我的 Kafka 旅程

2023-09-04 12:01:45

本文基於 Kafka 3.0+ 的 KRaft 模式來闡述

預設的 Kafka 不受認證約束,可不用賬號就可以連線到服務,也就是預設的 PLAIN 方式,不需要認證;設定了 SASL 認證之後,連線Kafka只能用憑證連線登入。

SASL 支援的認證方式有多種:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER;
其中 GSSAPI / OAUTHBEARER 都需要額外的獨立服務,顯得麻煩。
本文講述的是比較簡單的 SASL_PLAINTEXT 方式,認證機制統一為:SCRAM-SHA-512,自由的建立新使用者並授權到TOPIC。

那麼基於 SASL+PLAINTEXT+SCRAM 的認證模式,本文涵蓋的內容為:

  1. 設定伺服器端的認證模式
  2. 命令列建立新賬號
  3. 授權賬號到Topic的生產/消費許可權
  4. 命令列憑證接入樣例
  5. 使用者端憑證接入樣例

作者:[Sol·wang] - 部落格園,原文出處:https://www.cnblogs.com/Sol-wang/

一、建立新使用者

所以在不受認證約束的預設情況下,使用 kafka-configs.sh,可以在 Kafka 中建立新使用者:

# 建立使用者
bin/kafka-configs.sh --bootstrap-server {host}:9092 --alter \
    --entity-type users --entity-name {u-name} --add-config 'SCRAM-SHA-512=[password={user-password}]'
# 檢視使用者
bin/kafka-configs.sh --bootstrap-server {host}:9092 --describe --entity-type users --entity-name {u-name}

為什麼要先建立一個賬號

預設在沒有賬號的情況下,後續的認證授權生效後,用誰來連線到Kafka建立使用者呢??
當然是先有一個管理員賬號的存在,如上建立的賬號,就假設以上建立的賬號為管理員 admin。

使用者的分類

 - 超級管理員:對 KAFKA 的管理
 - 管理員:  用於第三方UI的接入
 - 寫入Topic:用於生產端的接入
 - 讀取Topic:用於消費端的接入

二、認證授權設定

接下來,讓建立的使用者起作用,就要設定認證授權機制(SASL_PLAINTEXT)

來編輯 config/kraft/server.properties 組態檔吧,如下:

# 認證方式(內/外) 設定
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093     # 對外認證方式 SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://localhost:9092    # Client(生產/消費)認證方式(不設定就用上一行代替)
inter.broker.listener.name=SASL_PLAINTEXT               # 內部節點之間的通訊認證方式
security.inter.broker.protocol=SASL_PLAINTEXT           # 內部通訊安全協定(不設定就用上一行代替)

# 認證機制 設定
sasl.enabled.mechanisms=SCRAM-SHA-512                   # SASL 定義支援的認證機制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512      # SASL 的內部認證機制(必須是上一行包含的值)

# 賬號限制 設定
super.users=User:{user-name};User:{user-name}           # 定義超級管理員
allow.everyone.if.no.acl.found=true                     # 生產/消費/等等,超級管理員之外的使用者是否可以連線

# 授權方式(類名) 設定
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

重啟 Kafka 服務,使其設定生效。此時,只能用定義了的超級管理員來連線操作 Kafka。那麼需要的使用者憑證於下節內容。

三、命令列中的憑證

假設之前已經建立了使用者 admin/*****,並且已設定為超級管理員。

為命令列模式建立此使用者的憑證檔案,這裡命名為 admin-user-jaas 儲存到伺服器目錄。
例如admin使用者憑證內容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="*****";

命令列模式,用憑證連線 Kafka 服務:

之後的每一步操作,都要帶上使用者憑證檔案,假設用超級使用者 admin 繼續建立新使用者的場景:

bin/kafka-configs.sh --bootstrap-server {host}:9092 \
    --alter --add-config 'SCRAM-SHA-512=[password={user-password}]' \
    --entity-type users --entity-name {new-user-name} --command-config admin-user-jaas

與之前不同的是多了個--command-config admin-user-jaas 也就是admin使用者的憑證資訊。這時若不帶憑證資訊會提示 disconnected。

四、使用者授權到TOPIC

使用者建立好了,接下來要為使用者授權到指定的 Topic 了,並指定寫入/讀取許可權。

通常寫入許可權等同於生產端的授權,那麼讀取許可權等同於消費端的授權。

# 授權使用者到指定 topic 的生產端(寫入)許可權
# --allow-principal:指定使用者
# --topic {topic-name}:指定某個主題
# --producer:指定為(生產端的)寫入許可權
bin/kafka-acls.sh --bootstrap-server {host}:9092 --add --producer \
    --allow-principal User:{user-name} --topic {topic-name} --command-config {admin-jaas}
#
# 授權使用者到指定 topic 的消費端(讀取)許可權
# --allow-principal:指定使用者
# --topic {topic-name}:指定某個主題
# --consumer:指定為(消費端的)讀取許可權
# --group {topic-group-name}:必須的消費組歸屬
bin/kafka-acls.sh --bootstrap-server {host}:9092 --add --consumer \
    --allow-principal User:{user-name} --topic {topic-name} --group {topic-group-name} --command-config {jaas-name}

隨著業務不斷的增長,建立更多的賬號並授權到各個TOPIC使用。

五、命令列用憑證接入

所有的使用者憑證檔案JAAS都相同,只不過賬號密碼不同,同樣的為使用者端使用者建立一個憑證檔案。

跟 admin-user-jaas 檔案一樣,建立各自賬號的JAAS憑證檔案後,這裡是生產端/消費端的命令列接入案例:

# 生產端 使用者憑證 連線到 Kafka
# --producer.config {使用者憑證檔案}
bin/kafka-console-producer.sh --bootstrap-server {host}:9092 --topic {topic-name} --producer.config upro-jaas
#
# 消費端 使用者憑證 連線到 Kafka
# --consumer.config {使用者憑證檔案}
bin/kafka-console-consumer.sh --bootstrap-server {host}:9092 --topic {topic-name} --consumer.config ucsm-jaas --from-beginning

六、.NET用憑證接入

也就是把各賬號憑證檔案JAAS中的內容,移到使用者端的設定類中,如下樣例:

var conf = new ConsumerConfig {
    GroupId = "test-cons-group",
    BootstrapServers = "192.168.56.101:9092",
    SaslUsername = "user-name",
    SaslPassword = "*********",
    SaslMechanism = SaslMechanism.ScramSha512,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslOauthbearerConfig = "org.apache.kafka.common.security.scram.ScramLoginModule"
};