首先我提出來兩個問題:
1.Rxjava是如何做到執行緒切換的
2.執行緒切換我多次呼叫subscribeOn和多次呼叫observeOn,對資料流有什麼影響
對Rxjava的資料處理流程不是很清楚的,可以看我上一篇文章
資料處理及訂閱流程分析
下面這段程式碼相信很多人都寫過
upstream.subscribeOn(SchedulerProvider.net())
.observeOn(AndroidSchedulers.mainThread());
上游的Observable進行執行緒切換在子執行緒發射,然後訂閱者程式碼執行到主執行緒。
接下來我們看一下subscribeOn的原始碼
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
建立了一個ObservableSubscribeOn並返回,把自身與scheduler通過構造方法傳入到ObservableSubscribeOn。顯然ObservableSubscribeOn一定繼承了Observable.這裡我們發現,向下遊傳遞的Observable已經被替換成了ObservableSubscribeOn.上游的Observable被ObservableSubscribeOn接管了,又是代理模式的應用。分析ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
構造方法裡把上游的observable賦值到自己的source成員變數,同時把Scheduler儲存到自己的成員變數。然後唯一的功能方法subscribeActual(observer),非常得簡潔。把下游的observer通過SubscribeOnObserver代理,然後呼叫parent的setDisposable方法,引數為scheduler.scheduleDirect(new SubscribeTask(parent))。
我們一看SubscribeOnObserver這個observer的原始碼便能清楚是什麼鬼了
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
沒幾行程式碼,也很簡潔,關注一下setDisposable(Disposable )方法,其實就是把scheduler返回的Disposable賦值給自己。也就是說ObservableSubscribeOn可以dispose,scheduler返回的那個disposable.通過我的上篇文章資料處理及訂閱流程分析我們知道ObservableSubscribeOn的subscribeActual方法執行發生在資料來源被訂閱的時候。也就是說訂閱的時候把下游的observer包裝為一個SubscribeTask,在scheduler的scheduleDirect方法中執行。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
這裡的source就是上游的Observable,上游的observable的subscribe方法在run方法中執行。scheduler作為後續文章講,它的scheduleDirect方法就是把SubscribeTask執行在子執行緒。
再梳理一遍:當最下游的observable發生訂閱的時候 -->會呼叫ObservableSubscribeOn的subscribeActual(observer)方法,這裡的引數就是我們下游的observer,這個observer被包裝後作為一個task,上游的source.subscribe(parent)方法執行在子執行緒,parent範例為SubscribeOnObserver,,上游source.subscribe方法內部實現,都會想方設法執行parent的onNext,onComplete等方法,就達成了包裝類SubscribeOnObserver(即parent)這些方法在子執行緒執行,同時返回一個disposable,這個disposable給Observer用來取消訂閱。又有些饒了,其實一個時序圖就能說明一切。點選檢視大圖
建議讀者跟著原始碼一起看,思考一下,就明白了。
回答第二個問題,多次到用subscribeOn會怎樣?subscribeOn會把下游的Observer方法的onNext,onComplete,onError方法包裝在SubscribeOnObserver方法中,這些方法會在訂閱時執行在子執行緒的task中,多次呼叫會多次包裝下游的Observer,進行執行緒切換到子執行緒。那多次切換最終會切換成啥樣呢?其實仔細想一下,答案就在這段程式碼中
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
下游的subscrbeActual先執行,當然最靠近上游的,是程式最後一次執行subscrbeActual方法啦,就是第一次呼叫subscribeOn在哪個執行緒中執行,最終結果會在哪個執行緒中執行。這個要牢記下游的subscribeActual最先執行就能知道結果了。因為每次subscribeActual都會切換執行緒(僅針對ObservableSubscribeOn裡的方法)。
接下來分析observerOn方法
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
建立一個 ObservableObserveOn返回,老樣子建立了一個新的Observable向下傳遞,看ObservableSubscribeOn原始碼
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
構造方法與上面分析相似 ,直接看subscribeActual方法,暫時不關心scheduler instanceof TrampolineScheduler的情況,接下來就是上游Observable source訂閱ObserveOnObserver,看ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Throwable error;
volatile boolean done;
volatile boolean disposed;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
upstream.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return disposed;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
....省略一些
}
程式碼稍微有些多,看schedule()方法,worker.schedule(this);拿worker執行自己的runable方法,runable方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainFused();
}
}
有興趣的可以分析drainFused,drainFused,我在以後的文章中會單獨分析這drain相關方法。其實它就是呼叫了下游對應的onNext,onCompete,onError等方法,
梳理一下:也就是說我們每次呼叫observeOn(scheduler)方法,都會包裝一個新的Observable向下傳遞,同時把下游的Observer包裝成ObserveOnObserver,又是代理,通過它裡面的worker在上游的Observer執行onNext等方法,在ObserveOnObserver的onNext做執行緒切換,呼叫下游的observer的onNext方法,從而實現執行緒切換。
那麼多次呼叫observeOn方法時,會怎樣呢?思考一下,其實很容易了,Observer的發射方法呼叫順序是從上游,一直傳遞到包裝下游,也就是說最後一次執行執行緒切換有效,當然中間的執行緒切換也有效果,只是最後一次Observer為我們傳遞的Observer而已,所以我們的Observer方法的onNext相關方法在哪個執行緒,就取決於最後一次呼叫了。
備註:Observer的onSubscribe方法,是從上游向下遊呼叫的,這一點恰好跟subscribeOnActual相反。仔細想一下就知道了。其實說一千道一萬,也沒有自己梳理一遍強。