kafka_02_安裝

2020-08-08 13:23:56
安裝

前置條件

  • 安裝jdk1.8+, 設定 JAVA_HOME
    通過shell安裝鏈接: install_jdk.sh.
  • 設定主機名和IP對映(需特別注意!!!)
# 優先設定解析
vim /etc/hosts
192.168.25.106 kafka01
192.168.25.107 kafka02
192.168.25.108 kafka03

# 臨時生效
hostname kafka01

# 再配合永久生效修改, 避免重新啓動實效
hostnamectl set-hostname kafka01
或(兩種方式等價, 上面命令會直接修改 /etc/hostname 檔案)
vim /etc/hostname 
kafka01

  • 關閉防火牆( CentOS7 , 每台伺服器都需要 )
# 停止firewall
systemctl stop firewalld.service
# 禁止firewall開機啓動
systemctl disable firewalld.service
# 檢視firewall狀態( Active: inactive (dead) )
systemctl status firewalld.service
  • 安裝&啓動 zookeeper(zookeeper-3.4.6.tar.gz)
    通過shell安裝鏈接: install_zookeeper.sh.
  • 安裝&啓動 kafka(kafka_2.11-2.2.0.tgz)

安裝指令碼
install_kafka.sh

#!/bin/bash

###############################################
#
#   GIN INSTALL KAFKA ENV
#	需要設定 INSTALL_PATH
#   執行shell需要參數$1(叢集中broker序號), 本機主機名, 例: ./kfk_install.sh 1 kafka01
#	config INSTALL_PATH (for broker id), hostname, execute like: ./kfk_install.sh 1 kafka01
#   需要設定 ZOOKEEPER_CLUSTER 單台則設定一個主機名:埠即可
#	configure ZOOKEEPER_CLUSTER, you can configure a hostname:port for a single
###############################################

# install_kfk_path
INSTALL_PATH='/home/kafka'
### Kafka links zookeeper cluster
ZOOKEEPER_CLUSTER='kafka01:2181,kafka02:2181,kafka03:2181'

### -------- 指令碼入參項 --------
### 表示安裝第幾台,broker id 會和該值一樣
### number of current machinei, such as: sh kfk_install.sh 1 kafka01
MACHINE_ID=$1
### 表示當前機器的主機名, 不能使用IP
### Indicates the host name of the current machine. IP cannot be used
LOACL_HOSTNAME=$2

### -------- 無需修改項 --------
PKG_NAME='kafka_2.11-2.2.0.tgz'
DIR_NAME='kafka_2.11-2.2.0'



function check_jdk()
{
	### kfk install need JAVA_HOME.  
	if [[ ! -d $JAVA_HOME ]];
	then
		echo "JAVA_HOME not set"
		exit 1
	else
		echo "JAVA_HOME=$JAVA_HOME"
	fi
} 

function check_package()
{
	### check install_path exists
	if [ ! -d "${INSTALL_PATH}" ];
	then
		echo "${INSTALL_PATH} not exit, mkdir"
		mkdir -p "${INSTALL_PATH}"
	else
		echo "${INSTALL_PATH} is exit"
	fi
	### get .tar.gz package name from current file
    PKG_NAME=`ls | grep kafka | grep .tgz`
	
	### check package
    if [ ! -f "${PKG_NAME}" ]
    then
		echo "you need install package!"
        exit
    fi    
 
    ### check unzip tmp dir
	DIR_NAME=`ls -l | grep '^d' |grep kafka |awk '{print$9}'`
	if [ -d "${DIR_NAME}" ];
    then
		echo "${DIR_NAME} is exit, rm unzip path"
        rm -rf "${DIR_NAME}"
	else
		echo "DIR_NAME is ok"
    fi
}

function install_info(){
	### execute shell param confirm 1
	if [ ! -z "$MACHINE_ID" ]; 
	then
		echo 'current MACHINE_ID='$MACHINE_ID
	else 
		### default value
		MACHINE_ID="1"
		echo "empty input param('$1'),using default 1" 
	fi
	
	### execute shell param confirm 2
	if [ ! -z "$LOACL_HOSTNAME" ]; 
	then
		echo 'current LOACL_HOSTNAME='$LOACL_HOSTNAME
	else 
		### default value
		LOACL_HOSTNAME="kafka01"
		echo "empty input param('$2'),using default kafka01" 
	fi
	
	echo
	echo "INSTALL_PATH: ${INSTALL_PATH}"
	echo "MACHINE_ID: ${MACHINE_ID}"
	echo "LOACL_HOSTNAME: ${LOACL_HOSTNAME}"
	echo "ZOOKEEPER_CLUSTER: ${ZOOKEEPER_CLUSTER}"
	echo

	while true; do
	    read -p "Check that the configuration, press [y/n] to continue: " yn
	    case $yn in
	        [Yy]* ) break;;
	        [Nn]* ) exit;;
	        * ) echo "please input y/n.";;
	    esac
	done
}

function install_kfk(){
	tar -xf $PKG_NAME
	### get file name
	DIR_NAME=`ls -l | grep '^d' |grep kafka |awk '{print$9}'`
	mv $DIR_NAME $INSTALL_PATH
	
	### kafka path
	TARGET_PATH=$INSTALL_PATH/$DIR_NAME
	
	### configuration file path and data path
	kfk_conf=$TARGET_PATH/config/server.properties
	kfk_data=$TARGET_PATH/data
	
	### config cluster
	### cluster id
	sed -i 's|broker.id=0|broker.id='$MACHINE_ID'|g' $kfk_conf
	### cluster hostname 
	sed -i 's|#listeners=PLAINTEXT://:9092|listeners=PLAINTEXT://'$LOACL_HOSTNAME':9092|g' $kfk_conf
	### cluster log data path
	mkdir -p $kfk_data
	sed -i 's|log.dirs=/tmp/kafka-logs|log.dirs='$kfk_data'|g' $kfk_conf
	### Kafka links zookeeper cluster
	sed -i 's|zookeeper.connect=localhost:2181|zookeeper.connect='$zookeeper_cluster'|g' $kfk_conf
	
	 
	### config env
	if [[ -z $KAFKA_HOME ]];then
		echo "### kfk env begin" >> /etc/profile
		echo "export KAFKA_HOME=$TARGET_PATH" >> /etc/profile
### use '' avoid $PATH resolved to actual value,if used "" $PATH need to be escaped like \$PATH
		echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile
		echo "### kfk env end" >> /etc/profile
	fi
	
	### start kfk
	echo
	echo "you can use command to start kfk..."
	echo "$TARGET_PATH/bin/kafka-server-start.sh -daemon $TARGET_PATH/config/server.properties"
	echo "or"
	echo "cd $TARGET_PATH/"
	echo "bin/kafka-server-start.sh -daemon config/server.properties"
	echo
}

 
function main()
{
	### Execute as needed
	check_jdk
	check_package
    install_info
	install_kfk
}
 
# Execute main method 
main
# END


單機安裝

  • 將 kafka_2.11-2.2.0.tgz 和 install_kafka.sh 放在同一個目錄下
  • 設定 INSTALL_PATH 和 ZOOKEEPER_CLUSTER
  • 執行 ./install_kafka.sh 1 kafka01

叢集安裝

  • 將 kafka_2.11-2.2.0.tgz 和 install_kafka.sh 放在同一個目錄下
  • 設定 INSTALL_PATH 和 ZOOKEEPER_CLUSTER
  • 每臺機器分別執行:

./install_kafka.sh 1 kafka01
./install_kafka.sh 2 kafka02
./install_kafka.sh 3 kafka03

單機測試

  • 建立 topic
# 檢視命令參數
kafka-topics.sh --help
# 指定server端hostname及port, 指定分割區數, 指定副本數, 建立 topic, 名稱爲 topic01
kafka-topics.sh --bootstrap-server kafka01:9092 --create --partitions 2 --replication-factor 2 --topic topic01
# 上述命令執行會報 Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1. 的錯誤
# 因爲目前爲單機, broker數量=1, 副本數不應該大於broker的數量(大於則多個副本存放一臺伺服器, 該伺服器宕機, 多個副本均失效, 失去了副本容災的意義)
# 建立 topic
kafka-topics.sh --bootstrap-server kafka01:9092 --create --partitions 2 --replication-factor 1 --topic topic01
# 檢視 topic
kafka-topics.sh --bootstrap-server kafka01:9092 --list
  • 生產消費 topic
# 開啓5個shell視窗, 分別執行下列命令
# 分組1消費者1: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092  --topic topic01 --group group01 
# 分組1消費者2: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092  --topic topic01 --group group01 
# 分組1消費者3: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092  --topic topic01 --group group01 
# 分組2消費者1: kafka按分組進行消費, 需指定組名
kafka-console-consumer.sh --bootstrap-server kafka01:9092  --topic topic01 --group group02 
# 生產者: 指定 --broker-list 
kafka-console-producer.sh --broker-list kafka01:9092  --topic topic01

# 測試1
# 在 producer 所在shell依次輸入 1, 2, 3
# 可以看到: 
# "分組1消費者1" 輸出了 1, 3
# "分組1消費者2" 輸出了 2
# "分組1消費者3" 什麼都沒輸出(沒有分配到分割區, 目前分割區數爲2)
# "分組2消費者1" 輸出了 1, 2, 3

# 測試2
# 在 "分組1消費者1" 所在shell視窗, 按下 ctrl+c
# 再在 producer 所在shell依次輸入 01, 02, 03
# 可以看到: 
# "分組1消費者1" 停止了服務
# "分組1消費者2" 輸出了 01, 03
# "分組1消費者3" 輸出了 02
# "分組2消費者1" 輸出了 01, 02, 03

# 可知 kafka 按group(分組), 進行廣播訊息, 訊息根據 consumer 數量進行輪詢消費
# 一般 consumer數量 等於分割區數, 超過分割區數的consumer數量的伺服器可以作爲備機
# 當分配到分割區的伺服器宕機, 備機可以分配到分割區進行消費