- RxJava 教程
- RxJava - 主页
- RxJava - 概述
- RxJava - 环境设置
- 观测值
- RxJava - Observable 的工作原理
- RxJava - 创建 Observables
- RxJava - 单个可观察的
- RxJava - 也许可观察
- RxJava - 完整的可观察的
- RxJava - 使用 CompositeDisposable
- 运营商
- RxJava - 创建运算符
- RxJava - 转换运算符
- RxJava - 过滤运算符
- RxJava - 组合运算符
- RxJava - 实用操作符
- RxJava - 条件运算符
- RxJava - 数学运算符
- RxJava - 可连接运算符
- 科目
- RxJava - 主题
- RxJava - 发布主题
- RxJava - Behave主题
- RxJava-ReplaySubject
- RxJava-AsyncSubject
- 调度程序
- RxJava - 调度程序
- RxJava - 蹦床调度程序
- RxJava - 新线程调度程序
- RxJava - 计算调度程序
- RxJava - IO 调度程序
- RxJava - 来自调度程序
- 各种各样的
- RxJava - 缓冲
- RxJava - 窗口化
- RxJava 有用资源
- RxJava - 快速指南
- RxJava - 有用的资源
- RxJava - 讨论
RxJava - 快速指南
RxJava - 概述
RxJava 是 ReactiveX 的基于 Java 的扩展。它提供了 Java 中的实现或 ReactiveX 项目。以下是 RxJava 的主要特征。
扩展观察者模式。
支持数据/事件序列。
提供运算符以声明方式将序列组合在一起。
在内部处理线程、同步、线程安全和并发数据结构。
什么是ReactiveX?
ReactiveX 是一个旨在为各种编程语言提供响应式编程概念的项目。响应式编程是指当数据出现时程序做出反应的场景。它是基于事件的编程概念,事件可以传播到寄存器观察者。
根据Reactive,他们结合了观察者模式、迭代器模式和函数式模式的优点。
观察者模式做得很好。ReactiveX 结合了观察者模式、迭代器模式和函数式编程的最佳思想。
函数式编程
函数式编程围绕使用纯函数构建软件进行。纯函数不依赖于先前的状态,并且对于传递的相同参数始终返回相同的结果。纯函数有助于避免与多线程环境中常见的共享对象、可变数据和副作用相关的问题。
反应式编程
响应式编程是指事件驱动的编程,其中数据流以异步方式进入并在到达时进行处理。
函数式反应式编程
RxJava 同时实现了这两个概念,其中流的数据随着时间的推移而变化,消费者函数也会做出相应的反应。
反应式宣言
反应式宣言是一份在线文档,阐述了应用软件系统的高标准。根据宣言,以下是反应式软件的关键属性 -
响应式- 应始终及时响应。
消息驱动- 应在组件之间使用异步消息传递,以便它们保持松散耦合。
弹性- 即使在高负载下也应保持响应。
弹性- 即使任何组件发生故障,也应保持响应。
RxJava的关键组件
RxJava 有两个关键组件:Observables 和 Observer。
Observable - 它表示类似于 Stream 的对象,可以发出零个或多个数据,可以发送错误消息,在发出一组数据时可以控制其速度,可以发送有限和无限数据。
观察者- 它订阅可观察的序列数据并对可观察的每个项目做出反应。每当 Observable 发出数据时,观察者都会收到通知。观察者一一处理数据。
如果项目不存在或者前一个项目没有返回回调,则观察者永远不会收到通知。
RxJava - 环境设置
本地环境设置
RxJava 是 Java 的库,因此第一个要求是在您的计算机中安装 JDK。
系统要求
JDK | 1.5或以上。 |
---|---|
记忆 | 没有最低要求。 |
磁盘空间 | 没有最低要求。 |
操作系统 | 没有最低要求。 |
第 1 步 - 验证计算机中的 Java 安装
首先,打开控制台并根据您正在使用的操作系统执行java命令。
操作系统 | 任务 | 命令 |
---|---|---|
Windows | 打开命令控制台 | c:\> java -版本 |
Linux | 打开命令终端 | $ java -版本 |
苹果 | 打开终端 | 机器:< joseph$ java -版本 |
让我们验证所有操作系统的输出 -
操作系统 | 输出 |
---|---|
Windows | java版本“1.8.0_101” Java(TM) SE 运行时环境(版本 1.8.0_101) |
Linux | java版本“1.8.0_101” Java(TM) SE 运行时环境(版本 1.8.0_101) |
苹果 | java版本“1.8.0_101” Java(TM) SE 运行时环境(版本 1.8.0_101) |
如果您的系统上没有安装 Java,请从以下链接下载 Java 软件开发工具包 (SDK):https://www.oracle.com。我们假设 Java 1.8.0_101 作为本教程的安装版本。
第2步-设置JAVA环境
设置JAVA_HOME环境变量以指向计算机上安装 Java 的基本目录位置。例如。
操作系统 | 输出 |
---|---|
Windows | 设置环境变量JAVA_HOME为C:\Program Files\Java\jdk1.8.0_101 |
Linux | 导出 JAVA_HOME = /usr/local/java-current |
苹果 | 导出 JAVA_HOME = /Library/Java/Home |
将 Java 编译器位置附加到系统路径。
操作系统 | 输出 |
---|---|
Windows | 将字符串C:\Program Files\Java\jdk1.8.0_101\bin添加到系统变量Path的末尾。 |
Linux | 导出路径 = $PATH:$JAVA_HOME/bin/ |
苹果 | 不需要 |
如上所述,使用命令java -version验证 Java 安装。
第 3 步 - 下载 RxJava2 存档
从RxJava @ MVNRepository 及其依赖项Reactive Streams @ MVNRepository下载最新版本的 RxJava jar 文件 。在撰写本教程时,我们已经下载了 rxjava-2.2.4.jar、reactive-streams-1.0.2.jar 并将其复制到 C:\>RxJava 文件夹中。
操作系统 | 档案名称 |
---|---|
Windows | rxjava-2.2.4.jar、reactive-streams-1.0.2.jar |
Linux | rxjava-2.2.4.jar、reactive-streams-1.0.2.jar |
苹果 | rxjava-2.2.4.jar、reactive-streams-1.0.2.jar |
第 4 步 - 设置 RxJava 环境
将RX_JAVA环境变量设置为指向计算机上存储 RxJava jar 的基本目录位置。假设我们已将 rxjava-2.2.4.jar 和reactive-streams-1.0.2.jar 存储在 RxJava 文件夹中。
先生编号 | 操作系统和描述 |
---|---|
1 | Windows 将环境变量 RX_JAVA 设置为 C:\RxJava |
2 | Linux 导出 RX_JAVA = /usr/local/RxJava |
3 | 苹果 导出 RX_JAVA = /Library/RxJava |
第 5 步 - 设置 CLASSPATH 变量
设置CLASSPATH环境变量以指向 RxJava jar 位置。
先生编号 | 操作系统和描述 |
---|---|
1 | Windows 设置环境变量 CLASSPATH 为 %CLASSPATH%;%RX_JAVA%\rxjava-2.2.4.jar;%RX_JAVA%\reactive-streams-1.0.2.jar;.; |
2 | Linux 导出 CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:。 |
3 | 苹果 导出 CLASSPATH = $CLASSPATH:$RX_JAVA/rxjava-2.2.4.jar:reactive-streams-1.0.2.jar:。 |
第 6 步 - 测试 RxJava 设置
创建一个类 TestRx.java 如下所示 -
import io.reactivex.Flowable; public class TestRx { public static void main(String[] args) { Flowable.just("Hello World!") .subscribe(System.out::println); } }
第 7 步 - 验证结果
使用javac编译器编译类,如下所示 -
C:\RxJava>javac Tester.java
验证输出。
Hello World!
RxJava - Observable 的工作原理
Observables代表观察者(订阅者)监听的数据源。简而言之,Observable 发出项目,然后 Subscriber 消费这些项目。
可观察的
一旦订阅者开始监听,Observable 就会提供数据。
Observable 可以发出任意数量的项目。
Observable 可以只发出完成信号,也可以不发出任何项目。
Observable 可以成功终止。
Observable 可能永远不会终止。例如,一个按钮可以被点击任意多次。
Observable 可能在任何时间点抛出错误。
订户
Observable 可以有多个订阅者。
当 Observable 发出一个项目时,每个订阅者的 onNext() 方法都会被调用。
当 Observable 完成发出项目时,每个订阅者的 onComplete() 方法都会被调用。
如果 Observable 发出错误,每个订阅者的 onError() 方法都会被调用。
RxJava - 创建 Observables
以下是创建可观察量的基类。
Flowable - 0..N 个流,发出 0 或 n 个项目。支持反应流和背压。
可观察- 0..N 流量,但没有背压。
单一- 1 项或错误。可以被视为方法调用的反应式版本。
可完成- 没有发出任何项目。用作完成或错误的信号。可以被视为 Runnable 的响应式版本。
MayBe - 没有发出任何物品或发出 1 个物品。可以被视为可选的响应式版本。
以下是在 Observable 类中创建可观察对象的便捷方法。
just(T item) - 返回一个 Observable,它指示给定的(常量引用)项,然后完成。
fromIterable(Iterable source) - 将 Iterable 序列转换为发出序列中项目的 ObservableSource。
fromArray(T... items) - 将数组转换为发出数组中项目的 ObservableSource。
fromCallable(Callable seller) - 返回一个 Observable,当观察者订阅它时,调用您指定的函数,然后发出从该函数返回的值。
fromFuture(Future future) - 将 Future 转换为 ObservableSource。
Interval(longinitialDelay, long period, TimeUnitunit) - 返回一个 Observable,它在初始延迟后发出 0L,并在其后的每个时间段后发出不断增加的数字。
RxJava - 单个可观察的
Single 类表示单值响应。单个可观察对象只能发出单个成功值或错误。它不会发出 onComplete 事件。
类别声明
以下是io.reactivex.Single<T>类的声明-
public abstract class Single<T> extends Object implements SingleSource<T>
协议
以下是 Single Observable 运行的顺序协议 -
onSubscribe (onSuccess | onError)?
单个示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Single; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create the observable Single<String> testSingle = Single.just("Hello World"); //Create an observer Disposable disposable = testSingle .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Hello World
RxJava - 也许可观察
MayBe 类代表延迟响应。MayBe observable 可以发出单个成功值或不发出值。
类别声明
以下是io.reactivex.Single<T>类的声明-
public abstract class Maybe<T> extends Object implements MaybeSource<T>
协议
以下是 MayBe Observable 运行的顺序协议 -
onSubscribe (onSuccess | onError | OnComplete)?
也许是例子
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Maybe; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Maybe.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Hello World
RxJava - 完整的可观察的
Completable 类代表延迟响应。Completable observable 可以指示成功完成或错误。
类别声明
以下是io.reactivex.Completable类的声明-
public abstract class Completable extends Object implements CompletableSource
协议
以下是 Completable Observable 运行的顺序协议 -
onSubscribe (onError | onComplete)?
可完成的例子
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.concurrent.TimeUnit; import io.reactivex.Completable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableCompletableObserver; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { //Create an observer Disposable disposable = Completable.complete() .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableCompletableObserver() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onStart() { System.out.println("Started!"); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); //start observing disposable.dispose(); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Started! Done!
RxJava - 使用 CompositeDisposable
CompositeDisposable 类表示一个可以容纳多个一次性物品的容器,并提供添加和删除一次性物品的 O(1) 复杂度。
类别声明
以下是io.reactivex.disposables.CompositeDisposable类的声明-
public final class CompositeDisposable extends Object implements Disposable, io.reactivex.internal.disposables.DisposableContainer
复合材料一次性示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Maybe; import io.reactivex.Single; import io.reactivex.disposables.CompositeDisposable; import io.reactivex.disposables.Disposable; import io.reactivex.observers.DisposableMaybeObserver; import io.reactivex.observers.DisposableSingleObserver; import io.reactivex.schedulers.Schedulers; import java.util.concurrent.TimeUnit; public class ObservableTester { public static void main(String[] args) throws InterruptedException { CompositeDisposable compositeDisposable = new CompositeDisposable(); //Create an Single observer Disposable disposableSingle = Single.just("Hello World") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith( new DisposableSingleObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } }); //Create an observer Disposable disposableMayBe = Maybe.just("Hi") .delay(2, TimeUnit.SECONDS, Schedulers.io()) .subscribeWith(new DisposableMaybeObserver<String>() { @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onSuccess(String value) { System.out.println(value); } @Override public void onComplete() { System.out.println("Done!"); } }); Thread.sleep(3000); compositeDisposable.add(disposableSingle); compositeDisposable.add(disposableMayBe); //start observing compositeDisposable.dispose(); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Hello World Hi
RxJava - 创建运算符
以下是用于创建 Observable 的运算符。
先生。 | 运算符及描述 |
---|---|
1 | 创造 从头开始创建一个 Observable 并允许以编程方式调用观察者方法。 |
2 | 推迟 在观察者订阅之前不要创建 Observable。为每个观察者创建一个新的可观察值。 |
3 | 清空/从不/抛出 创建一个具有有限Behave的 Observable。 |
4 | 从 将对象/数据结构转换为 Observable。 |
5 | 间隔 创建一个 Observable,按指定的时间间隔按顺序发射整数。 |
6 | 只是 将对象/数据结构转换为 Observable 以发出相同或相同类型的对象。 |
7 | 范围 创建一个 Observable,按给定范围的顺序发出整数。 |
8 | 重复 创建一个 Observable 并按顺序重复发射整数。 |
9 | 开始 创建一个 Observable 来发出函数的返回值。 |
10 | 定时器 创建一个 Observable 以在给定的延迟后发出单个项目。 |
创建算子示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //Using fromArray operator to create an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
ABCDEFG
RxJava - 转换运算符
以下是用于转换从 Observable 发出的项目的运算符。
先生。 | 运算符及描述 |
---|---|
1 | 缓冲 定期将 Observable 中的项目收集到包中,然后发出包而不是项目。 |
2 | 平面地图 用于嵌套可观察量。将项目转换为可观察对象。然后将这些项目扁平化为单个 Observable。 |
3 | 通过...分组 将一个 Observable 分成按键组织的一组 Observable,以发出不同的项目组。 |
4 | 地图 对每个发出的项目应用一个函数来对其进行转换。 |
5 | 扫描 按顺序将函数应用于每个发出的项目,然后发出连续的值。 |
6 | 窗户 定期将 Observable 中的项目收集到 Observable 窗口中,然后发出窗口而不是项目。 |
转换运算符示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //Using map operator to transform an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .map(String::toUpperCase) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
ABCDEFG
RxJava - 过滤运算符
以下是用于有选择地从 Observable 发出项目的运算符。
先生。 | 运算符及描述 |
---|---|
1 | 去抖动 仅当发生超时时才发出项目,而不发出其他项目。 |
2 | 清楚的 只发出独特的物品。 |
3 | 元素At 只发出 Observable 发出的 n 索引处的项目。 |
4 | 筛选 仅发出那些通过给定谓词函数的项目。 |
5 | 第一的 发出第一个项目或通过给定标准的第一个项目。 |
6 | 忽略元素 不要从 Observable 发出任何项目,但标记完成。 |
7 | 最后的 发出 Observable 中的最后一个元素。 |
8 | 样本 以给定的时间间隔发出最新的项目。 |
9 | 跳过 跳过 Observable 中的前 n 个项目。 |
10 | 跳过最后一个 跳过 Observable 中的最后 n 个项目。 |
11 | 拿 从 Observable 中获取前 n 个项目。 |
12 | 最后取 从 Observable 中获取最后 n 个项目。 |
过滤运算符示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //Using take operator to filter an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable .take(2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
ab
RxJava - 组合运算符
以下是用于从多个 Observable 创建单个 Observable 的运算符。
先生。 | 运算符及描述 |
---|---|
1 |
和/然后/当
使用模式和计划中介组合项目集。 |
2 |
结合最新
通过指定的函数组合每个 Observable 发出的最新项并发出结果项。 |
3 |
加入
如果在第二个 Observable 发出的项目的时间范围内发出,则合并两个 Observable 发出的项目。 |
4 |
合并
组合 Observables 发出的项目。 |
5 |
从...开始
在开始从源 Observable 发出项目之前发出指定的项目序列 |
6 |
转变
发出 Observables 发出的最新项目。 |
7 |
压缩
根据功能组合 Observables 的项目并发出结果项目。 |
组合运算符示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //Using combineLatest operator to combine Observables public class ObservableTester { public static void main(String[] args) { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.combineLatest(observable1, observable2, (a,b) -> a + b) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
g1g2g3g4g5g6
RxJava - 实用操作符
以下是对 Observables 经常有用的运算符。
先生。 | 运算符及描述 |
---|---|
1 | 延迟 注册操作来处理 Observable 生命周期事件。 |
2 | 物化/非物质化 表示发出的项目和发送的通知。 |
3 | 观察 指定要观察的调度程序。 |
4 | 连载 强制 Observable 进行序列化调用。 |
5 | 订阅 对项目和通知的发射进行操作,例如从可观察到的完成 |
6 | 订阅 指定 Observable 订阅时要使用的调度程序。 |
7 | 时间间隔 转换 Observable 以发出发射之间经过的时间量的指示。 |
8 | 暂停 如果指定时间没有发出任何项目,则发出错误通知。 |
9 | 时间戳 将时间戳附加到发出的每个项目。 |
9 |
使用 创建一次性资源或与 Observable 相同的生命周期。 |
公用事业运营商示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //Using subscribe operator to subscribe to an Observable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable = Observable.fromArray(letters); observable.subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
abcdefg
RxJava - 条件运算符
以下是评估一个或多个 Observables 或发出的项目的运算符。
先生。 | 运算符及描述 |
---|---|
1 |
全部 评估发出的所有项目是否满足给定标准。 |
2 |
安布 仅在给定多个 Observable 的情况下发出第一个 Observable 中的所有项目。 |
3 |
包含 检查 Observable 是否发出特定项目。 |
4 |
默认为空 如果 Observable 不发出任何东西,则发出默认项。 |
5 |
序列相等 检查两个 Observable 是否发出相同的项目序列。 |
6 |
跳至 丢弃第一个 Observable 发出的项目,直到第二个 Observable 发出一个项目。 |
7 |
跳过时 丢弃由 Observable 发出的项,直到给定条件变为 false。 |
8 |
直到 在第二个 Observable 发出一个项目或终止后,丢弃由一个 Observable 发出的项目。 |
9 |
稍事休息 在指定条件变为 false 后丢弃由 Observable 发出的项目。 |
条件运算符示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //Using defaultIfEmpty operator to operate on an Observable public class ObservableTester { public static void main(String[] args) { final StringBuilder result = new StringBuilder(); Observable.empty() .defaultIfEmpty("No Data") .subscribe(s -> result.append(s)); System.out.println(result); String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result1 = new StringBuilder(); Observable.fromArray(letters) .firstElement() .defaultIfEmpty("No data") .subscribe(s -> result1.append(s)); System.out.println(result1); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
No Data a
RxJava - 数学运算符
以下是对 Observable 发出的整个项目进行操作的运算符。
先生。 | 运算符及描述 |
---|---|
1 | 平均的 评估所有项目的平均值并发出结果。 |
2 | 康卡特 不交错地从多个 Observable 发出所有项目。 |
3 | 数数 计算所有项目并发出结果。 |
4 | 最大限度 评估所有项目中的最大值并发出结果。 |
5 | 最小 评估所有项目中的最小值项目并发出结果。 |
6 | 减少 对每个项目应用一个函数并返回结果。 |
7 | 和 评估所有项目的总和并发出结果。 |
数学运算符示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; //Using concat operator to operate on multiple Observables public class ObservableTester { public static void main(String[] args) throws InterruptedException { Integer[] numbers = { 1, 2, 3, 4, 5, 6}; String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); Observable<String> observable1 = Observable.fromArray(letters); Observable<Integer> observable2 = Observable.fromArray(numbers); Observable.concat(observable1, observable2) .subscribe( letter -> result.append(letter)); System.out.println(result); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
abcdefg123456
RxJava - 可连接运算符
以下是对订阅有更精确控制的运营商。
先生。 | 运算符及描述 |
---|---|
1 | 连接 指示可连接的 Observable 向其订阅者发出项目。 |
2 | 发布 将 Observable 转换为可连接的 Observable。 |
3 | 参考计数 将可连接的 Observable 转换为普通的 Observable。 |
4 | 重播 确保每个订阅者都能看到相同的发射项目序列,即使在 Observable 已经开始发射项目并且订阅者稍后订阅之后也是如此。 |
可连接操作器示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.observables.ConnectableObservable; //Using connect operator on a ConnectableObservable public class ObservableTester { public static void main(String[] args) { String[] letters = {"a", "b", "c", "d", "e", "f", "g"}; final StringBuilder result = new StringBuilder(); ConnectableObservable<String> connectable = Observable.fromArray(letters).publish(); connectable.subscribe(letter -> result.append(letter)); System.out.println(result.length()); connectable.connect(); System.out.println(result.length()); System.out.println(result); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
0 7 abcdefg
RxJava - 主题
根据Reactive,Subject 既可以充当 Observable 也可以充当 Observer。
主题是一种桥梁或代理,在 ReactiveX 的某些实现中可用,它既充当观察者又充当可观察者。因为它是一个观察者,所以它可以订阅一个或多个Observable,并且因为它是一个Observable,所以它可以通过重新发送它们来传递它所观察到的项目,并且它还可以发送新的项目。
有四种类型的主题 -
先生。 | 主题和描述 |
---|---|
1 | 发布主题 仅发出订阅时间后发出的那些项目。 |
2 |
重播主题
发出由源 Observable 发出的所有项目,无论它何时订阅了 Observable。 |
3 | Behave主体 订阅后,发出最新的项目,然后继续发出源 Observable 发出的项目。 |
4 | 异步主题 发射源 Observable 完成发射后发射的最后一项。 |
RxJava - 发布主题
PublishSubject 向当前订阅的观察者发送项目,并向当前或晚期观察者发送终端事件。
类别声明
以下是io.reactivex.subjects.PublishSubject<T>类的声明-
public final class PublishSubject<T> extends Subject<T>
发布主题示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.PublishSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); PublishSubject<String> subject = PublishSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be d only //as subscribed after c item emitted. System.out.println(result2); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
abcd d
RxJava - Behave主题
BehaviorSubject 向每个订阅的观察者发出它观察到的最新项目,然后发送所有后续观察到的项目。
类别声明
以下是io.reactivex.subjects.BehaviorSubject<T>类的声明-
public final class BehaviorSubject<T> extends Subject<T>
Behave主体示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.BehaviorSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); BehaviorSubject<String> subject = BehaviorSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be cd being BehaviorSubject //(c is last item emitted before subscribe) System.out.println(result2); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
abcd cd
RxJava-ReplaySubject
ReplaySubject 向当前和晚期观察者重播事件/项目。
类别声明
以下是io.reactivex.subjects.ReplaySubject<T>类的声明-
public final class ReplaySubject<T> extends Subject<T>
重播主题示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects.ReplaySubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); ReplaySubject<String> subject = ReplaySubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be abcd System.out.println(result1); //Output will be abcd being ReplaySubject //as ReplaySubject emits all the items System.out.println(result2); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
abcd abcd
RxJava-AsyncSubject
AsyncSubject 向观察者发出唯一的最后一个值,后跟完成事件或收到的错误。
类别声明
以下是io.reactivex.subjects.AsyncSubject<T>类的声明-
public final class AsyncSubject<T> extends Subject<T>
异步主题示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.subjects. AsyncSubject; public class ObservableTester { public static void main(String[] args) { final StringBuilder result1 = new StringBuilder(); final StringBuilder result2 = new StringBuilder(); AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(value -> result1.append(value) ); subject.onNext("a"); subject.onNext("b"); subject.onNext("c"); subject.subscribe(value -> result2.append(value)); subject.onNext("d"); subject.onComplete(); //Output will be d being the last item emitted System.out.println(result1); //Output will be d being the last item emitted System.out.println(result2); } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
d d
RxJava - 调度程序
调度程序用于多线程环境中与 Observable 运算符一起使用。
根据反应式,调度程序用于调度运算符链如何应用于不同的线程。
默认情况下,Observable 和应用到它的运算符链将在调用其 Subscribe 方法的同一线程上完成其工作,并通知其观察者。SubscribeOn 运算符通过指定 Observable 应在其上运行的不同调度程序来更改此Behave。ObserveOn 运算符指定一个不同的调度程序,Observable 将使用该调度程序向其观察者发送通知。
RxJava 中有以下类型的调度程序 -
先生。 | 调度程序和描述 |
---|---|
1 | Schedulers.computation() 创建并返回用于计算工作的调度程序。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。 |
2 | 调度程序.io() 创建并返回用于 IO 绑定工作的调度程序。线程池可以根据需要进行扩展。 |
3 | Schedulers.newThread() 创建并返回一个为每个工作单元创建一个新线程的调度程序。 |
4 | 调度程序.trampoline() 创建并返回一个调度程序,该调度程序将当前线程上的工作排队,以便在当前工作完成后执行。 |
4 | Schedulers.from(java.util.concurrent.Executor执行器) 将 Executor 转换为新的 Scheduler 实例。 |
RxJava - 蹦床调度程序
Schedulers.trampoline() 方法创建并返回一个 Scheduler,该 Scheduler 将当前线程上的工作排队,以便在当前工作完成后执行。
Schedulers.trampoline() 示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.trampoline())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Processing Thread main Receiver Thread main, Item length 1 Processing Thread main Receiver Thread main, Item length 2 Processing Thread main Receiver Thread main, Item length 3
RxJava - 新线程调度程序
Schedulers.newThread() 方法创建并返回一个调度程序,该调度程序为每个工作单元创建一个新线程。
Schedulers.newThread() 示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.newThread())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Processing Thread RxNewThreadScheduler-1 Receiver Thread RxNewThreadScheduler-1, Item length 1 Processing Thread RxNewThreadScheduler-2 Receiver Thread RxNewThreadScheduler-2, Item length 2 Processing Thread RxNewThreadScheduler-3 Receiver Thread RxNewThreadScheduler-3, Item length 3
RxJava - 计算调度程序
Schedulers.computation() 方法创建并返回用于计算工作的调度程序。要调度的线程数取决于系统中存在的 CPU。每个 CPU 允许一个线程。最适合事件循环或回调操作。
Schedulers.computation() 示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.computation())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Processing Thread RxComputationThreadPool-1 Receiver Thread RxComputationThreadPool-1, Item length 1 Processing Thread RxComputationThreadPool-2 Receiver Thread RxComputationThreadPool-2, Item length 2 Processing Thread RxComputationThreadPool-3 Receiver Thread RxComputationThreadPool-3, Item length 3
RxJava - IO 调度程序
Schedulers.io() 方法创建并返回一个用于 IO 绑定工作的调度程序。线程池可以根据需要进行扩展。最适合 I/O 密集型操作。
Schedulers.io() 示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.io())) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 1 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 2 Processing Thread RxCachedThreadScheduler-1 Receiver Thread RxCachedThreadScheduler-1, Item length 3
RxJava - 来自调度程序
Schedulers.from(Executor) 方法将 Executor 转换为新的 Scheduler 实例。
Schedulers.from(Executor) 示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import java.util.Random; import java.util.concurrent.Executors; import io.reactivex.Observable; import io.reactivex.schedulers.Schedulers; public class ObservableTester { public static void main(String[] args) throws InterruptedException { Observable.just("A", "AB", "ABC") .flatMap(v -> getLengthWithDelay(v) .doOnNext(s -> System.out.println("Processing Thread " + Thread.currentThread().getName())) .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3)))) .subscribe(length -> System.out.println("Receiver Thread " + Thread.currentThread().getName() + ", Item length " + length)); Thread.sleep(10000); } protected static Observable<Integer> getLengthWithDelay(String v) { Random random = new Random(); try { Thread.sleep(random.nextInt(3) * 1000); return Observable.just(v.length()); } catch (InterruptedException e) { e.printStackTrace(); } return null; } }
验证结果
使用javac编译器编译该类,如下所示 -
C:\RxJava>javac ObservableTester.java
现在按如下方式运行 ObservableTester -
C:\RxJava>java ObservableTester
它应该产生以下输出 -
Processing Thread pool-1-thread-1 Processing Thread pool-3-thread-1 Receiver Thread pool-1-thread-1, Item length 1 Processing Thread pool-4-thread-1 Receiver Thread pool-4-thread-1, Item length 3 Receiver Thread pool-3-thread-1, Item length 2
RxJava - 缓冲
缓冲运算符允许将 Observable 发出的项目收集到列表或包中,并发出这些包而不是项目。在下面的示例中,我们创建了一个 Observable 来发出 9 个项目,并使用缓冲,3 个项目将一起发出。
缓冲示例
使用您选择的任何编辑器(例如 C:\> RxJava)创建以下 Java 程序。
ObservableTester.java
import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; import j