Hướng dẫn cài đặt và sử dụng Kafka cơ bản

Khởi động đơn lẻ

Cài đặt

tar -xzf kafka_2.10-0.10.1.1.tgz
cd kafka_2.10-0.10.1.1

Khởi chạy

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

Tạo topic

Mở cửa sổ terminal mới:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Gửi tin nhắn

Mở cửa sổ terminal mới:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

Nhập dữ liệu:

hello world
hi

Nhận tin nhắn

Mở cửa sổ terminal mới:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

Kết quả hiển thị:

hello world
hi

Thử gửi thêm tin nhắn từ producer, consumer sẽ tự động nhận được.

Thiết lập cụm Kafka

Tạo file cấu hình mới

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

Sửa file config/server-1.properties:

broker.id=1
listeners=PLAINTEXT://:9093
log.dir=logs/kafka-logs-1

Sửa file config/server-2.properties:

broker.id=2
listeners=PLAINTEXT://:9094
log.dir=logs/kafka-logs-2

Khởi chạy cụm

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

Tạo topic với 3 bản sao

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Gửi tin nhắn

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

Nhập:

my test message 1
my test message 2

Nhận tin nhắn

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

Kiểm tra chịu lỗi

Tìm process ID của server1:

ps aux | grep server-1.properties

Kill process (ví dụ PID=43116):

kill -9 43116

Chạy consumer để kiểm tra:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

Kết quả vẫn hiển thị đầy đủ tin nhắn:

my test message 1
my test message 2

Kafka Connect – Kết nối với file

Tạo file nguồn

echo -e "foo\nbar" > test.txt

Khởi chạy Connect

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kiểm tra kết quả

cat test.sink.txt

Kết quả:

foo
bar

Giải thích cơ chế

File cấu hình connect-file-source.properties (nhập dữ liệu):

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

File cấu hình connect-file-sink.properties (xuất dữ liệu):

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Kiểm tra dữ liệu trong topic connect-test:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Kết quả:

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

Thử thêm dữ liệu vào test.txt:

echo "Another line" >> test.txt

Chạy lại cat test.sink.txt sẽ thấy dòng mới được đồng bộ:

foo
bar
Another line

Thẻ: kafka zookeeper Message Queue Distributed Systems Stream Processing

Đăng vào ngày 7 tháng 6 lúc 21:55