當使用SDK連線到Azure Event Hub時,最常規的方式為使用連線字串。這種做法參考官網檔案就可成功完成程式碼:https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send
只是,如果使用Azure AD認證方式進行存取,程式碼需要如何修改呢? 如何來使用AAD的 TokenCredential呢?
在使用Connection String的時候,EventProcessorClientBuilder使用connectionString方法設定連線字串。
如果使用Azure AD認證,則需要先根據AAD中註冊應用獲取到Client ID, Tenant ID, Client Secret,然後把這些內容設定為系統環境變數
然後使用 credential 初始化 EventProcessorClientBuilder 物件
注意點:
1) DefaultAzureCredentialBuilder 需要指定 Authority Host為 Azure China
2) EventProcessorClientBuilder . Credential 方法需要指定Event Hub Namespce 的域名
Azure 提供了以下 Azure 內建角色,用於通過 Azure AD 和 OAuth 授予對事件中心資料的存取許可權:
本範例中,只需要接收資料,所以只賦予了 Azure Event Hubs Data Receiver許可權。
新增在pom.xml檔案中 dependencies 部分的內容為:azure-identity , azure-messaging-eventhubs , azure-messaging-eventhubs-checkpointstore-blob,最好都是用最新版本,避免出現執行時出現型別衝突或找不到
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>spdemo</artifactId> <version>1.0-SNAPSHOT</version> <name>spdemo</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs</artifactId> <version>5.12.2</version> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId> <version>1.14.0</version> </dependency> <dependency> <groupId>com.azure</groupId> <artifactId>azure-identity</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.5</version> <configuration> <archive> <manifest> <mainClass>com.example.App</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> </project>
package com.example; import com.azure.core.credential.TokenCredential; import com.azure.identity.AzureAuthorityHosts; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.messaging.eventhubs.*; import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore; import com.azure.messaging.eventhubs.models.*; import com.azure.storage.blob.*; import java.io.IOException; import java.sql.Date; import java.time.Instant; import java.time.temporal.TemporalUnit; import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; /** * Hello world! * */ public class App { // private static final String connectionString ="<connectionString>"; // private static final String eventHubName = "<eventHubName>"; // private static final String storageConnectionString = "<storageConnectionString>"; // private static final String storageContainerName = "<storageContainerName>"; public static void main(String[] args) throws IOException { System.out.println("Hello World!"); // String connectionString ="<connectionString>"; // The fully qualified namespace for the Event Hubs instance. This is likely to // be similar to: // {your-namespace}.servicebus.windows.net // String fullyQualifiedNamespace ="<your event hub namespace>.servicebus.chinacloudapi.cn"; // String eventHubName = "<eventHubName>"; String storageConnectionString = System.getenv("storageConnectionString"); String storageContainerName = System.getenv("storageContainerName"); String fullyQualifiedNamespace = System.getenv("fullyQualifiedNamespace"); String eventHubName = System.getenv("eventHubName"); TokenCredential credential = new DefaultAzureCredentialBuilder().authorityHost(AzureAuthorityHosts.AZURE_CHINA) .build(); // Create a blob container client that you use later to build an event processor // client to receive and process events BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder() .connectionString(storageConnectionString) .containerName(storageContainerName) .buildAsyncClient(); // EventHubProducerClient // EventHubProducerClient client = new EventHubClientBuilder() // .credential(fullyQualifiedNamespace, eventHubName, credential) // .buildProducerClient(); Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>(); initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000)); // EventProcessorClientBuilder // Create a builder object that you will use later to build an event processor // client to receive and process events and errors. EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder() // .connectionString(connectionString, eventHubName) .credential(fullyQualifiedNamespace, eventHubName, credential) .initialPartitionEventPosition(initialPartitionEventPosition) .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) .processEvent(PARTITION_PROCESSOR) .processError(ERROR_HANDLER) .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)); // EventPosition.f // Use the builder object to create an event processor client EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient(); System.out.println("Starting event processor"); eventProcessorClient.start(); System.out.println("Press enter to stop."); System.in.read(); System.out.println("Stopping event processor"); eventProcessorClient.stop(); System.out.println("Event processor stopped."); System.out.println("Exiting process"); } public static final Consumer<EventContext> PARTITION_PROCESSOR = eventContext -> { PartitionContext partitionContext = eventContext.getPartitionContext(); EventData eventData = eventContext.getEventData(); System.out.printf("Processing event from partition %s with sequence number %d with body: %s%n", partitionContext.getPartitionId(), eventData.getSequenceNumber(), eventData.getBodyAsString()); // Every 10 events received, it will update the checkpoint stored in Azure Blob // Storage. if (eventData.getSequenceNumber() % 10 == 0) { eventContext.updateCheckpoint(); } }; public static final Consumer<ErrorContext> ERROR_HANDLER = errorContext -> { System.out.printf("Error occurred in partition processor for partition %s, %s.%n", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable()); }; }
使用EventPosition物件中的fromSequenceNumber方法,可以指定一個序列號,Consume端會根據這個號碼獲取之後的訊息。其他的方法還有 fromOffset(指定遊標) / fromEnqueuedTime(指定一個時間點,獲取之後的訊息) / earliest(從最早開始) / latest(從最後開始獲取新的資料,舊資料不獲取)
Map<String, EventPosition> initialPartitionEventPosition = new HashMap<>(); initialPartitionEventPosition.put("0", EventPosition.fromSequenceNumber(3000));
注意:
Map<String, EventPosition> 中的String 物件為Event Hub的分割區ID,如果Event Hub有2個分割區,則它的值分別時0,1.
EventPosition 設定的值,只在Storage Account所儲存在CheckPoint Store中的值沒有,或者小於此處設定的值時,才會起效果。否則,Consume 會根據從Checkpoint中獲取的SequenceNumber為準。
對使用 Azure Active Directory 存取事件中心資源的應用程式進行身份驗證 : https://docs.azure.cn/zh-cn/event-hubs/authenticate-application
使用 Java 向/從 Azure 事件中心 (azure-messaging-eventhubs) 傳送/接收事件 : https://docs.azure.cn/zh-cn/event-hubs/event-hubs-java-get-started-send
[END]
當在複雜的環境中面臨問題,格物之道需:濁而靜之徐清,安以動之徐生。 雲中,恰是如此!