Subject
RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,这意味着 Subject 确保每个观察者之间共享 Observable 的值。
而普通的 Observable 是单播的,它会为每一个观察者创建一次新的、独立的执行。当观察者进行订阅时,该可观察对象会连上一个事件处理器,并且向那个观察者发送一些值。当第二个观察者订阅时,这个可观察对象就会连上一个新的事件处理器,并独立执行一次,把这些值发送给第二个可观察对象。
在 RxJS 中有四种 Subject 分别是:Subject,BehaviorSubject,AsyncSubject,ReplaySubject;这四种 Subject 都是特殊的 Observable。
Subject 既是 Observable 也是 Observer。
Subject
Subject 其实是观察者模式的实现,所以当观察者订阅 Subject 对象时,它会把订阅者添加到观察者列表中,每当有接收到新值时,它就会遍历观察者列表,依次调用观察者内部的 next 方法,把值一一送出。
import { Subject } from 'rxjs';
const subject$ = new Subject<number>();
subject$.next(1);
subject$.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});
subject$.next(2);
subject$.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});
subject$.next(3);
// Output:
// observerA: 2
// observerA: 3
// observerB: 3
- 创建了一个 Subject
- 发出了一个值 1,但由于此时并没有订阅者,所以这个值不会被订阅到
- 创建了 observerA
- 又发出一个值 2,这时候 observerA 会接收到这个值
- 又创建一个 observerB
- 最后发出一个值 3,这时候已经订阅的都会接收到这个值
BehaviorSubject
BehaviorSubject,它有一个当前值的概念。它保存了发送给消费者的最新值,当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到最后发出的值。
在定义一个 BehaviorSubject 时需要有初始值。
import { BehaviorSubject } from 'rxjs';
const behavior$ = new BehaviorSubject(0); // 0 is the initial value
behavior$.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});
behavior$.next(1);
behavior$.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});
behavior$.next(2);
// Output:
// observerA: 0
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
- 创建了一个 BehaviorSubject的实例behavior$,初始值为 0。
- 然后订阅了这个 behavior$,由于BehaviorSubject的特点是把最新的值发布给订阅者,observerA 会得到初始值 0,所以会输出observerA: 0
- behavior$发出一个新的值 1,这时候 observerA 将会收到新的值,输出- observerA: 1
- 增加一个订阅者 observerB,这时候它会得到最新的值 1,所以输出 observerB: 1,
- 最后再一次发出一个新的值 2,这个时候有两个订阅者 observerA 和 observerB,它们都会接收到新的值 2,所以会输出 observerA: 2和observerB: 2
ReplaySubject
ReplaySubject 有点像 BehaviorSubject,区别是不仅是当前值,之前的旧值也可以发送给新的订阅者。ReplaySubject 会记录多个值,并重放给新的订阅者。
它的第一个参数 bufferSize 指定了缓存的大小,默认为 Infinity,即缓存所有发出的值。还可以向其传递第二个参数 windowTime,指定缓存的时间限制,默认为 Infinity,即不限制值的失效时间。
import { ReplaySubject } from 'rxjs';
const replay$ = new ReplaySubject(2); // buffer 2 values for new subscribers
replay$.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});
replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});
replay$.next(4);
// Output:
// observerA: 1
// observerA: 2
// observerA: 3
// observerB: 2
// observerB: 3
// observerA: 4
// observerB: 4
- 创建了一个 ReplaySubject的实例replay$,并设置缓存为 2.
- 创建了一个订阅者 observerA
- 调用三次的 next 方法,把值发布给订阅者。这时订阅者 observerA 会输出三次
- 创建一个新的订阅者 observerB,由于 ReplaySubject缓存了两个值,因此它将直接向订阅者 observerB 发出这两个值,订阅者 observerB 打印这两个值。
- 发出另外一个值 4,这时候 observerA 和 observerB 都接收到值的改变,分别打印这个值。
指定一个以毫秒为单位的窗口时间,示例:
import { ReplaySubject } from 'rxjs';
const replay$ = new ReplaySubject(2, 100 /* windowTime 100ms */);
replay$.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});
let i = 1;
setInterval(() => replay$.next(i++), 200);
setTimeout(() => {
  replay$.subscribe({
    next: (v) => console.log(`observerB: ${v}`),
  });
}, 1000);
// Output:
// observerA: 0
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 4
// observerA: 5
// observerB: 5
// ...
- 创建 ReplaySubject并设置缓存为 2,缓存时间不超过 100ms
- 创建一个 observerA
- 每 200ms 发出一个新的值。observerA 会接收到发出的所有值
- 创建一个 observerB,由于是在 1000ms 后进行订阅。这意味着在开始订阅之前,replay$已经发出了 5 个值。在创建ReplaySubject时,指定最多存储 2 个值,并且不能超过 100ms。这意味着在 1000ms 后,observerB 开始订阅时,由于replay$是 200ms 发出一个值,因此 observerB 只会接收到 1 个值。
AsyncSubject
只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。 类似 last() 操作符。
import { AsyncSubject } from 'rxjs';
const async$ = new AsyncSubject();
async$.subscribe({
  next: (v) => console.log(`observerA: ${v}`),
});
async$.next(1);
async$.next(2);
async$.subscribe({
  next: (v) => console.log(`observerB: ${v}`),
});
async$.next(3);
async$.complete();
// Logs:
// observerA: 3
// observerB: 3
- 创建 AsyncSubject的实例
- 创建一个 observerA
- async$发出 2 个值,但是 observerA 不会有输出。
- 创建一个 observerB
- 发出新的值,但是 observerA 和 observerB 都不会有输出。
- 执行 complate完成,这时候将最后一个值发送给所有订阅者
Void subject
有时,subject 的值并不重要,重要的是有一个事件被触发了。
可以声明一个 void subject,表示这个值是不相关的。只有事件本身才是重要的。
import { Subject } from 'rxjs';
const subject = new Subject(); // Shorthand for Subject<void>
subject.subscribe({
    next: () => console.log('One second has passed'),
});
setTimeout(() => subject.next(), 1000);