Observable
(可觀察物件),是RxJS
庫裡面的一個物件,可以用來處理非同步事件,例如HTTP請求(實際上,在Angular中,所有的HTTP請求返回的都是Observable)。【相關教學推薦:《》】
或許,你以前接觸過一個叫promise
的東西,它們本質上面是相同的:都是生產者主動向消費者「push」產品,而消費者是被動接收的,但是他們兩者還是有很大區別的:Observable
可以傳送任意多值,並且,在被訂閱之前,它是不會執行的!這是promise
不具備的特點。
Observable
用於在傳送方和接收方之間傳輸訊息,你可以將這些訊息看成是流Observable
物件時,需要傳入一個函數作為建構函式的引數,這個函數叫訂閱者函數,這個函數也就是生產者向消費者推播訊息的地方subscribe
(訂閱)之前,訂閱者函數不會被執行,直到subscribe()
函數被呼叫,該函數返回一個subscription
物件,裡面有一個unsubscribe()
函數,消費者可以隨時拒絕訊息的接收!subscribe()
函數接收一個observer(觀察者)
物件作為入參有了可觀察物件(傳送方)
,就需要一個觀察者(接收方)
來觀察可觀察物件,觀察者要實現observer
介面,它是一個物件,其中包含三個屬性,它們都是函數,如下:
通知型別 | 說明 |
---|---|
next | 必要。以接收的值作為入參,在正常情況下執行。可能執行零次或多次。 |
error | 可選。出錯的情況下執行。錯誤會中斷這個可觀察物件範例的執行過程。 |
complete | 可選。傳輸完成的情況下執行。 |
只有當有人訂閱 Observable
的範例時,它才會開始釋出值。 訂閱時要先呼叫可觀察物件的 subscribe()
方法,並把一個觀察者物件傳給它,用來接收通知。如下:
為了展示訂閱的原理,需要先建立新的可觀察物件。它有一個建構函式可以用來建立新範例,但是為了更簡明,也可以使用
Observable
上定義的一些靜態方法來建立一些常用的簡單可觀察物件:
of(...items)
:返回一個Observable
範例,它用同步的方式把引數中提供的這些值一個一個
傳送出來。from(iterable)
: 把它的引數轉換成一個Observable
範例。 該方法通常用於把一個陣列轉換成一個(傳送多個值的)可觀察物件。
import { of } from "rxjs"; // 1、通過 of() 方法返回一個可觀察物件,並準備將1,2,3三個資料傳送出去 const observable = of(1, 2, 3); // 2、實現 observer 介面,觀察者 const observer = { next: (num: number) => console.log(num), error: (err: Error) => console.error('Observer got an error: ' + err), complete: () => console.log('Observer got a complete notification'), } // 3、訂閱。呼叫可觀察物件的 subscribe() 方法訂閱,subscribe() 方法中傳入的物件就是一個觀察者 observable.subscribe(observer);
執行結果如下:
上面訂閱的寫法可以直接改為如下:引數不是物件
observable.subscribe( num => console.log(num), err => console.error('Observer got an error: ' + err), () => console.log('Observer got a complete notification') );
在上面的例子中使用的是of()
方法來建立可觀察物件,這節使用建構函式建立可觀察物件。
Observable
建構函式可以建立任何型別的可觀察流。 當執行可觀察物件的subscribe()
方法時,這個建構函式就會把它接收到的引數作為訂閱函數
來執行。 訂閱函數會接收一個Observer
物件,並把值釋出給觀察者的next()
方法。
// 1、自定義訂閱者函數 function sequenceSubscriber(observer: Observer<number>) { observer.next(1); // 傳送資料 observer.next(2); // 傳送資料 observer.next(3); // 傳送資料 observer.complete(); return {unsubscribe() {}}; } // 2、通過建構函式建立一個新的可觀察物件,引數就是一個訂閱者函數 const sequence = new Observable(sequenceSubscriber); // 3、訂閱 sequence.subscribe({ next(num) { console.log(num); }, // 接受資料 complete() { console.log('Finished sequence'); } });
執行結果如下:
上面一個例子演示瞭如何自定義訂閱函數,那麼既然可以自定義訂閱者函數,我們就可以將非同步程式碼封裝進可觀察物件的訂閱者函數中,待非同步程式碼執行完再傳送資料。如下:
import { Observable } from 'rxjs' // 非同步函數 function fn(num) { return new Promise((reslove, reject) => { setTimeout(() => { num++ reslove(num) }, 1000) }) } // 建立可觀察物件,並傳入訂閱者函數 const observable = new Observable((x) => { let num = 1 fn(num).then( res => x.next(res) // 非同步程式碼執行完成,傳送資料 ) }) // 訂閱,接收資料,可以改為鏈式呼叫 observable.subscribe(data => console.log(data)) // 2
https://angular.cn/guide/observables#multicasting
我們可以使用一系列的RxJS操作符
,在這些訊息被接收方接收之前,對它們進行一系列的處理、轉換,因為這些操作符都是純函數。
import { of } from 'rxjs'; import { map } from 'rxjs/operators'; // 1、建立可觀察物件,並行送資料 const nums = of(1, 2, 3); // 2、建立函數以接受可觀察物件 const squareValues = map((val: number) => val * val); const squaredNums = squareValues(nums); squaredNums.subscribe(x => console.log(x));
上面的方式我看不懂且難以接受,一般常用下面這種,使用pipe
把多個操作符連結起來
import { map, Observable, filter } from 'rxjs' // 建立可觀察物件,並傳入訂閱者函數 const observable = new Observable((x) => { x.next(1) x.next(2) x.next(3) x.next(4) }).pipe( map(value => value*100), // 操作符 filter(value => value == 200) // 操作符 ) .subscribe(data => console.log(data)) // 200
RxJS
還提供了catchError
操作符,它允許你在管道中處理已知錯誤。
假設你有一個可觀察物件,它發起 API 請求,然後對伺服器返回的響應進行對映。如果伺服器返回了錯誤或值不存在,就會生成一個錯誤。如果你捕獲這個錯誤並提供了一個預設值,流就會繼續處理這些值,而不會報錯。如下:
import { map, Observable, filter, catchError, of } from 'rxjs' const observable = new Observable((x) => { x.next(1) // 傳送資料 1 和 2 x.next(2) }).pipe( map(value => { if (value === 1) { // 1、當傳送的資料為 1 時,將其乘以 100 return value*100 } else { // 2、否則丟擲錯誤 throw new Error('丟擲錯誤'); } }), // 3、此處捕獲錯誤並處理錯誤,對外傳送資料 0 catchError((err) => { console.log(err) return of(0) }) ) .subscribe( data => console.log(data), // 4、由於上面丟擲的錯誤被 catchError 操作符處理(重新傳送資料)了,所以這裡能順利訂閱到資料而不報錯 err => console.log('接受不到資料:', err) )
最後的執行結果如下:
更多程式設計相關知識,請存取:!!
以上就是淺析Angular中的可觀察物件、觀察者和RxJS操作符的詳細內容,更多請關注TW511.COM其它相關文章!