@
Druid servers建議將它們組織為三種伺服器型別:Master主伺服器、Query查詢伺服器和Data資料伺服器。
服務程序型別細分如下:
Druid能夠實現海量資料實時分析採取瞭如下特殊⼿段:
Apache Druid可以在攝入原始資料時使用稱為「roll-up」的過程進行彙總。roll-up是針對選定列集的一級聚合操作,可減小儲存資料的大小。分析查詢逃不開聚合操作,Druid在資料⼊庫時就提前進⾏了聚合,這就是所謂的預聚合(roll-up)。Druid把資料按照選定維度的相同的值進⾏分組聚合,可以⼤⼤降低儲存⼤⼩。資料查詢的時候只需要預聚合的資料基礎上進⾏輕量的⼆次過濾和聚合即可快速拿到分析結果。要做預聚合,Druid要求資料能夠分為三個部分:
使用網路流事件資料的一個小樣本,表示在特定秒內發生的從源到目的IP地址的流量的包和位元組計數,資料如下:
{"timestamp":"2018-01-01T01:01:35Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":20,"bytes":9024}
{"timestamp":"2018-01-01T01:01:51Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":255,"bytes":21133}
{"timestamp":"2018-01-01T01:01:59Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":11,"bytes":5780}
{"timestamp":"2018-01-01T01:02:14Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":38,"bytes":6289}
{"timestamp":"2018-01-01T01:02:29Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":377,"bytes":359971}
{"timestamp":"2018-01-01T01:03:29Z","srcIP":"1.1.1.1","dstIP":"2.2.2.2","packets":49,"bytes":10204}
{"timestamp":"2018-01-02T21:33:14Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":38,"bytes":6289}
{"timestamp":"2018-01-02T21:33:45Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":123,"bytes":93999}
{"timestamp":"2018-01-02T21:35:45Z","srcIP":"7.7.7.7","dstIP":"8.8.8.8","packets":12,"bytes":2818}
timestamp是Timestamp列,srcIP和dstIP是Dimension列(維度),packets和bytes是Metric列。資料⼊庫到Druid時如果開啟預聚合功能(可以不開啟聚合,資料量大建議開啟),要求對packets和bytes進⾏累加(sum),並且要求按條計數(count *),聚合之後的資料如下,可以看出聚合是以犧牲明細資料分析查詢為代價。
列式儲存的概念已經非常耳熟,但凡在⼤資料領域想要解決快速儲存和分析海量資料基本都會採⽤列式儲存,一般來說OLTP資料庫使用行式儲存,OLAP資料使用列式儲存。
Druid的資料在儲存層⾯是按照Datasource和Segments實現多級分割區儲存的,並建⽴了點陣圖索引。
[外連圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-xkKQdpk9-1675265402036)(http://www.itxiaoshen.com:3001/assets/1675159187424CTMTr68s.png)]
Segment跟Chunk
Druid資料儲存的攝取方式、聚合方式、每列資料儲存的位元組起始位都有儲存。
例如有一份資料如下
以tp為時間列,appkey和city為維度,以value為度量值,導⼊Druid後按天聚合,最終結果如下
聚合後資料經過聚合之後查詢本身就很快了,為了進⼀步加速對聚合之後資料的查詢,Druid會建立點陣圖索引如下
上⾯的點陣圖索引不是針對列⽽是針對列的值,記錄了列的值在資料的哪⼀行出現過,第一列是具體列的值,後續列標識該列的值在某⼀⾏是否出現過,依次是第1列到第n列。例如appkey1在第⼀⾏出現過,在其他⾏沒出現,那就是1000(例子中只有四個列)。
Select sum(value)
from xxx
where time='2019-11-11' and appkey in('appkey1','appkey2') and area='北京'
當我們有如上查詢時,⾸先根據時間段定位到segment,然後根據appkey in (‘appkey1’,’appkey2’) and area=’北京’ 查到各⾃的bitmap:(appkey1(1000) or appkey2(0110)) and 北京(1100) = (1100) 也就是說,符合條件的列是第⼀行和第⼆行,這兩⾏的metric的和為125.
在Druid中載入資料稱為攝取或索引。當攝取資料到Druid時,Druid從源系統讀取資料並將其儲存在稱為段的資料檔案中;通常,每個段檔案包含幾百萬行。
對於大多數攝取方法,Druid MiddleManager程序或Indexer程序載入源資料。唯一的例外是基於Hadoop的攝取,它在YARN上使用Hadoop MapReduce作業。
在攝入過程中,Druid建立片段並將它們儲存在深層儲存中。歷史節點將段載入到記憶體中以響應查詢。對於流輸入,中層管理人員和索引人員可以使用到達的資料實時響應查詢。
Druid包含流式和批次攝取方法,以下描述了適用於所有攝入方法的攝入概念和資訊。
流攝取:有兩個可用的選項;流攝取由一個持續執行的管理器控制。
批次攝取:有三種可供批次攝入的選擇。批次攝取作業與在作業期間執行的控制器任務相關聯。
Apache Druid支援兩種查詢語言:Druid SQL和本機查詢;可以使用Druid SQL查詢Druid資料來源中的資料。Druid將SQL查詢翻譯成其本地查詢語言。Druid SQL計劃發生在Broker上。設定Broker執行時屬性以設定查詢計劃和JDBC查詢。
Apache Druid 包含的API如下:
Apache Druid的本地查詢型別和本地查詢元件內容如下:
使用hadoop1、hadoop2、hadoop3共3臺搭建druid的叢集,如果有更多伺服器可以隨時啟動相應元件即可,叢集規模不大Master Server3臺和Query Server2臺即可,更多的是根據處理資料規模增加Data Server節點。
主機 | 元件 |
---|---|
hadoop1 | Master Server(Coordinator和Overlords) |
hadoop2 | Data Server(Historical和MiddleManager) |
hadoop3 | Query Server(Broker和Router) |
-- 建立一個druid資料庫,確保使用utf8mb4作為編碼
CREATE DATABASE druid DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_bin;
-- 建立一個druid使用者
CREATE USER 'druid'@'%' IDENTIFIED BY 'diurd';
-- 向用戶授予剛剛建立的資料庫的所有許可權
GRANT ALL PRIVILEGES ON druid.* TO druid@'%' WITH GRANT OPTION;
ALTER USER 'druid'@'%' IDENTIFIED WITH mysql_native_password BY 'druid';
FLUSH PRIVILEGES;
druid.host=hadoop1
# 在擴充套件載入列表中包含mysql-metadata-storage和下面使用的druid-hdfs-storage
druid.extensions.loadList=["druid-hdfs-storage", "druid-kafka-indexing-service", "druid-datasketches", "druid-multi-stage-query","mysql-metadata-storage"]
#druid.metadata.storage.type=derby
#druid.metadata.storage.connector.connectURI=jdbc:derby://localhost:1527/var/druid/metadata.db;create=true
#druid.metadata.storage.connector.host=localhost
#druid.metadata.storage.connector.port=1527
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://mysqlserver:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
#druid.storage.type=local
#druid.storage.storageDirectory=var/druid/segments
druid.storage.type=hdfs
druid.storage.storageDirectory=/druid/segments
#druid.indexer.logs.type=file
#druid.indexer.logs.directory=var/druid/indexing-logs
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=/druid/indexing-logs
druid.indexer.task.baseTaskDir=/var/druid/task
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=/var/druid/hadoop-tmp
vi conf/druid/cluster/_common/common.runtime.properties
druid.zk.service.host=zk1:2181,zk2:2181,zk3:2181
# 將apache-druid分別到另外兩臺伺服器上,並修改druid.host
rsync -az apache-druid-25.0.0/ hadoop2:/home/commons/apache-druid-25.0.0/
rsync -az apache-druid-25.0.0/ hadoop3:/home/commons/apache-druid-25.0.0/
# hadoop1上啟動Master Serve
bin/start-cluster-master-no-zk-server
# hadoop2上啟動Data Server
bin/start-cluster-data-server
# hadoop3上啟動Query Server
bin/start-cluster-query-server
# 如果叢集規模較大需要分離程序模組,也可以單獨啟動
bin/coordinator.sh start
bin/overlord.sh start
bin/historical.sh start
bin/middleManager.sh start
bin/broker.sh start
bin/jconsole.sh start
# 單獨關閉
bin/coordinator.sh stop
bin/overlord.sh stop
bin/historical.sh stop
bin/middleManager.sh stop
bin/broker.sh stop
bin/jconsole.sh stop
啟動完畢後存取查詢節點的Druid的控制檯UI,http://hadoop3:8888/,點選Services欄目可以看到所有程序服務詳細資訊
# 先將官方提供的範例資料上傳到hdfs
hdfs dfs -put wikiticker-2015-09-12-sampled.json.gz /tmp/my-druid
然後和前面單機版匯入操作流程相似,只是選擇輸入型別為HDFS,填寫Paths為上面上傳的路徑/tmp/my-druid/wikiticker-2015-09-12-sampled.json.gz
生成SQL如下,修改表名為wikipedia(原來為data)
REPLACE INTO "wikipedia" OVERWRITE ALL
WITH "ext" AS (SELECT *
FROM TABLE(
EXTERN(
'{"type":"hdfs","paths":"/tmp/my-druid/wikiticker-2015-09-12-sampled.json.gz"}',
'{"type":"json"}',
'[{"name":"time","type":"string"},{"name":"channel","type":"string"},{"name":"cityName","type":"string"},{"name":"comment","type":"string"},{"name":"countryIsoCode","type":"string"},{"name":"countryName","type":"string"},{"name":"isAnonymous","type":"string"},{"name":"isMinor","type":"string"},{"name":"isNew","type":"string"},{"name":"isRobot","type":"string"},{"name":"isUnpatrolled","type":"string"},{"name":"metroCode","type":"long"},{"name":"namespace","type":"string"},{"name":"page","type":"string"},{"name":"regionIsoCode","type":"string"},{"name":"regionName","type":"string"},{"name":"user","type":"string"},{"name":"delta","type":"long"},{"name":"added","type":"long"},{"name":"deleted","type":"long"}]'
)
))
SELECT
TIME_PARSE("time") AS "__time",
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
"delta",
"added",
"deleted"
FROM "ext"
PARTITIONED BY DAY
檢視資料來源可以看到wikipedia表資訊
檢視HDFS上也有相應的段資料
輸入SQL,點選執行查詢資料
SELECT
channel,
COUNT(*)
FROM "wikipedia"
GROUP BY channel
ORDER BY COUNT(*) DESC
可以通過http請求查詢,這裡以官方範例TopN查詢為例
curl -X POST 'http://hadoop3:8888/druid/v2/?pretty' -H 'Content-Type:application/json' -d @wikipedia-top-pages.json
檢視資料攝取的任務資訊
檢視段資訊