Elliot Yang 的动态

动态详情

返回列表
Elliot Yang

Elliot Yang

生成结果

typescript
1import { waitFor } from '@/promise'
2import { concatMap, exhaustMap, from, interval, map, mergeMap, of, switchMap } from 'rxjs'
3import { describe, expect, it, vi } from 'vitest'
4
5/**
6 * 比较 RxJS 中不同的 xxxMap 操作符的行为特点
7 * 主要测试四种映射操作符:
8 * - mergeMap: 并发执行所有内部 Observable
9 * - concatMap: 串行执行内部 Observable
10 * - switchMap: 切换到最新的内部 Observable
11 * - exhaustMap: 忽略新值直到当前内部 Observable 完成
12 */
13describe('xxxMap vs xxxMap', () => {
14  beforeAll(() => {
15    vi.useFakeTimers()
16  })
17
18  afterAll(() => {
19    vi.useRealTimers()
20  })
21
22  /**
23   * mergeMap 的并发性:
24   * 1. mergeMap 允许并发处理多个内部 Observable
25   * 2. 当一个内部 Observable 还在进行时,下一个 interval 发射的值会立即被处理
26   * 3. 最终输出 [1, 0] 的原因:
27   *    - index=0 时启动一个延迟100ms的 Observable
28   *    - index=1 时立即发出值1
29   *    - 100ms后第一个 Observable 完成,输出0
30   */
31  it('mergeMap', async () => {
32    const values: number[] = []
33    const subscription = interval(100).pipe(
34      mergeMap((value, index) => {
35        if (index === 1) {
36          return of(value)
37        }
38        return from(waitFor(100)).pipe(map(() => value))
39      }),
40    ).subscribe({
41      next: (value) => {
42        values.push(value)
43      },
44    })
45
46    await vi.advanceTimersByTimeAsync(300)
47    subscription.unsubscribe()
48    expect(values).toEqual([1, 0])
49  })
50
51  /**
52   * concatMap 的串行性:
53   * 1. concatMap 会等待前一个内部 Observable 完成后,才会处理下一个值
54   * 2. 保证了处理顺序与源 Observable 发出值的顺序一致
55   * 3. 最终输出 [0, 1] 的原因:
56   *    - index=0 的 Observable 延迟100ms后输出0
57   *    - 然后处理 index=1,立即输出1
58   */
59  it('contactMap', async () => {
60    const values: number[] = []
61    const subscription = interval(100).pipe(
62      concatMap((value, index) => {
63        if (index === 1) {
64          return of(value)
65        }
66        return from(waitFor(100)).pipe(map(() => value))
67      }),
68    ).subscribe({
69      next: (value) => {
70        values.push(value)
71      },
72    })
73
74    await vi.advanceTimersByTimeAsync(300)
75    subscription.unsubscribe()
76    expect(values).toEqual([0, 1])
77  })
78
79  /**
80   * switchMap 的切换性:
81   * 1. switchMap 会在收到新的源值时取消前一个内部 Observable 的订阅
82   * 2. 只保留最新的内部 Observable 的结果
83   * 3. 最终只输出 [1] 的原因:
84   *    - index=0 启动延迟100ms的 Observable
85   *    - 100ms时收到 index=1,取消前一个 Observable
86   *    - index=1 立即输出值1
87   */
88  it('switchMap', async () => {
89    const values: number[] = []
90    const subscription = interval(100).pipe(
91      switchMap((value, index) => {
92        if (index === 1) {
93          return of(value)
94        }
95        return from(waitFor(100)).pipe(map(() => value))
96      }),
97    ).subscribe({
98      next: (value) => {
99        values.push(value)
100      },
101    })
102
103    await vi.advanceTimersByTimeAsync(300)
104    subscription.unsubscribe()
105    expect(values).toEqual([1])
106  })
107
108  /**
109   * exhaustMap 的防抖性:
110   * 1. exhaustMap 会在处理内部 Observable 时忽略源 Observable 发出的新值
111   * 2. 直到当前内部 Observable 完成才会处理新的值
112   * 3. 最终输出 [0, 2] 的原因:
113   *    - index=0 启动延迟100ms的 Observable
114   *    - 100ms时收到 index=1,但被忽略,因为当前 Observable 还未完成
115   *    - 200ms时,index=0 的 Observable 完成并输出 0
116   *    - 同时收到 index=2,开始新的延迟100ms的 Observable
117   *    - 300ms时,index=2 的 Observable 完成并输出 2
118   */
119  it('exhaustMap', async () => {
120    const values: number[] = []
121    const subscription = interval(100).pipe(
122      exhaustMap((value, index) => {
123        if (index === 1) {
124          return of(value)
125        }
126        return from(waitFor(100)).pipe(map(() => value))
127      }),
128    ).subscribe({
129      next: (value) => {
130        values.push(value)
131      },
132    })
133
134    await vi.advanceTimersByTimeAsync(300)
135    subscription.unsubscribe()
136    expect(values).toEqual([0, 2])
137  })
138})
浏览:126点赞:0