Rxjava執行緒切換原始碼分析

2020-10-15 13:01:22

 首先我提出來兩個問題:
1.Rxjava是如何做到執行緒切換的
2.執行緒切換我多次呼叫subscribeOn和多次呼叫observeOn,對資料流有什麼影響
對Rxjava的資料處理流程不是很清楚的,可以看我上一篇文章
資料處理及訂閱流程分析

下面這段程式碼相信很多人都寫過

  upstream.subscribeOn(SchedulerProvider.net())
          .observeOn(AndroidSchedulers.mainThread());

上游的Observable進行執行緒切換在子執行緒發射,然後訂閱者程式碼執行到主執行緒。
接下來我們看一下subscribeOn的原始碼

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

建立了一個ObservableSubscribeOn並返回,把自身與scheduler通過構造方法傳入到ObservableSubscribeOn。顯然ObservableSubscribeOn一定繼承了Observable.這裡我們發現,向下遊傳遞的Observable已經被替換成了ObservableSubscribeOn.上游的Observable被ObservableSubscribeOn接管了,又是代理模式的應用。分析ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

構造方法裡把上游的observable賦值到自己的source成員變數,同時把Scheduler儲存到自己的成員變數。然後唯一的功能方法subscribeActual(observer),非常得簡潔。把下游的observer通過SubscribeOnObserver代理,然後呼叫parent的setDisposable方法,引數為scheduler.scheduleDirect(new SubscribeTask(parent))。
我們一看SubscribeOnObserver這個observer的原始碼便能清楚是什麼鬼了

   static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) {
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

沒幾行程式碼,也很簡潔,關注一下setDisposable(Disposable )方法,其實就是把scheduler返回的Disposable賦值給自己。也就是說ObservableSubscribeOn可以dispose,scheduler返回的那個disposable.通過我的上篇文章資料處理及訂閱流程分析我們知道ObservableSubscribeOn的subscribeActual方法執行發生在資料來源被訂閱的時候。也就是說訂閱的時候把下游的observer包裝為一個SubscribeTask,在scheduler的scheduleDirect方法中執行。

 final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

 這裡的source就是上游的Observable,上游的observable的subscribe方法在run方法中執行。scheduler作為後續文章講,它的scheduleDirect方法就是把SubscribeTask執行在子執行緒。
 再梳理一遍:當最下游的observable發生訂閱的時候 -->會呼叫ObservableSubscribeOn的subscribeActual(observer)方法,這裡的引數就是我們下游的observer,這個observer被包裝後作為一個task,上游的source.subscribe(parent)方法執行在子執行緒,parent範例為SubscribeOnObserver,,上游source.subscribe方法內部實現,都會想方設法執行parent的onNext,onComplete等方法,就達成了包裝類SubscribeOnObserver(即parent)這些方法在子執行緒執行,同時返回一個disposable,這個disposable給Observer用來取消訂閱。又有些饒了,其實一個時序圖就能說明一切。點選檢視大圖
在這裡插入圖片描述
 建議讀者跟著原始碼一起看,思考一下,就明白了。
 回答第二個問題,多次到用subscribeOn會怎樣?subscribeOn會把下游的Observer方法的onNext,onComplete,onError方法包裝在SubscribeOnObserver方法中,這些方法會在訂閱時執行在子執行緒的task中,多次呼叫會多次包裝下游的Observer,進行執行緒切換到子執行緒。那多次切換最終會切換成啥樣呢?其實仔細想一下,答案就在這段程式碼中

 @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

下游的subscrbeActual先執行,當然最靠近上游的,是程式最後一次執行subscrbeActual方法啦,就是第一次呼叫subscribeOn在哪個執行緒中執行,最終結果會在哪個執行緒中執行。這個要牢記下游的subscribeActual最先執行就能知道結果了。因為每次subscribeActual都會切換執行緒(僅針對ObservableSubscribeOn裡的方法)。
 接下來分析observerOn方法

        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

建立一個 ObservableObserveOn返回,老樣子建立了一個新的Observable向下傳遞,看ObservableSubscribeOn原始碼

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
 }

構造方法與上面分析相似 ,直接看subscribeActual方法,暫時不關心scheduler instanceof TrampolineScheduler的情況,接下來就是上游Observable source訂閱ObserveOnObserver,看ObserveOnObserver

 static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;

        Throwable error;
        volatile boolean done;

        volatile boolean disposed;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        downstream.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        downstream.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                downstream.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                upstream.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
         ....省略一些
    }

程式碼稍微有些多,看schedule()方法,worker.schedule(this);拿worker執行自己的runable方法,runable方法

  @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainFused();
            }
        }

有興趣的可以分析drainFused,drainFused,我在以後的文章中會單獨分析這drain相關方法。其實它就是呼叫了下游對應的onNext,onCompete,onError等方法,
梳理一下:也就是說我們每次呼叫observeOn(scheduler)方法,都會包裝一個新的Observable向下傳遞,同時把下游的Observer包裝成ObserveOnObserver,又是代理,通過它裡面的worker在上游的Observer執行onNext等方法,在ObserveOnObserver的onNext做執行緒切換,呼叫下游的observer的onNext方法,從而實現執行緒切換。
那麼多次呼叫observeOn方法時,會怎樣呢?思考一下,其實很容易了,Observer的發射方法呼叫順序是從上游,一直傳遞到包裝下游,也就是說最後一次執行執行緒切換有效,當然中間的執行緒切換也有效果,只是最後一次Observer為我們傳遞的Observer而已,所以我們的Observer方法的onNext相關方法在哪個執行緒,就取決於最後一次呼叫了。
備註:Observer的onSubscribe方法,是從上游向下遊呼叫的,這一點恰好跟subscribeOnActual相反。仔細想一下就知道了。其實說一千道一萬,也沒有自己梳理一遍強。