• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 14278092116

05 Apr 2025 03:32AM UTC coverage: 88.48% (-6.7%) from 95.167%
14278092116

push

github

bordoley
tests

929 of 1256 branches covered (73.96%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 2 files covered. (100.0%)

229 existing lines in 27 files now uncovered.

6037 of 6617 relevant lines covered (91.23%)

421.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.87
/src/computations/AsyncIterable.ts
1
import {
1✔
2
  Array_length,
3
  Array_map,
4
  Iterator_done,
5
  Iterator_next,
6
  Iterator_value,
7
  MAX_SAFE_INTEGER,
8
} from "../__internal__/constants.js";
9
import {
1✔
10
  AsyncIterableLike,
11
  AsyncIterableWithSideEffectsLike,
12
  ComputationLike_isDeferred,
13
  ComputationLike_isPure,
14
  ComputationLike_isSynchronous,
15
  ComputationModule,
16
  ComputationTypeLike,
17
  ComputationTypeLike_T,
18
  ComputationTypeLike_baseOfT,
19
  ConcurrentDeferredComputationModule,
20
  DeferredComputationModule,
21
  InteractiveComputationModule,
22
  PureAsyncIterableLike,
23
} from "../computations.js";
24
import {
1✔
25
  Equality,
26
  Factory,
27
  Function1,
28
  Optional,
29
  Predicate,
30
  Reducer,
31
  SideEffect1,
32
  Tuple2,
33
  alwaysTrue,
34
  bindMethod,
35
  error,
36
  invoke,
37
  isFunction,
38
  isNone,
39
  isSome,
40
  newInstance,
41
  none,
42
  pick,
43
  pipe,
44
  raiseError,
45
  returns,
46
  strictEquality,
47
  tuple,
48
} from "../functions.js";
49
import { clampPositiveInteger, clampPositiveNonZeroInteger } from "../math.js";
1✔
50
import * as Disposable from "../utils/Disposable.js";
1✔
51
import * as Queue from "../utils/Queue.js";
1✔
52
import * as Iterator from "../utils/__internal__/Iterator.js";
1✔
53
import {
1✔
54
  DisposableLike,
55
  DisposableLike_dispose,
56
  DropOldestBackpressureStrategy,
57
  EnumeratorLike_current,
58
  EnumeratorLike_moveNext,
59
  QueueLike_enqueue,
60
} from "../utils.js";
61
import * as ComputationM from "./Computation.js";
1✔
62
import {
1✔
63
  Observable_genAsync,
64
  Observable_genPureAsync,
65
} from "./Observable/__private__/Observable.genAsync.js";
66
import {
1✔
67
  Producer_genAsync,
68
  Producer_genPureAsync,
69
} from "./Producer/__private__/Producer.genAsync.js";
70

71
export interface AsyncIterableComputation extends ComputationTypeLike {
72
  readonly [ComputationTypeLike_baseOfT]?: AsyncIterableLike<
73
    this[typeof ComputationTypeLike_T]
74
  >;
75
}
76
export type Computation = AsyncIterableComputation;
77

78
export interface AsyncIterableModule
79
  extends ComputationModule<AsyncIterableComputation>,
80
    DeferredComputationModule<AsyncIterableComputation>,
81
    InteractiveComputationModule<
82
      AsyncIterableComputation,
83
      {
84
        toObservable: {
85
          bufferSize?: number;
86
        };
87
      }
88
    >,
89
    ConcurrentDeferredComputationModule<AsyncIterableComputation> {
90
  fromAsyncFactory<T>(): Function1<
91
    (options?: { signal?: AbortSignal }) => Promise<T>,
92
    AsyncIterableWithSideEffectsLike<T>
93
  >;
94

95
  of<T>(): Function1<AsyncIterable<T>, AsyncIterableWithSideEffectsLike<T>>;
96
}
97

98
export type Signature = AsyncIterableModule;
99

100
class BufferAsyncIterable<T> implements AsyncIterableLike<ReadonlyArray<T>> {
101
  public readonly [ComputationLike_isPure]: Optional<boolean>;
102
  public readonly [ComputationLike_isDeferred]: true = true as const;
3✔
103
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
104

105
  constructor(
106
    private readonly s: AsyncIterableLike<T>,
3✔
107
    private readonly c: number,
3✔
108
  ) {
109
    this[ComputationLike_isPure] = ComputationM.isPure(s);
3✔
110
  }
111

112
  async *[Symbol.asyncIterator]() {
113
    const { s: src, c: count } = this;
3✔
114
    let buffer: T[] = [];
3✔
115
    for await (const v of src) {
3✔
116
      buffer.push(v);
25✔
117

118
      if (buffer.length === count) {
25✔
119
        const result = buffer;
5✔
120
        buffer = [];
5✔
121
        yield result;
5✔
122
      }
123
    }
124

125
    if (buffer.length > 0) {
3✔
126
      yield buffer;
2✔
127
    }
128
  }
129
}
130

131
export const buffer: Signature["buffer"] = (<T>(options?: { count?: number }) =>
1✔
132
  (iter: AsyncIterableLike<T>) =>
3✔
133
    newInstance(
3✔
134
      BufferAsyncIterable<T>,
135
      iter,
136
      clampPositiveNonZeroInteger(options?.count ?? MAX_SAFE_INTEGER),
4✔
137
    )) as Signature["buffer"];
138

139
class CatchErrorAsyncIterable<T> implements AsyncIterableLike<T> {
140
  public readonly [ComputationLike_isPure]: Optional<boolean>;
141
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
142
  public readonly [ComputationLike_isDeferred]: true = true as const;
5✔
143

144
  constructor(
145
    private readonly s: AsyncIterableLike<T>,
5✔
146
    private readonly onError:
5✔
147
      | SideEffect1<Error>
148
      | Function1<Error, AsyncIterableLike<T>>,
149
    isPure: boolean,
150
  ) {
151
    this[ComputationLike_isPure] = ComputationM.isPure(s) && isPure;
5✔
152
  }
153

154
  async *[Symbol.asyncIterator]() {
155
    try {
5✔
156
      yield* this.s;
5✔
157
    } catch (e) {
158
      const err = error(e);
4✔
159
      let action: Optional<AsyncIterableLike<T>> = none;
4✔
160
      try {
4✔
161
        action = this.onError(err) as Optional<AsyncIterableLike<T>>;
4✔
162
      } catch (e) {
163
        throw error([error(e), err]);
1✔
164
      }
165

166
      isSome(action) && (yield* action);
3✔
167
    }
168
  }
169
}
170

171
export const catchError: Signature["catchError"] = (<T>(
1✔
172
    onError: SideEffect1<Error> | Function1<Error, AsyncIterableLike<T>>,
173
    options?: {
174
      readonly [ComputationLike_isPure]: Optional<boolean>;
175
    },
176
  ) =>
177
  (iter: AsyncIterableLike<T>) =>
5✔
178
    newInstance(
5✔
179
      CatchErrorAsyncIterable,
180
      iter,
181
      onError,
182
      options?.[ComputationLike_isPure] ?? true,
10✔
183
    )) as Signature["catchError"];
184

185
class ConcatAllAsyncIterable<T> implements AsyncIterableLike<T> {
186
  public readonly [ComputationLike_isSynchronous]: false = false as const;
2✔
187
  public readonly [ComputationLike_isPure]: Optional<boolean>;
188
  public readonly [ComputationLike_isDeferred]: true = true as const;
2✔
189

190
  constructor(
191
    private readonly s: AsyncIterableLike<AsyncIterableLike<T>>,
2✔
192
    isPure: boolean,
193
  ) {
194
    this[ComputationLike_isPure] = ComputationM.isPure(s) && isPure;
2✔
195
  }
196

197
  async *[Symbol.asyncIterator]() {
198
    for await (const iter of this.s) {
2✔
199
      yield* iter;
4✔
200
    }
201
  }
202
}
203
export const concatAll: Signature["concatAll"] = (<T>(options?: {
1✔
204
    readonly [ComputationLike_isPure]: Optional<boolean>;
205
  }) =>
206
  (iterable: AsyncIterableLike<AsyncIterableLike<T>>) =>
2✔
207
    newInstance(
2✔
208
      ConcatAllAsyncIterable,
209
      iterable,
210
      options?.[ComputationLike_isPure] ?? true,
4✔
211
    )) as Signature["concatAll"];
212

213
class ConcatAsyncIterable<T> implements AsyncIterableLike<T> {
214
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
215
  public readonly [ComputationLike_isPure]: Optional<boolean>;
216
  public readonly [ComputationLike_isDeferred]: true = true as const;
3✔
217

218
  constructor(private readonly s: ReadonlyArray<AsyncIterableLike<T>>) {
3✔
219
    this[ComputationLike_isPure] = ComputationM.areAllPure(s);
3✔
220
  }
221

222
  async *[Symbol.asyncIterator]() {
223
    for (const iter of this.s) {
6✔
224
      yield* iter;
11✔
225
    }
226
  }
227
}
228

229
export const concat: Signature["concat"] = (<T>(
1✔
230
  ...iterables: ReadonlyArray<AsyncIterableLike<T>>
231
) =>
232
  newInstance(
3✔
233
    ConcatAsyncIterable,
234
    iterables,
235
  )) as unknown as Signature["concat"];
236

237
class DecodeWithCharsetAsyncIterable implements AsyncIterableLike<string> {
238
  public readonly [ComputationLike_isPure]: Optional<boolean>;
239
  public readonly [ComputationLike_isDeferred]: true = true as const;
5✔
240
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
241

242
  constructor(
243
    private readonly s: AsyncIterableLike<ArrayBuffer>,
5✔
244
    private readonly o: Optional<{
5✔
245
      charset?: string;
246
      fatal?: boolean;
247
      ignoreBOM?: boolean;
248
    }>,
249
  ) {
250
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
5✔
251
  }
252

253
  async *[Symbol.asyncIterator]() {
254
    const { s: src, o: options } = this;
5✔
255
    const textDecoder = newInstance(
5✔
256
      TextDecoder,
257
      options?.charset ?? "utf-8",
10✔
258
      options,
259
    );
260

261
    for await (const next of src) {
5✔
262
      const data = textDecoder.decode(next, {
6✔
263
        stream: true,
264
      });
265

266
      const shouldEmit = data[Array_length] > 0;
6✔
267

268
      if (shouldEmit) {
6✔
269
        yield data;
4✔
270
      }
271
    }
272

273
    const data = textDecoder.decode(newInstance(Uint8Array, []), {
5✔
274
      stream: false,
275
    });
276

277
    const shouldEmit = data[Array_length] > 0;
5✔
278

279
    if (shouldEmit) {
5✔
280
      yield data;
1✔
281
    }
282
  }
283
}
284

285
export const decodeWithCharset: Signature["decodeWithCharset"] = ((options?: {
1✔
286
    readonly charset?: string;
287
    readonly fatal?: boolean;
288
    readonly ignoreBOM?: boolean;
289
  }) =>
290
  (iter: AsyncIterableLike<ArrayBuffer>) =>
5✔
291
    newInstance(
5✔
292
      DecodeWithCharsetAsyncIterable,
293
      iter,
294
      options,
295
    )) as Signature["decodeWithCharset"];
296

297
class DistinctUntilChangedAsyncIterable<T> {
298
  public readonly [ComputationLike_isSynchronous]: false = false as const;
4✔
299
  public readonly [ComputationLike_isPure]: Optional<boolean>;
300
  public readonly [ComputationLike_isDeferred]: true = true as const;
4✔
301

302
  constructor(
303
    private s: AsyncIterableLike<T>,
4✔
304
    private eq: Equality<T>,
4✔
305
  ) {
306
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
4✔
307
  }
308

309
  async *[Symbol.asyncIterator]() {
310
    const equals = this.eq;
4✔
311

312
    let hasPrev = false;
4✔
313
    let prev: T = none as T;
4✔
314

315
    for await (const v of this.s) {
4✔
316
      if (!hasPrev) {
20✔
317
        hasPrev = true;
3✔
318
        prev = v;
3✔
319
        yield v;
3✔
320
      } else if (!equals(v, prev)) {
17✔
321
        prev = v;
3✔
322
        yield v;
3✔
323
      }
324
    }
325
  }
326
}
327

328
export const distinctUntilChanged: Signature["distinctUntilChanged"] = (<
1✔
329
    T,
330
  >(options?: {
331
    readonly equality?: Equality<T>;
332
  }) =>
333
  (iterable: AsyncIterableLike<T>) =>
4✔
334
    newInstance(
4✔
335
      DistinctUntilChangedAsyncIterable,
336
      iterable,
337
      options?.equality ?? strictEquality,
6✔
338
    )) as Signature["distinctUntilChanged"];
339

340
class AsyncFactoryIterator<T> implements AsyncIterator<T> {
341
  private hv = false;
3✔
342

343
  constructor(
344
    private readonly a: AbortController,
3✔
345
    private readonly p: Promise<T>,
3✔
346
  ) {}
347

348
  async next(): Promise<IteratorResult<T, any>> {
349
    const { hv: hasValue, p: promise } = this;
4✔
350

351
    if (hasValue) {
4✔
352
      return { done: true, value: none };
1✔
353
    }
354
    const value = await promise;
3✔
355
    this.hv = true;
3✔
356
    return { value };
3✔
357
  }
358

359
  async return?(): Promise<IteratorResult<T, any>> {
360
    const { a: abortController, hv: hasValue } = this;
3✔
361
    if (!hasValue) {
3✔
362
      abortController.abort();
2✔
363
    }
364
    return { done: true, value: none };
3✔
365
  }
366

367
  async throw?(e?: any): Promise<IteratorResult<T, any>> {
368
    const { a: abortController, hv: hasValue } = this;
1✔
369
    if (!hasValue) {
1✔
370
      abortController.abort(e);
1✔
371
    }
372
    return { done: true, value: none };
1✔
373
  }
374
}
375

376
class FromAsyncFactoryIterable<T>
377
  implements AsyncIterableWithSideEffectsLike<T>
378
{
379
  public [ComputationLike_isPure]: false = false as const;
3✔
380
  public [ComputationLike_isSynchronous]: false = false as const;
3✔
381
  public readonly [ComputationLike_isDeferred]: true = true as const;
3✔
382

383
  constructor(private f: (options?: { signal: AbortSignal }) => Promise<T>) {}
3✔
384

385
  [Symbol.asyncIterator]() {
386
    const abortController: AbortController = newInstance(AbortController);
3✔
387
    const promise = this.f({ signal: abortController.signal });
3✔
388
    return newInstance(AsyncFactoryIterator<T>, abortController, promise);
3✔
389
  }
390
}
391

392
export const fromAsyncFactory: Signature["fromAsyncFactory"] = returns(
1✔
393
  factory => newInstance(FromAsyncFactoryIterable, factory),
3✔
394
);
395

396
class EncodeUtf8AsyncIterable
397
  implements AsyncIterableLike<Uint8Array<ArrayBufferLike>>
398
{
399
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
400
  public readonly [ComputationLike_isPure]: Optional<boolean>;
401
  public readonly [ComputationLike_isDeferred]: true = true as const;
3✔
402

403
  constructor(private readonly s: AsyncIterableLike<string>) {
3✔
404
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
3✔
405
  }
406

407
  async *[Symbol.asyncIterator]() {
408
    const textEncoder = newInstance(TextEncoder);
3✔
409

410
    for await (const chunk of this.s) {
3✔
411
      yield textEncoder.encode(chunk);
3✔
412
    }
413
  }
414
}
415

416
export const encodeUtf8: Signature["encodeUtf8"] = (() =>
1✔
417
  (iterable: AsyncIterableLike<string>) =>
3✔
418
    newInstance(EncodeUtf8AsyncIterable, iterable)) as Signature["encodeUtf8"];
3✔
419

420
class ForEachAsyncIterable<T> implements AsyncIterableWithSideEffectsLike<T> {
421
  public [ComputationLike_isPure]: false = false as const;
2✔
422
  public [ComputationLike_isSynchronous]: false = false as const;
2✔
423
  public readonly [ComputationLike_isDeferred]: true = true as const;
2✔
424

425
  constructor(
426
    private readonly d: AsyncIterableLike<T>,
2✔
427
    private readonly ef: SideEffect1<T>,
2✔
428
  ) {}
429

430
  async *[Symbol.asyncIterator]() {
431
    const delegate = this.d;
2✔
432
    const effect = this.ef;
2✔
433

434
    for await (const v of delegate) {
2✔
435
      effect(v);
4✔
436
      yield v;
3✔
437
    }
438
  }
439
}
440

441
export const forEach: Signature["forEach"] = (<T>(effect: SideEffect1<T>) =>
1✔
442
  (iter: AsyncIterableLike<T>) =>
2✔
443
    newInstance(ForEachAsyncIterable, iter, effect)) as Signature["forEach"];
2✔
444

445
class GenAsyncIterable<T> implements AsyncIterableWithSideEffectsLike<T> {
UNCOV
446
  public readonly [ComputationLike_isSynchronous]: false = false as const;
×
UNCOV
447
  public readonly [ComputationLike_isDeferred]: true = true as const;
×
UNCOV
448
  public readonly [ComputationLike_isPure]: false = false as const;
×
449

UNCOV
450
  constructor(readonly f: Factory<Iterator<T>>) {}
×
451

452
  async *[Symbol.asyncIterator](): AsyncIterator<T, any, any> {
UNCOV
453
    const enumerator = pipe(this.f(), Iterator.toEnumerator<T>());
×
454

UNCOV
455
    while (enumerator[EnumeratorLike_moveNext]()) {
×
UNCOV
456
      yield Promise.resolve(enumerator[EnumeratorLike_current]);
×
457
    }
UNCOV
458
    Disposable.raiseIfDisposedWithError(enumerator);
×
459
  }
460
}
461

462
export const gen: Signature["gen"] = (<T>(factory: Factory<Iterator<T>>) =>
1✔
UNCOV
463
  newInstance(GenAsyncIterable<T>, factory)) as Signature["gen"];
×
464

465
class GenAsyncAsyncIterable<T> implements AsyncIterableWithSideEffectsLike<T> {
UNCOV
466
  public readonly [ComputationLike_isSynchronous]: false = false as const;
×
UNCOV
467
  public readonly [ComputationLike_isDeferred]: true = true as const;
×
UNCOV
468
  public readonly [ComputationLike_isPure]: false = false as const;
×
469

UNCOV
470
  constructor(readonly f: Factory<AsyncIterator<T>>) {}
×
471

472
  [Symbol.asyncIterator]() {
UNCOV
473
    return this.f();
×
474
  }
475
}
476

477
export const genAsync: Signature["genAsync"] = (<T>(
1✔
478
  factory: Factory<AsyncIterator<T>>,
UNCOV
479
) => newInstance(GenAsyncAsyncIterable, factory)) as Signature["genAsync"];
×
480

481
class GenPureAsyncIterable<T> implements PureAsyncIterableLike<T> {
482
  public readonly [ComputationLike_isSynchronous]: false = false as const;
73✔
483
  public readonly [ComputationLike_isDeferred]: true = true as const;
73✔
484
  public readonly [ComputationLike_isPure]: true = true as const;
73✔
485

486
  constructor(readonly f: Factory<Iterator<T>>) {}
73✔
487

488
  async *[Symbol.asyncIterator]() {
489
    const enumerator = pipe(this.f(), Iterator.toEnumerator<T>());
85✔
490

491
    while (enumerator[EnumeratorLike_moveNext]()) {
85✔
492
      yield Promise.resolve(enumerator[EnumeratorLike_current]);
226✔
493
    }
494
    Disposable.raiseIfDisposedWithError(enumerator);
64✔
495
  }
496
}
497

498
export const genPure: Signature["genPure"] = (<T>(
1✔
499
  factory: Factory<Iterator<T>>,
500
) => newInstance(GenPureAsyncIterable<T>, factory)) as Signature["genPure"];
73✔
501

502
class GenPureAsyncAsyncIterable<T> implements PureAsyncIterableLike<T> {
UNCOV
503
  public readonly [ComputationLike_isSynchronous]: false = false as const;
×
UNCOV
504
  public readonly [ComputationLike_isDeferred]: true = true as const;
×
UNCOV
505
  public readonly [ComputationLike_isPure]: true = true as const;
×
506

UNCOV
507
  constructor(readonly f: Factory<AsyncIterator<T>>) {}
×
508

509
  [Symbol.asyncIterator]() {
UNCOV
510
    return this.f();
×
511
  }
512
}
513

514
export const genPureAsync: Signature["genPureAsync"] = (<T>(
1✔
515
  factory: Factory<AsyncIterator<T>>,
516
) =>
UNCOV
517
  newInstance(GenPureAsyncAsyncIterable, factory)) as Signature["genPureAsync"];
×
518

519
class KeepAsyncIterable<T> implements AsyncIterableLike<T> {
520
  public readonly [ComputationLike_isPure]: Optional<boolean>;
521
  public readonly [ComputationLike_isDeferred]: true = true as const;
2✔
522
  public readonly [ComputationLike_isSynchronous]: false = false as const;
2✔
523

524
  constructor(
525
    private readonly d: AsyncIterableLike<T>,
2✔
526
    private readonly p: Predicate<T>,
2✔
527
  ) {
528
    this[ComputationLike_isPure] = d[ComputationLike_isPure];
2✔
529
  }
530

531
  async *[Symbol.asyncIterator]() {
532
    const delegate = this.d;
2✔
533
    const predicate = this.p;
2✔
534

535
    for await (const v of delegate) {
2✔
536
      if (predicate(v)) {
5✔
537
        yield v;
3✔
538
      }
539
    }
540
  }
541
}
542

543
export const keep: Signature["keep"] = (<T>(predicate: Predicate<T>) =>
1✔
544
  (iterable: AsyncIterableLike<T>) =>
2✔
545
    newInstance(KeepAsyncIterable, iterable, predicate)) as Signature["keep"];
2✔
546

547
class MapAsyncIterable<TA, TB> implements AsyncIterableLike<TB> {
548
  public readonly [ComputationLike_isPure]: Optional<boolean>;
549
  public readonly [ComputationLike_isDeferred]: true = true as const;
2✔
550
  public readonly [ComputationLike_isSynchronous]: false = false as const;
2✔
551

552
  constructor(
553
    private readonly d: AsyncIterableLike<TA>,
2✔
554
    private readonly m: Function1<TA, TB>,
2✔
555
  ) {
556
    this[ComputationLike_isPure] = d[ComputationLike_isPure];
2✔
557
  }
558

559
  async *[Symbol.asyncIterator]() {
560
    const delegate = this.d;
2✔
561
    const mapper = this.m;
2✔
562

563
    for await (const v of delegate) {
2✔
564
      yield mapper(v);
4✔
565
    }
566
  }
567
}
568

569
export const map: Signature["map"] = (<TA, TB>(mapper: Function1<TA, TB>) =>
1✔
570
  (iter: AsyncIterableLike<TA>) =>
2✔
571
    newInstance(MapAsyncIterable, iter, mapper)) as Signature["map"];
2✔
572

573
class AsyncIterableOf<T> implements AsyncIterableLike<T> {
UNCOV
574
  public readonly [ComputationLike_isPure]: false = false as const;
×
UNCOV
575
  public readonly [ComputationLike_isSynchronous]: false = false as const;
×
UNCOV
576
  public readonly [ComputationLike_isDeferred]: true = true as const;
×
577

UNCOV
578
  constructor(private d: AsyncIterable<T>) {}
×
579

580
  [Symbol.asyncIterator]() {
UNCOV
581
    return this.d[Symbol.asyncIterator]();
×
582
  }
583
}
584

585
export const of: Signature["of"] = /*@__PURE__*/ returns(iter =>
1✔
UNCOV
586
  newInstance(AsyncIterableOf, iter),
×
587
);
588

589
class PairwiseAsyncIterable<T> implements AsyncIterableLike<Tuple2<T, T>> {
590
  public readonly [ComputationLike_isSynchronous]: false = false as const;
2✔
591
  public readonly [ComputationLike_isPure]: Optional<boolean>;
592
  public readonly [ComputationLike_isDeferred]: true = true as const;
2✔
593

594
  constructor(private s: AsyncIterableLike<T>) {
2✔
595
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
2✔
596
  }
597

598
  async *[Symbol.asyncIterator]() {
599
    let hasPrev = false;
2✔
600
    let prev: T = none as T;
2✔
601

602
    for await (const v of this.s) {
2✔
603
      if (!hasPrev) {
11✔
604
        hasPrev = true;
2✔
605
        prev = v;
2✔
606
      } else {
607
        const result = tuple(prev, v);
9✔
608
        prev = v;
9✔
609

610
        yield result;
9✔
611
      }
612
    }
613
  }
614
}
615

616
export const pairwise: Signature["pairwise"] = (<T>() =>
1✔
617
  (iterable: AsyncIterableLike<T>) =>
2✔
618
    newInstance(PairwiseAsyncIterable<T>, iterable)) as Signature["pairwise"];
2✔
619

620
class ScanAsyncIterable<T, TAcc> implements AsyncIterableLike<TAcc> {
621
  public readonly [ComputationLike_isPure]: Optional<boolean>;
622
  public readonly [ComputationLike_isDeferred]: true = true as const;
3✔
623
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
624

625
  constructor(
626
    private readonly s: AsyncIterableLike<T>,
3✔
627
    private readonly r: Reducer<T, TAcc>,
3✔
628
    private readonly iv: Factory<TAcc>,
3✔
629
  ) {
630
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
3✔
631
  }
632

633
  async *[Symbol.asyncIterator]() {
634
    const reducer = this.r;
3✔
635
    let acc = this.iv();
3✔
636

637
    for await (const v of this.s) {
2✔
638
      acc = reducer(acc, v);
4✔
639
      yield acc;
3✔
640
    }
641
  }
642
}
643

644
class RepeatAsyncIterable<T> implements AsyncIterableLike<T> {
645
  public readonly [ComputationLike_isPure]: Optional<boolean>;
646
  public readonly [ComputationLike_isDeferred]: true = true as const;
4✔
647
  public readonly [ComputationLike_isSynchronous]: false = false as const;
4✔
648

649
  constructor(
650
    private i: AsyncIterableLike<T>,
4✔
651
    private p: Predicate<number>,
4✔
652
  ) {
653
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
4✔
654
  }
655

656
  async *[Symbol.asyncIterator]() {
657
    const iterable = this.i;
4✔
658
    const predicate = this.p;
4✔
659

660
    let cnt = 0;
4✔
661

662
    while (true) {
4✔
663
      yield* iterable;
8✔
664

665
      cnt++;
7✔
666
      if (!predicate(cnt)) {
7✔
667
        break;
2✔
668
      }
669
    }
670
  }
671
}
672

673
export const repeat: Signature["repeat"] = (<T>(
1✔
674
  predicate?: Predicate<number> | number,
675
) => {
676
  const repeatPredicate = isFunction(predicate)
4✔
677
    ? predicate
678
    : isNone(predicate)
2✔
679
      ? alwaysTrue
680
      : (count: number) => count < predicate;
3✔
681

682
  return (src: AsyncIterableLike<T>) =>
4✔
683
    newInstance(RepeatAsyncIterable, src, repeatPredicate);
4✔
684
}) as Signature["repeat"];
685

686
class RetryAsyncIterable<T> implements AsyncIterableLike<T> {
687
  public readonly [ComputationLike_isPure]: Optional<boolean>;
688
  public readonly [ComputationLike_isDeferred]: true = true as const;
4✔
689
  public readonly [ComputationLike_isSynchronous]: false = false as const;
4✔
690

691
  constructor(
692
    private i: AsyncIterableLike<T>,
4✔
693
    private p: (count: number, error: Error) => boolean,
4✔
694
  ) {
695
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
4✔
696
  }
697

698
  async *[Symbol.asyncIterator]() {
699
    const iterable = this.i;
4✔
700
    const predicate = this.p;
4✔
701

702
    let cnt = 0;
4✔
703

704
    while (true) {
4✔
705
      try {
7✔
706
        yield* iterable;
7✔
707
        return;
1✔
708
      } catch (e) {
709
        cnt++;
5✔
710
        if (!predicate(cnt, error(e))) {
5✔
711
          throw e;
1✔
712
        }
713
      }
714
    }
715
  }
716
}
717

718
export const retry: Signature["retry"] = (<T>(
1✔
719
    shouldRetry?: (count: number, error: Error) => boolean,
720
  ) =>
721
  (deferable: AsyncIterableLike<T>) =>
4✔
722
    newInstance(
4✔
723
      RetryAsyncIterable,
724
      deferable,
725
      shouldRetry ?? alwaysTrue,
6✔
726
    )) as Signature["retry"];
727

728
export const scan: Signature["scan"] = (<T, TAcc>(
1✔
729
    scanner: Reducer<T, TAcc>,
730
    initialValue: Factory<TAcc>,
731
  ) =>
732
  (iter: AsyncIterableLike<T>) =>
3✔
733
    newInstance(
3✔
734
      ScanAsyncIterable,
735
      iter,
736
      scanner,
737
      initialValue,
738
    )) as Signature["scan"];
739

740
class SkipFirstAsyncIterable<T> {
741
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
742
  public readonly [ComputationLike_isPure]: Optional<boolean>;
743
  public readonly [ComputationLike_isDeferred]: true = true as const;
3✔
744

745
  constructor(
746
    private s: AsyncIterableLike<T>,
3✔
747
    private c: number,
3✔
748
  ) {
749
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
3✔
750
  }
751

752
  async *[Symbol.asyncIterator]() {
753
    const skipCount = this.c;
3✔
754
    let count = 0;
3✔
755

756
    for await (const v of this.s) {
3✔
757
      if (count >= skipCount) {
9✔
758
        yield v;
3✔
759
      }
760
      count++;
9✔
761
    }
762
  }
763
}
764

765
export const skipFirst: Signature["skipFirst"] = (<T>(options?: {
1✔
766
    readonly count?: number;
767
  }) =>
768
  (iterable: AsyncIterableLike<T>) =>
3✔
769
    newInstance(
3✔
770
      SkipFirstAsyncIterable,
771
      iterable,
772
      clampPositiveInteger(options?.count ?? 1),
4✔
773
    )) as Signature["skipFirst"];
774

775
class TakeFirstAsyncIterable<T> implements AsyncIterableLike<T> {
776
  public readonly [ComputationLike_isPure]: Optional<boolean>;
777
  public readonly [ComputationLike_isDeferred]: true = true as const;
11✔
778
  public readonly [ComputationLike_isSynchronous]: false = false as const;
11✔
779

780
  constructor(
781
    private s: AsyncIterableLike<T>,
11✔
782
    private c: number,
11✔
783
  ) {
784
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
11✔
785
  }
786

787
  async *[Symbol.asyncIterator]() {
788
    const takeCount = this.c;
11✔
789
    let count = 0;
11✔
790

791
    for await (const v of this.s) {
11✔
792
      if (count < takeCount) {
42✔
793
        yield v;
34✔
794
      } else {
795
        break;
8✔
796
      }
797
      count++;
34✔
798
    }
799
  }
800
}
801

802
export const takeFirst: Signature["takeFirst"] = (<T>(options?: {
1✔
803
    readonly count?: number;
804
  }) =>
805
  (iterable: AsyncIterableLike<T>) =>
11✔
806
    newInstance(
11✔
807
      TakeFirstAsyncIterable,
808
      iterable,
809
      clampPositiveInteger(options?.count ?? 1),
13✔
810
    )) as Signature["takeFirst"];
811

812
class TakeLasAsyncIterable<T> implements AsyncIterableLike<T> {
813
  public readonly [ComputationLike_isPure]: Optional<boolean>;
814
  public readonly [ComputationLike_isDeferred]: true = true as const;
5✔
815
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
816

817
  constructor(
818
    private s: AsyncIterableLike<T>,
5✔
819
    private c: number,
5✔
820
  ) {
821
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
5✔
822
  }
823

824
  async *[Symbol.asyncIterator]() {
825
    const { s: src, c: capacity } = this;
5✔
826

827
    const queue = Queue.create<T>({
5✔
828
      backpressureStrategy: DropOldestBackpressureStrategy,
829
      capacity,
830
    });
831

832
    for await (const v of src) {
5✔
833
      queue[QueueLike_enqueue](v);
25✔
834
    }
835

836
    while (queue[EnumeratorLike_moveNext]()) {
5✔
837
      yield queue[EnumeratorLike_current];
10✔
838
    }
839
  }
840
}
841

842
export const takeLast: Signature["takeLast"] = (<T>(options?: {
1✔
843
    readonly count?: number;
844
  }) =>
845
  (iterable: AsyncIterableLike<T>) =>
5✔
846
    newInstance(
5✔
847
      TakeLasAsyncIterable,
848
      iterable,
849
      clampPositiveInteger(options?.count ?? 1),
7✔
850
    )) as Signature["takeLast"];
851

852
class TakeWhileAsyncIterable<T> implements AsyncIterableLike<T> {
853
  public readonly [ComputationLike_isPure]: Optional<boolean>;
854
  public readonly [ComputationLike_isDeferred]: true = true as const;
5✔
855
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
856

857
  constructor(
858
    private s: AsyncIterableLike<T>,
5✔
859
    private p: Predicate<T>,
5✔
860
    private i: boolean,
5✔
861
  ) {
862
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
5✔
863
  }
864

865
  async *[Symbol.asyncIterator]() {
866
    const predicate = this.p;
5✔
867
    const inclusive = this.i;
5✔
868

869
    for await (const next of this.s) {
5✔
870
      const satisfiesPredicate = predicate(next);
12✔
871

872
      if (satisfiesPredicate || inclusive) {
11✔
873
        yield next;
10✔
874
      }
875

876
      if (!satisfiesPredicate) {
11✔
877
        break;
2✔
878
      }
879
    }
880
  }
881
}
882

883
export const takeWhile: Signature["takeWhile"] = (<T>(
1✔
884
    predicate: Predicate<T>,
885
    options?: {
886
      readonly inclusive?: boolean;
887
    },
888
  ) =>
889
  (iterable: AsyncIterableLike<T>) =>
5✔
890
    newInstance(
5✔
891
      TakeWhileAsyncIterable,
892
      iterable,
893
      predicate,
894
      options?.inclusive ?? false,
9✔
895
    )) as Signature["takeWhile"];
896

897
class ThrowIfEmptyAsyncIterable<T> implements AsyncIterableLike<T> {
898
  public readonly [ComputationLike_isPure]: Optional<boolean>;
899
  public readonly [ComputationLike_isDeferred]: true = true as const;
3✔
900
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
901

902
  constructor(
903
    private readonly i: AsyncIterableLike<T>,
3✔
904
    private readonly f: Factory<unknown>,
3✔
905
  ) {
906
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
3✔
907
  }
908

909
  async *[Symbol.asyncIterator]() {
910
    let isEmpty = true;
3✔
911
    for await (const v of this.i) {
3✔
912
      isEmpty = false;
1✔
913
      yield v;
1✔
914
    }
915

916
    if (isEmpty) {
3✔
917
      raiseError(error(this.f()));
2✔
918
    }
919
  }
920
}
921

922
export const throwIfEmpty: Signature["throwIfEmpty"] = (<T>(
1✔
923
    factory: Factory<unknown>,
924
  ) =>
925
  (iter: AsyncIterableLike<T>) =>
3✔
926
    newInstance(
3✔
927
      ThrowIfEmptyAsyncIterable,
928
      iter,
929
      factory,
930
    )) as Signature["throwIfEmpty"];
931

932
export const toObservable: Signature["toObservable"] = (options =>
1✔
933
  (iter: AsyncIterableLike) =>
3✔
934
    ComputationM.isPure(iter)
3!
935
      ? Observable_genPureAsync(bindMethod(iter, Symbol.asyncIterator), options)
936
      : Observable_genAsync(
937
          bindMethod(iter, Symbol.asyncIterator),
938
          options,
939
        )) as Signature["toObservable"];
940

941
export const toProducer: Signature["toProducer"] =
1✔
942
  //   @__PURE__
943
  returns((iter: AsyncIterableLike) =>
944
    ComputationM.isPure(iter)
65✔
945
      ? Producer_genPureAsync(bindMethod(iter, Symbol.asyncIterator))
946
      : Producer_genAsync(bindMethod(iter, Symbol.asyncIterator)),
947
  ) as Signature["toProducer"];
948

949
class WithEffectAsyncIterable<T>
950
  implements AsyncIterableWithSideEffectsLike<T>
951
{
UNCOV
952
  public [ComputationLike_isSynchronous]: false = false as const;
×
UNCOV
953
  public [ComputationLike_isPure]: false = false as const;
×
UNCOV
954
  public readonly [ComputationLike_isDeferred]: true = true as const;
×
955

956
  constructor(
UNCOV
957
    private readonly d: AsyncIterableLike<T>,
×
UNCOV
958
    private readonly e: () =>
×
959
      | void
960
      | DisposableLike
961
      | SideEffect1<Optional<Error>>,
962
  ) {}
963

964
  async *[Symbol.asyncIterator]() {
UNCOV
965
    const delegate = this.d;
×
UNCOV
966
    const effect = this.e;
×
967

UNCOV
968
    const cleanup = effect();
×
UNCOV
969
    if (isSome(cleanup) && !isFunction(cleanup)) {
×
UNCOV
970
      Disposable.raiseIfDisposedWithError(cleanup as DisposableLike);
×
971
    }
972

UNCOV
973
    let didThrow = false;
×
974

UNCOV
975
    try {
×
UNCOV
976
      for await (const v of delegate) {
×
UNCOV
977
        if (isSome(cleanup) && !isFunction(cleanup)) {
×
UNCOV
978
          Disposable.raiseIfDisposedWithError(cleanup as DisposableLike);
×
979
        }
UNCOV
980
        yield v;
×
981
      }
982
    } catch (e) {
UNCOV
983
      didThrow = true;
×
UNCOV
984
      if (isFunction(cleanup)) {
×
UNCOV
985
        cleanup(error(e));
×
UNCOV
986
      } else if (isSome(cleanup)) {
×
UNCOV
987
        (cleanup as DisposableLike)[DisposableLike_dispose](error(e));
×
988
      }
989
    } finally {
UNCOV
990
      if (!didThrow && isFunction(cleanup)) {
×
UNCOV
991
        cleanup(none);
×
UNCOV
992
      } else if (!didThrow && isSome(cleanup)) {
×
UNCOV
993
        (cleanup as DisposableLike)[DisposableLike_dispose]();
×
994
      }
995
    }
996
  }
997
}
998

999
export const withEffect: Signature["withEffect"] = (<T>(
1✔
1000
    effect: () => void | DisposableLike | SideEffect1<Optional<Error>>,
1001
  ) =>
UNCOV
1002
  (iterable: AsyncIterableLike<T>) =>
×
UNCOV
1003
    newInstance(
×
1004
      WithEffectAsyncIterable,
1005
      iterable,
1006
      effect,
1007
    )) as Signature["withEffect"];
1008

1009
class ZipAsyncIterable implements AsyncIterableLike {
1010
  public readonly [ComputationLike_isPure]: Optional<boolean>;
1011
  public readonly [ComputationLike_isDeferred]: true = true as const;
2✔
1012
  public readonly [ComputationLike_isSynchronous]: false = false as const;
2✔
1013

1014
  constructor(private readonly iters: readonly AsyncIterableLike<any>[]) {
2✔
1015
    this[ComputationLike_isPure] = ComputationM.areAllPure(iters);
2✔
1016
  }
1017

1018
  async *[Symbol.asyncIterator]() {
1019
    const iterators = this.iters[Array_map](invoke(Symbol.asyncIterator));
2✔
1020

1021
    while (true) {
2✔
1022
      const next = await Promise.all(
5✔
1023
        iterators[Array_map](invoke(Iterator_next)),
1024
      );
1025

1026
      if (next.some(x => x[Iterator_done] ?? false)) {
13!
1027
        break;
2✔
1028
      }
1029
      yield next[Array_map](pick(Iterator_value));
3✔
1030
    }
1031
  }
1032
}
1033

1034
export const zip: Signature["zip"] = ((
1✔
1035
  ...iters: readonly AsyncIterableLike<any>[]
1036
) => newInstance(ZipAsyncIterable, iters)) as unknown as Signature["zip"];
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc