Flink模式

2023-03-13 06:04:04

Per-job Cluster

該模式下,一個作業一個叢集,作業之間相互隔離。

在Per-Job模式下,叢集管理器框架用於為每個提交的Job啟動一個 Flink 叢集。Job完成後,叢集將關閉,所有殘留的資源也將被清除。

此模式可以更好地隔離資源,因為行為異常的Job不會影響任何其他Job。另外,由於每個應用程式都有其自己的JobManager,因此它將記錄的負載分散到多個實體中。

場景:Per-Job模式適合長期執行的Job,這些Job可以接受啟動延遲的增加以支援彈性。

資源管理器支援:Yarn

Application

與per-job 模式相比,在Application 模式下,main() 方法在叢集上而不是在使用者端執行。

場景:任務啟動較慢,適合於長時間執行的大型任務。

資源管理器支援:Yarn、Native kubernetes

Session

該模式下,作業共用叢集資源。Session 模式提交的應用都在該叢集裡執行,會導致資源的競爭。

該模式優勢是無需為每一個提交的任務花費精力去分解叢集。但是,如果Job異常或是TaskManager 宕掉,那麼該TaskManager執行的其他Job都會失敗。除了影響到任務,也意味著潛在需要更多的恢復操作,重啟所有的Job,會並行存取檔案系統,會導致該檔案系統對其他服務不可用。此外,單叢集執行多個Job,意味著JobManager更大的負載。

場景:該模式適合於對啟動延遲要求較高且執行時間較短的作業,例如互動式查詢。任務提交速度快,適合頻繁提交執行的短時間任務。

資源管理器支援:Standalone、Yarn、Native kubernetes

Standalone

Standalone模式需要在任務啟動時就確定TaskManager的數量,不能像Yarn一樣,可以在任務啟動時申請動態資源。

很多時候任務需要多少個TaskManager事先並不知道,TaskManager設定少了,任務可能跑不起來,多了又會造成資源浪費,需要在任務啟動時才能確定需要多少個TaskMananger。

Standalone Application kubernetes

需要先將使用者程式碼都打到映象裡面,然後根據該映象來部署一個flink叢集執行使用者程式碼。

每提交一個任務,單獨啟動一個叢集執行該任務,執行結束叢集被刪除,資源也被釋放。

Standalone Session kubernetes

在Session模式下,先啟動一個Flink叢集,然後向該叢集提交任務,所有任務共用JobManager。

Native kubernetes

Flink 的 Client 內建了一個 K8s Client,可以藉助 K8s Client 去建立 JobManager,當 Job 提交之後,如果對資源有需求,JobManager 會向 Flink 自己的 ResourceManager 去申請資源。這個時候 Flink 的 ResourceManager 會直接跟 K8s 的 API Server 通訊,將這些請求資源直接下發給 K8s Cluster,告訴它需要多少個 TaskManger,每個 TaskManager 多大。當任務執行完之後,它也會告訴 K8s Cluster 釋放沒有使用的資源。相當於 Flink 用很原生的方式瞭解到 K8s Cluster 的存在,並知曉何時申請資源,何時釋放資源。

Native Kubernetes Application

native kubernetes下,application模式相當於提交任務時調k8s api自動拉起一個flink叢集跑該應用,然後跑完就刪除叢集。

這種模式比較適合對啟動時間不敏感、且長時間執行的作業。不適合對任務啟動時間比較敏感的場景。

優點:隔離性比較好,任務之間資源不衝突,一個任務單獨使用一個 Flink 叢集;相對於 Flink session 叢集而且,資源隨用隨建,任務執行完成後立刻銷燬資源,資源利用率會高一些。

缺點:需要提前指定 TaskManager 的數量,如果 TaskManager 指定的少了會導致作業執行失敗,指定的多了仍會降低資源利用率;資源是實時建立的,使用者的作業在被執行前,需要先等待以下過程。

flink on native kubernetes application模式:提交任務範例
./bin/flink run-application --target kubernetes-application
-Dkubernetes.namespace=flink-native-kubernetes
-Dkubernetes.cluster-id=flink-application-cluster
-Dkubernetes.jobmanager.service-account=flink
-Dkubernetes.container.image=flink:1.14.2
-Dkubernetes.rest-service.exposed.type=NodePort
-Djobmanager.heap.size=1024m
-Dkubernetes.jobmanager.cpu=1
-Dkubernetes.taskmanager.cpu=2
-Dtaskmanager.memory.process.size=1024m
-Dtaskmanager.numberOfTaskSlots=2
local:///opt/flink/examples/batch/WordCount.jar
應用映象構建方式:
`FROM flink:1.14.2
 RUN mkdir -p $FLINK_HOME/usrlib
 COPY my-flink-job.jar $FLINK_HOME/usrlib/my-flink-job.jar`

Native Kubernetes Session

native kubernetes下,session模式是提前調k8s api啟動一個常駐的flink叢集,然後使用者端提交任務時,調k8s api自動起一個taskmanager pod 執行任務,然後等任務執行完之後,這個taskmanager的任務pod會被銷燬。

flink on native kubernetes session模式:

1、kubectl create namespace flink-session-cluster

2、kubectl create serviceaccount flink -n flink-session-cluster

3、kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink-session-cluster:flink

啟動session叢集:
4、./bin/kubernetes-session.sh \
 -Dkubernetes.namespace=flink-session-cluster \
-Dkubernetes.jobmanager.service-account=flink \
 -Dkubernetes.cluster-id=flink-session-cluster \
 -Dkubernetes.rest-service.exposed.type=NodePort  \
-Dkubernetes.container.image=flink:1.14.2

flink on native kubernetes session模式:提交任務範例
./bin/flink run -d --target kubernetes-session  \
-Dkubernetes.namespace=flink-session-cluster  \
-Dkubernetes.cluster-id=flink-session-cluster \
 -Dkubernetes.jobmanager.service-account=flink \
-Dkubernetes.rest-service.exposed.type=NodePort \
/opt/flink-1.14.2/examples/batch/WordCount.jar

總結

Flink on K8s :

優點:

Flink 在 K8s 上最簡單的方式是以 Standalone 方式進行部署。這種方式部署的好處在於不需要對 Flink 做任何改動,同時 Flink 對 K8s 叢集是無感知的,通過外部手段即可讓 Flink 執行起來。

缺點:

  • 無論 Operator、Helm Chart 或者是直接使用 Kubectl Yaml 的方式,Flink 都感知不到 K8s 的存在。
  • 目前主要使用靜態的資源分配。需要提前確認好需要多少個 TaskManager,如果 Job 的並行需要做一些調整,TaskManager 的資源情況必須相應的跟上,否則任務無法正常執行。
  • 使用者需要對一些 Container、Operator 或者 K8s 有一些最基本的認識,這樣才能保證順利將 Flink 執行到 K8s 之上。
  • 對於批次處理任務,或者想在一個 Session 裡提交多個任務不太友好。無法實時申請資源和釋放資源。因為 TaskManager 的資源是固定的,批次處理任務可能會分多個階段去執行,需要去實時地申請資源、釋放資源,當前也無法實現。如果需要在一個 Session 裡跑多個 Job 並且陸續執行結束當前也無法實現。
  • 如果維持一個比較大的 Session Cluster,可能會資源浪費。但如果維持的 Session Cluster 比較小,可能會導致 Job 跑得慢或者是跑不起來。

基於這幾點,社群推進了一個 Native 的整合方案。讓Flink 原生的感知到下層 Cluster 的存在。Native 是相對於 Flink 而言的,藉助 Flink 的命令就可以達到自治的一個狀態,不需要引入外部工具就可以通過 Flink 完成任務在 K8s 上的執行。

生產環境上推薦:

Flink on YARN(pre-job、application)、Flink on Native Kubernetes Appliation;

問題

Flink on Kubernetes 需考慮的問題:

紀錄檔問題

紀錄檔需要通過k8s的pod紀錄檔排查。如果出現節點宕機,pod飄移到別的節點,紀錄檔獲取困難。

應用jar包問題

flink on k8s的application模式需要將jar包以及依賴放在映象裡啟動。

應用依賴問題

有依賴的任務,無法通過使用者端獲取資訊。k8s不支援pre-job模式。