Elliot Yang 的动态
动态详情
返回列表
Elliot Yang
1import { waitFor } from '@/promise'
2import { concatMap, exhaustMap, from, interval, map, mergeMap, of, switchMap } from 'rxjs'
3import { describe, expect, it, vi } from 'vitest'
4
5/**
6 * 比较 RxJS 中不同的 xxxMap 操作符的行为特点
7 * 主要测试四种映射操作符:
8 * - mergeMap: 并发执行所有内部 Observable
9 * - concatMap: 串行执行内部 Observable
10 * - switchMap: 切换到最新的内部 Observable
11 * - exhaustMap: 忽略新值直到当前内部 Observable 完成
12 */
13describe('xxxMap vs xxxMap', () => {
14 beforeAll(() => {
15 vi.useFakeTimers()
16 })
17
18 afterAll(() => {
19 vi.useRealTimers()
20 })
21
22 /**
23 * mergeMap 的并发性:
24 * 1. mergeMap 允许并发处理多个内部 Observable
25 * 2. 当一个内部 Observable 还在进行时,下一个 interval 发射的值会立即被处理
26 * 3. 最终输出 [1, 0] 的原因:
27 * - index=0 时启动一个延迟100ms的 Observable
28 * - index=1 时立即发出值1
29 * - 100ms后第一个 Observable 完成,输出0
30 */
31 it('mergeMap', async () => {
32 const values: number[] = []
33 const subscription = interval(100).pipe(
34 mergeMap((value, index) => {
35 if (index === 1) {
36 return of(value)
37 }
38 return from(waitFor(100)).pipe(map(() => value))
39 }),
40 ).subscribe({
41 next: (value) => {
42 values.push(value)
43 },
44 })
45
46 await vi.advanceTimersByTimeAsync(300)
47 subscription.unsubscribe()
48 expect(values).toEqual([1, 0])
49 })
50
51 /**
52 * concatMap 的串行性:
53 * 1. concatMap 会等待前一个内部 Observable 完成后,才会处理下一个值
54 * 2. 保证了处理顺序与源 Observable 发出值的顺序一致
55 * 3. 最终输出 [0, 1] 的原因:
56 * - index=0 的 Observable 延迟100ms后输出0
57 * - 然后处理 index=1,立即输出1
58 */
59 it('contactMap', async () => {
60 const values: number[] = []
61 const subscription = interval(100).pipe(
62 concatMap((value, index) => {
63 if (index === 1) {
64 return of(value)
65 }
66 return from(waitFor(100)).pipe(map(() => value))
67 }),
68 ).subscribe({
69 next: (value) => {
70 values.push(value)
71 },
72 })
73
74 await vi.advanceTimersByTimeAsync(300)
75 subscription.unsubscribe()
76 expect(values).toEqual([0, 1])
77 })
78
79 /**
80 * switchMap 的切换性:
81 * 1. switchMap 会在收到新的源值时取消前一个内部 Observable 的订阅
82 * 2. 只保留最新的内部 Observable 的结果
83 * 3. 最终只输出 [1] 的原因:
84 * - index=0 启动延迟100ms的 Observable
85 * - 100ms时收到 index=1,取消前一个 Observable
86 * - index=1 立即输出值1
87 */
88 it('switchMap', async () => {
89 const values: number[] = []
90 const subscription = interval(100).pipe(
91 switchMap((value, index) => {
92 if (index === 1) {
93 return of(value)
94 }
95 return from(waitFor(100)).pipe(map(() => value))
96 }),
97 ).subscribe({
98 next: (value) => {
99 values.push(value)
100 },
101 })
102
103 await vi.advanceTimersByTimeAsync(300)
104 subscription.unsubscribe()
105 expect(values).toEqual([1])
106 })
107
108 /**
109 * exhaustMap 的防抖性:
110 * 1. exhaustMap 会在处理内部 Observable 时忽略源 Observable 发出的新值
111 * 2. 直到当前内部 Observable 完成才会处理新的值
112 * 3. 最终输出 [0, 2] 的原因:
113 * - index=0 启动延迟100ms的 Observable
114 * - 100ms时收到 index=1,但被忽略,因为当前 Observable 还未完成
115 * - 200ms时,index=0 的 Observable 完成并输出 0
116 * - 同时收到 index=2,开始新的延迟100ms的 Observable
117 * - 300ms时,index=2 的 Observable 完成并输出 2
118 */
119 it('exhaustMap', async () => {
120 const values: number[] = []
121 const subscription = interval(100).pipe(
122 exhaustMap((value, index) => {
123 if (index === 1) {
124 return of(value)
125 }
126 return from(waitFor(100)).pipe(map(() => value))
127 }),
128 ).subscribe({
129 next: (value) => {
130 values.push(value)
131 },
132 })
133
134 await vi.advanceTimersByTimeAsync(300)
135 subscription.unsubscribe()
136 expect(values).toEqual([0, 2])
137 })
138})浏览:126点赞:0