本篇將對 Yarn 排程器中的資源搶佔方式進行探究。分析當叢集資源不足時,佔用量資源少的佇列,是如何從其他佇列中搶奪資源的。我們將深入原始碼,一步步分析搶奪資源的具體邏輯。
在資源排程器中,以 CapacityScheduler 為例(Fair 類似),每個佇列可設定一個最小資源量和最大資源量。其中,最小資源量是資源緊缺情況下每個佇列需保證的資源量,而最大資源量則是極端情況下佇列也不能超過的資源使用量。
資源搶佔發生的原因,是為了提高資源利用率,資源排程器(包括 Capacity Scheduler 和 Fair Scheduler)會將負載較輕的佇列的資源暫時分配給負載重的佇列。
僅當負載較輕佇列突然收到新提交的應用程式時,排程器才進一步將本屬於該佇列的資源歸還給它。
但由於此時資源可能正被其他佇列使用,因此排程器必須等待其他佇列釋放資源後,才能將這些資源「物歸原主」,為了防止應用程式等待時間過長,RM 在等待一段時間後強制回收。
開啟容器搶佔需要設定的引數 yarn-site.xml
:
yarn.resourcemanager.scheduler.monitor.enable
yarn.resourcemanager.scheduler.monitor.policies
這裡我們主要分析如何選出待搶佔容器這一過程。
整理流程如下圖所示:
接下來我們深入原始碼,看看具體的邏輯:
首先 ResourceManager 通過 ResourceManager#createPolicyMonitors
方法建立資源搶佔服務:
protected void createPolicyMonitors() {
// 只有 capacity scheduler 實現了 PreemptableResourceScheduler 介面,fair 是如何實現資源搶佔的?
if (scheduler instanceof PreemptableResourceScheduler
&& conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
LOG.info("Loading policy monitors");
// 是否設定了 scheduler.monitor.policies
// 預設值是 ProportionalCapacityPreemptionPolicy? 程式碼中沒看到預設值,但是 yarn-site.xml doc 中有預設值
List<SchedulingEditPolicy> policies = conf.getInstances(
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
SchedulingEditPolicy.class);
if (policies.size() > 0) {
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
// periodically check whether we need to take action to guarantee
// constraints
// 此處建立了資源搶佔服務類。
// 當此服務啟動時,會啟動一個執行緒每隔 PREEMPTION_MONITORING_INTERVAL(預設 3s)呼叫一次
// ProportionalCapacityPreemptionPolicy 類中的 editSchedule方法,
// 【重點】在此方法中實現了具體的資源搶佔邏輯。
SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
addService(mon);
}
資源搶佔服務會啟動一個執行緒每隔 3 秒鐘呼叫設定的搶佔規則,這裡以 ProportionalCapacityPreemptionPolicy
(比例容量搶佔規則)為例介紹其中的搶佔具體邏輯(editSchedule
方法):
// ProportionalCapacityPreemptionPolicy#editSchedule
public void editSchedule() {
updateConfigIfNeeded();
long startTs = clock.getTime();
CSQueue root = scheduler.getRootQueue();
// 獲取叢集當前資源快照
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
// 具體的資源搶佔邏輯
containerBasedPreemptOrKill(root, clusterResources);
if (LOG.isDebugEnabled()) {
LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
}
}
editSchedule
方法很簡單,邏輯都被封裝到 containerBasedPreemptOrKill()
方法中,我們繼續深入。
其中主要分三步:
// 僅保留重要邏輯
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// ------------ 第一步 ------------ (生成資源快照)
// extract a summary of the queues from scheduler
// 將所有佇列資訊拷貝到 queueToPartitions - Map<佇列名, Map<資源池, 佇列詳情>>。生成快照,防止佇列變化造成計算問題。
for (String partitionToLookAt : allPartitions) {
cloneQueues(root, Resources
.clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)), partitionToLookAt);
}
// ------------ 第二步 ------------ (找出待搶佔的容器)
// compute total preemption allowed
// based on ideal allocation select containers to be preemptionCandidates from each queue and each application
// candidatesSelectionPolicies 預設會放入 FifoCandidatesSelector,
// 如果設定了 INTRAQUEUE_PREEMPTION_ENABLED,會增加 IntraQueueCandidatesSelector
for (PreemptionCandidatesSelector selector :
candidatesSelectionPolicies) {
// 【核心方法】 計算待搶佔 Container 放到 preemptMap
toPreempt = selector.selectCandidates(toPreempt,
clusterResources, totalPreemptionAllowed);
}
// 這裡有個類似 dryrun 的引數 yarn.resourcemanager.monitor.capacity.preemption.observe_only
if (observeOnly) {
return;
}
// ------------ 第三步 ------------ (執行容器資源搶佔 或 kill超時未自動停止的容器)
// preempt (or kill) the selected containers
preemptOrkillSelectedContainerAfterWait(toPreempt);
// cleanup staled preemption candidates
cleanupStaledPreemptionCandidates();
}
第一步資源快照沒什麼好說的,直接進入到重點:第二步找出待搶佔的容器。
即 selector.selectCandidates()
,以預設的 FifoCandidatesSelector
實現為例講解,其他的同理。
主要分兩步:
// yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptionAllowed) {
// ------------ 第一步 ------------ (根據使用量和需求量重新分配資源)
// Calculate how much resources we need to preempt
// 計算出每個資源池每個佇列當前資源分配量,和實際要 preempt 的量
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptionAllowed);
// ------------ 第二步 ------------ (根據資源差額,計算要 kill 的 container)
// 選 container 是有優先順序的: 使用共用池的資源 -> 佇列中後提交的任務 -> amContainer
for (String queueName : preemptionContext.getLeafQueueNames()) {
synchronized (leafQueue) {
// 省略了大部分邏輯,在後面介紹
// 從 application 中選出要被搶佔的容器
preemptFrom(fc, clusterResource, resToObtainByPartition,
skippedAMContainerlist, skippedAMSize, selectedCandidates,
totalPreemptionAllowed);
}
}
我們先來看「根據使用量和需求量重新分配資源」,即 PreemptableResourceCalculator#computeIdealAllocation()
// 計算每個佇列實際要被 preempt 的量
public void computeIdealAllocation(Resource clusterResource,
Resource totalPreemptionAllowed) {
for (String partition : context.getAllPartitions()) {
TempQueuePerPartition tRoot = context.getQueueByPartition(
CapacitySchedulerConfiguration.ROOT, partition);
// 這裡計算好每個佇列超出資源設定的部分,存在 TempQueuePerPartition
// preemptableExtra 表示可以被搶佔的
// untouchableExtra 表示不可被搶佔的(佇列設定了不可搶佔)
// yarn.scheduler.capacity.<queue>.disable_preemption
updatePreemptableExtras(tRoot);
tRoot.idealAssigned = tRoot.getGuaranteed();
// 【重點】遍歷佇列樹,重新計算資源分配,並計算出每個佇列計劃要 Preempt 的量
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
}
// 計算實際每個佇列要被 Preempt 的量 actuallyToBePreempted(有個阻尼因子,不會一下把所有超量的都幹掉)
calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),
clusterResource);
}
}
我們直接深入到 recursivelyComputeIdealAssignment()
方法中的核心邏輯:重新計算各佇列資源分配值 AbstractPreemptableResourceCalculator#computeFixpointAllocation()
主要邏輯如下:
// 按一定規則將資源分給各個佇列
protected void computeFixpointAllocation(Resource totGuarant,
Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
boolean ignoreGuarantee) {
// 傳進來 unassigned = totGuarant
// 有序佇列,(使用量 / 設定量) 從小到大排序
PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
tqComparator);
// idealAssigned = min(使用量,設定量)。 對於不可搶佔佇列,則再加上超出的部分,防止資源被再分配。
if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
} else {
q.idealAssigned = Resources.clone(used);
}
// 如果該佇列有超出設定資源需求,就把這個佇列放到 orderedByNeed 有序佇列中(即這個佇列有資源缺口)
if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
orderedByNeed.add(q);
}
}
// 此時 unassigned 是 整體可用資源 排除掉 所有已使用的資源(used)
// 把未分配的資源(unassigned)分配出去
// 方式就是從 orderedByNeed 中每次取出 most under-guaranteed 佇列,按規則分配一塊資源給他,如果仍不滿足就按順序再放回 orderedByNeed
// 直到滿足所有佇列資源,或者沒有資源可分配
while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
unassigned, Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// 對於有資源缺口的佇列,重新計算他們的資源保證比例:normalizedGuarantee。
// 即 (該佇列保證量 / 所有資源缺口佇列保證量)
resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
// 這裡返回是個列表,是因為可能有需求度(優先順序)相等的情況
Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
orderedByNeed, tqComparator);
for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
.hasNext();) {
TempQueuePerPartition sub = i.next();
// 按照 normalizedGuarantee 比例能從剩餘資源中分走多少。
Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
sub.normalizedGuarantee, Resource.newInstance(1, 1));
// 【重點】按一定規則將資源分配給佇列,並返回剩下的資源。
Resource wQidle = sub.offer(wQavail, rc, totGuarant,
isReservedPreemptionCandidatesSelector);
// 分配給佇列的資源
Resource wQdone = Resources.subtract(wQavail, wQidle);
// 這裡 wQdone > 0 證明本次迭代分配出去了資源,那麼還會放回到待分配資源的集合中(哪怕本次已滿足資源請求),直到未再分配資源了才退出。
if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
orderedByNeed.add(sub);
}
Resources.addTo(wQassigned, wQdone);
}
Resources.subtractFrom(unassigned, wQassigned);
}
// 這裡有可能整個資源都分配完了,還有佇列資源不滿足
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
}
}
上面第 5 步是重點,也就是 sub.offer()
,是計算給該佇列在保證值之外,還能提供多少資源:
/**
* 計算佇列 idealAssigned,在原有基礎上增加新分配的資源。同時返回 avail 中未使用的資源。
* 引數說明:
* avail 按比例該佇列能從剩餘資源中分配到的
* clusterResource 整體資源量
* considersReservedResource ?
* idealAssigned = min(使用量,設定量)
*/
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource, boolean considersReservedResource) {
// 計算的是還有多少可分配資源的空間( maxCapacity - assigned )
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
Resources.subtract(getMax(), idealAssigned),
Resource.newInstance(0, 0));
// remain = avail - min(avail, (max - assigned), (current + pending - assigned))
// 佇列接受資源的計算方法:可提供的資源,佇列最大資源-已分配資源,當前已使用資源+未滿足的資源-min(使用量,設定量) 三者中的最小值。
Resource accepted = Resources.min(rc, clusterResource,
absMaxCapIdealAssignedDelta,
Resources.min(rc, clusterResource, avail, Resources
.subtract(
Resources.add((considersReservedResource
? getUsed()
: getUsedDeductReservd()), pending),
idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
}
核心的資源重新分配演演算法邏輯已經計算完畢,剩下的就是:
根據重新計算的資源分配,得到各佇列超用的資源,這部分就是要被搶佔的資源。
這裡不會一下把佇列超用的資源都幹掉,有個阻尼因子,用於平滑搶佔處理。
回到 selector.selectCandidates()
,上面已經介紹了各佇列搶佔量的計算邏輯,接下來介紹「如何選出各佇列中的 container」
public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptionAllowed) {
// ......
// ------------ 第二步 ------------ (根據資源差額,計算要 kill 的 container)
// 根據計算得到的要搶佔的量,計算各資源池各佇列要 kill 的 container
List<RMContainer> skippedAMContainerlist = new ArrayList<>();
// Loop all leaf queues
// 這裡是有優先順序的: 使用共用池的資源 -> 佇列中後提交的任務 -> amContainer
for (String queueName : preemptionContext.getLeafQueueNames()) {
// 獲取該佇列在每個資源池要被搶佔的量
Map<String, Resource> resToObtainByPartition =
CapacitySchedulerPreemptionUtils
.getResToObtainByPartitionForLeafQueue(preemptionContext,
queueName, clusterResource);
synchronized (leafQueue) {
// 使用共用池資源的,先處理
Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
leafQueue.getIgnoreExclusivityRMContainers();
for (String partition : resToObtainByPartition.keySet()) {
if (ignorePartitionExclusivityContainers.containsKey(partition)) {
TreeSet<RMContainer> rmContainers =
ignorePartitionExclusivityContainers.get(partition);
// 最後提交的任務,會被最先搶佔
for (RMContainer c : rmContainers.descendingSet()) {
if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
selectedCandidates)) {
// Skip already selected containers
continue;
}
// 將 Container 放到待搶佔集合 preemptMap 中
boolean preempted = CapacitySchedulerPreemptionUtils
.tryPreemptContainerAndDeductResToObtain(rc,
preemptionContext, resToObtainByPartition, c,
clusterResource, selectedCandidates,
totalPreemptionAllowed);
}
}
}
// preempt other containers
Resource skippedAMSize = Resource.newInstance(0, 0);
// 預設是 FifoOrderingPolicy,desc 也就是最後提交的在最前面
Iterator<FiCaSchedulerApp> desc =
leafQueue.getOrderingPolicy().getPreemptionIterator();
while (desc.hasNext()) {
FiCaSchedulerApp fc = desc.next();
if (resToObtainByPartition.isEmpty()) {
break;
}
// 從 application 中選出要被搶佔的容器(後面介紹)
preemptFrom(fc, clusterResource, resToObtainByPartition,
skippedAMContainerlist, skippedAMSize, selectedCandidates,
totalPreemptionAllowed);
}
// Can try preempting AMContainers
Resource maxAMCapacityForThisQueue = Resources.multiply(
Resources.multiply(clusterResource,
leafQueue.getAbsoluteCapacity()),
leafQueue.getMaxAMResourcePerQueuePercent());
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
totalPreemptionAllowed);
}
}
return selectedCandidates;
}
把要被搶佔的 container 都選出來之後,就剩最後一步, kill 這些 container。
回到 containerBasedPreemptOrKill()
:
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
// ......
// ------------ 第三步 ------------ (執行容器資源搶佔 或 kill超時未自動停止的容器)
// preempt (or kill) the selected containers
preemptOrkillSelectedContainerAfterWait(toPreempt);
// cleanup staled preemption candidates
cleanupStaledPreemptionCandidates();
}
至此,分析完畢整個資源搶佔的過程。
總結一下主要邏輯:
參考文章:
Yarn FairScheduler的搶佔機制詳解_小昌昌的部落格的部落格-CSDN部落格
Yarn搶佔最核心剖析_Geoffrey Turing的部落格-CSDN部落格 - 針對 fair
Yarn排程之CapacityScheduler原始碼分析資源搶佔
Better SLAs via Resource-preemption in YARN's CapacityScheduler - Cloudera Blog