RxJS - 快速指南


RxJS - 概述

本章介绍有关 RxJS 的特性、优点和缺点的信息。在这里,我们还将了解何时使用 RxJS。

RxJS 的完整形式是Reactive Extension for Javascript。它是一个 JavaScript 库,使用可观察量进行反应式编程,处理异步数据调用、回调和基于事件的程序。RxJS 可以与其他 Javascript 库和框架一起使用。它由 javascript 和 typescript 支持。

什么是 RxJS?

根据RxJS的官方网站,它被定义为一个使用可观察序列编写异步和基于事件的程序的库。它提供了一种核心类型,即 Observable、卫星类型(观察者、调度程序、主题)和受 Array#extras 启发的运算符(map、filter、reduce、every 等),以允许将异步事件作为集合处理。

RxJS 的特点

在 RxJS 中,以下概念负责处理异步任务 -

可观察的

observable 是一个创建观察者并将其附加到需要值的源的函数,例如,来自 dom 元素的单击、鼠标事件或 Http 请求等。

观察者

它是一个具有 next()、error() 和complete() 方法的对象,当与可观察对象进行交互时,即源与示例按钮单击、Http 请求等进行交互时,将调用该方法。

订阅

创建可观察量后,要执行可观察量,我们需要订阅它。它还可用于取消执行。

运营商

运算符是一个纯函数,它将 observable 作为输入,输出也是一个 observable。

主题

主题是一个可以多播的可观察对象,即与许多观察者交谈。考虑一个带有事件侦听器的按钮,每次用户单击按钮时,都会调用使用 addlistener 附加到事件的函数,类似的功能也适用于主题。

调度程序

调度程序控制订阅何时必须启动并收到通知的执行。

何时使用 RxJS?

如果您的项目包含大量异步任务处理,那么 RxJS 是一个不错的选择。它默认随 Angular 项目一起加载。

使用 RxJS 的优点

以下是使用 RxJS 的优点 -

  • RxJS 可以与其他 Javascript 库和框架一起使用。它由 javascript 和 typescript 支持。Angular、ReactJS、Vuejs、nodejs 等都是少数例子。

  • 在处理异步任务方面,RxJS 是一个很棒的库。RxJS 使用可观察量来处理反应式编程,处理异步数据调用、回调和基于事件的程序。

  • RxJS 提供了大量数学、转换、过滤、实用、条件、错误处理、连接类别等运算符,使反应式编程的使用变得轻松。

使用 RxJS 的缺点

以下是使用 RxJS 的缺点 -

  • 使用可观察量调试代码并不困难。

  • 当您开始使用 Observables 时,您最终可以将完整的代码包装在 observables 下。

RxJS - 环境设置

在本章中,我们将安装 RxJS。要使用 RxJS,我们需要以下设置 -

  • NodeJS
  • 尼普
  • RxJS包安装

NODEJS 和 NPM 安装

使用 npm 安装 RxJS 非常容易。您需要在系统上安装 nodejs 和 npm。要验证您的系统上是否安装了 NodeJS 和 npm,请尝试在命令提示符中执行以下命令。

E:\>node -v && npm -v
v10.15.1
6.4.1

如果您获得该版本,则意味着您的系统上已安装了 nodejs 和 npm,并且系统上现在的版本是 10 和 6。

如果它没有打印任何内容,请在您的系统上安装nodejs。要安装nodejs,请进入nodejs主页https://nodejs.org/en/download/,根据您的操作系统安装软件包。

Nodejs 的下载页面如下所示 -

NodeJS

根据您的操作系统,安装所需的软件包。一旦安装了nodejs,npm也会随之安装。要检查 npm 是否已安装,请在终端中输入 npm –v。它应该显示 npm 的版本。

RxJS 包安装

要开始安装 RxJS,首先创建一个名为rxjsproj/的文件夹,我们将在其中练习所有 RxJS 示例。

创建文件夹rxjsproj/后,运行命令npm init进行项目设置,如下所示

E:\>mkdir rxjsproj
E:\>cd rxjsproj
E:\rxjsproj>npm init

Npm init命令在执行过程中会问几个问题,只需按 Enter 即可继续。一旦 npm init 执行完成,它将在 rxjsproj/ 内创建package.json,如下所示 -

rxjsproj/
   package.json

现在您可以使用以下命令安装 rxjs -

npm install ---save-dev rxjs

E:\rxjsproj>npm install --save-dev rxjs
npm notice created a lockfile as package-lock.json. You should commit this file.

npm WARN rxjsproj@1.0.0 No description
npm WARN rxjsproj@1.0.0 No repository field.

+ rxjs@6.5.3
added 2 packages from 7 contributors and audited 2 packages in 21.89s
found 0 vulnerabilities

我们已经完成了 RxJS 安装。现在让我们尝试使用 RxJS,在rxjsproj /中创建一个文件夹src /

所以,现在,我们将拥有如下所示的文件夹结构 -

rxjsproj/
   node_modules/
   src/
   package.json

在src/内创建一个文件testrx.js,并编写以下代码 -

testrx.js

import { of } from 'rxjs;
import { map } from 'rxjs/operators';

map(x => x * x)(of(1, 2, 3)).subscribe((v) => console.log(`Output is: ${v}`));

当我们在命令提示符下使用命令 - node testrx.js执行上述代码时,它将显示导入错误,因为nodejs不知道如何处理导入。

为了使导入能够与 Nodejs 一起使用,我们需要使用 npm 安装 ES6 模块包,如下所示 -

E:\rxjsproj\src>npm install --save-dev esm
npm WARN rxjsproj@1.0.0 No description
npm WARN rxjsproj@1.0.0 No repository field.

+ esm@3.2.25
added 1 package from 1 contributor and audited 3 packages in 9.32s
found 0 vulnerabilities

安装包后,我们现在可以执行testrx.js文件,如下所示 -

E:\rxjsproj\src>node -r esm testrx.js
Output is: 1
Output is: 4
Output is: 9

我们现在可以看到输出,表明 RxJS 已安装并可以使用。上面的方法将帮助我们在命令行中测试RxJS。如果您想在浏览器中测试 RxJS,我们需要一些额外的包。

在浏览器中测试 RxJS

在 rxjsproj/ 文件夹中安装以下软件包 -

npm install --save-dev babel-loader @babel/core @babel/preset-env webpack webpack-cli webpack-dev-server

E:\rxjsproj>npm install --save-dev babel-loader 
@babel/core @babel/preset-env webpack webpack-cli webpack-dev-server

npm WARN rxjsproj@1.0.0 No description
npm WARN rxjsproj@1.0.0 No repository field.
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: fsevents@1.2.9
(node_modules\fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@
1.2.9: wanted {"os":"darwin","arch":"any"} (current: {"os":"win32","arch":"x64"})

+ webpack-dev-server@3.8.0
+ babel-loader@8.0.6
+ @babel/preset-env@7.6.0
+ @babel/core@7.6.0
+ webpack-cli@3.3.8
+ webpack@4.39.3
added 675 packages from 373 contributors and audited 10225 packages in 255.567s
found 0 vulnerabilities

为了启动服务器来执行我们的 Html 文件,我们将使用 webpack-server。package.json 中的命令“publish”将帮助我们启动并使用 webpack 打包所有 js 文件。打包后的js文件是我们最终要使用的js文件,保存在路径/dev文件夹中。

要使用 webpack,我们需要运行npm runpublish命令,并将该命令添加到 package.json 中,如下所示 -

包.json

{
   "name": "rxjsproj",
   "version": "1.0.0",
   "description": "",
   "main": "index.js",
   "scripts": {
      "publish":"webpack && webpack-dev-server --output-public=/dev/",
      "test": "echo \"Error: no test specified\" && exit 1"
   },
   "author": "",
   "license": "ISC",
   "devDependencies": {
      "@babel/core": "^7.6.0",
      "@babel/preset-env": "^7.6.0",
      "babel-loader": "^8.0.6",
      "esm": "^3.2.25",
      "rxjs": "^6.5.3",
      "webpack": "^4.39.3",
      "webpack-cli": "^3.3.8",
      "webpack-dev-server": "^3.8.0"
   }
}

要使用 webpack,我们必须首先创建一个名为 webpack.config.js 的文件,其中包含 webpack 工作的配置详细信息。

文件中的详细信息如下 -

var path = require('path');

module.exports = {
   entry: {
      app: './src/testrx.js'
   },
   output: {
      path: path.resolve(__dirname, 'dev'),
      filename: 'main_bundle.js'
   },
   mode:'development',
   module: {
      rules: [
         {
            test:/\.(js)$/,
            include: path.resolve(__dirname, 'src'),
            loader: 'babel-loader',
            query: {
               presets: ['@babel/preset-env']
            }
         }
      ]
   }
};

文件的结构如上所示。它以给出当前路径详细信息的路径开始。

var path = require('path'); //gives the current path

接下来是 module.exports 对象,它具有条目、输出和模块属性。入口是起点。这里,我们需要给出我们想要编译的start js文件。

entry: {
   app: './src/testrx.js'
},

path.resolve(_dirname, 'src/testrx.js') -- 将在目录中查找 src 文件夹,并在该文件夹中查找 testrx.js。

输出

output: {
   path: path.resolve(__dirname, 'dev'),
   filename: 'main_bundle.js'
},

输出是一个带有路径和文件名详细信息的对象。路径将保存保存已编译文件的文件夹,文件名将告诉您的 .html 文件中要使用的最终文件的名称。

模块

module: {
   rules: [
      {
         test:/\.(js)$/,
         include: path.resolve(__dirname, 'src'),
         loader: 'babel-loader',
         query: {
            presets: ['@babel/preset-env']
         }
      }
   ]
}

模块是具有规则详细信息的对象,其具有测试、包含、加载器、查询等属性。该测试将保存所有以 .js 和 .jsx 结尾的 js 文件的详细信息。它具有将在给定入口点末尾查找 .js 的模式。

Include告诉用于查看文件的文件夹。

加载器使用 babel-loader 来编译代码。

该查询具有属性预设,它是一个值为“@babel/preset-env”的数组。它将根据您需要的 ES 环境转译代码。

最终的文件夹结构如下 -

rxjsproj/
   node_modules/
   src/
      testrx.js
   index.html
   package.json
   webpack.config.js

运行命令

npm runpublish将创建 dev/ 文件夹,其中包含 main_bundle.js 文件。服务器将启动,您可以在浏览器中测试您的index.html,如下所示。

运行命令

打开浏览器并点击 url - http://localhost:8080/

主捆绑包

输出显示在控制台中。

RxJS - 最新更新

我们在本教程中使用 RxJS 版本 6。RxJS 通常用于处理响应式编程,并且更常与 Angular、ReactJS 一起使用。Angular 6 默认加载 rxjs6。

与版本 6 相比,RxJS 版本 5 的处理方式有所不同。如果将 RxJS 5 更新到 6,代码将会中断。在本章中,我们将看到处理版本更新的方式的差异。

如果您将 RxJS 更新到 6 并且不想更改代码,您也可以这样做,并且必须安装以下软件包。

npm install --save-dev rxjs-compact

该包将负责提供向后兼容性,旧代码将在 RxJS 版本 6 中正常工作。如果您想要进行与 RxJS 6 正常工作的代码更改,以下是需要完成的更改。

运营商、可观察物、主题的包进行了重组,因此主要变化发生在导入方面,如下所述。

运营商进口

根据版本 5,对于运算符,应包含以下导入语句 -

import 'rxjs/add/operator/mapTo'
import 'rxjs/add/operator/take'
import 'rxjs/add/operator/tap'
import 'rxjs/add/operator/map'

在 RxJS 版本 6 中,导入如下 -

import {mapTo, take, tap, map} from "rxjs/operators"

导入方法来创建 Observables

根据版本 5,在使用 Observables 时,应包含以下导入方法 -

import "rxjs/add/observable/from";
import "rxjs/add/observable/of";
import "rxjs/add/observable/fromEvent";
import "rxjs/add/observable/interval";

在 RxJS 版本 6 中,导入如下 -

import {from, of, fromEvent, interval} from 'rxjs';

观测值的导入

在 RxJS 版本 5 中,在使用 Observables 时,应包含以下导入语句 -

import { Observable } from 'rxjs/Observable'

在 RxJS 版本 6 中,导入如下 -

import { Observable } from 'rxjs'

导入主题

在 RxJS 版本 5 中,主题应包含如下 -

import { Subject} from 'rxjs/Subject'

在 RxJS 版本 6 中,导入如下 -

import { Subject } from 'rxjs'

如何在 RxJS 6 中使用运算符?

pipeline() 方法可用于创建的可观察对象。它是从 5.5 版本开始添加到 RxJS 中的。现在使用 pipeline(),您可以按顺序同时处理多个运算符。这就是 RxJS 版本 5 中运算符的使用方式。

例子

import "rxjs/add/observable/from";
import 'rxjs/add/operator/max'

let list1 = [1, 6, 15, 10, 58, 2, 40];
from(list1).max((a,b)=>a-b).subscribe(x => console.log("The Max value is "+x));

从 RxJS 5.5 版本开始,我们必须使用 pipeline() 来执行运算符 -

例子

import { from } from 'rxjs';
import { max } from 'rxjs/operators';

from(list1).pipe(max((a,b)=>a-b)).subscribe(x => console.log(
   "The Max value is "+x)
);

运营商更名

在包的重组过程中,一些运算符被重命名,因为它们与 javascript 关键字冲突或匹配。列表如下所示 -

操作员 重命名为
做() 轻敲()
抓住() 捕获错误()
转变() switchAll()
最后() 完成()
扔() 抛出错误()

RxJS - 可观察的

observable 是一个创建观察者并将其附加到期望值的源的函数,例如,来自 dom 元素的单击、鼠标事件或 Http 请求等。

Observer 是一个具有回调函数的对象,当与 Observable 进行交互时,即源已与示例按钮单击、Http 请求等进行交互时,它将被调用。

我们将在本章中讨论以下主题 -

  • 创建可观察的
  • 订阅可观察的
  • 执行可观察的

创建可观察的

可以使用 observable 构造函数创建 observable,也可以使用 observable create 方法并将 subscribe 函数作为参数传递给它,如下所示 -

testrx.js

import { Observable } from 'rxjs';

var observable = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

我们创建了一个 observable,并使用Observable 中可用的subscriber.next方法添加了一条消息“My First Observable”。

我们还可以使用 Observable.create() 方法创建 Observable,如下所示 -

testrx.js

import { Observable } from 'rxjs';
var observer = Observable.create(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);

订阅可观察的

您可以按如下方式订阅可观察的内容 -

testrx.js

import { Observable } from 'rxjs';

var observer = new Observable(
   function subscribe(subscriber) {
      subscriber.next("My First Observable")
   }
);
observer.subscribe(x => console.log(x));

当观察者被订阅时,它将开始执行 Observable。

这是我们在浏览器控制台中看到的 -

订阅可观察的

执行可观察的

一个可观察量在被订阅后就会被执行。观察者是一个具有三个被通知方法的对象,

next() - 此方法将发送数字、字符串、对象等值。

Complete() - 此方法不会发送任何值并指示可观察对象已完成。

error() - 此方法将发送错误(如果有)。

让我们使用所有三个通知创建可观察对象并执行相同的操作。

testrx.js

import { Observable } from 'rxjs';
var observer = new Observable(
   function subscribe(subscriber) {
      try {
         subscriber.next("My First Observable");
         subscriber.next("Testing Observable");
         subscriber.complete();
      } catch(e){
         subscriber.error(e);
      }
   }
);
observer.subscribe(x => console.log(x), (e)=>console.log(e), 
   ()=>console.log("Observable is complete"));

在上面的代码中,我们添加了next、complete和error方法。

try{
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
} catch(e){
   subscriber.error(e);
}

要执行下一步、完成和错误,我们必须调用 subscribe 方法,如下所示 -

observer.subscribe(x => console.log(x), (e)=>console.log(e), 
   ()=>console.log("Observable is complete"));

仅当出现错误时才会调用 error 方法。

这是在浏览器中看到的输出 -

执行可观察的

RxJS - 运算符

运算符是 RxJS 的重要组成部分。运算符是一个纯函数,它将 observable 作为输入,输出也是一个 observable。

与运营商合作

运算符是一个纯函数,它接受可观察值作为输入,输出也是一个可观察值。

为了使用运算符,我们需要一个 pipeline() 方法。

使用 Pipe() 的示例

let obs = of(1,2,3); // an observable
obs.pipe(
   operator1(),
   operator2(),
   operator3(),
   operator3(),
)

在上面的示例中,我们使用of()方法创建了一个可观察对象,该方法接受值 1、2 和 3。现在,在这个可观察对象上,您可以使用任意数量的运算符使用 pipeline() 方法执行不同的操作,如上所示。运算符的执行将在给定的可观察量上按顺序进行。

下面是一个工作示例 -

import { of } from 'rxjs';
import { map, reduce, filter } from 'rxjs/operators';

let test1 = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
let case1 = test1.pipe(
   filter(x => x % 2 === 0),
   reduce((acc, one) => acc + one, 0)
)
case1.subscribe(x => console.log(x));

输出

30

在上面的示例中,我们使用了过滤运算符来过滤偶数,接下来我们使用了reduce()运算符来添加偶数值并在订阅时给出结果。

这是我们将要讨论的可观察量列表。

  • 创建
  • 数学
  • 加入
  • 转型
  • 过滤
  • 公用事业
  • 有条件的
  • 组播
  • 错误处理

创建操作符

以下是我们将在创建运算符类别中讨论的运算符 -

先生编号 运算符及描述
1 AJAX

该操作符将对给定的 URL 发出 ajax 请求。

2

该运算符将从数组、类数组对象、promise、可迭代对象或类可观察对象创建可观察对象。

3 来自事件

该运算符将输出作为可观察量,用于发出事件的元素,例如按钮、单击等。

4 来自事件模式

该运算符将从输入函数创建一个可观察对象,用于注册事件处理程序。

5 间隔

该运算符将为给定时间的每次创建一个 Observable。

6

该运算符将接收传递的参数并将它们转换为可观察的。

7 范围

该运算符将创建一个 Observable,它将根据提供的范围为您提​​供一系列数字。

8 抛出错误

该运算符将创建一个通知错误的可观察对象。

9 计时器

该运算符将创建一个可观察量,该可观察量将在超时后发出值,并且该值将在每次调用后不断增加。

10 投资基金

该操作员将决定订阅哪个 Observable。

数学运算符

以下是我们将在数学运算符类别中讨论的运算符 -

先生编号 运算符及描述
1 数数

count() 运算符接收带有值的 Observable 并将其转换为给出单个值的 Observable

2 最大限度

Max 方法将接受具有所有值的可观察量并返回具有最大值的可观察量

3 最小

Min 方法将接受具有所有值的可观察量并返回具有最小值的可观察量。

4 减少

在reduce运算符中,累加器函数用于输入可观察量,累加器函数将以可观察量的形式返回累加值,并将可选的种子值传递给累加器函数。

reduce() 函数将接受 2 个参数,第一个是累加器函数,第二个是种子值。

加入运营商

以下是我们将在连接运算符类别中讨论的运算符。

先生编号 运算符及描述
1 连接

该运算符将按顺序发出作为输入给出的 Observable,并继续处理下一个。

2 分叉连接

该运算符将在数组或字典对象中作为输入,并等待可观察对象完成并返回从给定可观察对象发出的最后一个值。

3 合并

该运算符将接收输入可观察值,并发出可观察值中的所有值,并发出一个输出可观察值。

4 种族

它将返回一个可观察量,该可观察量将是第一个源可观察量的镜像副本。

转换算子

以下是我们将在转换运算符类别中讨论的运算符。

先生编号 运算符及描述
1 缓冲

缓冲区对可观察量进行操作并接收参数作为可观察量。它将开始缓冲数组中原始可观察量发出的值,并且当可观察量作为参数发出时,将发出相同的值。一旦作为参数的可观察量发出,缓冲区就会重置并在原始数据上再次开始缓冲,直到输入可观察量发出并且重复相同的场景。

2 缓冲区计数

对于 buffercount() 运算符,它将从调用它的可观察对象中收集值,并在给定 buffercount 的缓冲区大小匹配时发出相同的值。

3 缓冲时间

这与 bufferCount 类似,因此在这里,它将从调用它的可观察对象中收集值并发出 bufferTimeSpan 已完成。它接受 1 个参数,即bufferTimeSpan

4 缓冲区切换

对于 bufferToggle(),它需要 2 个参数,即 openings 和 openingSelector。开始参数是可订阅的或承诺启动缓冲区,第二个参数 openingSelector 也是可订阅的或承诺一个指示符来关闭缓冲区并发出收集的值。

5 缓冲时间

该运算符将以数组形式给出值,它接受一个参数作为函数,决定何时关闭、发出和重置缓冲区。

6 扩张

扩展运算符接受一个函数作为参数,该参数递归地应用于源可观察量以及输出可观察量。最终值是可观察的。

7 通过...分组

在 groupBy 运算符中,输出根据特定条件进行分组,并且这些组项作为 GroupedObservable 发出。

8 地图

对于映射运算符,项目函数应用于源 Observable 上的每个值,并且相同的输出作为 Observable 发出。

9 映射到

每当源 Observable 发出一个值时,都会与 Observable 一起给出一个常量值作为输出。

10 合并映射

对于 mergeMap 运算符,项目函数应用于每个源值,并将其输出与输出 Observable 合并。

11 切换映射

对于 switchMap 运算符,项目函数应用于每个源值,并将其输出与输出 Observable 合并,给出的值是最近投影的 Observable。

12 窗户

它接受一个参数 windowboundaries,它是一个可观察的,并在给定的 windowboundaries 发出时返回一个嵌套的可观察

过滤运算符

以下是我们将在过滤运算符类别中讨论的运算符。

先生编号 运算符及描述
1 去抖

一段时间后从源 Observable 发射的值,发射由作为 Observable 或 Promise 给出的另一个输入确定。

2 去抖时间

仅在时间结束后,它才会从可观察源发出值。

3 清楚的

该运算符将给出源可观察值中与前一个值相比不同的所有值。

4 元素At

该运算符将根据给定的索引给出源可观察值的单个值。

5 筛选

该运算符将根据给定的谓词函数过滤源 Observable 中的值。

6 第一的

该运算符将给出源 Observable 发出的第一个值。

7 最后的

该运算符将给出源 Observable 发出的最后一个值。

8 忽略元素

该运算符将忽略源 Observable 中的所有值,仅执行对完成或错误回调函数的调用。

9 样本

该运算符将给出来自源 Observable 的最新值,并且输出将取决于传递给它的参数发出。

10 跳过

该运算符将返回一个可观察值,该可观察值将跳过作为输入的计数项的第一次出现。

11 风门

该运算符将在由作为参数的输入函数确定的时间内输出并忽略源可观察值中的值,并且将重复相同的过程。

公用事业运营商

以下是我们将在实用运算符类别中讨论的运算符。

先生编号 运算符及描述
1 轻敲

该运算符将具有与源可观察值相同的输出,并且可用于将可观察值中的值记录给用户。主要值、错误(如果有)或任务是否完成。

2 延迟

该运算符根据给定的超时延迟从源 Observable 发出的值。

3 延迟时间

该运算符根据作为输入的另一个可观察量的超时延迟从源可观察量发出的值。

4 观察

这个基于输入调度器的操作符将从源 Observable 重新发出通知。

5 订阅

该运算符有助于根据作为输入的调度程序异步订阅源 Observable。

6 时间间隔

该运算符将返回一个对象,其中包含当前值以及使用调度程序输入计算出的当前值与上一个值之间经过的时间。

7 时间戳

返回时间戳以及从源 Observable 发出的值,该值说明了发出值的时间。

8 暂停

如果源 Observable 在给定超时后未发出值,则该运算符将抛出错误。

9 到数组

累积来自 Observable 的所有源值,并在源完成时将它们作为数组输出。

条件运算符

以下是我们将在条件运算符类别中讨论的运算符。

先生编号 运算符及描述
1 默认为空

如果源可观察量为空,则该运算符将返回默认值。

2 每一个

它将根据输入函数满足源 Observable 上每个值的条件返回一个 Observable。

3 寻找

当源 Observable 的第一个值满足作为输入的谓词函数的条件时,这将返回 observable。

4 查找索引

这个基于输入调度器的操作符将从源 Observable 重新发出通知。

5 是空的

如果输入可观察量进行完整回调而不发出任何值,则该运算符将输出 true,如果输入可观察量发出任何值,则输出为 false。

组播运营商

以下是我们将在多播运算符类别中讨论的运算符。

先生编号 运算符及描述
1 组播

多播运营商与其他订阅者共享创建的单个订阅。多播接受的参数是一个主题或一个返回具有 connect() 方法的 ConnectableObservable 的工厂方法。要订阅,必须调用 connect() 方法。

2 发布

该运算符返回 ConnectableObservable,并且需要使用 connect() 方法来订阅 observable。

3 发布Behave

publishBehaviour利用BehaviourSubject,并返回ConnectableObservable。必须使用 connect() 方法来订阅创建的可观察对象。

4 发布最后

publishBehaviour 使用 AsyncSubject,并返回 ConnectableObservable。必须使用 connect() 方法来订阅创建的可观察对象。

5 发布重播

publishReplay 利用Behave主体,其中它可以缓冲值并将其重播给新订阅者并返回 ConnectableObservable。必须使用 connect() 方法来订阅创建的可观察对象。

6 分享

它是 mutlicast() 运算符的别名,唯一的区别是您不必手动调用 connect() 方法来启动订阅。

错误处理运算符

以下是我们将在错误处理运算符类别中讨论的运算符。

先生编号 运算符及描述
1 捕获错误

该运算符负责通过返回新的 Observable 或错误来捕获源 Observable 上的错误。

2 重试

如果出现错误,该运算符将负责重试源 Observable,并且重试将根据给定的输入计数完成。

RxJS - 使用订阅

创建可观察量后,要执行可观察量,我们需要订阅它。

count() 运算符

这是一个如何订阅可观察对象的简单示例。

实施例1

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
final_val.subscribe(x => console.log("The count is "+x));

输出

The count is 6

订阅有一种称为 unsubscribe() 的方法。调用 unsubscribe() 方法将删除用于该 observable 的所有资源,即 observable 将被取消。这是使用 unsubscribe() 方法的工作示例。

实施例2

import { of } from 'rxjs';
import { count } from 'rxjs/operators';

let all_nums = of(1, 7, 5, 10, 10, 20);
let final_val = all_nums.pipe(count());
let test = final_val.subscribe(x => console.log("The count is "+x));
test.unsubscribe();

订阅存储在变量 test 中。我们使用了 test.unsubscribe() 可观察的。

输出

The count is 6

RxJS - 处理主题

主题是一个可以多播的可观察对象,即与许多观察者交谈。考虑一个带有事件监听器的按钮,每次用户单击按钮时,都会调用使用添加监听器附加到事件的函数,类似的功能也适用于主题。

我们将在本章中讨论以下主题 -

  • 创建主题
  • 可观察和主题有什么区别?
  • Behave主体
  • 重播主题
  • 异步主题

创建主题

要使用主题,我们需要导入主题,如下所示 -

import { Subject } from 'rxjs';

您可以按如下方式创建主题对象 -

const subject_test = new Subject();

该对象是一个具有三种方法的观察者 -

  • 下一个(五)
  • 错误(e)
  • 完全的()

订阅主题

您可以对该主题创建多个订阅,如下所示 -

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});

订阅被注册到主题对象,就像我们之前讨论的 addlistener 一样。

将数据传递给主体

您可以将数据传递给使用 next() 方法创建的主题。

subject_test.next("A");

数据将传递到该主题上添加的所有订阅。

例子

这是该主题的一个工作示例 -

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.next("B");

subject_test 对象是通过调用 new subject() 创建的。subject_test 对象引用了 next()、error() 和complete() 方法。上述示例的输出如下所示 -

输出

传递数据

我们可以使用complete()方法来停止主题执行,如下所示。

例子

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(`From Subject : ${v}`)
});
subject_test.subscribe({
   next: (v) => console.log(`From Subject: ${v}`)
});
subject_test.next("A");
subject_test.complete();
subject_test.next("B");

一旦我们调用完成,后面调用的下一个方法就不会被调用。

输出

传递数据方法

现在让我们看看如何调用error()方法。

例子

下面是一个工作示例 -

import { Subject } from 'rxjs';

const subject_test = new Subject();

subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.subscribe({
   error: (e) => console.log(`From Subject : ${e}`)
});
subject_test.error(new Error("There is an error"));

输出

传递数据错误

可观察和主题有什么区别?

可观察者将与订阅者进行一对一的交谈。每当您订阅可观察对象时,执行都会从头开始。使用 ajax 进行 Http 调用,并有 2 个订阅者调用 observable。您将在浏览器网络选项卡中看到 2 个 HttpHttp 请求。

例子

这是一个相同的工作示例 -

import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber1 = final_val.subscribe(a => console.log(a));
let subscriber2 = final_val.subscribe(a => console.log(a));

输出

可观察的

可观察的Ex

现在,问题是,我们希望共享相同的数据,但不共享,代价是 2 个 Http 调用。我们希望进行一次 Http 调用并在订阅者之间共享数据。

使用主题可以实现这一点。它是一个可以多播的可观察对象,即与许多观察者交谈。它可以在订阅者之间分享价值。

例子

这是一个使用主题的工作示例 -

import { Subject } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';

const subject_test = new Subject();

subject_test.subscribe({
   next: (v) => console.log(v)
});
subject_test.subscribe({
   next: (v) => console.log(v)
});

let final_val = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
let subscriber = final_val.subscribe(subject_test);

输出

可观察到的可能

现在您只能看到一个 Http 调用,并且相同的数据在调用的订阅者之间共享。

可观察的订阅者

Behave主体

Behave主体在调用时会给你最新的值。

您可以创建Behave主体,如下所示 -

import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject("Testing Behaviour Subject"); 
// initialized the behaviour subject with value:Testing Behaviour Subject

例子

这是一个使用Behave主题的工作示例 -

import { BehaviorSubject } from 'rxjs';
const behavior_subject = new BehaviorSubject("Testing Behaviour Subject"); 
// 0 is the initial value

behavior_subject.subscribe({
   next: (v) => console.log(`observerA: ${v}`)
});

behavior_subject.next("Hello");
behavior_subject.subscribe({
   next: (v) => console.log(`observerB: ${v}`)
});
behavior_subject.next("Last call to Behaviour Subject");

输出

Behave主体

重播主题

重放主体类似于Behave主体,其中,它可以缓冲值并将其重放给新订阅者。

例子

这是重播主题的一个工作示例 -

import { ReplaySubject } from 'rxjs';
const replay_subject = new ReplaySubject(2); 
// buffer 2 values but new subscribers

replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject A: ${v}`)
});

replay_subject.next(1);
replay_subject.next(2);
replay_subject.next(3);
replay_subject.subscribe({
   next: (v) => console.log(`Testing Replay Subject B: ${v}`)
});

replay_subject.next(5);

重放主题上使用的缓冲区值为 2。因此,最后两个值将被缓冲并用于呼叫的新订阅者。

输出

重播主题

异步主题

在 AsyncSubject 的情况下,最后调用的值将传递给订阅者,并且仅在调用complete()方法后才会完成。

例子

这是一个相同的工作示例 -

import { AsyncSubject } from 'rxjs';

const async_subject = new AsyncSubject();

async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject A: ${v}`)
});

async_subject.next(1);
async_subject.next(2);
async_subject.complete();
async_subject.subscribe({
   next: (v) => console.log(`Testing Async Subject B: ${v}`)
});

这里,在调用完成之前,传递给主题的最后一个值是 2,并且与传递给订阅者的值相同。

输出

异步主题

RxJS - 使用调度程序

调度程序控制订阅何时必须启动并收到通知的执行。

要使用调度程序,我们需要以下内容 -

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

这是一个工作示例,其中我们将使用调度程序来决定执行。

例子

import { Observable, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

var observable = new Observable(function subscribe(subscriber) {
   subscriber.next("My First Observable");
   subscriber.next("Testing Observable");
   subscriber.complete();
}).pipe(
   observeOn(asyncScheduler)
);
console.log("Observable Created");
observable.subscribe(
   x => console.log(x),
   (e)=>console.log(e),
   ()=>console.log("Observable is complete")
);

console.log('Observable Subscribed');

输出

调度程序

如果没有调度程序,输出将如下所示 -

调度程序控制

使用 RxJS 和 Angular

在本章中,我们将了解如何将 RxJ 与 Angular 结合使用。我们不会在这里讨论 Angular 的安装过程,要了解 Angular 安装,请参阅此链接 - https://www.tutorialspoint.com/angular7/angular7_environment_setup.htm

我们将直接处理一个示例,其中将使用 RxJS 中的 Ajax 来加载数据。

例子

应用程序组件.ts

import { Component } from '@angular/core';
import { environment } from './../environments/environment';
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators'

@Component({
   selector: 'app-root',
   templateUrl: './app.component.html',
   styleUrls: ['./app.component.css']
})
export class AppComponent {
   title = '';
   data;
   constructor() {
      this.data = "";
      this.title = "Using RxJs with Angular";
      let a = this.getData();
   }
   getData() {
      const response =
      ajax('https://jsonplaceholder.typicode.com/users')
         .pipe(map(e => e.response));
      response.subscribe(res => {
         console.log(res);
         this.data = res;
      });
   }
}

应用程序组件.html

<div>
   <h3>{{title}}</h3>
   <ul *ngFor="let i of data">
      <li>{{i.id}}: {{i.name}}</li>
   </ul>
</div>

<router-outlet></router-outlet>

我们使用 RxJS 中的 ajax 来从此 url 加载数据 - https://jsonplaceholder.typicode.com/users

编译时显示如下 -

使用 Angular 进行 RxJ

使用 RxJS 和 ReactJS

在本章中,我们将了解如何将 RxJs 与 ReactJS 结合使用。这里我们不会进入Reactjs的安装过程,要了解ReactJS安装请参考这个链接: https: //www.tutorialspoint.com/reactjs/reactjs_environment_setup.htm

例子

我们将直接使用下面的示例,其中将使用 RxJS 中的 Ajax 来加载数据。

索引.js

import React, { Component } from "react";
import ReactDOM from "react-dom";
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
class App extends Component {
   constructor() {
      super();
      this.state = { data: [] };
   }
   componentDidMount() {
      const response = ajax('https://jsonplaceholder.typicode.com/users').pipe(map(e => e.response));
      response.subscribe(res => {
         this.setState({ data: res });
      });
   }
   render() {
      return (
         <div>
            <h3>Using RxJS with ReactJS</h3>
            <ul>
               {this.state.data.map(el => (
                  <li>
                     {el.id}: {el.name}
                  </li>
               ))}
            </ul>
         </div>
      );
   }
}
ReactDOM.render(<App />, document.getElementById("root"));

索引.html

<!DOCTYPE html>
<html>
   <head>
      <meta charset = "UTF-8" />
      <title>ReactJS Demo</title>
   <head>
   <body>
      <div id = "root"></div>
   </body>
</html>

我们使用 RxJS 中的 ajax 来从此 Url 加载数据 - https://jsonplaceholder.typicode.com/users

编译时,显示如下 -

使用 ReactJS 进行 RxJ