RocketMQ部署

2022-09-15 21:03:46

RocketMQ部署手冊

單MasterRocketMQ叢集

系統要求與準備條件

  1. 64位元作業系統,推薦 Linux/Unix/macOS

  2. 64位元 JDK 1.8+

  3. Maven

    tips

    檢驗java環境與maven環境

    java -version

    mvn -v

下載安裝Apache RocketMQ

RocketMQ 的安裝包分為兩種,二進位制包和原始碼包,二進位制包是已經編譯完成後可以直接執行的,原始碼包是需要編譯後執行的。

啟動NameServer

### 啟動namesrv
$ nohup sh bin/mqnamesrv &
 
### 驗證namesrv是否啟動成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

我們可以在namesrv.log 中看到 'The Name Server boot success..', 表示NameServer 已成功啟動。

啟動Broker

### 先啟動broker
$ nohup sh bin/mqbroker -n localhost:9876 &

### 驗證broker是否啟動成功, 比如, broker的ip是192.168.1.2 然後名字是broker-a
$ tail -f ~/logs/rocketmqlogs/Broker.log 
The broker[broker-a,192.169.1.2:10911] boot success...

我們可以在 Broker.log 中看到「The broker[brokerName,ip:port] boot success..」,這表明 broker 已成功啟動。

工具測試訊息收發

在進行工具測試訊息收發之前,我們需要告訴使用者端NameServer的地址,RocketMQ有多種方式在使用者端中設定NameServer地址,這裡我們利用環境變數NAMESRV_ADDR

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
 SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
 ConsumeMessageThread_%d Receive New Messages: [MessageExt...

安裝視覺化控制檯

1.下載專案

在 GitHub 中搜尋 rocketmq-externals,其中 rocketmq-console 就是 RocketMQ 視覺化控制檯,我們可以將原始碼克隆下來,然後自己 mvn package,然後執行 jar 包。

或者直接下載官方提供的 1.0.0 版本的 rocketmq-console

https://github.com/apache/rocketmq-externals/releases/tag/rocketmq-console-1.0.0

下載 zip 包或者 tar 包

  • 修改組態檔application.properties

    設定rocketmq.config.namesrvAddr屬性的值,即nameserver的服務地址

    rocketmq.config.namesrvAddr=localhost:9876
    

  • 儲存修改後的組態檔,返回rocketmq-console目錄

  • 使用maven打包命令打包

    mvn clean package -Dmaven.test.skip=true
    
  • 打包完成後進入target目錄

rocketmq-console-ng-2.0.0.jar即為打包後得到的jar包

  • 啟動程式

    nohup java -jar rocketmq-console-ng-2.0.0.jar &
    

SDK測試訊息收發

準備工作

啟動 NameServer 和 broker

nohup sh bin/mqnamesrv >mqnamesrv-log.txt &

nohup sh bin/mqbroker -n 127.0.0.1:9876 >mqbroker-log.txt &

啟動控制檯

mvn spring-boot:run

建立一個 topic名為 test_quick_topic

工具測試完成後,我們可以嘗試使用 SDK 收發訊息。這裡以 Java SDK 為例介紹一下訊息收發過程。

  • 在IDEA中建立一個Java工程。

  • pom.xml 檔案中新增以下依賴引入Java依賴庫。

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.13.graal</version>
        </dependency>
    </dependencies>

生產者

  • 同步投遞10條訊息
package TestProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class TestProducer01 {
    public static final String NAMESRV_ADDR = "127.0.0.1:9876";
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
        producer.setNamesrvAddr(NAMESRV_ADDR);
        producer.start();
        for (int i = 0; i <10; i++) {
            Message message = new Message("test_quick_topic",//主題
                    "tagA", //標籤
                    "key" + i, //自定義key,唯一標識
                    ("第" + i+"次訊息").getBytes()); //訊息內容實體 (byte[])
            SendResult result = producer.send(message);
            System.out.println("第" + i + "條訊息發出,結果:" + result);
        }
        producer.shutdown();
    }
}

消費者

  • 消費上面生產者生產的10條訊息
package TestConsumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.List;

public class TestConsumer01 {
    public static final String NAMESRV_ADDR = "127.0.0.1:9876";
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        //從最後開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//        consumer.subscribe("test_quick_topic","tagA"); //過濾:消費tag為tagA的訊息
        consumer.subscribe("test_quick_topic", "*"); //消費所有的
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = list.get(0);
                try {
                    String topic = messageExt.getTopic();
                    String tags = messageExt.getTags();
                    String keys = messageExt.getKeys();
                    String msgBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                    System.out.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ", body: " + msgBody);
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("comsumer start");
    }
}

關閉伺服器

$ sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK

$ sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK