Flink SQL 子圖複用邏輯分析

2022-09-13 06:18:42

子圖複用優化是為了找到SQL執行計劃中重複的節點,將其複用,避免這部分重複計算的邏輯。先回顧SQL執行的主要流程 parser -> validate -> logical optimize -> physical optimize -> translateToExecNode。
而子圖複用的邏輯就是在這個階段進行的

private[flink] def translateToExecNodeGraph(
    optimizedRelNodes: Seq[RelNode],
    isCompiled: Boolean): ExecNodeGraph = {
    val nonPhysicalRel = optimizedRelNodes.filterNot(_.isInstanceOf[FlinkPhysicalRel])
    if (nonPhysicalRel.nonEmpty) {
      throw new TableException(
        "The expected optimized plan is FlinkPhysicalRel plan, " +
        s"actual plan is ${nonPhysicalRel.head.getClass.getSimpleName} plan.")
    }

    require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))
    // Rewrite same rel object to different rel objects
    // in order to get the correct dag (dag reuse is based on object not digest)
    val shuttle = new SameRelObjectShuttle()
    val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
    // reuse subplan
    val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, tableConfig)
    // convert FlinkPhysicalRel DAG to ExecNodeGraph
    val generator = new ExecNodeGraphGenerator()
    val execGraph = generator.generate(reusedPlan.map(_.asInstanceOf[FlinkPhysicalRel]), isCompiled)

    // process the graph
    val context = new ProcessorContext(this)
    val processors = getExecNodeGraphProcessors
    processors.foldLeft(execGraph)((graph, processor) => processor.process(graph, context))
  }

可以看到這裡首先會校驗relNodes都是FlinkPhysicalRel 物理執行計劃的節點

require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))

SameRelObjectShuttle

/**
* Rewrite same rel object to different rel objects.
*
* <p>e.g.
* {{{
*      Join                       Join
*     /    \                     /    \
* Filter1 Filter2     =>     Filter1 Filter2
*     \   /                     |      |
*      Scan                  Scan1    Scan2
* }}}
* After rewrote, Scan1 and Scan2 are different object but have same digest.
*/
class SameRelObjectShuttle extends DefaultRelShuttle {
  private val visitedNodes = Sets.newIdentityHashSet[RelNode]()

  override def visit(node: RelNode): RelNode = {
    val visited = !visitedNodes.add(node)
    var change = false
    val newInputs = node.getInputs.map {
      input =>
      val newInput = input.accept(this)
      change = change || (input ne newInput)
      newInput
    }
    if (change || visited) {
      node.copy(node.getTraitSet, newInputs)
    } else {
      node
    }
  }
}

然後進行rel節點重寫,RelShuttle的作用就是提供visit的模式根據實現的邏輯來替換樹中的某些節點。可以看到這個實現中會將 同一個objec(注意這裡儲存visitedNodes使用的是identity hash set) 第二次存取時 copy成一個新的物件,但是有相同的digest,這一步的目的是什麼呢?
我們往下面看在後續生成ExecNode時, 會建立一個IdentityHashMap 來儲存存取過的Rels,所以意思就是真正生成ExecNode時,是和Rels物件一一對應的。

private final Map<FlinkPhysicalRel, ExecNode<?>> visitedRels = new IdentityHashMap();
private ExecNode<?> generate(FlinkPhysicalRel rel, boolean isCompiled) {
    ExecNode<?> execNode = visitedRels.get(rel);
    if (execNode != null) {
        return execNode;
    }

    if (rel instanceof CommonIntermediateTableScan) {
        throw new TableException("Intermediate RelNode can't be converted to ExecNode.");
    }

    List<ExecNode<?>> inputNodes = new ArrayList<>();
    for (RelNode input : rel.getInputs()) {
        inputNodes.add(generate((FlinkPhysicalRel) input, isCompiled));
    }

    execNode = rel.translateToExecNode(isCompiled);
    // connects the input nodes
    List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size());
    for (ExecNode<?> inputNode : inputNodes) {
        inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build());
    }
    execNode.setInputEdges(inputEdges);

    visitedRels.put(rel, execNode);
    return execNode;
}

看到這裡上面將同一個object 拆成兩個的目的就更不可理解了,因為本來是一個object的話在這裡天然就複用了,但是拆成2個反而就不能複用了。
這裡的目的是先將相同的object被重複參照的節點拆開,然後再根據digest相同以及內部規則來決定是否複用。這樣就可以有Flink引擎來控制哪些節點是可以合併的。

SubplanReuseContext

在context中通過ReusableSubplanVisitor構造兩組對映關係

// mapping a relNode to its digest
private val mapRelToDigest = Maps.newIdentityHashMap[RelNode, String]()
// mapping the digest to RelNodes
private val mapDigestToReusableNodes = new util.HashMap[String, util.List[RelNode]]()

中間的邏輯比較簡單就是遍歷整棵樹,查詢是否存在可reusable的節點,怎麼判斷可reusable呢?

  • 同一digest下,掛了多個RelNode節點,那麼這一組RelNode是同一語意的,是可以複用的候選
  • 節點沒有disable reusable
/** Returns true if the given node is reusable disabled */
private def isNodeReusableDisabled(node: RelNode): Boolean = {
  node match {
    // TableSourceScan node can not be reused if reuse TableSource disabled
    case _: FlinkLogicalLegacyTableSourceScan | _: CommonPhysicalLegacyTableSourceScan |
    _: FlinkLogicalTableSourceScan | _: CommonPhysicalTableSourceScan =>
    !tableSourceReuseEnabled
    // Exchange node can not be reused if its input is reusable disabled
    case e: Exchange => isNodeReusableDisabled(e.getInput)
    // TableFunctionScan and sink can not be reused
    case _: TableFunctionScan | _: LegacySink | _: Sink => true
    case _ => false
  }
}

例如TableFunctionScan就不能被Reuse(這個原因還沒理解),或者exchange只有input被reuse時,該節點才能複用

SubplanReuseShuttle

在以上的visit執行完之後以及知道哪些節點是可以複用的了,最後通過一個Shuttle來將可複用的節點進行替換

class SubplanReuseShuttle(context: SubplanReuseContext) extends DefaultRelShuttle {
  private val mapDigestToNewNode = new util.HashMap[String, RelNode]()

  override def visit(rel: RelNode): RelNode = {
    val canReuseOtherNode = context.reuseOtherNode(rel)
    val digest = context.getRelDigest(rel)
    if (canReuseOtherNode) {
      val newNode = mapDigestToNewNode.get(digest)
      if (newNode == null) {
        throw new TableException("This should not happen")
      }
      newNode
    } else {
      val newNode = visitInputs(rel)
      mapDigestToNewNode.put(digest, newNode)
      newNode
    }
  }
}

實現的方式就是記錄每個digest對應的newNode,當可以複用時,那麼直接返回該複用digest對應的RelNode(替換了原先的digest相同,物件不同的RelNode),這樣整棵樹中可複用的節點又重新合併了。