Khởi động đơn lẻ
Cài đặt
tar -xzf kafka_2.10-0.10.1.1.tgz
cd kafka_2.10-0.10.1.1
Khởi chạy
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
Tạo topic
Mở cửa sổ terminal mới:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Gửi tin nhắn
Mở cửa sổ terminal mới:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Nhập dữ liệu:
hello world
hi
Nhận tin nhắn
Mở cửa sổ terminal mới:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Kết quả hiển thị:
hello world
hi
Thử gửi thêm tin nhắn từ producer, consumer sẽ tự động nhận được.
Thiết lập cụm Kafka
Tạo file cấu hình mới
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
Sửa file config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=logs/kafka-logs-1
Sửa file config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=logs/kafka-logs-2
Khởi chạy cụm
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
Tạo topic với 3 bản sao
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Gửi tin nhắn
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
Nhập:
my test message 1
my test message 2
Nhận tin nhắn
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
Kiểm tra chịu lỗi
Tìm process ID của server1:
ps aux | grep server-1.properties
Kill process (ví dụ PID=43116):
kill -9 43116
Chạy consumer để kiểm tra:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
Kết quả vẫn hiển thị đầy đủ tin nhắn:
my test message 1
my test message 2
Kafka Connect – Kết nối với file
Tạo file nguồn
echo -e "foo\nbar" > test.txt
Khởi chạy Connect
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Kiểm tra kết quả
cat test.sink.txt
Kết quả:
foo
bar
Giải thích cơ chế
File cấu hình connect-file-source.properties (nhập dữ liệu):
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
File cấu hình connect-file-sink.properties (xuất dữ liệu):
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Kiểm tra dữ liệu trong topic connect-test:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
Kết quả:
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
Thử thêm dữ liệu vào test.txt:
echo "Another line" >> test.txt
Chạy lại cat test.sink.txt sẽ thấy dòng mới được đồng bộ:
foo
bar
Another line