RxPY - 使用可观察量
observable 是一个创建观察者并将其附加到需要值的源的函数,例如,来自 dom 元素的单击、鼠标事件等。
本章将详细研究下面提到的主题。
创建可观察对象
订阅并执行 Observable
创建可观察量
为了创建一个 observable,我们将使用create()方法并将具有以下项目的函数传递给它。
on_next() - 当 Observable 发出一个项目时,该函数被调用。
on_completed() - 当 Observable 完成时调用此函数。
on_error() - 当 Observable 发生错误时调用此函数。
要使用 create() 方法,首先导入该方法,如下所示 -
from rx import create
这是一个创建可观察对象的工作示例 -
testrx.py
from rx import create deftest_observable(observer, scheduler): observer.on_next("Hello") observer.on_error("Error") observer.on_completed() source = create(test_observable).
订阅并执行 Observable
要订阅可观察对象,我们需要使用 subscribe() 函数并传递回调函数 on_next、on_error 和 on_completed。
这是一个工作示例 -
testrx.py
from rx import create deftest_observable(observer, scheduler): observer.on_next("Hello") observer.on_completed() source = create(test_observable) source.subscribe( on_next = lambda i: print("Got - {0}".format(i)), on_error = lambda e: print("Error : {0}".format(e)), on_completed = lambda: print("Job Done!"), )
subscribe() 方法负责执行 observable。回调函数on_next、on_error和on_completed必须传递给 subscribe 方法。调用 subscribe 方法,依次执行 test_observable() 函数。
并不强制将所有三个回调函数传递给 subscribe() 方法。您可以根据您的要求传递 on_next()、on_error() 和 on_completed()。
lambda 函数用于 on_next、on_error 和 on_completed。它将接受参数并执行给定的表达式。
这是创建的可观察值的输出 -
E:\pyrx>python testrx.py Got - Hello Job Done!