อัปเดตบทความใหม่ทุกสัปดาห์

Apache Kafka ด้วย Node.js

Event Streaming ที่แข็งและมีประสิทธิ์ สำหรับ Producer/Consumer จริง

คู่มือสอนขั้นตอนการตั้งค่า Apache Kafka Cluster ด้วย Node.js พร้อม Docker Compose และ Best Practices

Apache Kafka
Node.js
Docker
Zookeeper

Apache Kafka คืออะไร?

Apache Kafka คือ distributed event streaming platform ที่พัฒนาโดย LinkedIn และใช้โดยบริษัตต่างๆ เช่น

  • High Throughput: จัดการข้อมูลจำนวนได้หลายล้านรายชั่วโมง
  • Scalability: ขยาย cluster จาก 3 ถึงหลาย nodes
  • Durability: รักษาข้อมูลด้วย replication
  • Real-time: ประมวลผมากต่ำมาก (milliseconds)

สถาประกอบ Apache Kafka + Node.js

Zookeeper Kafka Brokers Producer Consumer Producer → Kafka Topic → Consumer Legend Kafka/Zookeeper Node.js App

สิ่งที่ต้องเตรียม

Java Requirements

  • Java 8+ (OpenJDK 11)
  • Apache Kafka (version 3.x)
  • Zookeeper 3.5+

Node.js Requirements

  • Node.js 14+ แนะนำ
  • kafkajs (npm install kafkajs)
  • Docker & Docker Compose (optional)

ขั้นตอนการติดตั้ง

ขั้นตอนที่ 1: ติดตั้ง Apache Kafka

ดาวน์โหลดและติดตั้ง Apache Kafka

# Ubuntu/Debian wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.1.0.tgz tar -xzf kafka_2.13-3.1.0.tgz cd kafka_2.13-3.1.0

# macOS brew install kafka

ขั้นตอนที่ 2: สร้าง Docker Compose

สร้าง docker-compose.yml สำหรับ Kafka + Zookeeper

version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_INIT_LIMIT: 5
      ZOOKEEPER_SYNC_LIMIT: 2
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data
      - zookeeper-logs:/var/lib/zookeeper/log

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    hostname: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_LOG_RETENTION_HOURS: 168
    volumes:
      - kafka-data:/var/lib/kafka/data
      - kafka-logs:/var/lib/kafka/log
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

ขั้นตอนที่ 3: เริ่ม Docker Compose

รัน docker-compose เพื่อเริ่ม Kafka และ Zookeeper

docker-compose up -d

ขั้นตอนที่ 4: ตรวจสอบ Kafka

ตรวจสอบว่า Kafka รันอยู่เป็นปกติด

docker ps
docker logs kafka

# ตรวจสอบ topics
docker exec kafka kafka-topics.sh --list

ตรวจสอบ: เปิด browser ไปที่ http://localhost:9092 ควรเห็น Kafka UI และ zookeeper connection

Node.js Producer - ส่งข้อความ

สร้าง Producer เพื่อส่งข้อความไปยัง Kafka topic

// producer.js
const { Kafka } = require('kafkajs');
const fs = require('fs');

// สร้าง Kafka producer
const kafka = new Kafka({
    clientId: 'my-producer',
    brokers: ['kafka:9092'],
});

// สร้าง producer
const producer = kafka.producer();

// ฟังก์ชันส่งข้อความ
async function sendMessage() {
    const message = {
        topic: 'my-topic',
        messages: [
            { key: 'user-123', value: JSON.stringify({ name: 'John', action: 'login' }) },
            { key: 'user-456', value: JSON.stringify({ name: 'Jane', action: 'purchase' }) },
        ],
    };

    try {
        await producer.send(message);
        console.log('Message sent:', message);
    } catch (error) {
        console.error('Error sending message:', error);
    }
}

// ส่งข้อความ 100 ครั้ง
setInterval(() => {
    sendMessage();
}, 1000);

// จัดการ shutdown
process.on('SIGINT', async () => {
    await producer.flush();
    process.exit(0);
});

Node.js Consumer - รับข้อความ

สร้าง Consumer เพื่อรับข้อความจาก Kafka topic

// consumer.js
const { Kafka, Consumer } = require('kafkajs');

// สร้าง Kafka consumer
const kafka = new Kafka({
    clientId: 'my-consumer',
    brokers: ['kafka:9092'],
    groupId: 'my-group',
});

// สร้าง consumer
const consumer = new Consumer({
    groupId: 'my-group',
    topic: 'my-topic',
    fromBeginning: false,
});

// รัน consumer
await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            topic,
            partition,
            offset: message.offset,
            key: message.key.toString(),
            value: message.value.toString(),
        });

        // ประมวลผม
        console.log('Processing message...');
        
        // Commit offset เมื่อประมวลผมเสร็จ
        if (shouldCommit) {
            consumer.commitOffsets();
        }
    },
});

// จัดการ shutdown
process.on('SIGINT', async () => {
    await consumer.disconnect();
    process.exit(0);
});

การตั้งค่า Kafka

server.properties
# Broker ID
broker.id=1

# Listeners
listeners=PLAINTEXT://:9092

# Log directories
log.dir=/var/log/kafka
log.retention.hours=168

# Number of partitions (per topic)
num.partitions=3

# Replication factor
default.replication.factor=1

# Zookeeper connection
zookeeper.connect=localhost:2181
producer.config
# Compression
compression.type=gzip
compression.level=6

# Batch size
batch.size=16384

# Acknowledgment
acks=all

# Linger time
linger.ms=0

# Retries
retries=3

# Buffer size
buffer.memory=33554432

การแก้ปัญหา

Kafka ไม่เริ่ม

ตรวจสอบ logs และดูว่า zookeeper มีปัญหา

docker logs kafka
docker logs zookeeper

# ตรวจสอบ connections
docker exec kafka netstat -an | grep 9092

Connection timeout

Node.js ไม่สามารถเชื่อมต่อไปยัง Kafka

# เพิ่ม session timeout
sessionTimeout: 30000,
requestTimeout: 30000,
connectionTimeout: 10000

No messages

Consumer รับข้อความแต่ไม่มีข้อความใน topic

# ตรวจสอบ topic
docker exec kafka kafka-topics.sh --list

# สร้าง test topic
docker exec kafka kafka-console-producer.sh --topic test-topic

Best Practices

1. ใช้ Producer Batching

ส่งข้อความหลายครั้งใน batch แทนการส่งทีละ

2. ตั้งค่า acks=all

รับรับรองจาก Kafka ว่าข้อความถูกส่งแล้ว

3. ใช้ Compression

เปิด gzip compression สำหรับ broker-to-broker

4. Monitor Lag

ตรวจสอบ consumer lag เพื่อให้ consumer รับข้อความทันเวลา

5. ปิด Producer เมื่อไม่ใช้

ปิด connections เพื่อประหลัด resources

6. ใช้ Consumer Groups

ให้ consumer ที่อ่าน topic เดียวกัน consume ข้อความ