前置條件
# 優先設定解析
vim /etc/hosts
192.168.25.106 kafka01
192.168.25.107 kafka02
192.168.25.108 kafka03
# 臨時生效
hostname kafka01
# 再配合永久生效修改, 避免重新啓動實效
hostnamectl set-hostname kafka01
或(兩種方式等價, 上面命令會直接修改 /etc/hostname 檔案)
vim /etc/hostname
kafka01
# 停止firewall
systemctl stop firewalld.service
# 禁止firewall開機啓動
systemctl disable firewalld.service
# 檢視firewall狀態( Active: inactive (dead) )
systemctl status firewalld.service
安裝指令碼
install_kafka.sh
#!/bin/bash
###############################################
#
# GIN INSTALL KAFKA ENV
# 需要設定 INSTALL_PATH
# 執行shell需要參數$1(叢集中broker序號), 本機主機名, 例: ./kfk_install.sh 1 kafka01
# config INSTALL_PATH (for broker id), hostname, execute like: ./kfk_install.sh 1 kafka01
# 需要設定 ZOOKEEPER_CLUSTER 單台則設定一個主機名:埠即可
# configure ZOOKEEPER_CLUSTER, you can configure a hostname:port for a single
###############################################
# install_kfk_path
INSTALL_PATH='/home/kafka'
### Kafka links zookeeper cluster
ZOOKEEPER_CLUSTER='kafka01:2181,kafka02:2181,kafka03:2181'
### -------- 指令碼入參項 --------
### 表示安裝第幾台,broker id 會和該值一樣
### number of current machinei, such as: sh kfk_install.sh 1 kafka01
MACHINE_ID=$1
### 表示當前機器的主機名, 不能使用IP
### Indicates the host name of the current machine. IP cannot be used
LOACL_HOSTNAME=$2
### -------- 無需修改項 --------
PKG_NAME='kafka_2.11-2.2.0.tgz'
DIR_NAME='kafka_2.11-2.2.0'
function check_jdk()
{
### kfk install need JAVA_HOME.
if [[ ! -d $JAVA_HOME ]];
then
echo "JAVA_HOME not set"
exit 1
else
echo "JAVA_HOME=$JAVA_HOME"
fi
}
function check_package()
{
### check install_path exists
if [ ! -d "${INSTALL_PATH}" ];
then
echo "${INSTALL_PATH} not exit, mkdir"
mkdir -p "${INSTALL_PATH}"
else
echo "${INSTALL_PATH} is exit"
fi
### get .tar.gz package name from current file
PKG_NAME=`ls | grep kafka | grep .tgz`
### check package
if [ ! -f "${PKG_NAME}" ]
then
echo "you need install package!"
exit
fi
### check unzip tmp dir
DIR_NAME=`ls -l | grep '^d' |grep kafka |awk '{print$9}'`
if [ -d "${DIR_NAME}" ];
then
echo "${DIR_NAME} is exit, rm unzip path"
rm -rf "${DIR_NAME}"
else
echo "DIR_NAME is ok"
fi
}
function install_info(){
### execute shell param confirm 1
if [ ! -z "$MACHINE_ID" ];
then
echo 'current MACHINE_ID='$MACHINE_ID
else
### default value
MACHINE_ID="1"
echo "empty input param('$1'),using default 1"
fi
### execute shell param confirm 2
if [ ! -z "$LOACL_HOSTNAME" ];
then
echo 'current LOACL_HOSTNAME='$LOACL_HOSTNAME
else
### default value
LOACL_HOSTNAME="kafka01"
echo "empty input param('$2'),using default kafka01"
fi
echo
echo "INSTALL_PATH: ${INSTALL_PATH}"
echo "MACHINE_ID: ${MACHINE_ID}"
echo "LOACL_HOSTNAME: ${LOACL_HOSTNAME}"
echo "ZOOKEEPER_CLUSTER: ${ZOOKEEPER_CLUSTER}"
echo
while true; do
read -p "Check that the configuration, press [y/n] to continue: " yn
case $yn in
[Yy]* ) break;;
[Nn]* ) exit;;
* ) echo "please input y/n.";;
esac
done
}
function install_kfk(){
tar -xf $PKG_NAME
### get file name
DIR_NAME=`ls -l | grep '^d' |grep kafka |awk '{print$9}'`
mv $DIR_NAME $INSTALL_PATH
### kafka path
TARGET_PATH=$INSTALL_PATH/$DIR_NAME
### configuration file path and data path
kfk_conf=$TARGET_PATH/config/server.properties
kfk_data=$TARGET_PATH/data
### config cluster
### cluster id
sed -i 's|broker.id=0|broker.id='$MACHINE_ID'|g' $kfk_conf
### cluster hostname
sed -i 's|#listeners=PLAINTEXT://:9092|listeners=PLAINTEXT://'$LOACL_HOSTNAME':9092|g' $kfk_conf
### cluster log data path
mkdir -p $kfk_data
sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs='$kfk_data'|g' $kfk_conf
### Kafka links zookeeper cluster
sed -i 's|zookeeper.connect=localhost:2181|zookeeper.connect='$zookeeper_cluster'|g' $kfk_conf
### config env
if [[ -z $KAFKA_HOME ]];then
echo "### kfk env begin" >> /etc/profile
echo "export KAFKA_HOME=$TARGET_PATH" >> /etc/profile
### use '' avoid $PATH resolved to actual value,if used "" $PATH need to be escaped like \$PATH
echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
echo "### kfk env end" >> /etc/profile
fi
### start kfk
echo
echo "you can use command to start kfk..."
echo "$TARGET_PATH/bin/kafka-server-start.sh -daemon $TARGET_PATH/config/server.properties"
echo "or"
echo "cd $TARGET_PATH/"
echo "bin/kafka-server-start.sh -daemon config/server.properties"
echo
}
function main()
{
### Execute as needed
check_jdk
check_package
install_info
install_kfk
}
# Execute main method
main
# END
單機安裝
叢集安裝
./install_kafka.sh 1 kafka01
./install_kafka.sh 2 kafka02
./install_kafka.sh 3 kafka03
單機測試
# 檢視命令參數
kafka-topics.sh --help
# 指定server端hostname及port, 指定分割區數, 指定副本數, 建立 topic, 名稱爲 topic01
kafka-topics.sh --bootstrap-server kafka01:9092 --create --partitions 2 --replication-factor 2 --topic topic01
# 上述命令執行會報 Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1. 的錯誤
# 因爲目前爲單機, broker數量=1, 副本數不應該大於broker的數量(大於則多個副本存放一臺伺服器, 該伺服器宕機, 多個副本均失效, 失去了副本容災的意義)
# 建立 topic
kafka-topics.sh --bootstrap-server kafka01:9092 --create --partitions 2 --replication-factor 1 --topic topic01
# 檢視 topic
kafka-topics.sh --bootstrap-server kafka01:9092 --list
# 開啓5個shell視窗, 分別執行下列命令
# 分組1消費者1: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic topic01 --group group01
# 分組1消費者2: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic topic01 --group group01
# 分組1消費者3: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic topic01 --group group01
# 分組2消費者1: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092 --topic topic01 --group group02
# 生產者: 指定 --broker-list
kafka-console-producer.sh --broker-list kafka01:9092 --topic topic01
# 測試1
# 在 producer 所在shell依次輸入 1, 2, 3
# 可以看到:
# "分組1消費者1" 輸出了 1, 3
# "分組1消費者2" 輸出了 2
# "分組1消費者3" 什麼都沒輸出(沒有分配到分割區, 目前分割區數爲2)
# "分組2消費者1" 輸出了 1, 2, 3
# 測試2
# 在 "分組1消費者1" 所在shell視窗, 按下 ctrl+c
# 再在 producer 所在shell依次輸入 01, 02, 03
# 可以看到:
# "分組1消費者1" 停止了服務
# "分組1消費者2" 輸出了 01, 03
# "分組1消費者3" 輸出了 02
# "分組2消費者1" 輸出了 01, 02, 03
# 可知 kafka 按group(分組), 進行廣播訊息, 訊息根據 consumer 數量進行輪詢消費
# 一般 consumer數量 等於分割區數, 超過分割區數的consumer數量的伺服器可以作爲備機
# 當分配到分割區的伺服器宕機, 備機可以分配到分割區進行消費