可观察对象Observable

可观察对象对在应用的各个部分之间传递消息提供了支持。

观察者模式与发布/订阅模式的区别

观察者模式和发布/订阅模式都是用来处理对象之间的一对多关系的。在观察者模式中,观察者知道被观察者的存在,并且直接从被观察者那里获取更新。而在发布/订阅模式中,发布者和订阅者不知道对方的存在。它们只是通过调度中心进行通信
观察者(Observer)模式是一个软件设计模式,它有一个对象,称之为主体 Subject,负责维护一个依赖项(称之为观察者 Observer)的列表,并且在状态变化时自动通知它们。该模式和发布/订阅模式非常相似(但不完全一样)。

特点

  • 声明式:也就是说,虽然你定义了一个用于发布值的函数,但是在有消费者订阅它之前,这个函数并不会实际执行。订阅之后,当这个函数执行完或取消订阅时,订阅者就会收到通知。
  • 可观察对象可以发送多个任意类型的值 —— 字面量、消息、事件。无论这些值是同步发送的还是异步发送的,接收这些值的 API 都是一样的。由于准备(setup)和清场(teardown)的逻辑都是由可观察对象自己处理的,因此你的应用代码只管订阅并消费这些值就可以了,做完之后,取消订阅。无论这个流是按键流、HTTP 响应流还是定时器,对这些值进行监听和停止监听的接口都是一样的。

示例

1
2
3
4
5
6
7
const myObserverable = of(1, 2, 3);
const myObserver = {
next: (x: number) => console.log('observer got a next value: ', x),
error: (err: Error) => console.log('observer got an error: ', err),
complete: () => console.log('observer got a complete notification')
};
myObserverable.subscribe(myObserver);
1
2
3
4
5
6
7
const ESC_CODE = 'Escape';
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = fromEvent(nameInput, 'keydown').subscribe((e: KeyboardEvent) => {
if(e.code === ESC_CODE){
nameInput.value = '';
}
});

多播

典型的可观察对象会为每一个观察者创建一次新的、独立的执行。有时候,不应该对每一个订阅者都独立执行一次,这就需要通过多播实现。多播是指在同一个可观察对象上多次调用 subscribe()。这种情况下,该可观察对象的多个实例共享同一个执行路径。

RXJS

什么是RxJS

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。响应式编程是一种面向数据流和变更传播的异步编程范式。RxJS(响应式扩展的 JavaScript 版)是一个使用可观察对象进行响应式编程的库,它让组合异步代码和基于回调的代码变得更简单。
响应式编程是一种编程范式,它强调数据流和变化传播。在响应式编程中,您可以定义数据流和数据流之间的变换,并且可以使用观察者模式来订阅和处理这些数据流。
React也可以被视为响应式编程的一种形式。React中的组件状态和属性都可以被视为响应式数据,当它们发生变化时,React会自动重新渲染视图以反映这些变化。此外,React还提供了一些钩子函数,如useEffectuseReducer,可以用于处理异步数据流和状态管理,这些也是响应式编程的重要概念。

RxJS操作符的用途

  • 把现有的异步代码转换成可观察对象
  • 迭代流中的各个值
  • 把这些值映射成其它类型
  • 对流进行过滤
  • 组合多个流

常用的操作符

  • of: 返回一个 Observable 实例,它用同步的方式把参数中提供的这些值发送出来。
  • from: 从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable。
  • fromEvent: 创建一个 Observable,它通过添加事件监听器的方式,根据提供的事件源(比如 DOM 事件、Node.js EventEmitter 或者其他事件源)来实现。
  • combineLatest: 当提供的任何一个 Observable 发出值时,从每个 Observable 中获取最新发出的值,然后使用可选的 project 函数把这些值投射成一个值,然后发出这个投射的结果。
  • startWith: 返回一个 Observable,它在发出原始 Observable 的值之前发出给定的值。
  • debounceTime: 延迟发出值,直到下一个时间段(由 dueTime 参数决定)内没有新的值出现才发出原始值。
  • filter: 使用指定的函数测试源 Observable 发出的值,只有通过测试的才会被发出。
  • take: 只发出源 Observable 发出的前 n 个值(n 是一个 number 类型)。
  • takeUntil: 发出源 Observable 的值,直到提供的 Observable 发出值。
  • map: 将每个源值投射成 Observable,该 Observable 会合并到输出 Observable 中。
  • tap: 拦截源 Observable 的值并且调用一个函数或方法,但返回的输出 Observable 与源 Observable 是相同的。
  • share: 共享源 Observable,以防止副作用或重复的 Observable 执行。
  • ofType: 用于过滤Observable中的特定类型的值。

常用操作符总结

  • 创建操作符:of、from、fromEvent
  • 组合操作符:combineLatest、startWith
  • 过滤操作符:debounceTime、filter、take、takeUntil
  • 转换操作符:map、tap
  • 多播操作符:share

错误处理操作符

  • catchError: 捕获错误,并返回新的 Observable 或错误。
  • retry: 可以尝试失败的请求.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const apiData = ajax('/api/data').pipe(
map((res: any) => {
if(!res.response){
console.log('error occurred');
throw new Error('value exception');
}
return res.response;
}),
retry(3),
catchError(()=>of([]))
);
apiData.subscribe({
next(x: T){console.log('data: ', x);},
error(){console.log('errors already caught... will not run');}
});

可观察对象(Observable)相对于Promise有以下优点:

  1. 支持多个值:Observable可以发出多个值,而Promise只能发出一个值。这使得Observable更适合处理流数据(如鼠标事件、HTTP响应等)。
  2. 可取消:Observable可以通过unsubscribe()方法来取消订阅,从而中止请求并释放资源。而Promise一旦开始执行就无法取消。
  3. 更丰富的操作符:Observable提供了更多的操作符,如map、filter、reduce等,可以对数据进行更复杂的处理和转换。
  4. 更好的错误处理:Observable可以使用catch操作符来捕获和处理错误,而Promise只能使用catch()方法来处理错误。
  5. 更好的内存管理:Observable可以使用操作符如take、takeUntil等来控制内存泄漏,而Promise没有这样的机制。
    总之,Observable比Promise更适合处理流数据和异步操作,并提供了更多的操作符和更好的错误处理和内存管理机制。

可观察对象vs数组

可观察对象会随时间生成值。数组是用一组静态的值创建的。某种意义上,可观察对象是异步的,而数组是同步的。

可观察对象vs事件

一个显著的不同是你可以配置可观察对象,使其在把事件传给事件处理器之前先进行转换。

实例

指数化退避

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import { timer } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retry } from 'rxjs/operators';

export function backoff(maxTries: number, initialDelay: number) {
return retry({
count: maxTries,
delay: (error, retryCount) => timer(initialDelay * retryCount ** 2),
});
}

ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(function handleData(data) { /* ... */ });

Angular中的Observable

  • EventEmitter: 扩展了 RxJS Subject,并添加了一个 emit() 方法,这样它就可以发送任意值了。当你调用 emit() 时,就会把所发送的值传给订阅上来的观察者的 next() 方法。
  • HttpClient: HttpClient[基于XHR].get() 方法返回一个 Observable,它会在 HTTP 响应可用时发出该响应。HttpClient.get() 方法返回的 Observable 会发出单个值,并在此后就会完成。这意味着它不会发出多个值,而是只发出一个值,然后结束。
  • Async管道: Async 管道接受一个 Promise 或者 Observable 作为输入,并且订阅该输入,最终返回该输入发出的值。当 Promise 或者 Observable 发出值时,Async 管道就会把这些值作为最终的结果返回。
  • 响应式表单:FormControl 的 valueChanges 属性和 statusChanges 属性包含了会发出变更事件的可观察对象。

Subscription & BehaviorSubject

Subscription

Subscription 表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,叫做 unsubscribe(),它不需要任何参数,也不返回任何值。调用此方法就可以取消对 Observable 的执行,以及释放资源或者进行清理。

BehaviorSubject

BehaviorSubject 是一种特殊类型的 Subject,它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

Observable & Subject

Observable

Observable 是一个可调用的未来值或事件的集合。它是流中的生产者。Observable 接口声明了用来处理值的方法,这些方法是消费者用来处理 Observable 发出的连续值的。Observable 提供了诸如 map、forEach、reduce、concat、flatMap 等操作符。

Subject

Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

Subject示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { Subject } from 'rxjs';

const subject = new Subject<number>();

// 订阅Subject
subject.subscribe({
next: (v) => console.log(`观察者A收到值:${v}`)
});
subject.subscribe({
next: (v) => console.log(`观察者B收到值:${v}`)
});

// 发出值
subject.next(1);
subject.next(2);
subject.next(3);