RxPY - 与主题一起工作
主题是一个可观察的序列,也是一个可以多播的观察者,即与许多已订阅的观察者交谈。
我们将讨论以下主题 -
- 创建主题
- 订阅主题
- 将数据传递给主体
- Behave主体
- 重播主题
- 异步主题
创建主题
要使用主题,我们需要导入主题,如下所示 -
from rx.subject import Subject
您可以按如下方式创建主客体 -
subject_test = Subject()
该对象是一个具有三种方法的观察者 -
- on_next(值)
- on_error(错误)和
- on_completed()
订阅主题
您可以对该主题创建多个订阅,如下所示 -
subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.subscribe( lambda x: print("The value is {0}".format(x)) )
将数据传递给主体
您可以将数据传递给使用 on_next(value) 方法创建的主题,如下所示 -
subject_test.on_next("A") subject_test.on_next("B")
数据将传递给所有订阅者,添加到主题上。
这是该主题的一个工作示例。
例子
from rx.subject import Subject subject_test = Subject() subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.on_next("A") subject_test.on_next("B")
subject_test 对象是通过调用Subject() 创建的。subject_test 对象引用了 on_next(value)、on_error(error) 和 on_completed() 方法。上述示例的输出如下所示 -
输出
E:\pyrx>python testrx.py The value is A The value is A The value is B The value is B
我们可以使用 on_completed() 方法来停止主题执行,如下所示。
例子
from rx.subject import Subject subject_test = Subject() subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.subscribe( lambda x: print("The value is {0}".format(x)) ) subject_test.on_next("A") subject_test.on_completed() subject_test.on_next("B")
一旦我们调用complete,后面调用的下一个方法就不会被调用。
输出
E:\pyrx>python testrx.py The value is A The value is A
现在让我们看看如何调用 on_error(error) 方法。
例子
from rx.subject import Subject subject_test = Subject() subject_test.subscribe( on_error = lambda e: print("Error : {0}".format(e)) ) subject_test.subscribe( on_error = lambda e: print("Error : {0}".format(e)) ) subject_test.on_error(Exception('There is an Error!'))
输出
E:\pyrx>python testrx.py Error: There is an Error! Error: There is an Error!
Behave主体
BehaviourSubject 会在调用时为您提供最新值。您可以创建Behave主体,如下所示 -
from rx.subject import BehaviorSubject behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
这是一个使用Behave主题的工作示例
例子
from rx.subject import BehaviorSubject behavior_subject = BehaviorSubject("Testing Behaviour Subject"); behavior_subject.subscribe( lambda x: print("Observer A : {0}".format(x)) ) behavior_subject.on_next("Hello") behavior_subject.subscribe( lambda x: print("Observer B : {0}".format(x)) ) behavior_subject.on_next("Last call to Behaviour Subject")
输出
E:\pyrx>python testrx.py Observer A : Testing Behaviour Subject Observer A : Hello Observer B : Hello Observer A : Last call to Behaviour Subject Observer B : Last call to Behaviour Subject
重播主题
重放主体类似于Behave主体,其中,它可以缓冲值并将其重放给新订阅者。这是重播主题的工作示例。
例子
from rx.subject import ReplaySubject replay_subject = ReplaySubject(2) replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x))) replay_subject.on_next(1) replay_subject.on_next(2) replay_subject.on_next(3) replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x))); replay_subject.on_next(5)
重放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于呼叫的新订阅者。
输出
E:\pyrx>python testrx.py Testing Replay Subject A: 1 Testing Replay Subject A: 2 Testing Replay Subject A: 3 Testing Replay Subject B: 2 Testing Replay Subject B: 3 Testing Replay Subject A: 5 Testing Replay Subject B: 5
异步主题
对于AsyncSubject,最后调用的值将传递给订阅者,并且只有在调用complete() 方法后才会完成。
例子
from rx.subject import AsyncSubject async_subject = AsyncSubject() async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x))) async_subject.on_next(1) async_subject.on_next(2) async_subject.on_completed() async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x))) Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
输出
E:\pyrx>python testrx.py Testing Async Subject A: 2 Testing Async Subject B: 2