RxPY - 使用调度程序的并发
RxPy 的一项重要特性是并发性,即允许任务并行执行。为了实现这一点,我们有两个运算符 subscribe_on() 和observe_on(),它们将与调度程序一起工作,这将决定订阅任务的执行。
这是一个工作示例,显示了对 subscibe_on()、observe_on() 和调度程序的需求。
例子
import random import time import rx from rx import operators as ops def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
在上面的例子中,我有2个任务:任务1和任务2。任务的执行是按顺序的。仅当第一个任务完成后,第二个任务才开始。
输出
E:\pyrx>python testrx.py From Task 1: 1 From Task 1: 2 From Task 1: 3 From Task 1: 4 From Task 1: 5 Task 1 complete From Task 2: 1 From Task 2: 2 From Task 2: 3 From Task 2: 4 Task 2 complete
RxPy支持许多Scheduler,在这里,我们将使用ThreadPoolScheduler。ThreadPoolScheduler 主要会尝试管理可用的 CPU 线程。
在我们之前看到的示例中,我们将使用一个多处理模块来提供 cpu_count。该计数将提供给 ThreadPoolScheduler,它将根据可用线程设法使任务并行工作。
这是一个工作示例 -
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) input("Press any key to exit\n")
在上面的示例中,我有 2 个任务,cpu_count 为 4。由于任务为 2,可用线程为 4,因此两个任务都可以并行启动。
输出
E:\pyrx>python testrx.py Cpu count is : 4 Press any key to exit From Task 1: 1 From Task 2: 1 From Task 1: 2 From Task 2: 2 From Task 2: 3 From Task 1: 3 From Task 2: 4 Task 2 complete From Task 1: 4 From Task 1: 5 Task 1 complete
如果您看到输出,则两个任务已并行启动。
现在,考虑一个场景,其中任务超过 CPU 数量,即 CPU 数量为 4,任务为 5。在这种情况下,我们需要检查任务完成后是否有线程获得空闲,这样,就可以分配给队列中可用的新任务。
为此,我们可以使用observe_on() 运算符,它将观察调度程序是否有空闲线程。这是一个使用observe_on()的工作示例
例子
import multiprocessing import random import time from threading import current_thread import rx from rx.scheduler import ThreadPoolScheduler from rx import operators as ops # calculate cpu count, using which will create a ThreadPoolScheduler thread_count = multiprocessing.cpu_count() thread_pool_scheduler = ThreadPoolScheduler(thread_count) print("Cpu count is : {0}".format(thread_count)) def adding_delay(value): time.sleep(random.randint(5, 20) * 0.1) return value # Task 1 rx.of(1,2,3,4,5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 1: {0}".format(s)), lambda e: print(e), lambda: print("Task 1 complete") ) # Task 2 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 2: {0}".format(s)), lambda e: print(e), lambda: print("Task 2 complete") ) #Task 3 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 3: {0}".format(s)), lambda e: print(e), lambda: print("Task 3 complete") ) #Task 4 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.subscribe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 4: {0}".format(s)), lambda e: print(e), lambda: print("Task 4 complete") ) #Task 5 rx.range(1, 5).pipe( ops.map(lambda a: adding_delay(a)), ops.observe_on(thread_pool_scheduler) ).subscribe( lambda s: print("From Task 5: {0}".format(s)), lambda e: print(e), lambda: print("Task 5 complete") ) input("Press any key to exit\n")
输出
E:\pyrx>python testrx.py Cpu count is : 4 From Task 4: 1 From Task 4: 2 From Task 1: 1 From Task 2: 1 From Task 3: 1 From Task 1: 2 From Task 3: 2 From Task 4: 3 From Task 3: 3 From Task 2: 2 From Task 1: 3 From Task 4: 4 Task 4 complete From Task 5: 1 From Task 5: 2 From Task 5: 3 From Task 3: 4 Task 3 complete From Task 2: 3 Press any key to exit From Task 5: 4 Task 5 complete From Task 1: 4 From Task 2: 4 Task 2 complete From Task 1: 5 Task 1 complete
如果您看到输出,则任务 4 完成后,线程将被分配给下一个任务,即任务 5,并且任务 5 开始执行。