Apache Kafka คืออะไร?
Apache Kafka คือ distributed event streaming platform ที่พัฒนาโดย LinkedIn และใช้โดยบริษัตต่างๆ เช่น
- High Throughput: จัดการข้อมูลจำนวนได้หลายล้านรายชั่วโมง
- Scalability: ขยาย cluster จาก 3 ถึงหลาย nodes
- Durability: รักษาข้อมูลด้วย replication
- Real-time: ประมวลผมากต่ำมาก (milliseconds)
สถาประกอบ Apache Kafka + Node.js
สิ่งที่ต้องเตรียม
Java Requirements
- Java 8+ (OpenJDK 11)
- Apache Kafka (version 3.x)
- Zookeeper 3.5+
Node.js Requirements
- Node.js 14+ แนะนำ
- kafkajs (npm install kafkajs)
- Docker & Docker Compose (optional)
ขั้นตอนการติดตั้ง
ขั้นตอนที่ 1: ติดตั้ง Apache Kafka
ดาวน์โหลดและติดตั้ง Apache Kafka
# Ubuntu/Debian
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.1.0.tgz
tar -xzf kafka_2.13-3.1.0.tgz
cd kafka_2.13-3.1.0# macOS
brew install kafka
ขั้นตอนที่ 2: สร้าง Docker Compose
สร้าง docker-compose.yml สำหรับ Kafka + Zookeeper
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
volumes:
- zookeeper-data:/var/lib/zookeeper/data
- zookeeper-logs:/var/lib/zookeeper/log
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_LOG_RETENTION_HOURS: 168
volumes:
- kafka-data:/var/lib/kafka/data
- kafka-logs:/var/lib/kafka/log
networks:
- kafka-network
networks:
kafka-network:
driver: bridge
ขั้นตอนที่ 3: เริ่ม Docker Compose
รัน docker-compose เพื่อเริ่ม Kafka และ Zookeeper
docker-compose up -d
ขั้นตอนที่ 4: ตรวจสอบ Kafka
ตรวจสอบว่า Kafka รันอยู่เป็นปกติด
docker psdocker logs kafka# ตรวจสอบ topicsdocker exec kafka kafka-topics.sh --list
ตรวจสอบ: เปิด browser ไปที่ http://localhost:9092
ควรเห็น Kafka UI และ zookeeper connection
Node.js Producer - ส่งข้อความ
สร้าง Producer เพื่อส่งข้อความไปยัง Kafka topic
// producer.js
const { Kafka } = require('kafkajs');
const fs = require('fs');
// สร้าง Kafka producer
const kafka = new Kafka({
clientId: 'my-producer',
brokers: ['kafka:9092'],
});
// สร้าง producer
const producer = kafka.producer();
// ฟังก์ชันส่งข้อความ
async function sendMessage() {
const message = {
topic: 'my-topic',
messages: [
{ key: 'user-123', value: JSON.stringify({ name: 'John', action: 'login' }) },
{ key: 'user-456', value: JSON.stringify({ name: 'Jane', action: 'purchase' }) },
],
};
try {
await producer.send(message);
console.log('Message sent:', message);
} catch (error) {
console.error('Error sending message:', error);
}
}
// ส่งข้อความ 100 ครั้ง
setInterval(() => {
sendMessage();
}, 1000);
// จัดการ shutdown
process.on('SIGINT', async () => {
await producer.flush();
process.exit(0);
});
Node.js Consumer - รับข้อความ
สร้าง Consumer เพื่อรับข้อความจาก Kafka topic
// consumer.js
const { Kafka, Consumer } = require('kafkajs');
// สร้าง Kafka consumer
const kafka = new Kafka({
clientId: 'my-consumer',
brokers: ['kafka:9092'],
groupId: 'my-group',
});
// สร้าง consumer
const consumer = new Consumer({
groupId: 'my-group',
topic: 'my-topic',
fromBeginning: false,
});
// รัน consumer
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
topic,
partition,
offset: message.offset,
key: message.key.toString(),
value: message.value.toString(),
});
// ประมวลผม
console.log('Processing message...');
// Commit offset เมื่อประมวลผมเสร็จ
if (shouldCommit) {
consumer.commitOffsets();
}
},
});
// จัดการ shutdown
process.on('SIGINT', async () => {
await consumer.disconnect();
process.exit(0);
});
การตั้งค่า Kafka
# Broker ID
broker.id=1
# Listeners
listeners=PLAINTEXT://:9092
# Log directories
log.dir=/var/log/kafka
log.retention.hours=168
# Number of partitions (per topic)
num.partitions=3
# Replication factor
default.replication.factor=1
# Zookeeper connection
zookeeper.connect=localhost:2181
# Compression
compression.type=gzip
compression.level=6
# Batch size
batch.size=16384
# Acknowledgment
acks=all
# Linger time
linger.ms=0
# Retries
retries=3
# Buffer size
buffer.memory=33554432
การแก้ปัญหา
Kafka ไม่เริ่ม
ตรวจสอบ logs และดูว่า zookeeper มีปัญหา
docker logs kafkadocker logs zookeeper# ตรวจสอบ connectionsdocker exec kafka netstat -an | grep 9092
Connection timeout
Node.js ไม่สามารถเชื่อมต่อไปยัง Kafka
# เพิ่ม session timeoutsessionTimeout: 30000,requestTimeout: 30000,connectionTimeout: 10000
No messages
Consumer รับข้อความแต่ไม่มีข้อความใน topic
# ตรวจสอบ topicdocker exec kafka kafka-topics.sh --list# สร้าง test topicdocker exec kafka kafka-console-producer.sh --topic test-topic
Best Practices
1. ใช้ Producer Batching
ส่งข้อความหลายครั้งใน batch แทนการส่งทีละ
2. ตั้งค่า acks=all
รับรับรองจาก Kafka ว่าข้อความถูกส่งแล้ว
3. ใช้ Compression
เปิด gzip compression สำหรับ broker-to-broker
4. Monitor Lag
ตรวจสอบ consumer lag เพื่อให้ consumer รับข้อความทันเวลา
5. ปิด Producer เมื่อไม่ใช้
ปิด connections เพื่อประหลัด resources
6. ใช้ Consumer Groups
ให้ consumer ที่อ่าน topic เดียวกัน consume ข้อความ