作者:vivo 網際網路搜尋團隊- Deng Jie
Kafka中的實時資料是以Topic的概念進行分類儲存,而Topic的資料是有一定時效性的,比如儲存24小時、36小時、48小時等。而在定位一些實時資料的Case時,如果沒有對實時資料進行歷史歸檔,在排查問題時,沒有紀錄檔追述,會很難定位是哪個環節的問題。
Kafka中的實時資料是以Topic的概念進行分類儲存,而Topic的資料是有一定時效性的,比如儲存24小時、36小時、48小時等。而在定位一些實時資料的Case時,如果沒有對實時資料進行歷史歸檔,在排查問題時,沒有紀錄檔追述,會很難定位是哪個環節的問題。因此,我們需要對處理的這些實時資料進行記錄歸檔並儲存。
這裡以i視訊和vivo短視訊實時資料為例,之前存在這樣的共同作業問題:
資料上游內容方提供實時Topic(存放i視訊和vivo短視訊相關實時資料),資料側對實時資料進行邏輯處理後,傳送給下游工程去建庫實時索引,當任務執行一段時間後,工程側建索引偶爾會提出資料沒有傳送過去的Case,前期由於沒有對資料做儲存,在定位問題的時候會比較麻煩,經常需求檢視實時紀錄檔,需要花費很長的時間來分析這些Case是出現在哪個環節。
為了解決這個問題,我們可以將實時Topic中的資料,在傳送給其他Topic的時候,新增跟蹤機制,進行資料分流,Sink到儲存媒介(比如HDFS、Hive等)。這裡,我們選擇使用Hive來進行儲存,主要是查詢方便,支援SQL來快速查詢。如下圖所示:
在實現優化後的方案時,有兩種方式可以實現跟蹤機制,它們分別是Flink SQL寫Hive、Flink DataStream寫Hive。接下來,分別對這兩種實現方案進行介紹和實踐。
這種方式比較直接,可以在Flink任務裡面直接操作實時Topic資料後,將消費後的資料進行分流跟蹤,作為紀錄檔記錄寫入到Hive表中,具體實現步驟如下:
構造Hive Catalog;
建立Hive表;
寫入實時資料到Hive表。
在構造Hive Catalog時,需要初始化Hive的相關資訊,部分程式碼片段如下所示:
// 設定執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
// 構造 Hive Catalog 名稱
String name = "video-hive-catalog";
// 初始化資料庫名
String defaultDatabase = "comsearch";
// Hive 組態檔路徑地址
String hiveConfDir = "/appcom/hive/conf";
// Hive 版本號
String version = "3.1.2";
// 範例化一個 HiveCatalog 物件
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
// 註冊HiveCatalog
tEnv.registerCatalog(name, hive);
// 設定當前 HiveCatalog
tEnv.useCatalog(name);
// 設定執行SQL為Hive
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 使用資料庫
tEnv.useDatabase("db1");
在以上程式碼中,我們首先設定了 Flink 的執行環境和表環境,然後建立了一個 HiveCatalog,並將其註冊到表環境中。
如果Hive表不存在,可以通過在程式中執行建表語句,具體SQL見表語句程式碼如下所示:
-- 建立表語句
tEnv.executeSql("CREATE TABLE IF NOT EXISTS TABLE `xxx_table`(
`content_id` string,
`status` int)
PARTITIONED BY (
`dt` string,
`h` string,
`m` string)
stored as ORC
TBLPROPERTIES (
'auto-compaction'='true',
'sink.partition-commit.policy.kind'='metastore,success-file',
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)")
在建立Hive表時我們使用了IF NOT EXISTS關鍵字,如果Hive中該表不存在會自動在Hive上建立,也可以提前在Hive中建立好該表,Flink SQL中就無需再執行建表SQL,因為用了Hive的Catalog,Flink SQL執行時會找到表。這裡,我們設定了auto-compaction屬性為true,用來使小檔案自動合併,1.12版的新特性,解決了實時寫Hive產生的小檔案問題。同時,指定metastore值是專門用於寫入Hive的,也需要指定success-file值,這樣CheckPoint觸發完資料寫入磁碟後會建立_SUCCESS檔案以及Hive metastore上建立後設資料,這樣Hive才能夠對這些寫入的資料可查。
在準備完成2.2.1和2.2.2中的步驟後,接下來就可以在Flink任務中通過SQL來對實時資料進行操作了,具體實現程式碼片段如下所示:
// 編寫業務SQL
String insertSql = "insert into xxx_table SELECT content_id, status, " +
" DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM xxx_rt";
// 執行 Hive SQL
tEnv.executeSql(insertSql);
// 執行任務
env.execute();
將消費後的資料進行分類,編寫業務SQL語句,將消費的資料作為紀錄檔記錄,傳送到Hive表進行儲存,這樣Kafka中的實時資料就儲存到Hive了,方便使用Hive來對Kafka資料進行即席分析。
使用這種方式在處理的過程中,如果設定使用的是EventTime,在程式中設定'sink.partition-commit.trigger'='partition-time',最後會出現無法提交分割區的情況。經過對原始碼PartitionTimeCommitTigger的分析,找到了出現這種異常情況的原因。
我們可以通過看
org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions
中的一個函數,來說明具體的問題,部分原始碼片段如下:
// PartitionTimeCommitTigger原始碼函數程式碼片段
@Override
public List<String> committablePartitions(long checkpointId) {
if (!watermarks.containsKey(checkpointId)) {
throw new IllegalArgumentException(String.format(
"Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
checkpointId, watermarks));
}
long watermark = watermarks.get(checkpointId);
watermarks.headMap(checkpointId, true).clear();
List<String> needCommit = new ArrayList<>();
Iterator<String> iter = pendingPartitions.iterator();
while (iter.hasNext()) {
String partition = iter.next();
// 通過分割區的值來獲取分割區的時間
LocalDateTime partTime = extractor.extract(
partitionKeys, extractPartitionValues(new Path(partition)));
// 判斷水印是否大於分割區建立時間+延遲時間
if (watermark > toMills(partTime) + commitDelay) {
needCommit.add(partition);
iter.remove();
}
}
return needCommit;
}
通過分析上述程式碼片段,我們可以知道系統通過分割區值來抽取相應的分割區來建立時間,然後進行比對,比如我們設定的時間 pattern 是 '$dt $h:$m:00' , 某一時刻我們正在往 /2022-02-26/18/20/ 這個分割區下寫資料,那麼程式根據分割區值,得到的 pattern 將會是2022-02-26 18:20:00,這個值在SQL中是根據 DATA_FORMAT 函數獲取的。
而這個值是帶有時區的,比如我們的時區設定為東八區,2022-02-26 18:20:00這個時間是東八區的時間,換成標準 UTC 時間是減去8個小時,也就是2022-02-26 10:20:00,而在原始碼中的 toMills 函數在處理這個東八區的時間時,並沒有對時區進行處理,把這個其實應該是東八區的時間當做了 UTC 時間來處理,這樣計算出來的值就比實際值大8小時,導致一直沒有觸發分割區的提交。
如果我們在資料來源中構造的分割區是 UTC 時間,也就是不帶分割區的時間,那麼這個邏輯就是沒有問題的,但是這樣又不符合我們的實際情況,比如對於分割區2022-02-26 18:20:00,我希望我的分割區肯定是東八區的時間,而不是比東八區小8個小時的UTC時間2022-02-26 10:20:00。
在明白了原因之後,我們就可以針對上述異常情況進行優化我們的實現方案,比如自定義一個分割區類、或者修改預設的時間分割區類。比如,我們使用TimeZoneTableFunction類來實現一個自定義時區,部分參考程式碼片段如下:
public class CustomTimeZoneTableFunction implements TimeZoneTableFunction {
private transient DateTimeFormatter formatter;
private String timeZoneId;
public CustomTimeZoneTableFunction(String timeZoneId) {
this.timeZoneId = timeZoneId;
}
@Override
public void open(FunctionContext context) throws Exception {
// 初始化 DateTimeFormatter 物件
formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");
formatter = formatter.withZone(ZoneId.of(timeZoneId));
}
@Override
public void eval(Long timestamp, Collector<TimestampWithTimeZone> out) {
// 將時間戳轉換為 LocalDateTime 物件
LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
// 將 LocalDateTime 物件轉換為指定時區下的 LocalDateTime 物件
LocalDateTime targetDateTime = localDateTime.atZone(ZoneId.of(timeZoneId)).toLocalDateTime();
// 將 LocalDateTime 物件轉換為 TimestampWithTimeZone 物件,並輸出到下游
out.collect(TimestampWithTimeZone.fromLocalDateTime(targetDateTime, ZoneId.of(timeZoneId)));
}
}
在一些特殊的場景下,Flink SQL如果無法實現我們複雜的業務需求,那麼我們可以考慮使用Flink DataStream寫Hive這種實現方案。比如如下業務場景,現在需要實現這樣一個業務需求,內容方將實時資料寫入到Kafka訊息佇列中,然後由資料側通過Flink任務消費內容方提供的資料來源,接著對消費的資料進行分流處理(這裡的步驟和Flink SQL寫Hive的步驟類似),每分鐘進行儲存到HDFS(MapReduce任務需要計算和重跑HDFS資料),然後通過MapReduce任務將HDFS上的這些紀錄檔資料生成Hive所需要格式,最後將這些Hive格式資料檔案載入到Hive表中。實現Kafka資料到Hive的即席分析功能,具體實現流程細節如下圖所示:
具體核心實現步驟如下:
消費內容方Topic實時資料;
生成資料預處理策略;
載入資料;
使用Hive SQL對Kafka資料進行即席分析。
編寫消費Topic的Flink程式碼,這裡不對Topic中的資料做邏輯處理,在後面統一交給MapReduce來做資料預處理,直接消費並儲存到HDFS上。具體實現程式碼如下所示:
public class Kafka2Hdfs {
public static void main(String[] args) {
// 判斷引數是否有效
if (args.length != 3) {
LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
return;
}
// 初始化Kafka連線地址和HDFS儲存地址以及Flink並行度
String bootStrapServer = args[0];
String hdfsPath = args[1];
int parallelism = Integer.parseInt(args[2]);
// 範例化一個Flink任務物件
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.setParallelism(parallelism);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Flink消費Topic中的資料
DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));
// 範例化一個HDFS儲存物件
BucketingSink<String> sink = new BucketingSink<>(hdfsPath);
// 自定義儲存到HDFS上的檔名,用小時和分鐘來命名,方便後面算策略
sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));
// 設定儲存HDFS的檔案大小和儲存檔案時間頻率
sink.setBatchSize(1024 * 1024 * 4);
sink.setBatchRolloverInterval(1000 * 30);
transction.addSink(sink);
env.execute("Kafka2Hdfs");
}
// 初始化Kafka物件連線資訊
private static Object configByKafkaServer(String bootStrapServer) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootStrapServer);
props.setProperty("group.id", "test_bll_group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
注意事項:
這裡我們把時間視窗設定小一些,每30s做一次Checkpoint,如果該批次的時間視窗沒有資料過來,就生成一個檔案落地到HDFS上;
另外,我們重寫了Bucketer為DateTimeBucketer,邏輯並不複雜,在原有的方法上加一個年-月-日/時-分的檔案生成路徑,例如在HDFS上的生成路徑:xxxx/2022-02-26/00-00。
具體DateTimeBucketer實現程式碼如下所示:
public class DateMinuteBucketer implements Bucketer<String> {
private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");
private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");
@Override
public Path getBucketPath(Clock clock, Path basePath, String element) {
return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));
}
}
這裡,我們需要對落地到HDFS上的檔案進行預處理,處理的邏輯是這樣的。比如,現在是2022-02-26 14:00,那麼我們需要將當天的13:55,13:56,13:57,13:58,13:59這最近5分鐘的資料處理到一起,並載入到Hive的最近5分鐘的一個分割區裡面去。那麼,我們需要生成這樣一個邏輯策略集合,用HH-mm作為key,與之最近的5個檔案作為value,進行資料預處理合並。具體實現程式碼步驟如下:
步驟一:獲取小時迴圈策略;
步驟二:獲取分鐘迴圈策略;
步驟三:判斷是否為5分鐘的倍數;
步驟四:對分鐘級別小於10的數位做0補齊(比如9補齊後變成09);
步驟五:對小時級別小於10的數位做0補齊(比如1補齊後變成01);
步驟六:生成時間範圍;
步驟七:輸出結果。
其中,主要的邏輯是在生成時間範圍的過程中,根據小時和分鐘數的不同情況,生成不同的時間範圍,並輸出結果。在生成時間範圍時,需要注意前導0的處理,以及特殊情況(如小時為0、分鐘為0等)的處理。最後,將生成的時間範圍輸出即可。
根據上述步驟編寫對應的實現程式碼,生成當天所有日期命名規則,預覽部分結果如下:
需要注意的是,如果發生了第二天00:00,那麼我們需要用到前一天的00-00=>23-59,23-58,23-57,23-56,23-55這5個檔案中的資料來做預處理。
在完成2.3.1和2.3.2裡面的內容後,接下來,我們可以使用Hive的load命令直接載入HDFS上預處理後的檔案,把資料載入到對應的Hive表中,具體實現命令如下:
-- 載入資料到Hive表
load data inpath '<hdfs_path_hfile>' overwrite into table xxx.table partition(day='2022-02-26',hour='14',min='05')
之後,我們使用Hive SQL來對Kafka資料進行即席分析,範例SQL如下所示:
-- 查詢某5分鐘分割區資料
select * from xxx.table where day='2022-02-26' and hour='14' and min='05'
Flink SQL 和 Flink DataStream 都是 Flink 中用於處理資料的核心元件,我們可以根據自己實際的業務場景來選擇使用哪一種元件。
Flink SQL 是一種基於 SQL 語言的資料處理引擎,它可以將 SQL 查詢語句轉換為 Flink 的資料流處理程式。相比於 Flink DataStream,Flink SQL 更加易於使用和維護,同時具有更快的開發速度和更高的程式碼複用性。Flink SQL 適用於需要快速開發和部署資料處理任務的場景,比如資料倉儲、實時報表、資料淨化等。
Flink DataStream API是Flink資料流處理標準API,SQL是Flink後期版本提供的新的資料處理操作介面。SQL的引入為提高了Flink使用的靈活性。可以認為Flink SQL是一種通過字串來定義資料流處理邏輯的描述語言。
因此,在選擇 Flink SQL 和 Flink DataStream 時,需要根據具體的業務需求和資料處理任務的特點來進行選擇。如果需要快速開發和部署任務,可以選擇使用 Flink SQL;如果需要進行更為深入和客製化化的資料處理操作,可以選擇使用 Flink DataStream。同時,也可以根據實際情況,結合使用 Flink SQL 和 Flink DataStream 來完成複雜的資料處理任務。
在實際應用中,Kafka實時資料即席查詢可以用於多種場景,如實時監控、實時報警、實時統計、實時分析等。具體應用和實踐中,需要注意以下幾點:
資料質量:Kafka實時資料即席查詢需要保證資料質量,避免資料重複、丟失或錯誤等問題,需要進行資料質量監控和調優。
系統複雜性:Kafka實時資料即席查詢需要涉及到多個系統和元件,包括Kafka、資料處理引擎(比如Flink)、查詢引擎(比如Hive)等,需要對系統進行設定和管理,增加了系統的複雜性。
安全性:Kafka實時資料即席查詢需要加強資料安全性保障,避免資料洩露或資料篡改等安全問題,做好Hive的許可權管控。
效能優化:Kafka實時資料即席查詢需要對系統進行效能優化,包括優化資料處理引擎、查詢引擎等,提高系統的效能和效率。
參考: