【Azure 事件中心】使用Azure AD認證方式建立Event Hub Consume Client + 自定義Event Position

2022-08-03 21:00:34

問題描述

當使用SDK連線到Azure Event Hub時,最常規的方式為使用連線字串。這種做法參考官網檔案就可成功完成程式碼:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

只是,如果使用Azure AD認證方式進行存取,程式碼需要如何修改呢? 如何來使用AAD的 TokenCredential呢?

 

分析問題

在使用Connection String的時候,EventProcessorClientBuilder使用connectionString方法設定連線字串。

 

如果使用Azure AD認證,則需要先根據AAD中註冊應用獲取到Client ID, Tenant ID, Client Secret,然後把這些內容設定為系統環境變數

  1. AZURE_TENANT_ID :對應AAD 註冊應用頁面的 Tenant ID
  2. AZURE_CLIENT_ID :對應AAD 註冊應用頁面的 Application (Client) ID
  3. AZURE_CLIENT_SECRET :對應AAD 註冊應用的 Certificates & secrets 中建立的Client Secrets

然後使用 credential 初始化 EventProcessorClientBuilder 物件

注意點:

1) DefaultAzureCredentialBuilder 需要指定 Authority Host為 Azure China

2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名

 

操作實現

第一步:為AAD註冊應用賦予操作Event Hub Data的許可權

Azure 提供了以下 Azure 內建角色,用於通過 Azure AD 和 OAuth 授予對事件中心資料的存取許可權:

  1. Azure 事件中心資料所有者 (Azure Event Hubs Data Owner): 使用此角色可以授予對事件中心資源的完全存取許可權。
  2. Azure 事件中心資料傳送者 (Azure Event Hubs Data Sender) : 使用此角色可以授予對事件中心資源的傳送存取許可權。
  3. Azure 事件中心資料接收者 (Azure Event Hubs Data Receiver): 使用此角色可以授予對事件中心資源的使用/接收存取許可權。

本範例中,只需要接收資料,所以只賦予了 Azure Event Hubs Data Receiver許可權。

 

第二步:在Java 專案中新增SDK依賴  

新增在pom.xml檔案中 dependencies 部分的內容為:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,最好都是用最新版本,避免出現執行時出現型別衝突或找不到

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.example</groupId>
  <artifactId>spdemo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>spdemo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs</artifactId>
      <version>5.12.2</version>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-identity</artifactId>
      <version>1.5.3</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.5.5</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.example.App</mainClass>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

 

第三步:新增完整程式碼

package com.example;

import com.azure.core.credential.TokenCredential;
import com.azure.identity.AzureAuthorityHosts;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.messaging.eventhubs.*;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.*;
import com.azure.storage.blob.*;

import java.io.IOException;
import java.sql.Date;
import java.time.Instant;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

/**
 * Hello world!
 *
 */
public class App {
    // private static final String connectionString ="<connectionString>";
    // private static final String eventHubName = "<eventHubName>";
    // private static final String storageConnectionString = "<storageConnectionString>";
    // private static final String storageContainerName = "<storageContainerName>";

    public static void main(String[] args) throws IOException {
        System.out.println("Hello World!");

        // String connectionString ="<connectionString>"; 
        // The fully qualified namespace for the Event Hubs instance. This is likely to
        // be similar to:
        // {your-namespace}.servicebus.windows.net
        // String fullyQualifiedNamespace ="<your event hub namespace>.servicebus.chinacloudapi.cn";
        // String eventHubName = "<eventHubName>";

        String storageConnectionString = System.getenv("storageConnectionString");
        String storageContainerName = System.getenv("storageContainerName");
        String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace");
        String eventHubName = System.getenv("eventHubName");

        TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA)
                .build();

        // Create a blob container client that you use later to build an event processor
        // client to receive and process events
        BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
                .connectionString(storageConnectionString)
                .containerName(storageContainerName)
                .buildAsyncClient();
        
    
        // EventHubProducerClient
        // EventHubProducerClient client = new EventHubClientBuilder()
        // .credential(fullyQualifiedNamespace, eventHubName, credential)
        // .buildProducerClient();

        Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();
        initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));
 
        // EventProcessorClientBuilder
        // Create a builder object that you will use later to build an event processor
        // client to receive and process events and errors.
        EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
                // .connectionString(connectionString, eventHubName)
                .credential(fullyQualifiedNamespace, eventHubName, credential)
                .initialPartitionEventPosition(initialPartitionEventPosition)
                .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
                .processEvent(PARTITION_PROCESSOR)
                .processError(ERROR_HANDLER)
                .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

        // EventPosition.f
        // Use the builder object to create an event processor client
        EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

        System.out.println("Starting event processor");
        eventProcessorClient.start();

        System.out.println("Press enter to stop.");
        System.in.read();

        System.out.println("Stopping event processor");
        eventProcessorClient.stop();
        System.out.println("Event processor stopped.");

        System.out.println("Exiting process");

    }

    public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> {
        PartitionContext partitionContext = eventContext.getPartitionContext();
        EventData eventData = eventContext.getEventData();

        System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n",
                partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString());

        // Every 10 events received, it will update the checkpoint stored in Azure Blob
        // Storage.
        if (eventData.getSequenceNumber() % 10 == 0) {
            eventContext.updateCheckpoint();
        }
    };

    public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> {
        System.out.printf("Error occurred in partition processor for partition %s, %s.%n",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
    };
}

 

 

 

附錄:自定義設定 Event Position,當程式執行時,指定從Event Hub中獲取訊息的 Sequence Number

使用EventPosition物件中的fromSequenceNumber方法,可以指定一個序列號,Consume端會根據這個號碼獲取之後的訊息。其他的方法還有 fromOffset(指定遊標) / fromEnqueuedTime(指定一個時間點,獲取之後的訊息) / earliest(從最早開始) / latest(從最後開始獲取新的資料,舊資料不獲取) 

    Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>();
    initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));

注意:

Map<String, EventPosition> 中的String 物件為Event Hub的分割區ID,如果Event Hub有2個分割區,則它的值分別時0,1.

EventPosition 設定的值,只在Storage Account所儲存在CheckPoint Store中的值沒有,或者小於此處設定的值時,才會起效果。否則,Consume 會根據從Checkpoint中獲取的SequenceNumber為準。

 

 

 

 

參考資料

授予對 Azure 事件中心的存取許可權 :https://docs.azure.cn/zh-cn/event-hubs/authorize-access-event-hubs

對使用 Azure Active Directory 存取事件中心資源的應用程式進行身份驗證 : https://docs.azure.cn/zh-cn/event-hubs/authenticate-application

使用 Java 向/從 Azure 事件中心 (azure-messaging-eventhubs) 傳送/接收事件 : https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send

 

 

 [END]