Flink的非同步運算元的原理及使用

2022-10-14 09:01:56

1、簡介

Flink的特點是高吞吐低延遲。但是Flink中的某環節的資料處理邏輯需要和外部系統互動,呼叫耗時不可控會顯著降低叢集效能。這時候就可能需要使用非同步運算元讓耗時操作不需要等待結果返回就可以繼續下面的耗時操作。

2、本章可以瞭解到啥

  • 非同步運算元原始碼分析
  • 非同步運算元為啥能夠保證有序性
  • flinksql中怎麼自定義使用非同步lookup join

3、非同步運算元的測試程式碼

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 網上copy的模擬一個耗時的非同步操作
 */
public class AsyncIODemo implements Serializable {

    private final ExecutorService executorService = Executors.newFixedThreadPool(4);

    public CompletableFuture<String> pullData(final String source) {

        CompletableFuture<String> completableFuture = new CompletableFuture<>();

        executorService.submit(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 前面睡眠幾秒後,呼叫一下完成方法,拼接一個結果字串
             */
            completableFuture.complete("Output value: " + source);
        });

        return completableFuture;
    }
}
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * 網上copy的程式碼
 */
public class AsyncTest {

    public static void main(String[] args) throws Exception {
        /**
         * 獲取Flink執行環境並設定並行度為1,方便後面觀測
         */
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        /**
         * 構造一個DataStreamSource的序列
         */
        DataStreamSource stream = env.fromElements("11", "22", "33", "44");

        /**
         * 使用AsyncDataStream構造一個非同步順序流,這裡非同步順序流從名字就可以看出來雖然是非同步的,但是卻可以保持順序,
         * 這個後面原始碼分析可以知道原因
         */
        SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() {
            @Override
            public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
                /**
                 * 這裡呼叫模擬的獲取非同步請求結果,並返回一個CompletableFuture
                 */
                CompletableFuture<String> future = new AsyncIODemo().pullData(input);
                /**
                 * 註冊一個future處理完成的回撥,當future處理完成拿到結果後,呼叫resultFuture的
                 * complete方法真正吐出資料
                 */
                future.whenCompleteAsync((d,t) ->{
                    resultFuture.complete(Arrays.asList(d));
                });
            }
            // 設定最長非同步呼叫超時時間為10秒
        }, 10, TimeUnit.SECONDS);
        asyncStream.print();
        env.execute();
    }
}

4、非同步運算元原始碼分析

4.1、AsyncDataStream

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperator;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;

import java.util.concurrent.TimeUnit;

/**
 * 用於將AsyncFunction應用到資料流的一個helper類
 *
 * <pre>{@code
 * DataStream<String> input = ...
 * AsyncFunction<String, Tuple<String, String>> asyncFunc = ...
 *
 * AsyncDataStream.orderedWait(input, asyncFunc, timeout, TimeUnit.MILLISECONDS, 100);
 * }</pre>
 */
@PublicEvolving
public class AsyncDataStream {

    /** 非同步操作的輸出模式,有序或者無序. */
    public enum OutputMode {
        ORDERED,
        UNORDERED
    }

    private static final int DEFAULT_QUEUE_CAPACITY = 100;

    /**
     * flag_2,新增一個AsyncWaitOperator. 
     *
     * @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.
     * @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.
     * @param timeout for the asynchronous operation to complete
     * @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.
     * @param mode Processing mode for {@link AsyncWaitOperator}.
     * @param <IN> Input type.
     * @param <OUT> Output type.
     * @return A new {@link SingleOutputStreamOperator}
     */
    private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            int bufSize,
            OutputMode mode) {

        TypeInformation<OUT> outTypeInfo =
                TypeExtractor.getUnaryOperatorReturnType(
                        func,
                        AsyncFunction.class,
                        0,
                        1,
                        new int[] {1, 0},
                        in.getType(),
                        Utils.getCallLocationName(),
                        true);

        /**
        	這裡生成了一個AsyncWaitOperatorFactory
    	*/
        AsyncWaitOperatorFactory<IN, OUT> operatorFactory =
                new AsyncWaitOperatorFactory<>(
                        in.getExecutionEnvironment().clean(func), timeout, bufSize, mode);

        return in.transform("async wait operator", outTypeInfo, operatorFactory);
    }

    /**
     * 新增一個AsyncWaitOperator。輸出流無順序。
     *
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param capacity The max number of async i/o operation that can be triggered
     * @param <IN> Type of input record
     * @param <OUT> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.
     */
    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.UNORDERED);
    }

    /**
     * 新增一個AsyncWaitOperator。輸出流無順序。
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param <IN> Type of input record
     * @param <OUT> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.
     */
    public static <IN, OUT> SingleOutputStreamOperator<OUT> unorderedWait(
            DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {
        return addOperator(
                in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.UNORDERED);
    }

    /**
     * flag_1,新增一個AsyncWaitOperator。處理輸入記錄的順序保證與輸入記錄的順序相同
     *
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param capacity The max number of async i/o operation that can be triggered
     * @param <IN> Type of input record
     * @param <OUT> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.
     */
    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
            DataStream<IN> in,
            AsyncFunction<IN, OUT> func,
            long timeout,
            TimeUnit timeUnit,
            int capacity) {
        return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
    }

    /**
     * 新增一個AsyncWaitOperator。處理輸入記錄的順序保證與輸入記錄的順序相同
     * @param in Input {@link DataStream}
     * @param func {@link AsyncFunction}
     * @param timeout for the asynchronous operation to complete
     * @param timeUnit of the given timeout
     * @param <IN> Type of input record
     * @param <OUT> Type of output record
     * @return A new {@link SingleOutputStreamOperator}.
     */
    public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
            DataStream<IN> in, AsyncFunction<IN, OUT> func, long timeout, TimeUnit timeUnit) {
     
        return addOperator(
                in, func, timeUnit.toMillis(timeout), DEFAULT_QUEUE_CAPACITY, OutputMode.ORDERED);
    }
}

如上從測試程式碼開始呼叫鏈為:AsyncDataStream.orderedWait -> addOperator,然後addOperator中new了一個AsyncWaitOperatorFactory。然後到這裡其實可以告一段落了,因為沒有必要往下看了,這個時候就需要猜了,一般我們類名叫XXFactory基本都是工廠模式,然後工廠生產的就是XX了,這裡就是生成AsyncWaitOperator物件的工廠了,然後我們就可以直接在AsyncWaitOperator類的構造方法第一行打個斷點,看看啥時候會進去了。為啥要這樣做,因為我們看到的Flink原始碼其實並不是一個線性的執行過程,架構圖如下

他的程式碼實際上並不是都在一個節點執行,雖然我們在本地偵錯,但是也是在模擬的一個本地叢集中執行,怎麼模擬出不同的節點呢,很明顯要通過執行緒,也就是說不同的節點用不同的執行緒來代表並執行。所以我們無腦斷點是沒法看到全貌的。看程式碼的一個技巧,根據各方面的經驗猜測,比如這裡就是根據類名的特點進行猜測

4.2、AsyncWaitOperator

我們在AsyncWaitOperator類的所有公共方法和構造方法裡打個斷點,debug執行程式進入偵錯

很明顯這個構造方法,在一個獨立的sink執行緒中執行,如果還按照上面的方式斷點,估計找一輩子都找不到了

public AsyncWaitOperator(
            @Nonnull AsyncFunction<IN, OUT> asyncFunction,
            long timeout,
            int capacity,
            @Nonnull AsyncDataStream.OutputMode outputMode,
            @Nonnull ProcessingTimeService processingTimeService,
            @Nonnull MailboxExecutor mailboxExecutor) {
    super(asyncFunction);

    setChainingStrategy(ChainingStrategy.ALWAYS);

    Preconditions.checkArgument(
            capacity > 0, "The number of concurrent async operation should be greater than 0.");
    this.capacity = capacity;

    this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

    this.timeout = timeout;

    this.processingTimeService = Preconditions.checkNotNull(processingTimeService);

    this.mailboxExecutor = mailboxExecutor;
}

我們看一下構造方法的內容,發現都是一些初始化操作,看著沒啥營養,看程式碼的另外一個技巧:抓大放小,路邊的野花不要理睬,忽略一些不重要的初始化和引數校驗等程式碼,重點關注大的流程的東西。

我們繼續直接放開往下執行,直到下一個斷點

@Override
    public void setup(
            StreamTask<?, ?> containingTask,
            StreamConfig config,
            Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);

        this.inStreamElementSerializer =
                new StreamElementSerializer<>(
                        getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));

        switch (outputMode) {
            case ORDERED:
                queue = new OrderedStreamElementQueue<>(capacity);
                break;
            case UNORDERED:
                queue = new UnorderedStreamElementQueue<>(capacity);
                break;
            default:
                throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
        }

        this.timestampedCollector = new TimestampedCollector<>(super.output);
    }

一眼望去就發現下面switch case那裡比較有用,根據名字可以知道,這裡根據outputMode判斷分別範例化有序的佇列和無需的佇列,聯想到AsyncDataStream類裡的幾個orderedWait和unorderedWait方法,很快就能想到是否有序這個佇列就是關鍵了。好了沒什麼可以留戀了,繼續執行到下一個斷點吧!

初始化狀態,沒啥可留戀的,先跳過繼續到下一個斷點

@Override
    public void open() throws Exception {
        super.open();

        this.isObjectReuseEnabled = getExecutionConfig().isObjectReuseEnabled();

        if (recoveredStreamElements != null) {
            for (StreamElement element : recoveredStreamElements.get()) {
                if (element.isRecord()) {
                    processElement(element.<IN>asRecord());
                } else if (element.isWatermark()) {
                    processWatermark(element.asWatermark());
                } else if (element.isLatencyMarker()) {
                    processLatencyMarker(element.asLatencyMarker());
                } else {
                    throw new IllegalStateException(
                            "Unknown record type "
                                    + element.getClass()
                                    + " encountered while opening the operator.");
                }
            }
            recoveredStreamElements = null;
        }
    }

如上從7行開始貌似是開始處理資料了,但是根據recoveredStreamElements這個名稱一看,很顯然recovered是恢復的意思,這裡判斷是否為空,不為空再做,很明顯是做修復資料相關的邏輯,我們處理資料的正主都沒找到這裡很明顯沒啥意義,屬於路邊的野花,直接忽略到下一個斷點去。

@Override
public void processElement(StreamRecord<IN> record) throws Exception {
    StreamRecord<IN> element;
    // copy the element avoid the element is reused
    if (isObjectReuseEnabled) {
        //noinspection unchecked
        element = (StreamRecord<IN>) inStreamElementSerializer.copy(record);
    } else {
        element = record;
    }

    // add element first to the queue
    final ResultFuture<OUT> entry = addToWorkQueue(element);

    final ResultHandler resultHandler = new ResultHandler(element, entry);

    // register a timeout for the entry if timeout is configured
    if (timeout > 0L) {
        resultHandler.registerTimeout(getProcessingTimeService(), timeout);
    }

    userFunction.asyncInvoke(element.getValue(), resultHandler);
}

很明顯根據方法名稱可以知道這裡就是在處理真正的資料了,反覆斷點幾次,可以發現,每一條資料都會進來這個方法一次

這個方法的引數就是source流裡的一個元素,下面我們再看一下addToWorkQueue方法吧

/**
	將給定的流元素新增到操作符的流元素佇列中。該操作會阻塞,直到元素被新增。
*/
private ResultFuture<OUT> addToWorkQueue(StreamElement streamElement)
            throws InterruptedException {

    Optional<ResultFuture<OUT>> queueEntry;
    while (!(queueEntry = queue.tryPut(streamElement)).isPresent()) {
        mailboxExecutor.yield();
    }

    return queueEntry.get();
}

這個方法就是將前面source裡的元素放入前面new出來的佇列,本例這裡是一個有序的佇列OrderedStreamElementQueue,並返回了一個ResultFuture物件,我們需要看一下這個物件是個啥

4.3、ResultFuture

@PublicEvolving
public interface ResultFuture<OUT> {
    /**
     * 將所有結果放在Collection中,然後輸出。
     */
    void complete(Collection<OUT> result);

    /**
     * 將異常輸出
     */
    void completeExceptionally(Throwable error);
}

我們再來看下tryPut是如何包裝出了一個ResultFuture物件的

4.4、OrderedStreamElementQueue

@Internal
public final class OrderedStreamElementQueue<OUT> implements StreamElementQueue<OUT> {

    private static final Logger LOG = LoggerFactory.getLogger(OrderedStreamElementQueue.class);

    /** Capacity of this queue. */
    private final int capacity;

    /** Queue for the inserted StreamElementQueueEntries. */
    private final Queue<StreamElementQueueEntry<OUT>> queue;

    public OrderedStreamElementQueue(int capacity) {
        Preconditions.checkArgument(capacity > 0, "The capacity must be larger than 0.");

        this.capacity = capacity;
        this.queue = new ArrayDeque<>(capacity);
    }

    @Override
    public boolean hasCompletedElements() {
        return !queue.isEmpty() && queue.peek().isDone();
    }

    @Override
    public void emitCompletedElement(TimestampedCollector<OUT> output) {
        if (hasCompletedElements()) {
            final StreamElementQueueEntry<OUT> head = queue.poll();
            head.emitResult(output);
        }
    }

    @Override
    public List<StreamElement> values() {
        List<StreamElement> list = new ArrayList<>(this.queue.size());
        for (StreamElementQueueEntry e : queue) {
            list.add(e.getInputElement());
        }
        return list;
    }

    @Override
    public boolean isEmpty() {
        return queue.isEmpty();
    }

    @Override
    public int size() {
        return queue.size();
    }

    @Override
    public Optional<ResultFuture<OUT>> tryPut(StreamElement streamElement) {
        if (queue.size() < capacity) {
            StreamElementQueueEntry<OUT> queueEntry = createEntry(streamElement);

            queue.add(queueEntry);

            LOG.debug(
                    "Put element into ordered stream element queue. New filling degree "
                            + "({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.of(queueEntry);
        } else {
            LOG.debug(
                    "Failed to put element into ordered stream element queue because it "
                            + "was full ({}/{}).",
                    queue.size(),
                    capacity);

            return Optional.empty();
        }
    }

    private StreamElementQueueEntry<OUT> createEntry(StreamElement streamElement) {
        if (streamElement.isRecord()) {
            return new StreamRecordQueueEntry<>((StreamRecord<?>) streamElement);
        }
        if (streamElement.isWatermark()) {
            return new WatermarkQueueEntry<>((Watermark) streamElement);
        }
        throw new UnsupportedOperationException("Cannot enqueue " + streamElement);
    }
}

我們重點關注一下52行以下的部分,可以看到new了一個StreamElementQueueEntry物件放入了queue佇列中,那就需要看一下StreamRecordQueueEntry類了

4.5、StreamRecordQueueEntry

@Internal
class StreamRecordQueueEntry<OUT> implements StreamElementQueueEntry<OUT> {
    @Nonnull private final StreamRecord<?> inputRecord;

    private Collection<OUT> completedElements;

    StreamRecordQueueEntry(StreamRecord<?> inputRecord) {
        this.inputRecord = Preconditions.checkNotNull(inputRecord);
    }

    @Override
    public boolean isDone() {
        return completedElements != null;
    }

    @Nonnull
    @Override
    public StreamRecord<?> getInputElement() {
        return inputRecord;
    }

    @Override
    public void emitResult(TimestampedCollector<OUT> output) {
        output.setTimestamp(inputRecord);
        for (OUT r : completedElements) {
            output.collect(r);
        }
    }

    @Override
    public void complete(Collection<OUT> result) {
        this.completedElements = Preconditions.checkNotNull(result);
    }
}

如上之後,現在已經可以有一個大概的認識了,就是隨著程式的執行,一個個的資料被封裝成了StreamRecordQueueEntry物件,並阻塞的放入了OrderedStreamElementQueue佇列中了,這個佇列中的StreamRecordQueueEntry物件有一些方法標誌性的方法,如:isDone,根據名字就可以知道是否完成的意思;emitResult方法如果寫過flink程式的人一看到output.collect(r)這種程式碼就知道是向下遊發出資料的方法;complete方法字母意思就是一個完成動作方法,內容就是把傳入的資料判空後賦給了成員變數completedElements。

我們繼續回到processElement方法的主線上來,

// 首先將元素新增到佇列中
final ResultFuture<OUT> entry = addToWorkQueue(element);
final ResultHandler resultHandler = new ResultHandler(element, entry);

// 如果設定了timeout,則為條目註冊一個超時,這裡的timeout也就是測試程式碼裡的10s
if (timeout > 0L) {
    resultHandler.registerTimeout(getProcessingTimeService(), timeout);
}

userFunction.asyncInvoke(element.getValue(), resultHandler);

關注上面的最後一行,執行了asyncInvoke方法,也就回到了測試程式碼裡覆寫的asyncInvoke方法裡了

/**
 * 使用AsyncDataStream構造一個非同步順序流,這裡非同步順序流從名字就可以看出來雖然是非同步的,但是卻可以保持順序,
 * 這個後面原始碼分析可以知道原因
 */
SingleOutputStreamOperator asyncStream = AsyncDataStream.orderedWait(stream, new AsyncFunction<String, String>() {
    @Override
    public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
        /**
         * 這裡呼叫模擬的獲取非同步請求結果,並返回一個CompletableFuture
         */
        CompletableFuture<String> future = new AsyncIODemo().pullData(input);
        /**
         * 註冊一個future處理完成的回撥,當future處理完成拿到結果後,呼叫resultFuture的
         * complete方法真正吐出資料
         */
        future.whenCompleteAsync((d,t) ->{
            resultFuture.complete(Arrays.asList(d));
        });
    }
    // 設定最長非同步呼叫超時時間為10秒
}, 10, TimeUnit.SECONDS);

這時候我們可以打個斷點到如上測試程式碼的17行上,然後執行進入方法,可以看到實際上回到了org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.ResultHandler這個內部類裡的complete方法

private void outputCompletedElement() {
    /**
    	判斷這個OrderedStreamElementQueue佇列有沒有完成了的元素,參見上面程式碼
    	@Override
        public boolean hasCompletedElements() {
            return !queue.isEmpty() && queue.peek().isDone();
        }
    	其實就是檢視了一下佇列頭的元素StreamRecordQueueEntry,呼叫了一下isDone方法
    	@Override
        public boolean isDone() {
            return completedElements != null;
        }
    	就是判斷成員變數是不是空,因為上一步已經賦值了,所以這裡isDone就返回true了
    	
	*/
    if (queue.hasCompletedElements()) {
        /**
        	呼叫了一下OrderedStreamElementQueue佇列的emitCompletedElement方法,
        	
        	@Override
            public void emitCompletedElement(TimestampedCollector<OUT> output) {
                if (hasCompletedElements()) {
                    final StreamElementQueueEntry<OUT> head = queue.poll();
                    head.emitResult(output);
                }
            }
			移除佇列的頭元素StreamElementQueueEntry,並呼叫其emitResult方法
            @Override
            public void emitResult(TimestampedCollector<OUT> output) {
                output.setTimestamp(inputRecord);
                for (OUT r : completedElements) {
                    output.collect(r);
                }
            }
        	這裡就是真正的迴圈呼叫collect把資料吐出到下游去了
    	*/
        queue.emitCompletedElement(timestampedCollector);
        // if there are more completed elements, emit them with subsequent mails
        if (queue.hasCompletedElements()) {
            try {
                mailboxExecutor.execute(
                        this::outputCompletedElement,
                        "AsyncWaitOperator#outputCompletedElement");
            } catch (RejectedExecutionException mailboxClosedException) {
                // This exception can only happen if the operator is cancelled which means all
                // pending records can be safely ignored since they will be processed one more
                // time after recovery.
                LOG.debug(
                        "Attempt to complete element is ignored since the mailbox rejected the execution.",
                        mailboxClosedException);
            }
        }
    }
}

/** A handler for the results of a specific input record. */
private class ResultHandler implements ResultFuture<OUT> {
    /** Optional timeout timer used to signal the timeout to the AsyncFunction. */
    private ScheduledFuture<?> timeoutTimer;
    /** Record for which this result handler exists. Used only to report errors. */
    private final StreamRecord<IN> inputRecord;
    /**
     * The handle received from the queue to update the entry. Should only be used to inject the
     * result; exceptions are handled here.
     */
    private final ResultFuture<OUT> resultFuture;
    /**
     * A guard against ill-written AsyncFunction. Additional (parallel) invokations of {@link
     * #complete(Collection)} or {@link #completeExceptionally(Throwable)} will be ignored. This
     * guard also helps for cases where proper results and timeouts happen at the same time.
     */
    private final AtomicBoolean completed = new AtomicBoolean(false);

    ResultHandler(StreamRecord<IN> inputRecord, ResultFuture<OUT> resultFuture) {
        this.inputRecord = inputRecord;
        this.resultFuture = resultFuture;
    }

    @Override
    public void complete(Collection<OUT> results) {
        Preconditions.checkNotNull(
                results, "Results must not be null, use empty collection to emit nothing");

        // cas修改一下completed的狀態,不成功就返回
        if (!completed.compareAndSet(false, true)) {
            return;
        }

        processInMailbox(results);
    }

    private void processInMailbox(Collection<OUT> results) {
        // move further processing into the mailbox thread
        mailboxExecutor.execute(
                () -> processResults(results),
                "Result in AsyncWaitOperator of input %s",
                results);
    }

    private void processResults(Collection<OUT> results) {
        /**
        	如果超時的Timer物件不為空,則將定時任務取消掉,因為這裡已經是在完成方法裡呼叫了,
        	資料都完成處理了,這個資料的超時任務就可以取消了
        */
        if (timeoutTimer != null) {
            // canceling in mailbox thread avoids
            // https://issues.apache.org/jira/browse/FLINK-13635
            timeoutTimer.cancel(true);
        }

        /**
        	這裡呼叫了一下StreamRecordQueueEntry的complete方法將成員變數completedElements
			賦值了,可以參見上面StreamRecordQueueEntry類
        */
        resultFuture.complete(results);
        // 這裡看上面第1行程式碼
        outputCompletedElement();
    }


}

我們可以從上面的ResultHandler類的complete方法開始看,具體可以參見上面註釋,總結起來就是如下幾步

  1. 取消當前ResultHandler物件的超時定時任務
  2. 呼叫StreamRecordQueueEntry的complete方法將成員變數completedElements賦值
  3. 判斷OrderedStreamElementQueue佇列的隊頭元素StreamRecordQueueEntry的completedElements成員變數是不是不為空
  4. 第3步不為空,則呼叫OrderedStreamElementQueue佇列的emitCompletedElement方法移除佇列的頭元素StreamElementQueueEntry並呼叫emitResult方法真正向下游吐出資料

從上面可以看出每次隨著completableFuture的complete方法的呼叫,都會判斷隊頭的元素是否處理完,處理完就移除隊頭元素並向吐出資料。所以非同步運算元每次來資料經過processElement方法,就已經將資料元素封裝成StreamElementQueueEntry物件並放到了佇列中,雖然非同步運算元執行過程是非同步,每個元素的完成時間沒有順序,但是由於每個元素完成後,都是判斷的隊頭元素有沒有完成,完成後也是移除隊頭並向下遊吐資料。所以整體過程還是按照processElement處理順序也就是上游給過來的資料順序嚴格有序的。

5、flinksql自定義AsyncLookupFunction

通常flinksql使用外部的資料來源都需要引入一個flinksql-connector-xx這種jar包,比如我們想以kafka為流表join一個redis的維表,那麼這時候查詢redis的維表,通常使用的就是lookup join。但是網上提供的例子基本都是同步的lookup join,在有些場景下為了提高吞吐就需要使用非同步的lookup join。詳細實現可以直接看程式碼:https://gitee.com/rongdi/flinksql-connector-redis