操作符
操作符本质上是一个纯函数 (pure function),它接收一个 Observable
作为输入,并生成一个新的 Observable
作为输出。并且前面的 Observable
保持不变。
操作符有两类:
- Pipeable 操作符:是指可以使用
observable.pipe(operator)
或更常用的observable.pipe(operatorFactory())
语法将操作符管道化的操作符。操作符工厂函数包括filter(...)
和mergeMap(...)
等。 - 创建操作符可以作为独立函数调用,可以创建一个新的
Observable
。例如:of(1, 2, 3)
。
创建操作符
range
range(start, length)
,调用方法后返回 Observable
对象,被订阅后会发出指定范围的数值。
import { range } from "rxjs"
range(0, 5).subscribe(n => console.log(n))
// Output:
// 0
// 1
// 2
// 3
// 4
of
将参数列表作为数据流返回。
of("a", "b", [], {}, true, 20).subscribe(v => console.log(v))
// Output:
// a
// b
// []
// {}
// true
// 20
from, fromEvent, fromPromise
// 将数组转为 Observable
from(["a", "b", "c"]).subscribe(v => console.log(v))
// Output:
// a
// b
// c
// 将事件转为 Observable
fromEvent(document.querySelector('button'), 'click');
// 将 Promise 转为 Observable
function p() {
return new Promise(function (resolve) {
resolve([100, 200])
})
}
fromPromise(p()).subscribe(v => console.log(v))
// Output:
// [100, 200]
fromPromise(fetch('/users'));
interval、timer
interval每隔一段时间发出一个数值,数值递增。
import {interval, take} from 'rxjs';
const numbers = interval(1000);
const takeFourNumbers = numbers.pipe(take(4));
takeFourNumbers.subscribe(x => console.log('Next: ', x));
// Output:
// Next: 0
// Next: 1
// Next: 2
// Next: 3
timer间隔时间过去以后发出数值,行为终止,或间隔时间发出数值后,继续按第二个参数的时间间隔继续发出值
import { timer } from "rxjs"
timer(2000).subscribe(n => console.log(n))
timer(0, 1000).subscribe(n => console.log(n))
concat
合并数据流,先让第一个数据流发出值,结束后再让第二个数据流发出值,进行整体合并。
import {interval, take, range, concat} from 'rxjs';
const timer = interval(1000).pipe(take(4), map(x => `${x}s`));
const sequence = range(1, 10);
const result = concat(timer, sequence);
result.subscribe(x => console.log(x));
// Output:
// 0s-> 1s-> 2s -> 3s -> 1 -> 2 ... -> 10
const timer = interval(1000).pipe(take(2));
concat(timer, timer) // concatenating the same Observable!
.subscribe({
next: value => console.log(value),
complete: () => console.log('...and it is done!')
});
// Logs:
// 0 after 1s
// 1 after 2s
// 0 after 3s
// 1 after 4s
// '...and it is done!' also after 4s
merge
合并数据流,多个参数一起发出数据流,按照时间线进行交叉合并。
import {merge, interval, take, map} from "rxjs";
const timer = interval(1000).pipe(take(3), map(v => 'timer1: ' + v + 's'));
const timer2 = interval(1000).pipe(take(3), map(v => 'timer2: ' + v + 's'));
merge(timer, timer2).subscribe(console.log);
// Output:
// timer1: 0s
// timer2: 0s
// timer1: 1s
// timer2: 1s
// timer1: 2s
// timer2: 2s
combineLatest
将两个 Observable 中最新发出的数据流进行组合成新的数据流,其值由每个输入观测值的最新值计算得出。
import { combineLatest, timer } from "rxjs"
const firstTimer = timer(0, 1000) // emit 0, 1, 2... after every second, starting from now
const secondTimer = timer(500, 1000) // emit 0, 1, 2... after every second, starting 0,5s from now
combineLatest(firstTimer, secondTimer).subscribe(console.log)
// Output:
// [0, 0] after 0.5s
// [1, 0] after 1s
// [1, 1] after 1.5s
// [2, 1] after 2s
传入字典参数:
import { of, delay, startWith, combineLatest } from 'rxjs';
const observables = {
a: of(1).pipe(delay(1000), startWith(0)),
b: of(5).pipe(delay(5000), startWith(0)),
c: of(10).pipe(delay(10000), startWith(0))
};
const combined = combineLatest(observables);
combined.subscribe(value => console.log(value));
// Output:
// { a: 0, b: 0, c: 0 } immediately
// { a: 1, b: 0, c: 0 } after 1s
// { a: 1, b: 5, c: 0 } after 5s
// { a: 1, b: 5, c: 10 } after 10s
zip
将多个 Observable 中的数据流进行组合。其值按顺序从每个输入 Observable 的值中计算得出。
import { zip, of } from "rxjs"
import { map } from "rxjs/operators"
let age = of(27, 25, 29)
let name = of("Foo", "Bar", "Beer")
let isDev = of(true, true, false)
zip(name, age, isDev)
.pipe(map(([name, age, isDev]) => ({ name, age, isDev })))
.subscribe(console.log)
// { name: 'Foo', age: 27, isDev: true }
// { name: 'Bar', age: 25, isDev: true }
// { name: 'Beer', age: 29, isDev: false }
forkJoin
forkJoin
是 RxJS 版本的 Promise.all()
,即表示等到所有的 Observable 都完成后,才一次性返回值。
传入字典参数:
import {forkJoin, of, timer} from 'rxjs';
const observable = forkJoin({
foo: of(1, 2, 3, 4),
bar: Promise.resolve(8),
baz: timer(4000)
});
observable.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});
// Logs:
// { foo: 4, bar: 8, baz: 0 } after 4 seconds
// 'This is how it ends!' immediately after
传入数组参数:
const observable = forkJoin([
of(1, 2, 3, 4),
Promise.resolve(8),
timer(4000)
]);
observable.subscribe({
next: value => console.log(value),
complete: () => console.log('This is how it ends!'),
});
// Logs:
// [4, 8, 0] after 4 seconds
// 'This is how it ends!' immediately after
throwError
返回可观察对象并向订阅者抛出错误。
import { throwError } from "rxjs"
throwError("unknown error").subscribe({ error: console.log })
race
接收并同时执行多个可观察对象,只将最快发出的数据流传递给订阅者。
import {interval, map, race} from 'rxjs';
const obs1 = interval(7000).pipe(map(() => 'slow one'));
const obs2 = interval(3000).pipe(map(() => 'fast one'));
const obs3 = interval(5000).pipe(map(() => 'medium one'));
race(obs1, obs2, obs3)
.subscribe(winner => console.log(winner));
// Outputs
// a series of 'fast one'
Pipeable 操作符
转换操作符
map
map:对数据流进行转换,基于原有值进行转换。
import { interval } from "rxjs"
import { map } from "rxjs/operators"
interval(1000)
.pipe(map(n => n * 2))
.subscribe(n => console.log(n))
reduce、scan
reduce: 用于将源 Observable 发出的所有的值累计成一个单一的结果。它类似于的数组 reduce
方法,但它是处理流的每个值,而不是数组。
import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';
// 定义一个源 Observable 发出一些数字
const source$ = of(1, 2, 3, 4, 5);
// 使用 reduce 操作符计算数字的总和
const sum$ = source$.pipe(
reduce((acc, value) => acc + value, 0)
);
// 订阅并打印输出
sum$.subscribe(result => console.log(result));
// 15
reduce(accumulator, [seed])
接受两个参数:
accumulator
是一个函数,接受两个参数:acc
(累计值):上一次迭代的结果,或者是初始值(如果提供了seed
)。value
:当前发出的值。 该函数需要返回新的累计值。
seed
:可选。acc
的初始值。如果未提供,acc
会默认是流中的第一个值,从第二个值开始累计。
当 reduce
接收到源 Observable 的每个值时,会使用 accumulator
函数对当前值和累计值进行计算。 等到源 Observable 完成发射所有值时,reduce
会将最终的累计值作为结果发射出来。
scan:类似 reduce
,进行累计操作,但执行时机不同,数据源每次发出数据流 scan
都会执行。reduce
是发送出最终计算的结果,而 scan
是发出每次计算的结果。
import { of } from 'rxjs';
import { scan } from 'rxjs/operators';
// 定义一个源 Observable 发出一些数字
const source$ = of(1, 2, 3, 4, 5);
// 使用 scan 操作符逐步计算总和
const sum$ = source$.pipe(
scan((acc, value) => acc + value, 0)
);
// 订阅并打印输出
sum$.subscribe(result => console.log(result));
// 1
// 3
// 6
// 10
// 15
groupBy
用于将源 Observable 发出的数据根据某个条件分组,并返回一个 GroupedObservable
。groupBy
根据给定的键(通常是某个属性值)将数据分组(例如按用户 ID、地区、日期等对流中的数据进行分组),每个分组会返回一个新的 Observable,这些新的 Observable 会在后续操作符中被处理。
import { of } from 'rxjs';
import { groupBy, mergeMap, map } from 'rxjs/operators';
const source$ = of(1, 2, 3, 4, 5, 6, 7, 8, 9);
source$.pipe(
groupBy(value => value % 2 === 0 ? 'even' : 'odd'), // 按奇偶分组
mergeMap(group => group.pipe(
map(value => `${group.key}: ${value}`) // 为每个组的元素加上标识
))
).subscribe(console.log);
// odd: 1
// odd: 3
// odd: 5
// odd: 7
// odd: 9
// even: 2
// even: 4
// even: 6
// even: 8
按对象属性分组:
import { of } from 'rxjs';
import { groupBy, mergeMap, map } from 'rxjs/operators';
const source$ = of(
{ name: 'Apple', category: 'Fruit' },
{ name: 'Carrot', category: 'Vegetable' },
{ name: 'Banana', category: 'Fruit' },
{ name: 'Broccoli', category: 'Vegetable' }
);
source$.pipe(
groupBy(item => item.category), // 按类别分组
mergeMap(group => group.pipe(
map(item => `${group.key}: ${item.name}`) // 对每个组的项进行处理
))
).subscribe(console.log);
// Fruit: Apple
// Fruit: Banana
// Vegetable: Carrot
// Vegetable: Broccoli
过滤操作符
filter
对数据流进行过滤。
import {range} from "rxjs"
import {filter} from "rxjs/operators"
range(1, 10).pipe(filter((n) => n > 5))
.subscribe((even) => console.log(even));
// Output:
// 6
// 7
// 8
// 9
// 10
first
获取数据流中的第一个值或者查找数据流中第一个符合条件的值,类似数组中的 find
方法。获取到值以后终止行为。
import {interval} from "rxjs"
import {first} from "rxjs/operators"
interval(1000)
.pipe(first())
.subscribe(n => console.log(n))
interval(1000)
.pipe(first(n => n === 3))
.subscribe(n => console.log(n))
last
获取数据流中的最后一个。不可以用于不会终止的流。
import {range} from "rxjs"
import {last} from "rxjs/operators"
range(1, 10).pipe(last()).subscribe(console.log)
// 10
如果数据源不变成完成状态,则没有最后一个。
import {interval} from "rxjs"
import {last} from "rxjs/operators"
interval(1000).pipe(last()).subscribe(console.log)
// 不会有输出,程序会 hang 住
take、takeWhile、takeUtil、takeLast
take:获取数据流中的前几个
import { range } from "rxjs"
import { take } from "rxjs/operators"
range(1, 10).pipe(take(3)).subscribe(console.log)
// 1
// 2
// 3
takeWhile:只要数据源发出的值满足 takeWhile
中的条件,就发出该值,一旦不满足,就结束发射。
import {range} from "rxjs"
import {takeWhile} from "rxjs/operators"
range(1, 10)
.pipe(takeWhile(n => n < 5))
.subscribe(console.log)
// 1
// 2
// 3
// 4
takeUntil:接收一个可观察对象,当接收的可观察对象发出值时,终止主数据源。
import {interval, timer} from "rxjs"
import {takeUntil} from "rxjs/operators"
interval(1000)
.pipe(takeUntil(timer(5000)))
.subscribe(console.log)
// 0
// 1
// 2
// 3
takeLast:获取数据流中最后的一个值,takeLast
会将值缓存在内存中,等到流结束时取最后的值。
它不可以用于不会终止的流,在无尽的流上使用 takeLast
永远不会发出数据。
range(1, 100).pipe(takeLast(3)).subscribe(x => console.log(x));
skip、skipWhile、skipUntil、skipLast
skip:跳过前几个数据流。
import {range} from "rxjs"
import {skip} from "rxjs/operators"
range(1, 10).pipe(skip(8)).subscribe(console.log)
// 9
// 10
skipWhile:只要数据源发出的值满足 skipWhile
中的条件,就跳过该值。
import {range} from "rxjs"
import {skipWhile} from "rxjs/operators"
range(1, 10)
.pipe(skipWhile(n => n < 5))
.subscribe(console.log)
// 5
// 6
// 7
// 8
// 9
// 10
skipUntil:接收一个可观察对象,并跳过主数据源发出的值,直到接收的可观察对象发出值,主数据源的值才能发出。
import {timer, interval} from "rxjs"
import {skipUntil} from "rxjs/operators"
interval(500)
.pipe(skipUntil(timer(2000)))
.subscribe(console.log)
// 3
// 4
// 5
// ...
skipLast:跳过后面几个值,它的实现和 takeLast
类似,当数据发送完成后跳过最后的值发出其他的值。不可以用于不会终止的流。
of(1, 2, 3, 4, 5).pipe(skipLast(2)).subscribe(x => console.log(x));
// 1
// 2
// 3
elementAt
elementAt
类似数组的下标索引,获取源 Observable 中指定索引位置的元素。
elementAt(index, [defaultValue])
接受两个参数:
index
: 要获取的元素的索引。defaultValue
(可选):如果索引超出范围,可以指定一个默认值。在没有提供默认值时,默认行为是发出一个错误通知。
import { of } from 'rxjs';
import { elementAt } from 'rxjs/operators';
// 定义一个源 Observable 发出一些数字
const source$ = of(10, 20, 30, 40, 50);
// 使用 elementAt 获取索引为 2 的元素(即第三个元素,值为 30)
const element$ = source$.pipe(
elementAt(2)
);
// 订阅并打印输出
element$.subscribe({
next: value => console.log(value),
error: err => console.error(err)
});
// 30
ignoreElements
忽略源 Observable 发出的所有值,只传递异常和结束信息,可以用于不关注数据只关注异常的场景。
import { of } from 'rxjs';
import { ignoreElements } from 'rxjs/operators';
// 定义一个源 Observable 发出一些数字
const source$ = of(1, 2, 3, 4, 5);
// 使用 ignoreElements 操作符
const ignored$ = source$.pipe(
ignoreElements()
);
// 订阅并打印输出
ignored$.subscribe({
next: value => console.log('Next:', value), // 不会打印任何值
error: err => console.error('Error:', err), // 如果有错误会打印
complete: () => console.log('Completed!') // 会打印 "Completed!"
});
// Completed!
debounceTime
常用于处理输入框等需要等待用户停止输入的场景。debounceTime
会延迟发出值,直到观察源 Observable 发出新的值后经过指定的时间,只有在这段时间内没有其他值发出时,它才会发出最新的值。
import { fromEvent } from "rxjs"
import { debounceTime } from "rxjs/operators"
fromEvent(document, "click")
.pipe(debounceTime(1000))
.subscribe(x => console.log(x))
distinctUntilChanged
用于过滤重复值。它会比较相邻的值,如果当前的值与上一个值不同,则允许这个值通过,否则会被忽略。
import { of } from "rxjs"
import { distinctUntilChanged } from "rxjs/operators"
of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
.pipe(distinctUntilChanged())
.subscribe(x => console.log(x)) // 1, 2, 1, 2, 3, 4
自定义的比较函数
import { of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
const source$ = of(
{ id: 1, name: 'John' },
{ id: 1, name: 'John' },
{ id: 2, name: 'Alice' },
{ id: 2, name: 'Alice' },
{ id: 3, name: 'Bob' }
);
source$.pipe(
distinctUntilChanged((prev, curr) => prev.id === curr.id) // 自定义比较逻辑,比较 id 字段
).subscribe(value => {
console.log(value);
});
throttleTime
节流,它能够控制源 Observable 发出的值的频率。throttleTime
会在指定的时间间隔内只允许第一次发出的值通过,其他值会被丢弃,直到这个时间间隔过去,才允许下一个值通过。
import { interval } from 'rxjs';
import { throttleTime } from 'rxjs/operators';
const source$ = interval(100); // 每 100ms 发出一个数值
source$.pipe(
throttleTime(500) // 只允许每 500ms 发出一个值
).subscribe(value => {
console.log(value);
});
// 0
// 5
// 10
// 15
// ...
throttleTime
与 debounceTime
的区别
throttleTime
:每个时间窗口内发出第一个值,之后的值会被丢弃,直到时间窗口结束。debounceTime
:在时间间隔结束之前,所有的值都会被丢弃,只有最后一个值会被发出。
连接操作符
startWith
创建一个新的 Observable 对象并将参数值发送出去,然后再发送源 Observable 对象发出的值。
在异步编程中提供默认值的时候非常有用。将 startWith
发出的值当做默认值。
import {interval} from "rxjs"
import {map, startWith} from "rxjs/operators"
interval(1000)
.pipe(
map(n => n + 100),
startWith(505)
)
.subscribe(n => console.log(n))
// 505
// 100
// 101
// 102
// ...
withLatestFrom
withLatestFrom
适用于:
- 你需要从另一个流(可能是一个状态流)获取最新的数据,并将其与源 Observable 的当前数据结合处理。
- 用于处理那些只关心最新状态的情况,而不需要每次都处理整个流的每个值。
将点击事件与最新的计数结合:
import { fromEvent, interval } from 'rxjs';
import { withLatestFrom, map } from 'rxjs/operators';
const button = document.getElementById('button');
const counter$ = interval(1000); // 每 1000 毫秒发出一个递增的计数值
fromEvent(button, 'click').pipe(
withLatestFrom(counter$), // 将点击事件与当前计数值结合
map(([clickEvent, counterValue]) => `Button clicked! Current counter: ${counterValue}`)
).subscribe(console.log);
// Button clicked! Current counter: 2
// Button clicked! Current counter: 3
// Button clicked! Current counter: 4
// ...
工具操作符
tap
tap
用来查看 Observable 流中的数据,同时不会对数据流产生任何影响。通常用来打印日志。
import {of, tap, map} from 'rxjs';
of(Math.random()).pipe(
tap(console.log),
map(n => n > 0.5 ? 'big' : 'small')
).subscribe(console.log);
import { of } from 'rxjs';
import { tap, map } from 'rxjs/operators';
const source = of(1, 2, 3, 4, 5);
const example = source
.pipe(
map(val => val + 10),
tap({
next: val => {
console.log('on next', val);
},
error: error => {
console.log('on error', error.message);
},
complete: () => console.log('on complete')
})
)
.subscribe(val => console.log(val));
// 输出:
// on next 11
// 11
// on next 12
// 12
// on next 13
// 13
// on next 14
// 14
// on next 15
// 15
// on complete
delay、delayWhen
delay:对 Observable 发出的值延迟指定的时间,只执行一次。
import {interval, take} from "rxjs"
import {delay} from "rxjs/operators"
const start = new Date();
interval(500)
.pipe(
take(5),
delay(3000)
)
.subscribe((v) => {
console.log('val', v);
console.log(new Date() - start);
})
// 3500ms 之后发出值,接下来每 500ms 发出一个值
delayWhen:将源 Observable 发出的值延迟一段时间,这个延迟时间由另一个 Observable 来决定。源 Observable 发出多少值,传入的回调函数就会执行多少次。
import { of, timer } from 'rxjs';
import { delayWhen } from 'rxjs/operators';
// 定义一个源 Observable 发出一些数字
const source$ = of(1, 2, 3, 4);
const delayed$ = source$.pipe(
delayWhen(value => {
// 根据每个值决定延迟的时间
// 比如这里是每个值的延迟时间都基于 1000ms + 当前数字 * 500ms
return timer(1000 + value * 500);
})
);
// 订阅并打印输出
delayed$.subscribe(console.log);
every
every
操作符用于判断源 Observable 发出的每个值是否都符合某个条件。返回布尔值。类似数组的 every
方法。
import { range } from "rxjs"
import { every, map } from "rxjs/operators"
range(1, 9)
.pipe(
every(n => n > 5)
)
.subscribe(console.log)
// false,因为源 Observable 发出的值只有 6,7,8,9 大于 5
range(1, 9)
.pipe(
every(n => n > 0)
)
.subscribe(console.log)
// true
错误处理
catchError
catchError
用于捕获 Observable 流中的错误,并返回一个新的 Observable 来替代原来的错误流。这个操作符允许你处理错误并继续执行流,而不是直接终止整个 Observable。
import { of, throwError } from 'rxjs';
import { catchError } from 'rxjs/operators';
// 模拟一个会抛出错误的 Observable
const source$ = throwError(() => new Error('Something went wrong!'));
// 使用 catchError 捕获错误并返回替代的 Observable
const result$ = source$.pipe(
catchError(err => {
console.error('Caught error:', err.message);
return of('Fallback value');
// 重新抛出一个新的错误
// return throwError(() => new Error('Re-thrown error'));
})
);
// 订阅并打印输出
result$.subscribe({
next: value => console.log(value),
error: err => console.error('Final error:', err),
complete: () => console.log('Completed!')
});
// Caught error: Something went wrong!
// Fallback value
// Completed!
retry
retry
操作符用于自动重新订阅源 Observable,并重试失败的操作。当源 Observable 发出错误通知时,retry 会尝试重新发起新的订阅,直到达到指定的重试次数为止。如果重试次数超过了指定次数,则会将错误传递给观察者。
import { of, throwError } from 'rxjs';
import { retry } from 'rxjs/operators';
// 模拟一个会发出错误的 Observable
const source$ = throwError(() => new Error('Something went wrong!'));
// 使用 retry 操作符,最多重试 3 次
const retried$ = source$.pipe(
retry(3) // 重试 3 次
);
// 订阅并打印输出
retried$.subscribe({
next: value => console.log(value),
error: err => console.error('Error:', err.message), // 在尝试了 3 次后,错误通知会被最终发出
complete: () => console.log('Completed!')
});
// Error: Something went wrong!
如果 count
参数未指定,retry
会无限次重试,直到成功或手动取消订阅。
重试指定条件的错误:
retryWhen
操作符已被弃用,使用 retry({ delay: () => notify$ })
来替代 retryWhen(() => notify$)
。
通过传递一个带有 delay
选项的对象给 retry
,来控制每次重试之间的延迟时间。
delay
:可以是一个数字,表示延迟时间。delay
:可以是一个函数delay: () => notify$
,返回一个 Observable。每次重试之前会等待notify$
发出的值。
import { of, throwError, timer } from 'rxjs';
import { retry } from 'rxjs/operators';
// 模拟一个会发出错误的 Observable
const source$ = throwError(() => new Error('Network Error'));
// 定义一个控制延迟的 Observable
const notify$ = timer(1000); // 每次重试之前等待 1 秒
// 使用 retry 操作符并指定 delay 属性
const retried$ = source$.pipe(
retry({
count: 3,
delay: () => notify$ // 每次重试之间延迟 1 秒
})
);
// 订阅并打印输出
retried$.subscribe({
next: value => console.log(value),
error: err => console.error('Error:', err.message),
complete: () => console.log('Completed!')
});
// Error: Network Error
动态延迟,指数退避:
import { of, throwError, timer } from 'rxjs';
import { retry } from 'rxjs/operators';
// 模拟一个会发出错误的 Observable
const source$ = throwError(() => new Error('Network Error'));
// 动态计算每次重试的延迟:指数退避(Exponential Backoff)
const retryDelay = (attempt: number) => timer(Math.pow(2, attempt) * 1000); // 每次重试间隔 2^attempt 秒
// 使用 retry 操作符并指定延迟策略
const retried$ = source$.pipe(
retry({
count: 3,
delay: (error, attempt) => retryDelay(attempt) // 基于尝试次数来计算延迟
})
);
// 订阅并打印输出
retried$.subscribe({
next: value => console.log(value),
error: err => console.error('Error:', err.message),
complete: () => console.log('Completed!')
});
// Error: Network Error
根据不同错误类型重试:
import { of, throwError, timer } from 'rxjs';
import { retry, delay } from 'rxjs/operators';
// 模拟一个会发出错误的 Observable
const source$ = throwError(() => new Error('Network Error'));
// 条件重试
const retryDelay = (err: any) => {
if (err.message === 'Network Error') {
// 如果是网络错误,延迟 1 秒
console.log('Network Error: Retrying...');
return of(err).pipe(delay(1000));
} else if (err.message === 'Timeout Error') {
// 如果是超时错误,延迟 2 秒
console.log('Timeout Error: Retrying...');
return of(err).pipe(delay(2000));
} else {
// 其他错误直接抛出
return throwError(() => err);
}
}
// 使用 retry 操作符并指定 delay 属性
const retried$ = source$.pipe(
retry({
count: 3,
delay: (error, attempt) => retryDelay(error)
})
);
// 订阅并打印输出
retried$.subscribe({
next: value => console.log(value),
error: err => console.error('Error:', err.message),
complete: () => console.log('Completed!')
});
高阶操作符
所谓的高阶操作符(Higher Order Observable)就是指一个 Observable 发送出的值还是一个 Observable。
concatAll、concatMap
concatAll:用于将一个包含多个内部 Observables 的高阶 Observable(即 Observable 的 Observable)“展平”(flatten)为一个单一的 Observable,并按顺序依次合并这些内部 Observables。
concatAll
确保它按顺序执行每个内部 Observable,前一个 Observable 完成后才会处理下一个。
import { of } from 'rxjs';
import { concatAll } from 'rxjs/operators';
// 创建多个内部 Observables
const source$ = of(
of('A', 'B', 'C'),
of('D', 'E'),
of('F', 'G')
);
// 使用 concatAll 将其展平为一个流
const result$ = source$.pipe(concatAll());
// 订阅并打印输出
result$.subscribe(value => console.log(value));
// A
// B
// C
// D
// E
// F
// G
处理异步:
import { of, timer } from 'rxjs';
import { concatAll, map } from 'rxjs/operators';
// 模拟异步操作(例如:HTTP 请求)
const simulateHttpRequest = (id) => {
return timer(1000 * id).pipe(map(() => `Request ${id} completed`));
};
// 创建一个包含多个 HTTP 请求的 Observable
const source$ = of(1, 2, 3).pipe(
map(id => simulateHttpRequest(id)) // 为每个请求返回一个内部 Observable
);
// 使用 concatAll 依次处理这些 HTTP 请求
const result$ = source$.pipe(concatAll());
// 订阅并打印输出
result$.subscribe(value => console.log(value));
// Request 1 completed
// Request 2 completed
// Request 3 completed
concatMap:用于将源 Observable 发出的每个值映射为一个内部 Observable,并按顺序依次处理这些内部 Observables。
- 每次只会处理一个内部 Observable,前一个 Observable 完成后才会处理下一个。
- 保证了顺序,不会并发执行内部 Observables。
import { of, timer } from 'rxjs';
import { concatMap, map } from 'rxjs/operators';
// 模拟异步操作(例如 HTTP 请求)
const simulateHttpRequest = (id) => {
return timer(1000 * id).pipe(map(() => `Request ${id} completed`));
};
// 创建一个 Observable 发出请求 ID
const source$ = of(1, 2, 3);
// 使用 concatMap 按顺序执行 HTTP 请求
const result$ = source$.pipe(
concatMap(id => simulateHttpRequest(id))
);
// 订阅并打印输出
result$.subscribe(value => console.log(value));
// Request 1 completed
// Request 2 completed
// Request 3 completed
mergeAll、mergeMap
mergeAll:用于将高阶 Observable(即一个发出多个内部 Observable 的 Observable)展平(flatten),并将所有内部 Observable 合并为一个单一的输出流。
mergeAll
会立即订阅所有内部 Observable,然后并发地发出它们的值,而不会按顺序等待每个内部 Observable 完成。
mergeAll(concurrent: number = Infinity)
有一个可选参数 concurrent
可以用来控制并发数量。
import { of } from 'rxjs';
import { mergeAll } from 'rxjs/operators';
// 创建一个包含多个内部 Observables 的高阶 Observable
const source$ = of(
of('A', 'B', 'C'),
of('D', 'E'),
of('F', 'G')
);
// 使用 mergeAll 将这些内部 Observables 合并为一个单一的流
const result$ = source$.pipe(mergeAll());
// 订阅并打印输出
result$.subscribe(value => console.log(value));
// A
// B
// C
// D
// E
// F
// G
mergeMap:允许将源 Observable 发出的每一个值映射为一个内部 Observable(或称为“内部流”),并将这些内部 Observable 合并到一个单一的输出流中。
- 不同于
concatMap
,mergeMap
会并发地处理多个内部 Observable,并不会等待前一个内部 Observable 完成后再处理下一个。
mergeMap(project: (value: T) => ObservableInput, concurrent: number = Infinity)
,接受两个参数
project
:一个函数,接收源 Observable 发出的每个值,并返回一个内部 Observable(或其他类型的值)。这个内部 Observable 会被合并到输出流中。concurrent
(可选):指定最多可以同时订阅多少个内部 Observable,默认为Infinity
,表示无限制并发。
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
// 模拟异步操作
const simulateAsyncTask = (value) => {
return of(`Task ${value} completed`).pipe(delay(1000 * value)); // 延时模拟异步操作
};
// 创建一个源 Observable
const source$ = of(1, 2, 3);
// 使用 mergeMap 来并发处理多个异步任务
// 任务 Task 1, Task 2, 和 Task 3 会同时开始处理
const result$ = source$.pipe(
mergeMap(value => simulateAsyncTask(value))
);
// 订阅并打印输出
result$.subscribe(value => console.log(value));
// Task 1 completed
// Task 2 completed
// Task 3 completed
限制并发数量:
import { of } from 'rxjs';
import { mergeMap, delay } from 'rxjs/operators';
// 模拟异步操作
const simulateAsyncTask = (value) => {
return of(`Task ${value} completed`).pipe(delay(1000 * value)); // 延时模拟异步操作
};
// 创建一个源 Observable
const source$ = of(1, 2, 3, 4, 5);
// 使用 mergeMap 并限制并发数为 2
const result$ = source$.pipe(
mergeMap(value => simulateAsyncTask(value), 2) // 这里限制并发数为 2
);
// 订阅并打印输出
result$.subscribe(value => console.log(value));
// Task 1 completed
// Task 2 completed
// Task 3 completed
// Task 4 completed
// Task 5 completed
switchMap
用于将源 Observable 发出的每个值映射为一个新的 Observable,并且在接收到新的值时,取消订阅当前的内部 Observable,并订阅新的内部 Observable。
这意味着只有最后一个内部 Observable 的值会被发出,其它之前的内部 Observable 会被自动取消订阅。
import { fromEvent, fromPromise } from 'rxjs';
import { switchMap, debounceTime, map } from 'rxjs/operators';
// 模拟网络请求
const fakeApiRequest = (query) => {
return new Promise((resolve) => {
setTimeout(() => resolve(`Result for: ${query}`), 1000);
});
};
// 获取搜索框元素
const searchInput = document.getElementById('search');
// 用户输入事件流
const search$ = fromEvent(searchInput, 'input').pipe(
map(event => event.target.value), // 提取输入的值
debounceTime(300), // 防抖,避免每次输入都发出请求
switchMap(query => {
if (query) {
return fromPromise(fakeApiRequest(query)); // 返回一个新的 Observable
} else {
return of(''); // 如果没有输入,返回一个空字符串
}
})
);
// 订阅并打印搜索结果
search$.subscribe(result => console.log(result));
fakeApiRequest(query)
模拟一个异步请求,switchMap
会发出该请求,并在新的输入发生时取消之前的请求。
exhaustAll, exhaustMap
exhaustMap:映射源 Observable 发出的每个值为一个内部 Observable,并且忽略在内部 Observable 完成之前发出的所有新值。
exhaustMap
会等待当前的内部 Observable 完成。- 在当前的内部 Observable 执行期间,任何新的源值都会被忽略。
- 只有在内部 Observable 完成后,才会处理源 Observable 中的下一个值。
这种行为通常用于限制并发任务的数量,尤其是在处理需要等待的异步操作时,例如避免用户频繁点击按钮触发多个请求。
import { fromEvent } from 'rxjs';
import { exhaustMap, map, debounceTime } from 'rxjs/operators';
// 模拟异步搜索请求
const simulateSearchRequest = (query) => {
console.log(`Searching for: ${query}`);
return new Promise(resolve => {
setTimeout(() => {
resolve(`Search results for: ${query}`);
}, 1500); // 模拟请求延时
});
};
// 获取搜索框元素
const searchInput = document.getElementById('searchBox');
// 用户输入事件流
const search$ = fromEvent(searchInput, 'input').pipe(
debounceTime(500), // 防抖,避免每次输入都发出请求
map(event => event.target.value), // 提取输入的值
exhaustMap(query => {
if (query) {
return from(simulateSearchRequest(query)); // 返回一个新的 Observable
} else {
return of(''); // 如果没有输入,返回空字符串
}
})
);
// 订阅并打印搜索结果
search$.subscribe(result => console.log(result));
exhaustMap
会确保每次用户输入时只处理最后一次的请求,而忽略在请求进行中产生的其他输入。
exhaustAll:类似于 exhaustMap
,但它的用途稍有不同。exhaustAll
用于将源 Observable 发出的每个值(这些值本身通常是 Observable)转换为内部 Observable,并且在当前内部 Observable 完成之前,会忽略所有新的值,直到当前内部 Observable 完成。
exhaustAll
监听源 Observable 中发出的每个值,这些值通常是 Observable。- 当源 Observable 发出一个新的 Observable 时,
exhaustAll
会订阅该内部 Observable。 - 如果当前的内部 Observable 还没有完成,
exhaustAll
会忽略源 Observable 发出的所有后续值,直到当前的内部 Observable 完成。 - 一旦当前的内部 Observable 完成,
exhaustAll
会继续处理源 Observable 中的下一个值(如果存在)。
import { of, interval } from 'rxjs';
import { exhaustAll, map, take } from 'rxjs/operators';
// 模拟内部 Observable(例如 HTTP 请求)
const simulateAsyncTask = (id) => {
return of(`Request ${id} completed`).pipe(
// 模拟任务延迟
map(val => val),
take(1) // 模拟每个请求只会发出一个值,然后完成
);
};
// 创建一个源 Observable,发出多个 Observable
const source$ = of(
simulateAsyncTask(1),
simulateAsyncTask(2),
simulateAsyncTask(3),
simulateAsyncTask(4)
);
// 使用 exhaustAll 来处理内部 Observable
const result$ = source$.pipe(
exhaustAll() // 忽略当前任务未完成时的其他任务
);
// 订阅并打印输出
result$.subscribe(value => console.log(value));
// Request 1 completed
// Request 2 completed
// Request 3 completed
// Request 4 completed