本文涉及到Flink SQL UDAF,Window 狀態管理等部分,希望能起到拋磚引玉的作用,讓大家可以藉此深入瞭解這個領域。
大家知道,Flink的自定義聚合函數(UDAF)可以將多條記錄聚合成1條記錄,這功能是通過accumulate方法來完成的,官方參考指出:
在系統執行過程中,底層runtime程式碼會把歷史狀態accumulator,和您指定的上遊數據(支援任意數量,任意型別的數據)作爲參數,一起發送給accumulate計算。
但是實時計算還有一些特殊的場景,在此場景下,還需要提供merge方法才能 纔能完成。
在實時計算中一些場景需要merge,例如session window。 由於實時計算具有out of order的特性,後輸入的數據有可能位於2個原本分開的session中間,這樣就把2個session合爲1個session。此時,需要使用merge方法把多個accumulator合爲1個accumulator。
之前因爲沒親身操作,所以一直忽略merge的特殊性。最近無意中看到了一個UDAF的實現,突然覺得有一個地方很奇怪,即 accumulate 和 merge 這兩個函數不應該定義在一個類中。因爲這是兩個完全不同的處理方法。應該定義在兩個不同的類中。
比如用UDAF做word count,則:
然後又想出了一個問題:Flink是如何管理 UDAF的accumulator?其狀態存在哪裏?
看起來應該是Flink在背後做了一些黑魔法,把這兩個函數從一個類中拆分了。爲了驗證我們的推測,讓我們從原始碼入手來看看這些問題:
範例程式碼摘要如下 :
public class CountUdaf extends AggregateFunction<Long, CountUdaf.CountAccum> {
//定義存放count UDAF狀態的accumulator的數據的結構。
public static class CountAccum {
public long total;
}
//初始化count UDAF的accumulator。
public CountAccum createAccumulator() {
CountAccum acc = new CountAccum();
acc.total = 0;
return acc;
}
//accumulate提供了,如何根據輸入的數據,更新count UDAF存放狀態的accumulator。
public void accumulate(CountAccum accumulator, Object iValue) {
accumulator.total++;
}
public void merge(CountAccum accumulator, Iterable<CountAccum> its) {
for (CountAccum other : its) {
accumulator.total += other.total;
}
}
}
批次處理相對簡單,因爲數據是有邊界的,其邏輯比較清晰。
首先給出測試程式碼
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
// register the DataSet as a view "WordCount"
tEnv.createTemporaryView("WordCount", input, 'word, 'frequency)
tEnv.registerFunction("countUdaf", new CountUdaf())
// run a SQL query on the Table and retrieve the result as a new Table
val table = tEnv.sqlQuery("SELECT word, countUdaf(frequency), SUM(frequency) FROM WordCount GROUP BY word")
case class WC(word: String, frequency: Long)
在 DataSetAggregate.translateToPlan
中生成了執行計劃。原來Flink把 SQL 語句分割成兩個階段:
於是我們推斷,這很有可能就是 combineGroup 呼叫accumulate,reduceGroup 呼叫 merge。
關於combineGroup,如果有興趣,可以看看我之前文章 [原始碼解析] Flink的groupBy和reduce究竟做了什麼 以及 原始碼解析] GroupReduce,GroupCombine 和 Flink SQL group by
override def translateToPlan(tableEnv: BatchTableEnvImpl,
queryConfig: BatchQueryConfig): DataSet[Row] = {
if (grouping.length > 0) {
// grouped aggregation
if (preAgg.isDefined) {
// 執行到這裏
inputDS
// pre-aggregation
.groupBy(grouping: _*)
.combineGroup(preAgg.get) // 第一階段
.returns(preAggType.get)
.name(aggOpName)
// final aggregation
.groupBy(grouping.indices: _*)
.reduceGroup(finalAgg.right.get) // 第二階段
.returns(rowTypeInfo)
.name(aggOpName)
}
}
}
SQL語句對應的執行計劃大致爲:
在執行看,確實對應了兩個階段。
階段 1 確實是 GroupReduceCombineDriver 呼叫到了 accumulate。
//堆疊如下
accumulate:25, CountUdaf (mytest)
accumulate:-1, DataSetAggregatePrepareMapHelper$5
combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)
sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
//SQL UDAF生成的程式碼如下
function = {DataSetAggregatePrepareMapHelper$5@10085}
function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10079} "CountUdaf"
function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10087} "LongSumAggFunction"
階段 2 中 GroupReduceDriver 呼叫到了 merge
//堆疊如下
merge:29, CountUdaf (mytest)
mergeAccumulatorsPair:-1, DataSetAggregateFinalHelper$6
reduce:71, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)
run:131, GroupReduceDriver (org.apache.flink.runtime.operators)
//SQL UDAF生成的程式碼如下
function = {DataSetAggregateFinalHelper$6@10245}
function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10238} "CountUdaf"
function_org$apache$flink$table$functions$aggfunctions$LongSumAggFunction$a5214701531789b3139223681d = {LongSumAggFunction@10247} "LongSumAggFunction"
Flink對使用者定義的UDAF程式碼分別生成了兩個不同的功能類:
UDAF有一個accumulator,這個會在程式執行過程中始終存在,Flink是如何管理這個accumulator呢?
GroupReduceCombineDriver類有一個成員變數 combiner,
public class GroupReduceCombineDriver<IN, OUT> implements Driver<GroupCombineFunction<IN, OUT>, OUT> {
private GroupCombineFunction<IN, OUT> combiner;
}
而 combiner 被賦予了 DataSetPreAggFunction 類的一個範例。
class DataSetPreAggFunction(genAggregations: GeneratedAggregationsFunction)
extends AbstractRichFunction{
private var accumulators: Row = _ //這裏儲存歷史狀態
private var function: GeneratedAggregations = _
}
Flink就是把 UDAF的accumulator 儲存在 combiner.accumulators
中,我們可以看到,無論使用者定義了什麼型別作爲 accumulator,Flink都用萬能型別 Row 搞定。
combiner = {DataSetPreAggFunction@10063}
genAggregations = {GeneratedAggregationsFunction@10070}
accumulators = {Row@10117} "mytest.CountUdaf$CountAccum@1e343db7,(0,false)"
function = {DataSetAggregatePrepareMapHelper$5@10066} // function是包含使用者程式碼的功能類。
function_mytest$CountUdaf$5ae272a09e5f36214da5c4e5436c4c48 = {CountUdaf@10076} "CountUdaf"
讓我們總結一下,批次處理被分成兩個階段:
Flink在GroupReduceCombineDriver類的成員變數 combiner 中儲存 accumulator歷史狀態。
流處理則是和批次處理完全不同的世界,下面 下麪我們看看流處理背後有什麼奧祕。
在流計算場景中,數據沒有邊界源源不斷的流入的,每條數據流入都可能會觸發計算,比如在進行count或sum這些操作是如何計算的呢?
val query: Table = tableEnv.sqlQuery(
"""
|SELECT
|countUdaf(num)
|FROM tb_num
|GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND)
""".stripMargin)
DataStreamGroupWindowAggregateBase.translateToPlan
函數中完成了計劃生成。根據Stream的型別(是否有key),會走不同的邏輯業務。
WindowedStream
代表了根據key分組,並且基於WindowAssigner
切分視窗的數據流。所以WindowedStream
都是從KeyedStream
衍生而來的。在key分組的流上進行視窗切分是比較常用的場景,也能夠很好地並行化(不同的key上的視窗聚合可以分配到不同的task去處理)。AllWindowedStream
。AllWindowedStream
是直接在DataStream
上進行windowAll(...)
操作。在普通流上進行視窗操作,就勢必需要將所有分割區的流都彙集到單個的Task中,而這個單個的Task很顯然就會成爲整個Job的瓶頸。我們的範例程式碼是基於Key的,所以走 WindowedStream
分支,即一個 window 中即做accumulate,又做merge。
// grouped / keyed aggregation
if (grouping.length > 0) {
// 有key,所以是 WindowedStream,我們範例走這裏
val windowFunction = AggregateUtil.createAggregationGroupWindowFunction(...)
val keySelector = new CRowKeySelector(grouping, inputSchema.projectedTypeInfo(grouping))
val keyedStream = timestampedInput.keyBy(keySelector)
val windowedStream =
createKeyedWindowedStream(queryConfig, window, keyedStream)
.asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]]
val (aggFunction, accumulatorRowType) =
AggregateUtil.createDataStreamGroupWindowAggregateFunction(...)
windowedStream
.aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
.name(keyedAggOpName)
}
// global / non-keyed aggregation
else {
// 沒有key,所以是AllWindowedStream
val windowFunction = AggregateUtil.createAggregationAllWindowFunction(...)
val windowedStream =
createNonKeyedWindowedStream(queryConfig, window, timestampedInput)
.asInstanceOf[AllWindowedStream[CRow, DataStreamWindow]]
val (aggFunction, accumulatorRowType) =
AggregateUtil.createDataStreamGroupWindowAggregateFunction(...)
windowedStream
.aggregate(aggFunction, windowFunction, accumulatorRowType, outRowType)
.name(nonKeyedAggOpName)
}
SQL語句對應的執行計劃大致如下,我們能看出來 accumulate & merge 都在 Window 中處理。
可以看到,流處理對UDAF的管理,就完全是進入了Window的地盤,而UDAF歷史狀態管理其實就是Flink Window狀態管理的領域了。
我們以基於key的WindowedStream爲例繼續進行研究。
當Window接受到一個輸入item時候,item會被分配到一個key,由KeySelector完成。WindowOperator 類首先使用使用者選擇的 windowAssigner 將流入的數據分配到響應的window中,有可能是1個,0個甚至多個window。這裏就會做accumulate。
本例 windowAssigner = {TumblingProcessingTimeWindows}
,進入到processElement函數的 非 MergingWindow部分,具體流程如下:
可以看到,是 windowState 新增元素時候,呼叫到State的API,然後間接呼叫到了UDAF。
windowState 以 window 爲 namespace,以隔離不同的window的context。這裏雖然叫做 windowState 。但是可以發現,該類儲存的是不同window中的對應的原始數據(processWindowFunction情況)或結果(ReduceFunction/AggregateFunction情況)。我們此例中,儲存的是執行結果。
本例用到的 window process 是 Incremental Aggregation Functions。即 ReduceFunction 與 AggregateFunction ,其特點是無需儲存 window 中的所有數據,一旦新數據進入,便可與之前的中間結果進行計算,因此這種 window 中其狀態僅需儲存一個結果便可。
因此這裏我們拿到的是 HeapReducingState, HeapAggregatingState,當執行到 windowState.add(element.getValue());
語句時,便呼叫UDAF得出結果。
在flink中state用來存放計算過程的節點中間結果或元數據。在flink內部提供三種state儲存實現
我們這裏拿到的是 HeapAggregatingState。
以三元組的形式儲存儲存數據,即 key, namespace, value。
public abstract class StateTable<K, N, S>
implements StateSnapshotRestore, Iterable<StateEntry<K, N, S>> {
/**
* Map for holding the actual state objects. The outer array represents the key-groups.
* All array positions will be initialized with an empty state map.
*/
protected final StateMap<K, N, S>[] keyGroupedStateMaps;
}
// 真實中變數摘錄如下
keyGroupedStateMaps = {StateMap[1]@9266}
0 = {CopyOnWriteStateMap@9262} // 這裏就是將要儲存使用者accumulator的地方
stateSerializer = {RowSerializer@9254}
snapshotVersions = {TreeSet@9277} size = 0
primaryTable = {CopyOnWriteStateMap$StateMapEntry[128]@9278}
incrementalRehashTable = {CopyOnWriteStateMap$StateMapEntry[2]@9280}
lastNamespace = {TimeWindow@9239} "TimeWindow{start=1593934200000, end=1593934210000}"
在上面提及的 3.1.2)stateMap.transform(key, namespace, value, transformation);
中
@Override
public <T> void transform(
K key,
N namespace,
T value,
StateTransformationFunction<S, T> transformation) throws Exception {
final StateMapEntry<K, N, S> entry = putEntry(key, namespace);
// copy-on-write check for state
entry.state = transformation.apply(
(entry.stateVersion < highestRequiredSnapshotVersion) ?
getStateSerializer().copy(entry.state) : entry.state,
value);
// 當執行完使用者程式碼之後,數據會儲存在這裏,這個就是CopyOnWriteStateMap的一個Entry
entry.stateVersion = stateMapVersion;
流處理對UDAF的管理,就完全是進入了Window的地盤,而UDAF歷史狀態管理其實就是Flink Window狀態管理的領域了。
Flink-SQL原始碼解讀(一)window運算元的建立的原始碼分析
★★★★★★關於生活和技術的思考★★★★★★
微信公衆賬號:羅西的思考
如果您想及時得到個人撰寫文章的訊息推播,或者想看看個人推薦的技術資料,可以掃描下面 下麪二維條碼(或者長按識別二維條碼)關注個人公衆號)。