NFS 儲存主要是為了給 Kafka、ZooKeeper 提供穩定的後端儲存,當 Kafka、ZooKeeper 的 Pod 發生故障重啟或遷移後,依然能獲得原先的資料。
yum -y install nfs-utils yum -y install rpcbind
mkdir -p /usr/local/k8s/zookeeper/pv{1..3} mkdir -p /usr/local/k8s/kafka/pv{1..3}
(2)編輯 /etc/exports 檔案:
vim /etc/exports
(3)在裡面新增如下內容:
/usr/local/k8s/kafka/pv1 *(rw,sync,no_root_squash) /usr/local/k8s/kafka/pv2 *(rw,sync,no_root_squash) /usr/local/k8s/kafka/pv3 *(rw,sync,no_root_squash) /usr/local/k8s/zookeeper/pv1 *(rw,sync,no_root_squash) /usr/local/k8s/zookeeper/pv2 *(rw,sync,no_root_squash) /usr/local/k8s/zookeeper/pv3 *(rw,sync,no_root_squash)
(4)儲存退出後執行如下命令重啟服務:
如果執行 systemctl restart nfs 報「Failed to restart nfs.service: Unit nfs.service not found.」錯誤,可以嘗試改用如下命令:
systemctl restart rpcbind
systemctl restart nfs
systemctl enable nfs
(5)執行 exportfs -v 命令可以顯示出所有的共用目錄:
(6)而其他的 Node 節點上需要執行如下命令安裝 nfs-utils 使用者端:
yum -y install nfs-util
(7)然後其他的 Node 節點上可執行如下命令(ip 為 Master 節點 IP)檢視 Master 節點上共用的資料夾:
showmount -e 107.106.37.33(nfs伺服器端的IP)
(1)首先建立一個 zookeeper-pv.yaml 檔案,內容如下:
apiVersion: v1
kind: PersistentVolume
metadata:
name: k8s-pv-zk01
labels:
app: zk
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
nfs:
server: 170.106.37.33
path: "/usr/local/k8s/zookeeper/pv1"
persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: k8s-pv-zk02
labels:
app: zk
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
nfs:
server: 170.106.37.33
path: "/usr/local/k8s/zookeeper/pv2"
persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: k8s-pv-zk03
labels:
app: zk
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
capacity:
storage: 1Gi
accessModes:
- ReadWriteOnce
nfs:
server: 170.106.37.33
path: "/usr/local/k8s/zookeeper/pv3"
persistentVolumeReclaimPolicy: Recycle
(2)然後執行如下命令建立 PV:
kubectl apply -f zookeeper-pv.yaml
(3)執行如下命令可以檢視是否建立成功:
kubectl get pv
apiVersion: v1
kind: Service
metadata:
name: zk-hs
labels:
app: zk
spec:
selector:
app: zk
clusterIP: None
ports:
- name: server
port: 2888
- name: leader-election
port: 3888
---
apiVersion: v1
kind: Service
metadata:
name: zk-cs
labels:
app: zk
spec:
selector:
app: zk
type: NodePort
ports:
- name: client
port: 2181
nodePort: 31811
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zk
spec:
serviceName: "zk-hs"
replicas: 3 # by default is 1
selector:
matchLabels:
app: zk # has to match .spec.template.metadata.labels
updateStrategy:
type: RollingUpdate
podManagementPolicy: Parallel
template:
metadata:
labels:
app: zk # has to match .spec.selector.matchLabels
spec:
containers:
- name: zk
imagePullPolicy: Always
image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
ports:
- containerPort: 2181
name: client
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=4G \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 10
timeoutSeconds: 5
volumeMounts:
- name: datadir
mountPath: /var/lib/zookeeper
volumeClaimTemplates:
- metadata:
name: datadir
annotations:
volume.beta.kubernetes.io/storage-class: "anything"
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
(2)然後執行如下命令開始建立:
kubectl apply -f zookeeper.yaml
(3)執行如下命令可以檢視是否建立成功:
kubectl get pods
kubectl get service
apiVersion: v1
kind: Service
metadata:
name: kafka-service-1
labels:
app: kafka-service-1
spec:
type: NodePort
ports:
- port: 9092
name: kafka-service-1
targetPort: 9092
nodePort: 30901
protocol: TCP
selector:
app: kafka-1
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-2
labels:
app: kafka-service-2
spec:
type: NodePort
ports:
- port: 9092
name: kafka-service-2
targetPort: 9092
nodePort: 30902
protocol: TCP
selector:
app: kafka-2
---
apiVersion: v1
kind: Service
metadata:
name: kafka-service-3
labels:
app: kafka-service-3
spec:
type: NodePort
ports:
- port: 9092
name: kafka-service-3
targetPort: 9092
nodePort: 30903
protocol: TCP
selector:
app: kafka-3
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-1
spec:
replicas: 1
selector:
matchLabels:
app: kafka-1
template:
metadata:
labels:
app: kafka-1
spec:
containers:
- name: kafka-1
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_CREATE_TOPICS
value: mytopic:2:1
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_PORT
value: "30901"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.hostIP
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
volumes:
- name: datadir
nfs:
server: 170.106.37.33
path: "/usr/local/k8s/kafka/pv1"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-2
spec:
replicas: 1
selector:
matchLabels:
app: kafka-2
template:
metadata:
labels:
app: kafka-2
spec:
containers:
- name: kafka-2
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
- name: KAFKA_BROKER_ID
value: "2"
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_PORT
value: "30902"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.hostIP
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
volumes:
- name: datadir
nfs:
server: 170.106.37.33
path: "/usr/local/k8s/kafka/pv2"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment-3
spec:
replicas: 1
selector:
matchLabels:
app: kafka-3
template:
metadata:
labels:
app: kafka-3
spec:
containers:
- name: kafka-3
image: wurstmeister/kafka
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9092
env:
- name: KAFKA_ZOOKEEPER_CONNECT
value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
- name: KAFKA_BROKER_ID
value: "3"
- name: KAFKA_LISTENERS
value: PLAINTEXT://0.0.0.0:9092
- name: KAFKA_ADVERTISED_PORT
value: "30903"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.hostIP
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
volumes:
- name: datadir
nfs:
server: 170.106.37.33
path: "/usr/local/k8s/kafka/pv3"
(2)然後執行如下命令開始建立:
kubectl apply -f kafka.yaml
(3)執行如下命令可以檢視是否建立成功:
kubectl get pods
kubectl get service
kubectl exec -it kafka-deployment-1-59f87c7cbb-99k46 /bin/bash
(2)接著執行如下命令建立一個名為 test_topic 的 topic:
kafka-topics.sh --create --topic test_topic --zookeeper zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 --partitions 1 --replication-factor 1
(3)建立後執行如下命令開啟一個生產者,啟動後可以直接在控制檯中輸入訊息來傳送,控制檯中的每一行資料都會被視為一條訊息來傳送。
kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic
(4)重新再開啟一個終端連線伺服器,然後進入容器後執行如下命令開啟一個消費者:
kafka-console-consumer.sh --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic
(5)再次開啟之前的訊息生產使用者端來傳送訊息,並觀察消費者這邊對訊息的輸出來體驗 Kafka 對訊息的基礎處理。
二、Kafka生產者消費者範例(基於命令列) 1.建立一個itcasttopic的主題 程式碼如下(範例): kafka-topics.sh --create --topic itcasttopic --partitions 3 --replication-factor 2 -zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181 2.hadoop01當生產者 程式碼如下(範例): kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic itcasttopic 3.hadoop02當消費者 程式碼如下(範例): kafka-console-consumer.sh --from-beginning --topic itcasttopic --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 3.–list檢視所有主題 程式碼如下(範例): kafka-topics.sh --list --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181 4.刪除主題 程式碼如下(範例): kafka-topics.sh --delete --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181 --topic itcasttopic 5.關閉kafka 程式碼如下(範例): bin/kafka-server-stop.sh config/server.properties