K8s

2023-10-26 15:01:02
本文演示如何在K8s叢集下部署Kafka叢集,並且搭建後除了可以K8s內部存取Kafka服務,也支援從K8s叢集外部存取Kafka服務。服務的叢集部署通常有兩種方式:一種是 StatefulSet,另一種是 Service&Deployment。本次我們使用 StatefulSet 方式搭建 ZooKeeper 叢集,使用 Service&Deployment 搭建 Kafka 叢集。

一、建立 NFS 儲存

    NFS 儲存主要是為了給 Kafka、ZooKeeper 提供穩定的後端儲存,當 Kafka、ZooKeeper 的 Pod 發生故障重啟或遷移後,依然能獲得原先的資料。

1,安裝 NFS

這裡我選擇在 master 節點建立 NFS 儲存,首先執行如下命令安裝 NFS:
yum -y install nfs-utils
yum -y install rpcbind

2,建立共用資料夾

(1)執行如下命令建立 6 個資料夾:
mkdir -p /usr/local/k8s/zookeeper/pv{1..3}
mkdir -p /usr/local/k8s/kafka/pv{1..3}

(2)編輯 /etc/exports 檔案:

vim /etc/exports

(3)在裡面新增如下內容:

/usr/local/k8s/kafka/pv1 *(rw,sync,no_root_squash)
/usr/local/k8s/kafka/pv2 *(rw,sync,no_root_squash)
/usr/local/k8s/kafka/pv3 *(rw,sync,no_root_squash)
/usr/local/k8s/zookeeper/pv1 *(rw,sync,no_root_squash)
/usr/local/k8s/zookeeper/pv2 *(rw,sync,no_root_squash)
/usr/local/k8s/zookeeper/pv3 *(rw,sync,no_root_squash)

(4)儲存退出後執行如下命令重啟服務:

如果執行 systemctl restart nfs 報「Failed to restart nfs.service: Unit nfs.service not found.」錯誤,可以嘗試改用如下命令:

    • sudo service nfs-server start
systemctl restart rpcbind
systemctl restart nfs
systemctl enable nfs

(5)執行 exportfs -v 命令可以顯示出所有的共用目錄:

 (6)而其他的 Node 節點上需要執行如下命令安裝 nfs-utils 使用者端:

yum -y install nfs-util

(7)然後其他的 Node 節點上可執行如下命令(ip 為 Master 節點 IP)檢視 Master 節點上共用的資料夾:

showmount -e  107.106.37.33(nfs伺服器端的IP)

 

二、建立 ZooKeeper 叢集

1,建立 ZooKeeper PV

(1)首先建立一個 zookeeper-pv.yaml 檔案,內容如下:

注意:170.106.37.33 需要改成實際 NFS 伺服器地址:
apiVersion: v1
kind: PersistentVolume
metadata:
  name: k8s-pv-zk01
  labels:
    app: zk
  annotations:
    volume.beta.kubernetes.io/storage-class: "anything"
spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteOnce
  nfs:
    server: 170.106.37.33
    path: "/usr/local/k8s/zookeeper/pv1"
  persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: k8s-pv-zk02
  labels:
    app: zk
  annotations:
    volume.beta.kubernetes.io/storage-class: "anything"
spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteOnce
  nfs:
    server: 170.106.37.33
    path: "/usr/local/k8s/zookeeper/pv2"
  persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: k8s-pv-zk03
  labels:
    app: zk
  annotations:
    volume.beta.kubernetes.io/storage-class: "anything"
spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteOnce
  nfs:
    server: 170.106.37.33
    path: "/usr/local/k8s/zookeeper/pv3"
  persistentVolumeReclaimPolicy: Recycle

(2)然後執行如下命令建立 PV:

kubectl apply -f zookeeper-pv.yaml

(3)執行如下命令可以檢視是否建立成功:

kubectl get pv

2,建立 ZooKeeper 叢集

(1)我們這裡要搭建一個包含 3 個節點的 ZooKeeper 叢集。首先建立一個 zookeeper.yaml 檔案,內容如下:
apiVersion: v1
kind: Service
metadata:
  name: zk-hs
  labels:
    app: zk
spec:
  selector:
    app: zk
  clusterIP: None
  ports:
    - name: server
      port: 2888
    - name: leader-election
      port: 3888
---
apiVersion: v1
kind: Service
metadata:
  name: zk-cs
  labels:
    app: zk
spec:
  selector:
    app: zk
  type: NodePort
  ports:
    - name: client
      port: 2181
      nodePort: 31811
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zk
spec:
  serviceName: "zk-hs"
  replicas: 3 # by default is 1
  selector:
    matchLabels:
      app: zk # has to match .spec.template.metadata.labels
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: zk # has to match .spec.selector.matchLabels
    spec:
      containers:
        - name: zk
          imagePullPolicy: Always
          image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
          ports:
            - containerPort: 2181
              name: client
            - containerPort: 2888
              name: server
            - containerPort: 3888
              name: leader-election
          command:
            - sh
            - -c
            - "start-zookeeper \
        --servers=3 \
        --data_dir=/var/lib/zookeeper/data \
        --data_log_dir=/var/lib/zookeeper/data/log \
        --conf_dir=/opt/zookeeper/conf \
        --client_port=2181 \
        --election_port=3888 \
        --server_port=2888 \
        --tick_time=2000 \
        --init_limit=10 \
        --sync_limit=5 \
        --heap=4G \
        --max_client_cnxns=60 \
        --snap_retain_count=3 \
        --purge_interval=12 \
        --max_session_timeout=40000 \
        --min_session_timeout=4000 \
        --log_level=INFO"
          readinessProbe:
            exec:
              command:
                - sh
                - -c
                - "zookeeper-ready 2181"
            initialDelaySeconds: 10
            timeoutSeconds: 5
          livenessProbe:
            exec:
              command:
                - sh
                - -c
                - "zookeeper-ready 2181"
            initialDelaySeconds: 10
            timeoutSeconds: 5
          volumeMounts:
            - name: datadir
              mountPath: /var/lib/zookeeper
  volumeClaimTemplates:
    - metadata:
        name: datadir
        annotations:
          volume.beta.kubernetes.io/storage-class: "anything"
      spec:
        accessModes: [ "ReadWriteOnce" ]
        resources:
          requests:
            storage: 1Gi

(2)然後執行如下命令開始建立:

kubectl apply -f zookeeper.yaml

(3)執行如下命令可以檢視是否建立成功:

kubectl get pods
kubectl get service

 

三、建立 Kafka 叢集

(1)我們這裡要搭建一個包含 3 個節點的 Kafka 叢集。首先建立一個 kafka.yaml 檔案,內容如下:
注意:
  • nfs 地址需要改成實際 NFS 伺服器地址。
  • status.hostIP 表示宿主機的 IP,即 Pod 實際最終部署的 Node 節點 IP(本文我是直接部署到 Master 節點上),將 KAFKA_ADVERTISED_HOST_NAME 設定為宿主機 IP 可以確保 K8s 叢集外部也可以存取 Kafka

apiVersion: v1
kind: Service
metadata:
  name: kafka-service-1
labels:
  app: kafka-service-1
spec:
  type: NodePort
  ports:
    - port: 9092
      name: kafka-service-1
      targetPort: 9092
      nodePort: 30901
      protocol: TCP
    selector:
      app: kafka-1
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-2
labels:
  app: kafka-service-2
spec:
  type: NodePort
  ports:
    - port: 9092
      name: kafka-service-2
      targetPort: 9092
      nodePort: 30902
      protocol: TCP
    selector:
      app: kafka-2
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-3
labels:
  app: kafka-service-3
spec:
  type: NodePort
  ports:
    - port: 9092
      name: kafka-service-3
      targetPort: 9092
      nodePort: 30903
      protocol: TCP
    selector:
      app: kafka-3
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-1
  template:
    metadata:
      labels:
        app: kafka-1
    spec:
      containers:
        - name: kafka-1
          image: wurstmeister/kafka
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_CREATE_TOPICS
              value: mytopic:2:1
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://0.0.0.0:9092
            - name: KAFKA_ADVERTISED_PORT
              value: "30901"
            - name: KAFKA_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
          volumeMounts:
            - name: datadir
              mountPath: /var/lib/kafka
      volumes:
         - name: datadir
           nfs:
           server: 170.106.37.33
           path: "/usr/local/k8s/kafka/pv1"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-2
  template:
    metadata:
      labels:
        app: kafka-2
    spec:
      containers:
        - name: kafka-2
          image: wurstmeister/kafka
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
            - name: KAFKA_BROKER_ID
              value: "2"
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://0.0.0.0:9092
            - name: KAFKA_ADVERTISED_PORT

              value: "30902"
            - name: KAFKA_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
         volumeMounts:
            - name: datadir
              mountPath: /var/lib/kafka
      volumes:
        - name: datadir
          nfs:
            server: 170.106.37.33
            path: "/usr/local/k8s/kafka/pv2"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-3
  template:
    metadata:
      labels:
        app: kafka-3
    spec:
      containers:
        - name: kafka-3
          image: wurstmeister/kafka
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
            - name: KAFKA_BROKER_ID
              value: "3"
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://0.0.0.0:9092
            - name: KAFKA_ADVERTISED_PORT
              value: "30903"
            - name: KAFKA_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
         volumeMounts:
            - name: datadir
              mountPath: /var/lib/kafka
      volumes:
         - name: datadir
           nfs:
             server: 170.106.37.33
             path: "/usr/local/k8s/kafka/pv3"

(2)然後執行如下命令開始建立:

kubectl apply -f kafka.yaml

(3)執行如下命令可以檢視是否建立成功:

kubectl get pods
kubectl get service

 

四、開始測試

1,K8s 叢集內部測試

(1)首先執行如下命令進入一個容器:
kubectl exec -it kafka-deployment-1-59f87c7cbb-99k46 /bin/bash

(2)接著執行如下命令建立一個名為 test_topic 的 topic:

kafka-topics.sh --create --topic test_topic --zookeeper zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 --partitions 1 --replication-factor 1

(3)建立後執行如下命令開啟一個生產者,啟動後可以直接在控制檯中輸入訊息來傳送,控制檯中的每一行資料都會被視為一條訊息來傳送。

kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic

(4)重新再開啟一個終端連線伺服器,然後進入容器後執行如下命令開啟一個消費者:

kafka-console-consumer.sh --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic

(5)再次開啟之前的訊息生產使用者端來傳送訊息,並觀察消費者這邊對訊息的輸出來體驗 Kafka 對訊息的基礎處理。

 

2,叢集外出測試

    使用 Kafka 使用者端工具(Offset Explorer)連線 Kafka 叢集(可以通過 zookeeper 地址連線,也可以通過 kafka 地址連線),可以連線成功並能檢視到資料。

 

更多的測試命令參考:

二、Kafka生產者消費者範例(基於命令列)
1.建立一個itcasttopic的主題
程式碼如下(範例):
kafka-topics.sh --create --topic itcasttopic --partitions 3 --replication-factor 2 -zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181

2.hadoop01當生產者
程式碼如下(範例):
kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic itcasttopic

3.hadoop02當消費者
程式碼如下(範例):
kafka-console-consumer.sh --from-beginning --topic itcasttopic --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092

3.–list檢視所有主題
程式碼如下(範例):
kafka-topics.sh --list --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181

4.刪除主題
程式碼如下(範例):
kafka-topics.sh --delete --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181 --topic itcasttopic

5.關閉kafka
程式碼如下(範例):
bin/kafka-server-stop.sh config/server.properties