簡單畫個流程圖就是:
Redis
{"rowkey":rowkey,"table":table}
解析出rowkey;LRANGE
;範圍刪除(留下指定範圍的資料)LTRIM
;判斷list長度LLEN
;加入listRPUSH
;刪除LREM
等等;Hbase
Get
,沒用scan
;CellUtil.clone
相關方法;RocketMQ
設定
#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.password=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# Zookeeper 叢集地址,逗號分隔
hbase.zookeeper.quorum=
# Zookeeper 埠
hbase.zookeeper.property.clientPort=2181
# 訊息目的rocketmq地址
rocketmq.server.host=
# 傳送訊息間隔時間,防止傳送過快mq受不了
rocketmq.send.interval.millisec=10
# 每次從redis讀取資料量限制。
data.access.redisDataSize=100
# 失敗資料重試次數,超過的直接丟棄
data.access.retryNum=10
# 需要接入的表,需要傳送到rocketmq的topic和在redis中的key的對映。xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back
部分程式碼
獲取設定,其餘的直接@Value("${}")
:
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {
/**
* key:topic; value:redis的key
*/
private Map<String, String> topicKeyMap = new HashMap<>();
/**
* 一次從redis中讀取資料量限制
*/
private long redisDataSize = 50;
/**
* 失敗資料重試次數
*/
private int retryNum = 10;
}
開啟接入:
@Component
public class AdapterRunner implements ApplicationRunner {
@Resource
private DataAccessService dataAccessService;
@Override
public void run(ApplicationArguments args) {
System.out.println("專案已啟動,開始接入資料到RocketMQ……");
dataAccessService.accessData2Mq();
}
}
其他程式碼其實也在分析裡了。
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
上面分析也說了,注意傳送速度,有多少資源就接入多快。還有注意相關三個埠是否開放。
程式很簡單,主要涉及方案的是,獲取redis的list資料時,是考慮效率,及加入重試策略,保證資料不丟失等。
作者: letscrazy
出處: https://www.cnblogs.com/letscrazy/
關於作者:letscrazy
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出, 原文連結 如有問題, 可郵件([email protected])諮詢.