1.手機端向主題 topic111 傳送訊息,並接收。(手機測試工具名稱:MQTT偵錯程式)
2.控制檯列印
MQTT 是用於物聯網 (IoT) 的 OASIS 標準訊息傳遞協定。它被設計為一種極其輕量級的釋出/訂閱訊息傳輸,非常適合連線具有小程式碼足跡和最小網路頻寬的遠端裝置。
MQTT 是使用者端伺服器釋出/訂閱訊息傳輸協定。它重量輕、開放、簡單,並且易於實施。這些特性使其非常適合在許多情況下使用,包括受限制的環境,例如機器對機器 (M2M) 和物聯網 (IoT) 環境中的通訊,其中需要小程式碼足跡和/或網路頻寬非常寶貴。
該協定通過 TCP/IP 或其他提供有序、無失真、雙向連線的網路協定執行。其特點包括:
· 使用釋出/訂閱訊息模式,提供一對多的訊息分發和應用程式的解耦。
· 與有效負載內容無關的訊息傳輸。
· 訊息傳遞的三種服務質量:
o 「最多一次」,根據操作環境的最大努力傳遞訊息。可能會發生訊息丟失。例如,此級別可用於環境感測器資料,其中單個讀數是否丟失並不重要,因為下一個讀數將很快釋出。
o 「至少一次」,保證訊息到達但可能出現重複。
o 「Exactly once」,保證訊息只到達一次。例如,此級別可用於重複或丟失訊息可能導致應用不正確費用的計費系統。
· 最小化傳輸開銷和協定交換以減少網路流量。
· 發生異常斷開時通知相關方的機制。
通過開放標準物聯網協定 MQTT、CoAP 和 LwM2M 連線任何裝置。使用 EMQX Enterprise 叢集輕鬆擴充套件到數千萬並行 MQTT 連線。
並且EMQX還是開源的,又支援叢集,所以還是一個比較不錯的選擇
1.兩臺伺服器:我的兩個伺服器一臺是騰訊雲、一臺是阿里雲的(不要問為什麼,薅羊毛得來的)咱們暫且叫他們 mqtt_service_aliyun和
## 1.下載 wget https://www.emqx.com/zh/downloads/broker/4.4.4/emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 2.安裝 sudo yum install emqx-4.4.4-otp24.1.5-3-el8-amd64.rpm ## 3.修改組態檔 vim /etc/emqx/emqx.conf ## 4.修改以下內容 ## 注意node.name是當前這臺伺服器名稱 node.name = [email protected] cluster.static.seeds = [email protected],[email protected] cluster.discovery = static cluster.name = my-mqtt-cluster
sudo emqx start
nginx搭建很簡單略過,大家只需要修改以下nginx.conf裡面的內容即可
stream { upstream mqtt.zhouhong.icu { zone tcp_servers 64k; hash $remote_addr; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; server xxx.xx.xxx.xx:1883 weight=1 max_fails=3 fail_timeout=30s; } server { listen 8883 ssl; status_zone tcp_server; proxy_pass mqtt.zhouhong.icu; proxy_buffer_size 4k; ssl_handshake_timeout 15s; ssl_certificate /etc/nginx/7967358_www.mqtt.zhouhong.icu.pem; ssl_certificate_key /etc/nginx/7967358_www.mqtt.zhouhong.icu.key; } }
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
server: port: 8080 mqtt:
## 單機版--只需要把域名改為ip既可 hostUrl: tcp://mqtt.zhouhong.icu:1883 username: admin password: public ## 伺服器端 clientId (傳送端自己定義) clientId: service_client_id cleanSession: true reconnect: true timeout: 100 keepAlive: 100 defaultTopic: topic111 qos: 0
/** * description: * date: 2022/6/16 15:51 * @author: zhouhong */ @Component @ConfigurationProperties("mqtt") @Data public class MqttProperties { /** * 使用者名稱 */ private String username; /** * 密碼 */ private String password; /** * 連線地址 */ private String hostUrl; /** * 使用者端Id,同一臺伺服器下,不允許出現重複的使用者端id */ private String clientId; /** * 預設連線主題 */ private String topic; /** * 超時時間 */ private int timeout; /** * 設定對談心跳時間 單位為秒 伺服器會每隔1.5*20秒的時間向用戶端 * 傳送個訊息判斷使用者端是否線上,但這個方法並沒有重連的機制 */ private int keepAlive; /** * 設定是否清空session,這裡如果設定為false表示伺服器會保留使用者端的連 * 接記錄,這裡設定為true表示每次連線到伺服器都以新的身份連線 */ private Boolean cleanSession; /** * 是否斷線重連 */ private Boolean reconnect; /** * 連線方式 */ private Integer qos; }
/** * description: 發生訊息成功後 的 回撥 * date: 2022/6/16 15:55 * * @author: zhouhong */ @Component @Log4j2 public class MqttSendCallBack implements MqttCallbackExtended { /** * 使用者端斷開後觸發 * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("傳送訊息回撥: 連線斷開,可以做重連"); } /** * 使用者端收到訊息觸發 * * @param topic 主題 * @param mqttMessage 訊息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("傳送訊息回撥: 接收訊息主題 : " + topic); log.info("傳送訊息回撥: 接收訊息內容 : " + new String(mqttMessage.getPayload())); } /** * 釋出訊息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("傳送訊息回撥: 向主題:" + topic + "傳送訊息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("傳送訊息回撥: 訊息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連線emq伺服器後觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "使用者端連線成功!--------------------"); } }
/** * description: 接收訊息後的回撥 * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptCallback implements MqttCallbackExtended { @Resource private MqttAcceptClient mqttAcceptClient; /** * 使用者端斷開後觸發 * * @param throwable */ @Override public void connectionLost(Throwable throwable) { log.info("接收訊息回撥: 連線斷開,可以做重連"); if (MqttAcceptClient.client == null || !MqttAcceptClient.client.isConnected()) { log.info("接收訊息回撥: emqx重新連線...................................................."); mqttAcceptClient.reconnection(); } } /** * 使用者端收到訊息觸發 * * @param topic 主題 * @param mqttMessage 訊息 */ @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { log.info("接收訊息回撥: 接收訊息主題 : " + topic); log.info("接收訊息回撥: 接收訊息內容 : " + new String(mqttMessage.getPayload())); } /** * 釋出訊息成功 * * @param token token */ @Override public void deliveryComplete(IMqttDeliveryToken token) { String[] topics = token.getTopics(); for (String topic : topics) { log.info("接收訊息回撥: 向主題:" + topic + "傳送訊息成功!"); } try { MqttMessage message = token.getMessage(); byte[] payload = message.getPayload(); String s = new String(payload, "UTF-8"); log.info("接收訊息回撥: 訊息的內容是:" + s); } catch (MqttException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } /** * 連線emq伺服器後觸發 * * @param b * @param s */ @Override public void connectComplete(boolean b, String s) { log.info("--------------------ClientId:" + MqttAcceptClient.client.getClientId() + "使用者端連線成功!--------------------"); // 以/#結尾表示訂閱所有以test開頭的主題 // 訂閱所有機構主題 mqttAcceptClient.subscribe("topic111", 0); } }
/** * description: 傳送訊息 * date: 2022/6/16 16:01 * * @author: zhouhong */ @Component public class MqttSendClient { @Autowired private MqttSendCallBack mqttSendCallBack; @Autowired private MqttProperties mqttProperties; public MqttClient connect() { MqttClient client = null; try { String uuid = UUID.randomUUID().toString().replaceAll("-",""); client = new MqttClient(mqttProperties.getHostUrl(),uuid , new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setCleanSession(true); options.setAutomaticReconnect(false); try { // 設定回撥 client.setCallback(mqttSendCallBack); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } return client; } /** * 釋出訊息 * 主題格式: server:report:$orgCode(引數實際使用機構程式碼) * * @param retained 是否保留 * @param pushMessage 訊息體 */ public void publish(boolean retained, String topic, String pushMessage) { MqttMessage message = new MqttMessage(); message.setQos(mqttProperties.getQos()); message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttClient mqttClient = connect(); try { mqttClient.publish(topic, message); } catch (MqttException e) { e.printStackTrace(); } finally { disconnect(mqttClient); close(mqttClient); } } /** * 關閉連線 * * @param mqttClient */ public static void disconnect(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.disconnect(); } } catch (MqttException e) { e.printStackTrace(); } } /** * 釋放資源 * * @param mqttClient */ public static void close(MqttClient mqttClient) { try { if (mqttClient != null) { mqttClient.close(); } } catch (MqttException e) { e.printStackTrace(); } } }
/** * description: 伺服器段端連線訂閱訊息、監控topic * date: 2022/6/16 15:52 * * @author: zhouhong */ @Component @Log4j2 public class MqttAcceptClient { @Autowired @Lazy private MqttAcceptCallback mqttAcceptCallback; @Autowired private MqttProperties mqttProperties; public static MqttClient client; private static MqttClient getClient() { return client; } private static void setClient(MqttClient client) { MqttAcceptClient.client = client; } /** * 使用者端連線 */ public void connect() { MqttClient client; try { // clientId 使用伺服器 yml裡面設定的 clientId client = new MqttClient(mqttProperties.getHostUrl(), mqttProperties.getClientId(), new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttProperties.getUsername()); options.setPassword(mqttProperties.getPassword().toCharArray()); options.setConnectionTimeout(mqttProperties.getTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAlive()); options.setAutomaticReconnect(mqttProperties.getReconnect()); options.setCleanSession(mqttProperties.getCleanSession()); MqttAcceptClient.setClient(client); try { // 設定回撥 client.setCallback(mqttAcceptCallback); client.connect(options); } catch (Exception e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } } /** * 重新連線 */ public void reconnection() { try { client.connect(); } catch (MqttException e) { e.printStackTrace(); } } /** * 訂閱某個主題 * * @param topic 主題 * @param qos 連線方式 */ public void subscribe(String topic, int qos) { log.info("==============開始訂閱主題==============" + topic); try { client.subscribe(topic, qos); } catch (MqttException e) { e.printStackTrace(); } } /** * 取消訂閱某個主題 * * @param topic */ public void unsubscribe(String topic) { log.info("==============開始取消訂閱主題==============" + topic); try { client.unsubscribe(topic); } catch (MqttException e) { e.printStackTrace(); } } }
/** * description: 啟動後連線 MQTT 伺服器, 監聽 mqtt/my_topic 這個topic傳送的訊息 * date: 2022/6/16 15:57 * @author: zhouhong */ @Configuration public class MqttConfig { @Resource private MqttAcceptClient mqttAcceptClient; @Bean public MqttAcceptClient getMqttPushClient() { mqttAcceptClient.connect(); return mqttAcceptClient; } }
/** * description: 發訊息控制類 * date: 2022/6/16 15:58 * * @author: zhouhong */ @RestController public class SendController { @Resource private MqttSendClient mqttSendClient; @PostMapping("/mqtt/sendmessage") public void sendMessage(@RequestBody SendParam sendParam) { mqttSendClient.publish(false,sendParam.getTopic(),sendParam.getMessageContent()); } }
2. 控制檯列印
本文來自部落格園,作者:Tom-shushu,轉載請註明原文連結:https://www.cnblogs.com/Tom-shushu/p/16390187.html