反應式程式設計


反應式程式設計是處理資料流和變化傳播的程式設計範例。 這意味著,當資料流由一個元件發出時,更改將通過反應式程式設計庫傳播到其他元件。變化的傳播將持續到最終的接收者。 事件驅動和反應式程式設計的區別在於,事件驅動式程式設計圍繞事件展開,反應式程式設計圍繞著資料展開。

ReactiveX或RX用於反應式程式設計
ReactiveX或者Raective Extension是反應式程式設計最著名的實現。 ReactiveX的工作取決於以下兩個類 -

可觀察的類
這個類是資料流或事件的來源,它打包傳入的資料,以便資料可以從一個執行緒傳遞到另一個執行緒。 在某些觀察者訂閱它之前,它不會提供資料。

觀察員類
該類使用observable發出的資料流。 可以有多個可觀察的觀察者,每個觀察者將接收每個發射的資料項。 觀察者可以通過訂閱可觀察到的三種型別的事件 -

  • on_next()事件 - 它意味著資料流中有一個元素。
  • on_completed()事件 - 它意味著排放已經結束,沒有更多資料項到來。
  • on_error()事件 - 它也意味著排放的結束,但在可觀察到丟擲錯誤的情況下。

RxPY - 用於反應式程式設計的Python模組

RxPY是一個Python模組,可用於反應式程式設計。 我們需要確保模組已安裝。 以下命令可用於安裝RxPY模組 -

pip install RxPY

例子
以下是一個Python指令碼,它使用RxPY模組及Observable類和Observe類來進行反應式程式設計。 基本上有兩類 -

  • get_strings() - 用於從觀察者獲取字串。
  • PrintObserver() - 用於從觀察者列印字串。 它使用觀察員班的所有三個事件。 它也使用subscribe()類。

參考以下實現程式碼 -

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

執行上面範例程式碼,得到以下結果 -

Received Ram
Received Mohan
Received Shyam
Finished

用於反應式程式設計的PyFunctional庫

PyFunctionalis是另一個可用於響應式程式設計的Python庫。 它使我們能夠使用Python程式設計語言建立功能程式。 這很有用,因為它允許我們通過使用鏈式函式操作符來建立資料管道。

RxPY和PyFunctional之間的區別

這兩個庫都用於響應式程式設計,並以類似的方式處理流,但兩者的主要區別取決於資料的處理。 RxPY處理系統中的資料和事件,而PyFunctional專注於使用函式式程式設計範例轉換資料。

安裝PyFunctional模組

需要在使用之前安裝這個模組。可以通過以下pip命令來安裝 -

pip install pyfunctional

例子
以下範例使用PyFunctional模組及其seq類,它們充當可以疊代和操作的流物件。 在這個程式中,它使用將每個值加倍的lamda函式對映序列,然後過濾x大於4的值,最後將序列減少為所有剩餘值的和。

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

執行上面範例程式碼,得到以下結果 -

Result: 6