Redis+Hbase+RocketMQ 實際使用問題案例分享

2023-01-19 18:00:47

需求

  1. 將Hbase資料,解析後推播到RocketMQ。
  2. redis使用list資料型別,儲存了需要推播的資料的RowKey及表名。

簡單畫個流程圖就是:

分析及確定方案

Redis

  1. 明確list中元素結構{"rowkey":rowkey,"table":table}解析出rowkey;
  2. 一次取多個元素加快效率;
  3. 取了之後放入重試佇列,並刪除原來的元素;
  4. 處理資料永遠是重試佇列裡的,成功之後刪除,失敗就加上重試次數並重新放回;
  5. 明確從list中取值所使用的redis命令;範圍獲取LRANGE;範圍刪除(留下指定範圍的資料)LTRIM;判斷list長度LLEN;加入listRPUSH;刪除LREM等等;
  6. 從Hbase獲取資料失敗和傳送到mq失敗都令重試次數加一;
  7. 每次碰到重試次數不為0的資料都休眠1s;
  8. 設定最大重試次數,達到限制後丟棄;
  9. 考慮客戶redis部署方式,單機、主從、叢集、哨兵等;
  10. 選擇合適的使用者端,Jedis、Redisson、Lettuce等;
  11. 編寫不同的操作程式碼,也可以利用組態檔、環境變數、工廠模式等適配各種部署模式;

Hbase

  1. 基本理論知識學習(原來沒接觸過),rowkey是沒條資料的主鍵,限定符是欄位名,列族是多個限定名的集合等;當時看這個覺得不錯https://juejin.cn/post/6844903797655863309
  2. 因為是不停讀取資料、連結、Table不用close,可以快取起來,沒必要每次都建立;
  3. 確定批次獲取資料方式為批次Get,沒用scan
  4. 瞭解解析方式,一些網上的解析試了之後會亂碼,這邊用的是它自帶的CellUtil.clone相關方法;
  5. 考慮所有都沒資料時休眠10s;

RocketMQ

  1. 有現成的傳送程式碼,公司封裝好的;
  2. 調整傳送的速度、太快了伺服器端會吃不消(獲取Hbase資料速度太快了,最開始沒限制一會兒就入了百萬資料),設定超時時間(預設3s);
  3. 調整伺服器端的記憶體、執行緒數等引數;

實現

設定

#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.password=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# Zookeeper 叢集地址,逗號分隔
hbase.zookeeper.quorum=
# Zookeeper 埠
hbase.zookeeper.property.clientPort=2181
# 訊息目的rocketmq地址
rocketmq.server.host=
# 傳送訊息間隔時間,防止傳送過快mq受不了
rocketmq.send.interval.millisec=10
# 每次從redis讀取資料量限制。
data.access.redisDataSize=100
# 失敗資料重試次數,超過的直接丟棄
data.access.retryNum=10
# 需要接入的表,需要傳送到rocketmq的topic和在redis中的key的對映。xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back

部分程式碼

獲取設定,其餘的直接@Value("${}")

@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {

    /**
     * key:topic; value:redis的key
     */
    private Map<String, String> topicKeyMap = new HashMap<>();

    /**
     * 一次從redis中讀取資料量限制
     */
    private long redisDataSize = 50;

    /**
     * 失敗資料重試次數
     */
    private int retryNum = 10;

}

開啟接入:

@Component
public class AdapterRunner implements ApplicationRunner {

    @Resource
    private DataAccessService dataAccessService;

    @Override
    public void run(ApplicationArguments args) {
        System.out.println("專案已啟動,開始接入資料到RocketMQ……");
        dataAccessService.accessData2Mq();
    }
}

其他程式碼其實也在分析裡了。

踩坑

  1. mq傳送問題
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
	at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
	at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
	at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
	at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

上面分析也說了,注意傳送速度,有多少資源就接入多快。還有注意相關三個埠是否開放。

總結

程式很簡單,主要涉及方案的是,獲取redis的list資料時,是考慮效率,及加入重試策略,保證資料不丟失等。