調大zookeeper的heap記憶體,預設是1G,可以根據伺服器大小設定其堆記憶體為2G或者4G,kafka實時傳輸的資料如果達到PB級別的話,得觀察一下YGC和FGC的值可以適當再次調大。
修改kafka的副本數,預設的副本數是1,建議修改為2,如果副本數為2,那麼容災能力就是1,如果副本數3,則容災能力就是2,當然副本數越多,可能會導致叢集的效能下降,但是可靠性更強,各有利弊,推薦副本數為2。
kafka推薦分割區數,預設的分割區數是1,理論上來說,parition的數量小於core的數量的話,值越大,kafka的吞吐量就越高,但是你必須得考慮你的磁碟IO的瓶頸,因此我不推薦你將分割區數這隻過大,建議這個值大於broker的數量,比如叢集broker的只有5臺,叢集的partition數量是20。
kafka的heap記憶體,預設也是1G,生成環境中建議將它調大,不知道大家有沒有發現,你broker的heap記憶體不管有多的,它都能給你吃滿!在生成環境中給kafka的heap記憶體是6G,kafka主要使用堆外記憶體,即大量使用作業系統的頁快取,因此其並不需要分配太多的堆記憶體空間,zookeeper給的是2G,剩下的全部給作業系統預留著,否則你的機器會非常的卡頓。
kafka組態檔調優
參見kafka的設定分為 broker、producter、consumer三個不同的設定詳解
切記,不要弄太高版本,否則zookeeper-3.5.*安裝報錯:找不到或無法載入主類 org.apache.zookeeper.server.quorum.QuorumPeerMain
[root@z_k zookeeper]# wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
[root@z_k zookeeper]# tar -zxf zookeeper-3.4.13.tar.gz -C /soft/
[root@z_k ~]# cp -r /soft/zookeeper-3.4.13/ /soft/zk
[root@z_k ~]# cat /soft/zk/conf/java.env
#!/bin/bash
#指定JDK的安裝路徑
export JAVA_HOME=/soft/jdk
#指定zookeeper的heap記憶體大小
export JVMFLAGS="-Xms2048m -Xmx2048m $JVMFLAGS"
需要手動建立,或者從「 /soft/zk/conf/zoo_sample.cfg 」賦值一個模板即可
[root@z_k ~]# vim /soft/zk/conf/zoo.cfg
# 滴答,計時的基本單位,預設是2000毫秒,即2秒。它是zookeeper最小的時間單位,
# 用於丈量心跳時間和超時時間等,通常設定成預設2秒即可。
tickTime=2000
# 初始化限制是10滴答,預設是10個滴答,即預設是20秒。
# 指定follower節點初始化是連結leader節點的最大tick次數。
initLimit=5
# 資料同步的時間限制,預設是5個滴答,即預設時間是10秒。設定了follower節點與leader節點進行同步的最大時間。
# 與initLimit類似,它也是以tickTime為單位進行指定的。
syncLimit=2
# 指定zookeeper的工作目錄,這是一個非常重要的引數,zookeeper會在記憶體中在記憶體只能中儲存系統快照,
# 並定期寫入該路徑指定的資料夾中。生產環境中需要注意該資料夾的磁碟佔用情況。
dataDir=/home/z_k/zookeeper
# 監聽zookeeper的預設埠。zookeeper監聽使用者端連結的埠,一般設定成預設2181即可。
clientPort=2181
# 這個操作將限制連線到 ZooKeeper 的使用者端的數量,限制並行連線的數量,它通過 IP 來區分不同的使用者端。
# 此設定選項可以用來阻止某些類別的 Dos 攻擊。將它設定為 0 或者忽略而不進行設定將會取消對並行連線的限制。
#maxClientCnxns=60
# 在上文中已經提到,3.4.0及之後版本,ZK提供了自動清理事務紀錄檔和快照檔案的功能,這個引數指定了清理頻率,單位是小時,
# 需要設定一個1或更大的整數,預設是0,表示不開啟自動清理功能。
#autopurge.purgeInterval=1
# 這個引數和上面的引數搭配使用,這個引數指定了需要保留的檔案數目。預設是保留3個。
#autopurge.snapRetainCount=3
#server.x=[hostname]:nnnnn[:nnnnn],這裡的x是一個數位,與myid檔案中的id是一致的。
# 右邊可以設定兩個埠,第一個埠用於F和L之間的資料同步和其它通訊,第二個埠用於Leader選舉過程中投票通訊。
server.117=10.1.3.117:2888:3888
server.118=10.1.3.118:2888:3888
server.119=10.1.3.119:2888:3888
[root@z_k ~]#
[root@z_k ~]# vim /usr/local/bin/xzk.sh
#判斷使用者是否傳參
if [ $# -ne 1 ];then
echo "無效引數,用法為: $0 {start|stop|restart|status}"
exit
fi
#獲取使用者輸入的命令
cmd=$1
#定義函數功能
function zookeeperManger(){
case $cmd in
start)
echo "啟動服務"
remoteExecution start
;;
stop)
echo "停止服務"
remoteExecution stop
;;
restart)
echo "重新啟動服務"
remoteExecution restart
;;
status)
echo "檢視狀態"
remoteExecution status
;;
*)
echo "無效引數,用法為: $0 {start|stop|restart|status}"
;;
esac
}
#定義執行的命令
function remoteExecution(){
for (( i=117 ; i<=119 ; i++ )) ; do
tput setaf 2
echo ========== kafka${i} zkServer.sh $1 ================
tput setaf 9
ssh kafka${i} "source /etc/profile ; zkServer.sh $1"
done
}
#呼叫函數
zookeeperManger
賦予執行許可權
[root@z_k ~]#
[root@z_k ~]# chmod +x /usr/local/bin/xzk.sh
[root@z_k ~]#
[root@z_k ~]# ll /usr/local/bin/xzk.sh
-rwxr-xr-x 1 root root 1101 Nov 7 10:53 /usr/local/bin/xzk.sh
[root@z_k ~]#
[root@z_k ~]# vim /usr/local/bin/xrsync.sh
#判斷使用者是否傳參
if [ $# -lt 1 ];then
echo "請輸入引數";
exit
fi
#獲取檔案路徑
file=$@
#獲取子路徑
filename=`basename $file`
#獲取父路徑
dirpath=`dirname $file`
#獲取完整路徑
cd $dirpath
fullpath=`pwd -P`
#同步檔案到Kafka叢集
for (( i=117;i<=119;i++ ))
do
#使終端變綠色
tput setaf 2
echo =========== kafka${i} : $file ===========
#使終端變回原來的顏色,即白灰色
tput setaf 7
#遠端執行命令
rsync -lr $filename `whoami`@kafka${i}:$fullpath
#判斷命令是否執行成功
if [ $? == 0 ];then
echo "命令執行成功"
fi
done
[root@z_k ~]# chmod +x /usr/local/bin/xrsync.sh
[root@z_k ~]# tail -5 /etc/profile
#ADD Zookeeper PATH BY z_k
export ZOOKEEPER=/soft/zk
export PATH=$PATH:$ZOOKEEPER/bin
export KAFKA_HOME=/soft/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@z_k ~]#
[root@z_k ~]# xrsync.sh /etc/profile
=========== z_k : /etc/profile ===========
命令執行成功
=========== kafka117 : /etc/profile ===========
命令執行成功
=========== kafka118 : /etc/profile ===========
命令執行成功
=========== kafka119 : /etc/profile ===========
命令執行成功
[root@z_k ~]#
同步zookeeper安裝檔案,到各個節點,接下來需要在「/home/z_k/zookeeper/」目錄中建立一個myid,並寫入組態檔。也可以使用一個shell迴圈搞定,僅供參考。
for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /soft/kafka/config" ;done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /home/data_zk/kafka/logs" ;done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /home/data_zk/kafka/logs2" ;done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "mkdir -p /home/data_zk/kafka/logs3" ;done
xrsync.sh /soft/zk
for (( i=117;i<=119;i++ )) do ssh kafka${i} "echo -n $i > /home/z_k/zookeeper/myid" ;done
[root@z_k ~]# xzk.sh status
#這是檢視zookeeper的狀態,如果是啟動,或停止zookeeper,
#直接呼叫start或者stop方法即可
檢視狀態
========== kafka117 zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: follower
========== kafka118 zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: leader #很顯然,該節點為zookeeper節點。
========== kafka119 zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: follower
[root@z_k ~]#
[root@z_k ~]# wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
[root@z_k ~]# tar -zxf kafka_2.11-0.10.2.1.tgz -C /soft/
[root@z_k ~]# cp -r /soft/kafka_2.11-0.10.2.1/ /soft/kafka
[root@z_k ~]# cat /soft/kafka/config/server.properties | grep -v ^# | grep -v ^$
broker.id=117
delete.topic.enable=true
auto.create.topics.enable=false
port=9092
host.name=10.1.3.117
num.network.threads=30
num.io.threads=30
socket.send.buffer.bytes=5242880
socket.receive.buffer.bytes=5242880
socket.request.max.bytes=104857600
queued.max.requests=1000
log.dirs=/home/z_k/kafka/logs,/home/z_k/kafka/logs2,/home/z_k/kafka/log3
num.partitions=20
num.recovery.threads.per.data.dir=1
default.replication.factor=2
message.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=600000
zookeeper.connect=10.1.3.117:2181,10.1.3.118:2181,10.1.3.119:2181
zookeeper.session.timeout.ms=180000
zookeeper.connection.timeout.ms=6000
max.request.size=104857600
fetch.message.max.bytes=104857600
replica.fetch.max.bytes=104857600
replica.fetch.wait.max.ms=2000
unclean.leader.election=false
num.replica.fetchers=5
[root@z_k ~]#
[root@z_k ~]# cat /soft/kafka/config/server.properties #上述組態檔詳解
主要按需求,調堆大記憶體。
[root@z_k ~]# cat /soft/kafka/bin/kafka-server-start.sh
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ $# -lt 1 ];
then
echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
exit 1
fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
#在這裡指定堆記憶體為20G
export KAFKA_HEAP_OPTS="-Xmx20G -Xms20G"
fi
EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
[root@z_k ~]# cat /usr/local/bin/xkafka.sh
#!/bin/bash
#判斷使用者是否傳參
if [ $# -ne 1 ];then
echo "無效引數,用法為: $0 {start|stop}"
exit
fi
#獲取使用者輸入的命令
cmd=$1
for (( i=117 ; i<=119 ; i++ )) ; do
tput setaf 2
echo ========== kafka${i} $cmd ================
tput setaf 9
case $cmd in
start)
ssh kafka${i} "source /etc/profile ; nohup kafka-server-start.sh /soft/kafka/config/server.properties >> /home/z_k/kafka/console/kafka-`date +%F`.log &"
#ssh kafka${i} "source /etc/profile ; kafka-server-start.sh -daemon /soft/kafka/config/server.properties"
echo kafka${i} "服務已啟動"
;;
stop)
ssh kafka${i} "source /etc/profile ; kafka-server-stop.sh"
echo kafka${i} "服務已停止"
;;
*)
echo "無效引數,用法為: $0 {start|stop}"
exit
;;
esac
done
[root@z_k ~]# chmod +x /usr/local/bin/xkafka.sh
xrsync.sh /soft/kafka
for (( i=117;i<=119;i++ )) do ssh kafka${i} "sed -i 's#broker.id=117#broker.id=${i}#g' /soft/kafka/config/server.properties ";done
for (( i=117;i<=119;i++ )) do ssh kafka${i} "sed -i 's#host.name=10.1.3.117#host.name=10.1.3.${i}#g' /soft/kafka/config/server.properties ";done
編寫批次執行的指令碼,需要你手動設定SSH免密要登入。
[root@z_k ~]# cat /usr/local/bin/xcall.sh
#!/bin/bash
#判斷使用者是否傳參
if [ $# -lt 1 ];then
echo "請輸入引數"
exit
fi
#獲取使用者輸入的命令
cmd=$@
#Kafka叢集批次執行命令
for (( i=117;i<=119;i++ ))
do
#使終端變綠色
tput setaf 2
echo =========== kafka${i} : $cmd ===========
#使終端變回原來的顏色,即白灰色
tput setaf 7
#遠端執行命令
ssh kafka${i} $cmd
#判斷命令是否執行成功
if [ $? == 0 ];then
echo "命令執行成功"
fi
done
[root@z_k ~]# chmod +x /usr/local/bin/xkafka.sh
[root@z_k ~]# xcall.sh jps
命令執行成功
=========== kafka117 : jps ===========
953 Jps
8236 Kafka
4735 QuorumPeerMain
命令執行成功
=========== kafka118 : jps ===========
4616 QuorumPeerMain
2425 Jps
8382 Kafka
命令執行成功
=========== kafka119 : jps ===========
23953 Jps
4763 QuorumPeerMain
8079 Kafka
搭建KafkaEagle,可以監控kafka及zookeeper叢集的是否正常,下載kafka-eagle-web-2.0.2-bin.tar.gz
搭建教學,詳見https://blog.csdn.net/m0_46522455/article/details/108827979
監控kafka叢集http://172.16.6.13:8048/system/resource
大屏展示