摘要:物聯網是現在比較熱門的軟體領域,眾多物聯網廠商都有自己的物聯網平臺,而物聯網平臺其中一個核心的模組就是Mqtt閘道器。
本文分享自華為雲社群《一文帶你掌握物聯網mqtt閘道器搭建背後的技術原理》,作者:張儉。
物聯網是現在比較熱門的軟體領域,眾多物聯網廠商都有自己的物聯網平臺,而物聯網平臺其中一個核心的模組就是Mqtt閘道器。這篇文章的目的是手把手教大家寫書寫一個mqtt閘道器,後端儲存支援Kafka/Pulsar,支援mqtt 連線、斷鏈、傳送訊息、訂閱訊息。技術選型:
核心pom依賴如下
<dependency> <groupId>io.netty</groupId> <artifactId>netty-codec-mqtt</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-transport</artifactId> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-original</artifactId> <version>${pulsar.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>${mqtt-client.version}</version> <scope>test</scope> </dependency>
軟體引數可謂是非常常見,複雜的開源專案,引數甚至可以達到上百個、組態檔長達數千行。我們需要的設定有
監聽埠的設定即使是寫demo也非常必要,常常用在單元測試中,由於單元測試跑完之後,即使網路伺服器關閉,作業系統也不會立即釋放埠,所以單元測試的時候指定隨機埠非常關鍵,在java中,我們可以通過這樣的工具類來獲取一個空閒的埠。未設定的話,我們就使用mqtt的預設埠1883。
package io.github.protocol.mqtt.broker.util; import java.io.IOException; import java.io.UncheckedIOException; import java.net.ServerSocket; public class SocketUtil { public static int getFreePort() { try (ServerSocket serverSocket = new ServerSocket(0)) { return serverSocket.getLocalPort(); } catch (IOException e) { throw new UncheckedIOException(e); } } }
我們的mqtt閘道器是沒有可靠的儲存能力的,依賴後端的訊息中介軟體來做持久化處理。後端規劃支援Pulsar、Kafka兩種型別。定義列舉類如下
public enum ProcessorType { KAFKA, PULSAR, }
對應的KafkaProcessorConfig、PulsarProcessorConfig比較簡單,包含基礎的連線地址即可,如果後續要做效能調優、安全,這塊還是會有更多的設定項
@Setter @Getter public class KafkaProcessorConfig { private String bootstrapServers = "localhost:9092"; public KafkaProcessorConfig() { } } @Setter @Getter public class PulsarProcessorConfig { private String httpUrl = "http://localhost:8080"; private String serviceUrl = "pulsar://localhost:6650"; public PulsarProcessorConfig() { } }
我們通過netty啟動一個mqttServer,新增mqtt解碼器
package io.github.protocol.mqtt.broker; import io.github.protocol.mqtt.broker.processor.KafkaProcessor; import io.github.protocol.mqtt.broker.processor.KafkaProcessorConfig; import io.github.protocol.mqtt.broker.processor.MqttProcessor; import io.github.protocol.mqtt.broker.processor.PulsarProcessor; import io.github.protocol.mqtt.broker.processor.PulsarProcessorConfig; import io.github.protocol.mqtt.broker.util.SocketUtil; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class MqttServer { private final MqttServerConfig mqttServerConfig; public MqttServer() { this(new MqttServerConfig()); } public MqttServer(MqttServerConfig mqttServerConfig) { this.mqttServerConfig = mqttServerConfig; if (mqttServerConfig.getPort() == 0) { mqttServerConfig.setPort(SocketUtil.getFreePort()); } } public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // decoder p.addLast(new MqttDecoder()); p.addLast(MqttEncoder.INSTANCE); } }); // Start the server. ChannelFuture f = b.bind(mqttServerConfig.getPort()).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private MqttProcessor processor(MqttServerConfig config) { return switch (config.getProcessorType()) { case KAFKA -> new KafkaProcessor(config.getMqttAuth(), config.getKafkaProcessorConfig()); case PULSAR -> new PulsarProcessor(config.getMqttAuth(), config.getPulsarProcessorConfig()); }; } public int getPort() { return mqttServerConfig.getPort(); } }
我們寫一個簡單的main函數用來啟動mqttServer,方便調測
package io.github.protocol.mqtt.broker; public class MqttServerStarter { public static void main(String[] args) throws Exception { new MqttServer().start(); } }
使用者端使用eclipse mqtt client進行測試
package io.github.protocol.mqtt; import lombok.extern.log4j.Log4j2; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @Log4j2 public class MqttClientPublishExample { public static void main(String[] args) throws Exception { String topic = "MQTT Examples"; String content = "Message from MqttPublishExample"; int qos = 2; String broker = "tcp://127.0.0.1:1883"; String clientId = "JavaSample"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); log.info("Connecting to broker: {}", broker); sampleClient.connect(connOpts); log.info("Connected"); log.info("Publishing message: {}", content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); sampleClient.publish(topic, message); log.info("Message published"); sampleClient.disconnect(); log.info("Disconnected"); System.exit(0); } catch (MqttException me) { log.error("reason {} msg {}", me.getReasonCode(), me.getMessage(), me); } } }
然後我們先執行MqttServer,再執行MqttClient,發現MqttClient卡住了
Connecting to broker: tcp://127.0.0.1:1883
這是為什麼呢,我們通過抓包發現僅僅只有使用者端傳送了Mqtt connect資訊,伺服器端並沒有響應
但是根據mqtt標準協定,傳送Connect訊息,必須要有ConnAck響應
所以我們需要在接收到Connect後,返回connAck訊息。我們建立一個MqttHandler,讓他繼承ChannelInboundHandlerAdapter, 用來接力MqttDecoder解碼完成後的訊息,這裡要重點繼承其中的channelRead方法,以及channelInactive方法,用來釋放斷鏈時需要釋放的資源
package com.github.shoothzj.mqtt; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; @Slf4j public class MqttHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); } }
然後把這個handler加入到netty的職責鏈中,放到解碼器的後面
在mqtt handler中插入我們的程式碼
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); if (msg instanceof MqttConnectMessage) { handleConnect(ctx, (MqttConnectMessage) msg); } else { log.error("Unsupported type msg [{}]", msg); } } private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { log.info("connect msg is [{}]", connectMessage); }
列印出connectMessage如下
[MqttConnectMessage[fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=22], variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=60], payload=MqttConnectPayload[clientIdentifier=JavaSample, willTopic=null, willMessage=null, userName=null, password=null]]]
通常,mqtt connect message中會包含qos、使用者名稱、密碼等資訊,由於我們啟動使用者端的時候也沒有攜帶使用者名稱和密碼,這裡獲取到的都為null,我們先不校驗這些訊息,直接給使用者端返回connack訊息,代表連線成功
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build();
ctx.channel().writeAndFlush(ackMessage);
我們再執行起Server和Client,隨後可以看到已經走過了Connect階段,進入了publish message過程,接下來我們再實現更多的其他場景
附上此階段的MqttHandler程式碼
package com.github.shoothzj.mqtt; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectPayload; import io.netty.handler.codec.mqtt.MqttConnectVariableHeader; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import lombok.extern.slf4j.Slf4j; import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; @Slf4j public class MqttHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); if (msg instanceof MqttConnectMessage) { handleConnect(ctx, (MqttConnectMessage) msg); } else { log.error("Unsupported type msg [{}]", msg); } } private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage connectMessage) { log.info("connect msg is [{}]", connectMessage); final MqttFixedHeader fixedHeader = connectMessage.fixedHeader(); final MqttConnectVariableHeader variableHeader = connectMessage.variableHeader(); final MqttConnectPayload connectPayload = connectMessage.payload(); final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck().returnCode(CONNECTION_ACCEPTED).build(); ctx.channel().writeAndFlush(ackMessage); } }
我們當前把所有的邏輯都放在MqttHandler裡面,不方便後續的擴充套件。抽象出一個MqttProcessor介面來處理具體的請求,MqttHandler負責解析MqttMessage的型別並分發。MqttProcess介面設計如下
package io.github.protocol.mqtt.broker.processor; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; public interface MqttProcessor { void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception; void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception; void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception; void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception; void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception; void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception; void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception; void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception; void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception; void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception; void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception; void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception; void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception; void processDisconnect(ChannelHandlerContext ctx) throws Exception; void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception; }
我們允許這些方法丟擲異常,當遇到極難處理的故障時,把mqtt連線斷掉(如後端儲存故障),等待使用者端的重連。
MqttHandler中來呼叫MqttProcessor,相關MqttHandler程式碼如下
Preconditions.checkArgument(message instanceof MqttMessage); MqttMessage msg = (MqttMessage) message; try { if (msg.decoderResult().isFailure()) { Throwable cause = msg.decoderResult().cause(); if (cause instanceof MqttUnacceptableProtocolVersionException) { // Unsupported protocol version MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader( MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, false), null); ctx.writeAndFlush(connAckMessage); log.error("connection refused due to invalid protocol, client address [{}]", ctx.channel().remoteAddress()); ctx.close(); return; } else if (cause instanceof MqttIdentifierRejectedException) { // ineligible clientId MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null); ctx.writeAndFlush(connAckMessage); log.error("ineligible clientId, client address [{}]", ctx.channel().remoteAddress()); ctx.close(); return; } throw new IllegalStateException(msg.decoderResult().cause().getMessage()); } MqttMessageType messageType = msg.fixedHeader().messageType(); if (log.isDebugEnabled()) { log.debug("Processing MQTT Inbound handler message, type={}", messageType); } switch (messageType) { case CONNECT: Preconditions.checkArgument(msg instanceof MqttConnectMessage); processor.processConnect(ctx, (MqttConnectMessage) msg); break; case CONNACK: Preconditions.checkArgument(msg instanceof MqttConnAckMessage); processor.processConnAck(ctx, (MqttConnAckMessage) msg); break; case PUBLISH: Preconditions.checkArgument(msg instanceof MqttPublishMessage); processor.processPublish(ctx, (MqttPublishMessage) msg); break; case PUBACK: Preconditions.checkArgument(msg instanceof MqttPubAckMessage); processor.processPubAck(ctx, (MqttPubAckMessage) msg); break; case PUBREC: processor.processPubRec(ctx, msg); break; case PUBREL: processor.processPubRel(ctx, msg); break; case PUBCOMP: processor.processPubComp(ctx, msg); break; case SUBSCRIBE: Preconditions.checkArgument(msg instanceof MqttSubscribeMessage); processor.processSubscribe(ctx, (MqttSubscribeMessage) msg); break; case SUBACK: Preconditions.checkArgument(msg instanceof MqttSubAckMessage); processor.processSubAck(ctx, (MqttSubAckMessage) msg); break; case UNSUBSCRIBE: Preconditions.checkArgument(msg instanceof MqttUnsubscribeMessage); processor.processUnsubscribe(ctx, (MqttUnsubscribeMessage) msg); break; case UNSUBACK: Preconditions.checkArgument(msg instanceof MqttUnsubAckMessage); processor.processUnsubAck(ctx, (MqttUnsubAckMessage) msg); break; case PINGREQ: processor.processPingReq(ctx, msg); break; case PINGRESP: processor.processPingResp(ctx, msg); break; case DISCONNECT: processor.processDisconnect(ctx); break; case AUTH: processor.processAuth(ctx, msg); break; default: throw new UnsupportedOperationException("Unknown MessageType: " + messageType); } } catch (Throwable ex) { ReferenceCountUtil.safeRelease(msg); log.error("Exception was caught while processing MQTT message, ", ex); ctx.close(); }
這裡的程式碼,主要是針對MqttMessage的不同型別,呼叫MqttProcessor的不同方法,值得一提的有兩點
維護Mqtt對談的session,主要用來持續跟蹤使用者端對談資訊,跟蹤在系統中佔用的資源等,考慮到無論是何種後端實現,都需要維護Mqtt的Session,我們構築一個AbstractMqttProcessor來維護MqttSession
package io.github.protocol.mqtt.broker.processor; import io.github.protocol.mqtt.broker.MqttSessionKey; import io.github.protocol.mqtt.broker.auth.MqttAuth; import io.github.protocol.mqtt.broker.util.ChannelUtils; import io.github.protocol.mqtt.broker.util.MqttMessageUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader; import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttFixedHeader; import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttMessageFactory; import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; import io.netty.handler.codec.mqtt.MqttPubAckMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttSubscribePayload; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.util.stream.IntStream; @Slf4j public abstract class AbstractProcessor implements MqttProcessor { protected final MqttAuth mqttAuth; public AbstractProcessor(MqttAuth mqttAuth) { this.mqttAuth = mqttAuth; } @Override public void processConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) throws Exception { String clientId = msg.payload().clientIdentifier(); String username = msg.payload().userName(); byte[] pwd = msg.payload().passwordInBytes(); if (StringUtils.isBlank(clientId) || StringUtils.isBlank(username)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, false), null); ctx.writeAndFlush(connAckMessage); log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress()); ctx.close(); return; } if (!mqttAuth.connAuth(clientId, username, pwd)) { MqttConnAckMessage connAckMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD, false), null); ctx.writeAndFlush(connAckMessage); log.error("the clientId username pwd cannot be empty, client address[{}]", ctx.channel().remoteAddress()); ctx.close(); return; } MqttSessionKey mqttSessionKey = new MqttSessionKey(); mqttSessionKey.setUsername(username); mqttSessionKey.setClientId(clientId); ChannelUtils.setMqttSession(ctx.channel(), mqttSessionKey); log.info("username {} clientId {} remote address {} connected", username, clientId, ctx.channel().remoteAddress()); onConnect(mqttSessionKey); MqttConnAckMessage mqttConnectMessage = (MqttConnAckMessage) MqttMessageFactory.newMessage( new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false), null); ctx.writeAndFlush(mqttConnectMessage); } protected void onConnect(MqttSessionKey mqttSessionKey) { } @Override public void processConnAck(ChannelHandlerContext ctx, MqttConnAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("conn ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processPublish(ChannelHandlerContext ctx, MqttPublishMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("publish, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); return; } if (msg.fixedHeader().qosLevel() == MqttQoS.FAILURE) { log.error("failure. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername()); return; } if (msg.fixedHeader().qosLevel() == MqttQoS.EXACTLY_ONCE) { log.error("does not support QoS2 protocol. clientId {}, username {} ", mqttSession.getClientId(), mqttSession.getUsername()); return; } onPublish(ctx, mqttSession, msg); } protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttPublishMessage msg) throws Exception { } @Override public void processPubAck(ChannelHandlerContext ctx, MqttPubAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processPubRec(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub rec, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processPubRel(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub rel, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processPubComp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("pub comp, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("sub, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } onSubscribe(ctx, mqttSession, msg.payload()); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); IntStream intStream = msg.payload().topicSubscriptions().stream().mapToInt(s -> s.qualityOfService().value()); MqttSubAckPayload payload = new MqttSubAckPayload(intStream.toArray()); ctx.writeAndFlush(MqttMessageFactory.newMessage( fixedHeader, MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()), payload)); } protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttSubscribePayload subscribePayload) throws Exception { } @Override public void processSubAck(ChannelHandlerContext ctx, MqttSubAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("sub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("unsub, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processUnsubAck(ChannelHandlerContext ctx, MqttUnsubAckMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("unsub ack, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processPingReq(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { ctx.writeAndFlush(MqttMessageUtil.pingResp()); } @Override public void processPingResp(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("ping resp, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } @Override public void processDisconnect(ChannelHandlerContext ctx) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("disconnect, client address {} not authed", ctx.channel().remoteAddress()); } onDisconnect(mqttSession); } protected void onDisconnect(MqttSessionKey mqttSessionKey) { } @Override public void processAuth(ChannelHandlerContext ctx, MqttMessage msg) throws Exception { MqttSessionKey mqttSession = ChannelUtils.getMqttSession(ctx.channel()); if (mqttSession == null) { log.error("auth, client address {} not authed", ctx.channel().remoteAddress()); ctx.close(); } } }
可以看到,這裡的AbstractProcessor主要是維護了MqttSessionKey,校驗MqttSessionKey,並攔截publish中不支援的Qos2、Failure。同時,也影響了mqtt心跳請求。同樣的,我們允許在onPublish、onSubscribe中丟擲異常。
基於訊息佇列實現的mqtt閘道器的基礎思想也比較簡單,簡而言之就是,有publish訊息的時候向訊息佇列中生產訊息。有訂閱的時候就從訊息佇列中拉取訊息。由此延伸出來,我們可能需要維護每個mqtt topic和producer、consumer的對應關係,因為像kafka、pulsar這些訊息中介軟體的消費者都是區分topic的,片段通用程式碼如下:
protected final ReentrantReadWriteLock.ReadLock rLock; protected final ReentrantReadWriteLock.WriteLock wLock; protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap; protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap; protected final Map<MqttTopicKey, P> producerMap; protected final Map<MqttTopicKey, C> consumerMap; public AbstractMqProcessor(MqttAuth mqttAuth) { super(mqttAuth); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); rLock = lock.readLock(); wLock = lock.writeLock(); this.sessionProducerMap = new HashMap<>(); this.sessionConsumerMap = new HashMap<>(); this.producerMap = new HashMap<>(); this.consumerMap = new HashMap<>(); } @Override protected void onConnect(MqttSessionKey mqttSessionKey) { wLock.lock(); try { sessionProducerMap.put(mqttSessionKey, new ArrayList<>()); sessionConsumerMap.put(mqttSessionKey, new ArrayList<>()); } finally { wLock.unlock(); } } @Override protected void onDisconnect(MqttSessionKey mqttSessionKey) { wLock.lock(); try { // find producers List<MqttTopicKey> produceTopicKeys = sessionProducerMap.get(mqttSessionKey); if (produceTopicKeys != null) { for (MqttTopicKey mqttTopicKey : produceTopicKeys) { P producer = producerMap.get(mqttTopicKey); if (producer != null) { ClosableUtils.close(producer); producerMap.remove(mqttTopicKey); } } } sessionProducerMap.remove(mqttSessionKey); List<MqttTopicKey> consumeTopicKeys = sessionConsumerMap.get(mqttSessionKey); if (consumeTopicKeys != null) { for (MqttTopicKey mqttTopicKey : consumeTopicKeys) { C consumer = consumerMap.get(mqttTopicKey); if (consumer != null) { ClosableUtils.close(consumer); consumerMap.remove(mqttTopicKey); } } } sessionConsumerMap.remove(mqttSessionKey); } finally { wLock.unlock(); } } }
由於kafka producer不區分topic,我們可以在kafka processor中複用producer,在將來單個kafka producer的效能到達上限時,我們可以將kafka producer擴充套件為kafka producer列表進行輪詢處理,消費者由於mqtt協定可能針對每個訂閱topic有不同的行為,不合適複用同一個消費者範例。我們在建構函式中啟動KafkaProducer
private final KafkaProcessorConfig kafkaProcessorConfig; private final KafkaProducer<String, ByteBuffer> producer; public KafkaProcessor(MqttAuth mqttAuth, KafkaProcessorConfig kafkaProcessorConfig) { super(mqttAuth); this.kafkaProcessorConfig = kafkaProcessorConfig; this.producer = createProducer(); } protected KafkaProducer<String, ByteBuffer> createProducer() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProcessorConfig.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer.class); return new KafkaProducer<>(properties); }
處理MqttPublish訊息,MqttPublish訊息包含如下幾個關鍵引數
MqttQoS mqttQoS = publishMessage.fixedHeader().qosLevel(); String topic = publishMessage.variableHeader().topicName(); ByteBuffer byteBuffer = publishMessage.payload().nioBuffer();
其中
根據topic、qos傳送訊息,程式碼如下
String topic = msg.variableHeader().topicName(); ProducerRecord<String, ByteBuffer> record = new ProducerRecord<>(topic, msg.payload().nioBuffer()); switch (msg.fixedHeader().qosLevel()) { case AT_MOST_ONCE -> producer.send(record, (metadata, exception) -> { if (exception != null) { log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, exception); return; } log.debug("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}", mqttSessionKey, metadata.topic(), metadata.partition(), metadata.offset()); }); case AT_LEAST_ONCE -> { try { RecordMetadata recordMetadata = producer.send(record).get(); log.info("mqttSessionKey {} send message to kafka success, topic {}, partition {}, offset {}", mqttSessionKey, recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()); ctx.writeAndFlush(MqttMessageUtil.pubAckMessage(msg.variableHeader().packetId())); } catch (Exception e) { log.error("mqttSessionKey {} send message to kafka error", mqttSessionKey, e); } } case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException( String.format("mqttSessionKey %s can not reach here", mqttSessionKey)); }
處理訂閱訊息,我們暫時僅根據訂閱的topic,建立topic進行消費即可,由於kafka原生使用者端建議的消費程式碼模式如下
while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, byte[]> record : records) { // do logic } }
我們需要切換到其他執行緒對consumer進行訊息,書寫一個KafkaConsumerListenerWrapper的wrapper,轉換為listener非同步消費模型
package io.github.protocol.mqtt.broker.processor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; @Slf4j public class KafkaConsumerListenerWrapper implements AutoCloseable { private final AdminClient adminClient; private final KafkaConsumer<String, byte[]> consumer; public KafkaConsumerListenerWrapper(KafkaProcessorConfig config, String username) { Properties adminProperties = new Properties(); adminProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); this.adminClient = KafkaAdminClient.create(adminProperties); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, username); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); this.consumer = new KafkaConsumer<>(properties); } public void start(String topic, KafkaMessageListener listener) throws Exception { try { TopicDescription topicDescription = adminClient.describeTopics(Collections.singletonList(topic)) .values().get(topic).get(); log.info("topic info is {}", topicDescription); } catch (ExecutionException ee) { if (ee.getCause() instanceof UnknownTopicOrPartitionException) { log.info("topic {} not exist, create it", topic); adminClient.createTopics(Collections.singletonList(new NewTopic(topic, 1, (short) 1))); } else { log.error("find topic info {} error", topic, ee); } } catch (Exception e) { throw new IllegalStateException("find topic info error", e); } consumer.subscribe(Collections.singletonList(topic)); log.info("consumer topic {} start", topic); new Thread(() -> { try { while (true) { ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, byte[]> record : records) { listener.messageReceived(record); } } } catch (WakeupException we) { consumer.close(); } catch (Exception e) { log.error("consumer topic {} consume error", topic, e); consumer.close(); } }).start(); Thread.sleep(5_000); } @Override public void close() throws Exception { log.info("wake up {} consumer", consumer); consumer.wakeup(); } } @Override protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttSubscribePayload subscribePayload) throws Exception { for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) { KafkaConsumerListenerWrapper consumer = createConsumer(mqttSessionKey, topicSubscription.topicName()); subscribe(ctx, consumer, topicSubscription.topicName()); } } private KafkaConsumerListenerWrapper createConsumer(MqttSessionKey mqttSessionKey, String topic) { MqttTopicKey mqttTopicKey = new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey); wLock.lock(); try { KafkaConsumerListenerWrapper consumer = consumerMap.get(mqttTopicKey); if (consumer == null) { consumer = new KafkaConsumerListenerWrapper(kafkaProcessorConfig, mqttSessionKey.getUsername()); sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> { if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.add(mqttTopicKey); return mqttTopicKeys; }); consumerMap.put(mqttTopicKey, consumer); } return consumer; } finally { wLock.unlock(); } } protected void subscribe(ChannelHandlerContext ctx, KafkaConsumerListenerWrapper consumer, String topic) throws Exception { BoundInt boundInt = new BoundInt(65535); consumer.start(topic, record -> { log.info("receive message from kafka, topic {}, partition {}, offset {}", record.topic(), record.partition(), record.offset()); MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage( MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), record.value()); ctx.writeAndFlush(mqttPublishMessage); }); }
在上述的程式碼中,有一個需要通篇注意的點:紀錄檔列印的時候,注意要將關鍵的資訊攜帶,比如:topic、mqtt username、mqtt clientId等,在寫demo的時候沒有感覺,但是海量請求下需要定位問題的時候,就知道這些資訊的關鍵之處了。
使用BountInt這個簡單的工具類來生成從0~65535的packageId,滿足協定的要求
pulsar相比kafka來說,更適合作為mqtt協定的代理。原因有如下幾點:
protected final ReentrantReadWriteLock.ReadLock rLock; protected final ReentrantReadWriteLock.WriteLock wLock; protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionProducerMap; protected final Map<MqttSessionKey, List<MqttTopicKey>> sessionConsumerMap; protected final Map<MqttTopicKey, Producer<byte[]>> producerMap; protected final Map<MqttTopicKey, Consumer<byte[]>> consumerMap; private final PulsarProcessorConfig pulsarProcessorConfig; private final PulsarAdmin pulsarAdmin; private final PulsarClient pulsarClient; public PulsarProcessor(MqttAuth mqttAuth, PulsarProcessorConfig pulsarProcessorConfig) { super(mqttAuth); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); rLock = lock.readLock(); wLock = lock.writeLock(); this.sessionProducerMap = new HashMap<>(); this.sessionConsumerMap = new HashMap<>(); this.producerMap = new HashMap<>(); this.consumerMap = new HashMap<>(); this.pulsarProcessorConfig = pulsarProcessorConfig; try { this.pulsarAdmin = PulsarAdmin.builder() .serviceHttpUrl(pulsarProcessorConfig.getHttpUrl()) .build(); this.pulsarClient = PulsarClient.builder() .serviceUrl(pulsarProcessorConfig.getServiceUrl()) .build(); } catch (Exception e) { throw new IllegalStateException("Failed to create pulsar client", e); } }
處理publish訊息
@Override protected void onPublish(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttPublishMessage msg) throws Exception { String topic = msg.variableHeader().topicName(); Producer<byte[]> producer = getOrCreateProducer(mqttSessionKey, topic); int len = msg.payload().readableBytes(); byte[] messageBytes = new byte[len]; msg.payload().getBytes(msg.payload().readerIndex(), messageBytes); switch (msg.fixedHeader().qosLevel()) { case AT_MOST_ONCE -> producer.sendAsync(messageBytes). thenAccept(messageId -> log.info("clientId [{}]," + " username [{}]. send message to pulsar success messageId: {}", mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId)) .exceptionally((e) -> { log.error("clientId [{}], username [{}]. send message to pulsar fail: ", mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e); return null; }); case AT_LEAST_ONCE -> { try { MessageId messageId = producer.send(messageBytes); MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) MqttMessageFactory.newMessage(fixedHeader, MqttMessageIdVariableHeader.from(msg.variableHeader().packetId()), null); log.info("clientId [{}], username [{}]. send pulsar success. messageId: {}", mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), messageId); ctx.writeAndFlush(pubAckMessage); } catch (PulsarClientException e) { log.error("clientId [{}], username [{}]. send pulsar error: {}", mqttSessionKey.getClientId(), mqttSessionKey.getUsername(), e.getMessage()); } } case EXACTLY_ONCE, FAILURE -> throw new IllegalStateException( String.format("mqttSessionKey %s can not reach here", mqttSessionKey)); } } private Producer<byte[]> getOrCreateProducer(MqttSessionKey mqttSessionKey, String topic) throws Exception { MqttTopicKey mqttTopicKey = new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey); rLock.lock(); try { Producer<byte[]> producer = producerMap.get(mqttTopicKey); if (producer != null) { return producer; } } finally { rLock.unlock(); } wLock.lock(); try { Producer<byte[]> producer = producerMap.get(mqttTopicKey); if (producer == null) { producer = createProducer(topic); sessionProducerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> { if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.add(mqttTopicKey); return mqttTopicKeys; }); producerMap.put(mqttTopicKey, producer); } return producer; } finally { wLock.unlock(); } } protected Producer<byte[]> createProducer(String topic) throws Exception { return pulsarClient.newProducer(Schema.BYTES).topic(topic).create(); }
處理subscribe訊息
@Override protected void onSubscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, MqttSubscribePayload subscribePayload) throws Exception { for (MqttTopicSubscription topicSubscription : subscribePayload.topicSubscriptions()) { subscribe(ctx, mqttSessionKey, topicSubscription.topicName()); } } protected void subscribe(ChannelHandlerContext ctx, MqttSessionKey mqttSessionKey, String topic) throws Exception { MqttTopicKey mqttTopicKey = new MqttTopicKey(); mqttTopicKey.setTopic(topic); mqttTopicKey.setMqttSessionKey(mqttSessionKey); wLock.lock(); try { Consumer<byte[]> consumer = consumerMap.get(mqttTopicKey); if (consumer == null) { consumer = createConsumer(ctx, mqttSessionKey.getUsername(), topic); sessionConsumerMap.compute(mqttSessionKey, (mqttSessionKey1, mqttTopicKeys) -> { if (mqttTopicKeys == null) { mqttTopicKeys = new ArrayList<>(); } mqttTopicKeys.add(mqttTopicKey); return mqttTopicKeys; }); consumerMap.put(mqttTopicKey, consumer); } } finally { wLock.unlock(); } } protected Consumer<byte[]> createConsumer(ChannelHandlerContext ctx, String username, String topic) throws Exception { BoundInt boundInt = new BoundInt(65535); try { PartitionedTopicStats partitionedStats = pulsarAdmin.topics().getPartitionedStats(topic, false); log.info("topic {} partitioned stats {}", topic, partitionedStats); } catch (PulsarAdminException.NotFoundException nfe) { log.info("topic {} not found", topic); pulsarAdmin.topics().createPartitionedTopic(topic, 1); } return pulsarClient.newConsumer(Schema.BYTES).topic(topic) .messageListener((consumer, msg) -> { log.info("receive message from pulsar, topic {}, message {}", topic, msg.getMessageId()); MqttPublishMessage mqttPublishMessage = MqttMessageUtil.publishMessage( MqttQoS.AT_LEAST_ONCE, topic, boundInt.nextVal(), msg.getData()); ctx.writeAndFlush(mqttPublishMessage); }) .subscriptionName(username).subscribe(); }
我們可以通過embedded-kafka-java這個專案來啟動用做單元測試的kafka broker。通過如下的group引入依賴
<dependency> <groupId>io.github.embedded-middleware</groupId> <artifactId>embedded-kafka-core</artifactId> <version>0.0.2</version> <scope>test</scope> </dependency>
我們就可以通過如下的程式碼啟動基於kafka的mqtt broker
@Slf4j public class MqttKafkaTestUtil { public static MqttServer setupMqttKafka() throws Exception { EmbeddedKafkaServer embeddedKafkaServer = new EmbeddedKafkaServer(); new Thread(() -> { try { embeddedKafkaServer.start(); } catch (Exception e) { log.error("kafka broker started exception ", e); } }).start(); Thread.sleep(5_000); MqttServerConfig mqttServerConfig = new MqttServerConfig(); mqttServerConfig.setPort(0); mqttServerConfig.setProcessorType(ProcessorType.KAFKA); KafkaProcessorConfig kafkaProcessorConfig = new KafkaProcessorConfig(); kafkaProcessorConfig.setBootstrapServers(String.format("localhost:%d", embeddedKafkaServer.getKafkaPort())); mqttServerConfig.setKafkaProcessorConfig(kafkaProcessorConfig); MqttServer mqttServer = new MqttServer(mqttServerConfig); new Thread(() -> { try { mqttServer.start(); } catch (Exception e) { log.error("mqsar broker started exception ", e); } }).start(); Thread.sleep(5000L); return mqttServer; } }
kafka端到端測試用例,比較簡單,通過mqtt client publish一條訊息,然後消費出來
@Log4j2 public class MqttKafkaPubSubTest { @Test public void pubSubTest() throws Exception { MqttServer mqttServer = MqttKafkaTestUtil.setupMqttKafka(); String topic = UUID.randomUUID().toString(); String content = "test-msg"; String broker = String.format("tcp://localhost:%d", mqttServer.getPort()); String clientId = UUID.randomUUID().toString(); MemoryPersistence persistence = new MemoryPersistence(); MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(UUID.randomUUID().toString()); connOpts.setPassword(UUID.randomUUID().toString().toCharArray()); connOpts.setCleanSession(true); log.info("Mqtt connecting to broker"); sampleClient.connect(connOpts); CompletableFuture<String> future = new CompletableFuture<>(); log.info("Mqtt subscribing"); sampleClient.subscribe(topic, (s, mqttMessage) -> { log.info("messageArrived"); future.complete(mqttMessage.toString()); }); log.info("Mqtt subscribed"); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(1); log.info("Mqtt message publishing"); sampleClient.publish(topic, message); log.info("Mqtt message published"); TimeUnit.SECONDS.sleep(3); sampleClient.disconnect(); String msg = future.get(5, TimeUnit.SECONDS); Assertions.assertEquals(content, msg); } }
我們可以通過embedded-pulsar-java這個專案來啟動用做單元測試的pulsar broker。通過如下的group引入依賴
<dependency> <groupId>io.github.embedded-middleware</groupId> <artifactId>embedded-pulsar-core</artifactId> <version>0.0.2</version> <scope>test</scope> </dependency>
我們就可以通過如下的程式碼啟動基於pulsar的mqtt broker
@Slf4j public class MqttPulsarTestUtil { public static MqttServer setupMqttPulsar() throws Exception { EmbeddedPulsarServer embeddedPulsarServer = new EmbeddedPulsarServer(); embeddedPulsarServer.start(); MqttServerConfig mqttServerConfig = new MqttServerConfig(); mqttServerConfig.setPort(0); mqttServerConfig.setProcessorType(ProcessorType.PULSAR); PulsarProcessorConfig pulsarProcessorConfig = new PulsarProcessorConfig(); pulsarProcessorConfig.setHttpUrl(String.format("http://localhost:%d", embeddedPulsarServer.getWebPort())); pulsarProcessorConfig.setServiceUrl(String.format("pulsar://localhost:%d", embeddedPulsarServer.getTcpPort())); mqttServerConfig.setPulsarProcessorConfig(pulsarProcessorConfig); MqttServer mqttServer = new MqttServer(mqttServerConfig); new Thread(() -> { try { mqttServer.start(); } catch (Exception e) { log.error("mqsar broker started exception ", e); } }).start(); Thread.sleep(5000L); return mqttServer; } }
pulsar端到端測試用例,比較簡單,通過mqtt client publish一條訊息,然後消費出來
@Log4j2 public class MqttPulsarPubSubTest { @Test public void pubSubTest() throws Exception { MqttServer mqttServer = MqttPulsarTestUtil.setupMqttPulsar(); String topic = UUID.randomUUID().toString(); String content = "test-msg"; String broker = String.format("tcp://localhost:%d", mqttServer.getPort()); String clientId = UUID.randomUUID().toString(); MemoryPersistence persistence = new MemoryPersistence(); MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setUserName(UUID.randomUUID().toString()); connOpts.setPassword(UUID.randomUUID().toString().toCharArray()); connOpts.setCleanSession(true); log.info("Mqtt connecting to broker"); sampleClient.connect(connOpts); CompletableFuture<String> future = new CompletableFuture<>(); log.info("Mqtt subscribing"); sampleClient.subscribe(topic, (s, mqttMessage) -> { log.info("messageArrived"); future.complete(mqttMessage.toString()); }); log.info("Mqtt subscribed"); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(1); log.info("Mqtt message publishing"); sampleClient.publish(topic, message); log.info("Mqtt message published"); TimeUnit.SECONDS.sleep(3); sampleClient.disconnect(); String msg = future.get(5, TimeUnit.SECONDS); Assertions.assertEquals(content, msg); } }
這裡我們簡單描述幾個效能優化點,像一些調整執行緒數、buffer大小這類的引數調整就不在這裡贅述了,這些需要具體的效能壓測來決定引數的設定。
public class EventLoopUtil { /** * @return an EventLoopGroup suitable for the current platform */ public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) { if (Epoll.isAvailable()) { return new EpollEventLoopGroup(nThreads, threadFactory); } else { return new NioEventLoopGroup(nThreads, threadFactory); } } public static Class<? extends ServerSocketChannel> getServerSocketChannelClass(EventLoopGroup eventLoopGroup) { if (eventLoopGroup instanceof EpollEventLoopGroup) { return EpollServerSocketChannel.class; } else { return NioServerSocketChannel.class; } } }
通過Epollo.isAvailable,以及在指定channel型別的時候通過判斷group的型別選擇對應的channel型別
EventLoopGroup acceptorGroup = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("mqtt-acceptor")); EventLoopGroup workerGroup = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("mqtt-worker")); b.group(acceptorGroup, workerGroup) // key point .channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // decoder p.addLast(new MqttDecoder()); p.addLast(MqttEncoder.INSTANCE); p.addLast(new MqttHandler(processor(mqttServerConfig))); } });
由於mqtt協定本身就有心跳機制,所以可以關閉tcp的keepalive,依賴mqtt協定層的心跳即可,節約海量連線下的效能。設定ChannelOption.SO_KEEPALIVE為false即可
.option(ChannelOption.SO_KEEPALIVE, false)
預設情況下,無論是單元測試中mqtt,還是pulsar producer和kafka producer的生產超時時間,都相對較長(一般為30s),如果在內網環境部署,可以將超時時間調整到5s。來避免無意義的超時等待
單個KafkaProducer會達到tcp鏈路頻寬的瓶頸,當有海量請求,而延時在kafka生產比較突出的情況下,可以考慮啟動多個KafkaProducer。並根據mqtt協定的特點(鏈路多,單個鏈路上qps不高),用mqttSessionKey的雜湊值來決定使用那個KafkaProducer傳送訊息
在KafkaProcessorConfig中新增如下設定,生產者個數,預設為1
private int producerNum = 1;
在初始化的時候,初始化Producer陣列,而不是單個Producer
this.producerArray = new KafkaProducer[kafkaProcessorConfig.getProducerNum()]; for (int i = 0; i < kafkaProcessorConfig.getProducerNum(); i++) { producerArray[i] = createProducer(); }
封裝一個方法來獲取producer
private Producer<String, ByteBuffer> getProducer(MqttSessionKey mqttSessionKey) { return producerArray[Math.abs(mqttSessionKey.hashCode() % kafkaProcessorConfig.getProducerNum())]; }
本文的程式碼均已上傳到github。我們這裡僅僅只實現了基礎的mqtt 連線、釋出、訂閱功能,甚至不支援暫停、取消訂閱。想要實現一個成熟商用的mqtt閘道器,我們還需要使用者隔離、對協定的更多支援、可靠性、可運維、流控、安全等能力。如有商用生產級別的mqtt需求,又無法快速構築成熟的mqtt閘道器的,可以選擇華為雲IoTDA服務,提供穩定可靠的mqtt服務,支援海量裝置連線上雲、裝置和雲端訊息雙向通訊能力。