1. Giới thiệu về Kafka
Kafka là một hệ thống nhật ký phân tán, hỗ trợ nhiều bản sao và người đăng ký, với khả năng phân vùng dựa trên Zookeeper để quản lý. Các tính năng chính của nó bao gồm:
- Cung cấp khả năng lưu trữ dữ liệu theo độ phức tạp thời gian O(1), cho phép truy cập hiệu quả ngay cả với lượng dữ liệu lớn hơn TB.
- Khả năng xử lý cao. Dù chạy trên phần cứng thương mại giá rẻ, Kafka vẫn có thể đạt được tốc độ truyền tải lên đến 100.000 tin nhắn mỗi giây trên một máy chủ đơn lẻ.
- Hỗ trợ phân vùng và tiêu thụ phân tán giữa các server Kafka, đảm bảo thứ tự và truyền tải thông tin trong từng partition.
- Hỗ trợ xử lý dữ liệu trực tuyến và xử lý dữ liệu ngoại tuyến.
2. Ứng dụng thực tế
- Thu thập nhật ký
- Phát dữ liệu
- Là bộ đệm lớn
- Middleware dịch vụ
3. Kiến trúc ứng dụng
Trong kiến trúc Kafka, một cụm bao gồm nhiều Producer (dữ liệu từ máy chủ, dữ liệu nghiệp vụ, page view từ frontend web), nhiều Broker (có thể mở rộng ngang, số lượng broker càng nhiều thì khả năng xử lý càng cao), nhiều Consumer Group, và một cụm Zookeeper (được dùng để quản lý cấu hình cụm, bầu chọn leader, và thực hiện rebalance khi có thay đổi trong nhóm người tiêu dùng).
3.1 Giải thích thuật ngữ
- Broker: Nút xử lý trung gian (máy chủ), mỗi máy chủ tương ứng với một broker, một cụm Kafka bao gồm một hoặc nhiều broker.
- Topic: Phân loại tin nhắn, mỗi tin nhắn gửi đến cụm đều phải chỉ định một topic.
- Partition: Khái niệm vật lý, mỗi topic chứa một hoặc nhiều partition, mỗi partition tương ứng với một thư mục chứa dữ liệu và file chỉ mục. Trong một partition, dữ liệu được sắp xếp theo trình tự.
- Producer: Người tạo ra tin nhắn, chịu trách nhiệm gửi tin nhắn tới broker.
- Consumer: Người đọc tin nhắn, nhận tin nhắn từ broker.
- Consumer Group: Mỗi consumer thuộc về một nhóm người tiêu dùng cụ thể, có thể chỉ định tên nhóm. Một tin nhắn có thể được gửi đến nhiều nhóm người tiêu dùng khác nhau, nhưng chỉ có duy nhất một consumer trong mỗi nhóm có thể xử lý một tin nhắn nhất định.
4. Triển khai Kafka trên Kubernetes
4.1 Chuẩn bị
- Một cụm Kubernetes có ít nhất ba nút (phiên bản sử dụng là 1.13.10)
- Một StorageClass đã được thiết lập (ở đây sử dụng CephFS)
4.2 Tệp YAML triển khai
1. Triển khai Zookeeper
apiVersion: v1
kind: Service
metadata:
name: zk-hs
namespace: kafka
labels:
app: zk
spec:
ports:
- port: 2888
name: server
- port: 3888
name: leader-election
clusterIP: None
selector:
app: zk
---
apiVersion: v1
kind: Service
metadata:
name: zk-cs
namespace: kafka
labels:
app: zk
spec:
ports:
- port: 2181
name: client
selector:
app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: zk-pdb
namespace: kafka
spec:
selector:
matchLabels:
app: zk
maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zk
namespace: kafka
spec:
selector:
matchLabels:
app: zk
serviceName: zk-hs
replicas: 3
updateStrategy:
type: RollingUpdate
podManagementPolicy: Parallel
template:
metadata:
labels:
app: zk
spec:
nodeSelector:
travis.io/schedule-only: "kafka"
tolerations:
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "NoSchedule"
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "NoExecute"
tolerationSeconds: 3600
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "PreferNoSchedule"
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
containers:
- name: kubernetes-zookeeper
imagePullPolicy: IfNotPresent
image: fastop/zookeeper:3.4.10
resources:
requests:
memory: "200Mi"
cpu: "0.1"
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
securityContext:
runAsUser: 0
fsGroup: 0
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: cephfs
resources:
requests:
storage: 20Gi
2. Triển khai Kafka
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
namespace: kafka
labels:
app: kafka
spec:
ports:
- port: 9092
name: server
clusterIP: None
selector:
app: kafka
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
namespace: kafka
spec:
selector:
matchLabels:
app: kafka
minAvailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
namespace: kafka
spec:
selector:
matchLabels:
app: kafka
serviceName: kafka-svc
replicas: 3
template:
metadata:
labels:
app: kafka
spec:
nodeSelector:
travis.io/schedule-only: "kafka"
tolerations:
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "NoSchedule"
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "NoExecute"
tolerationSeconds: 3600
- key: "travis.io/schedule-only"
operator: "Equal"
value: "kafka"
effect: "PreferNoSchedule"
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- kafka
topologyKey: "kubernetes.io/hostname"
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zk
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
containers:
- name: k8s-kafka
imagePullPolicy: IfNotPresent
image: fastop/kafka:2.2.0
resources:
requests:
memory: "600Mi"
cpu: 500m
ports:
- containerPort: 9092
name: server
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
--override listeners=PLAINTEXT://:9092 \
--override zookeeper.connect=zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181 \
--override log.dir=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=false \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=6000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override inter.broker.protocol.version=2.2.0 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=4 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 1
initialDelaySeconds: 5
securityContext:
runAsUser: 1000
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes: [ "ReadWriteMany" ]
storageClassName: cephfs
resources:
requests:
storage: 20Gi
4.3 Triển khai cụm
Zookeeper và Kafka đều là các dịch vụ có trạng thái, không thể triển khai bằng Deployment hay RC, nên chúng ta sử dụng StatefulSet.
4.3.1 Đánh dấu nút
Để xác định nơi nào sẽ chạy Kafka, cần đánh dấu các nút:
kubectl label node <node-name> travis.io/schedule-only=kafka
Nếu muốn tránh chạy trên một số nút, có thể sử dụng taint:
kubectl taint node <node-name> travis.io/schedule-only=kafka:NoSchedule
4.3.2 Tạo namespace
kubectl create ns kafka
4.3.3 Triển khai Zookeeper
kubectl apply -f zookeeper.yaml
kubectl get pod -n kafka
4.3.4 Triển khai Kafka
kubectl apply -f kafka.yaml
kubectl get pod -n kafka
4.3.5 Kiểm tra hoạt động
kubectl exec -it zk-0 -n kafka -- zkServer.sh status
kubectl exec -it zk-0 -n kafka -- zkCli.sh create /hello world
kubectl delete -f zookeeper.yaml
kubectl apply -f zookeeper.yaml
kubectl exec -it zk-0 -n kafka -- zkCli.sh get /hello
kubectl exec -it kafka-0 -n kafka -- bash
kafka-topics.sh --create \
--topic test \
--zookeeper zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181 \
--partitions 3 \
--replication-factor 2
kafka-topics.sh --list --zookeeper zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181
kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
# Mở terminal mới, truy cập vào kafka-1
kubectl exec -it kafka-1 -n kafka -- bash
kafka-console-producer.sh --topic test --broker-list localhost:9092