Triển khai cụm Kafka trên Kubernetes

1. Giới thiệu về Kafka

Kafka là một hệ thống nhật ký phân tán, hỗ trợ nhiều bản sao và người đăng ký, với khả năng phân vùng dựa trên Zookeeper để quản lý. Các tính năng chính của nó bao gồm:

  1. Cung cấp khả năng lưu trữ dữ liệu theo độ phức tạp thời gian O(1), cho phép truy cập hiệu quả ngay cả với lượng dữ liệu lớn hơn TB.
  2. Khả năng xử lý cao. Dù chạy trên phần cứng thương mại giá rẻ, Kafka vẫn có thể đạt được tốc độ truyền tải lên đến 100.000 tin nhắn mỗi giây trên một máy chủ đơn lẻ.
  3. Hỗ trợ phân vùng và tiêu thụ phân tán giữa các server Kafka, đảm bảo thứ tự và truyền tải thông tin trong từng partition.
  4. Hỗ trợ xử lý dữ liệu trực tuyến và xử lý dữ liệu ngoại tuyến.

2. Ứng dụng thực tế

  1. Thu thập nhật ký
  2. Phát dữ liệu
  3. Là bộ đệm lớn
  4. Middleware dịch vụ

3. Kiến trúc ứng dụng

Trong kiến trúc Kafka, một cụm bao gồm nhiều Producer (dữ liệu từ máy chủ, dữ liệu nghiệp vụ, page view từ frontend web), nhiều Broker (có thể mở rộng ngang, số lượng broker càng nhiều thì khả năng xử lý càng cao), nhiều Consumer Group, và một cụm Zookeeper (được dùng để quản lý cấu hình cụm, bầu chọn leader, và thực hiện rebalance khi có thay đổi trong nhóm người tiêu dùng).

3.1 Giải thích thuật ngữ

  • Broker: Nút xử lý trung gian (máy chủ), mỗi máy chủ tương ứng với một broker, một cụm Kafka bao gồm một hoặc nhiều broker.
  • Topic: Phân loại tin nhắn, mỗi tin nhắn gửi đến cụm đều phải chỉ định một topic.
  • Partition: Khái niệm vật lý, mỗi topic chứa một hoặc nhiều partition, mỗi partition tương ứng với một thư mục chứa dữ liệu và file chỉ mục. Trong một partition, dữ liệu được sắp xếp theo trình tự.
  • Producer: Người tạo ra tin nhắn, chịu trách nhiệm gửi tin nhắn tới broker.
  • Consumer: Người đọc tin nhắn, nhận tin nhắn từ broker.
  • Consumer Group: Mỗi consumer thuộc về một nhóm người tiêu dùng cụ thể, có thể chỉ định tên nhóm. Một tin nhắn có thể được gửi đến nhiều nhóm người tiêu dùng khác nhau, nhưng chỉ có duy nhất một consumer trong mỗi nhóm có thể xử lý một tin nhắn nhất định.

4. Triển khai Kafka trên Kubernetes

4.1 Chuẩn bị

  • Một cụm Kubernetes có ít nhất ba nút (phiên bản sử dụng là 1.13.10)
  • Một StorageClass đã được thiết lập (ở đây sử dụng CephFS)

4.2 Tệp YAML triển khai

1. Triển khai Zookeeper

apiVersion: v1
kind: Service
metadata:
  name: zk-hs
  namespace: kafka
  labels:
    app: zk
spec:
  ports:
  - port: 2888
    name: server
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: zk
---
apiVersion: v1
kind: Service
metadata:
  name: zk-cs
  namespace: kafka
  labels:
    app: zk
spec:
  ports:
  - port: 2181
    name: client
  selector:
    app: zk
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: zk-pdb
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: zk
  maxUnavailable: 1
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zk
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: zk
  serviceName: zk-hs
  replicas: 3
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: zk
    spec:
      nodeSelector:
          travis.io/schedule-only: "kafka"
      tolerations:
      - key: "travis.io/schedule-only"
        operator: "Equal"
        value: "kafka"
        effect: "NoSchedule"
      - key: "travis.io/schedule-only"
        operator: "Equal"
        value: "kafka"
        effect: "NoExecute"
        tolerationSeconds: 3600
      - key: "travis.io/schedule-only"
        operator: "Equal"
        value: "kafka"
        effect: "PreferNoSchedule"
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - zk
              topologyKey: "kubernetes.io/hostname"
      containers:
      - name: kubernetes-zookeeper
        imagePullPolicy: IfNotPresent
        image: fastop/zookeeper:3.4.10
        resources:
          requests:
            memory: "200Mi"
            cpu: "0.1"
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        command:
        - sh
        - -c
        - "start-zookeeper \
          --servers=3 \
          --data_dir=/var/lib/zookeeper/data \
          --data_log_dir=/var/lib/zookeeper/data/log \
          --conf_dir=/opt/zookeeper/conf \
          --client_port=2181 \
          --election_port=3888 \
          --server_port=2888 \
          --tick_time=2000 \
          --init_limit=10 \
          --sync_limit=5 \
          --heap=512M \
          --max_client_cnxns=60 \
          --snap_retain_count=3 \
          --purge_interval=12 \
          --max_session_timeout=40000 \
          --min_session_timeout=4000 \
          --log_level=INFO"
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/zookeeper
      securityContext:
        runAsUser: 0
        fsGroup: 0
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteMany" ]
      storageClassName: cephfs
      resources:
        requests:
          storage: 20Gi

2. Triển khai Kafka

apiVersion: v1
kind: Service
metadata:
  name: kafka-svc
  namespace: kafka
  labels:
    app: kafka
spec:
  ports:
  - port: 9092
    name: server
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: kafka-pdb
  namespace: kafka
spec:
  selector:
    matchLabels:
      app: kafka
  minAvailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
spec:
  selector:
     matchLabels:
        app: kafka
  serviceName: kafka-svc
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
    spec:
      nodeSelector:
          travis.io/schedule-only: "kafka"
      tolerations:
      - key: "travis.io/schedule-only"
        operator: "Equal"
        value: "kafka"
        effect: "NoSchedule"
      - key: "travis.io/schedule-only"
        operator: "Equal"
        value: "kafka"
        effect: "NoExecute"
        tolerationSeconds: 3600
      - key: "travis.io/schedule-only"
        operator: "Equal"
        value: "kafka"
        effect: "PreferNoSchedule"
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values: 
                    - kafka
              topologyKey: "kubernetes.io/hostname"
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values: 
                        - zk
                 topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 300
      containers:
      - name: k8s-kafka
        imagePullPolicy: IfNotPresent
        image: fastop/kafka:2.2.0
        resources:
          requests:
            memory: "600Mi"
            cpu: 500m
        ports:
        - containerPort: 9092
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9092 \
          --override zookeeper.connect=zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=2.2.0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=4 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka
        readinessProbe:
          tcpSocket:
            port: 9092
          timeoutSeconds: 1
          initialDelaySeconds: 5
      securityContext:
        runAsUser: 1000
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteMany" ]
      storageClassName: cephfs
      resources:
        requests:
          storage:  20Gi

4.3 Triển khai cụm

Zookeeper và Kafka đều là các dịch vụ có trạng thái, không thể triển khai bằng Deployment hay RC, nên chúng ta sử dụng StatefulSet.

4.3.1 Đánh dấu nút

Để xác định nơi nào sẽ chạy Kafka, cần đánh dấu các nút:

kubectl label node <node-name> travis.io/schedule-only=kafka

Nếu muốn tránh chạy trên một số nút, có thể sử dụng taint:

kubectl taint node <node-name> travis.io/schedule-only=kafka:NoSchedule

4.3.2 Tạo namespace

kubectl create ns kafka

4.3.3 Triển khai Zookeeper

kubectl apply -f zookeeper.yaml
kubectl get pod -n kafka

4.3.4 Triển khai Kafka

kubectl apply -f kafka.yaml
kubectl get pod -n kafka

4.3.5 Kiểm tra hoạt động

kubectl exec -it zk-0 -n kafka -- zkServer.sh status
kubectl exec -it zk-0 -n kafka -- zkCli.sh create /hello world
kubectl delete -f zookeeper.yaml
kubectl apply -f zookeeper.yaml
kubectl exec -it zk-0 -n kafka -- zkCli.sh get /hello

kubectl exec -it kafka-0 -n kafka -- bash 
kafka-topics.sh --create \
--topic test \
--zookeeper zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181 \
--partitions 3 \
--replication-factor 2
kafka-topics.sh --list --zookeeper zk-0.zk-hs.kafka.svc.cluster.local:2181,zk-1.zk-hs.kafka.svc.cluster.local:2181,zk-2.zk-hs.kafka.svc.cluster.local:2181
kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092

# Mở terminal mới, truy cập vào kafka-1
kubectl exec -it kafka-1 -n kafka -- bash
kafka-console-producer.sh --topic test --broker-list localhost:9092

Thẻ: Kubernetes kafka zookeeper statefulset storageclass

Đăng vào ngày 9 tháng 6 lúc 17:42