• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 13870860679

15 Mar 2025 07:43AM UTC coverage: 96.713% (-0.1%) from 96.812%
13870860679

push

github

bordoley
remove unnecessay subscribe config

1112 of 1232 branches covered (90.26%)

Branch coverage included in aggregate %.

40 of 44 new or added lines in 8 files covered. (90.91%)

6 existing lines in 2 files now uncovered.

5978 of 6099 relevant lines covered (98.02%)

4099.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.19
/src/computations/AsyncIterable.ts
1
import {
3✔
2
  Array_map,
3
  Array_push,
4
  Iterator_done,
5
  Iterator_next,
6
  Iterator_value,
7
} from "../__internal__/constants.js";
8
import parseArrayBounds from "../__internal__/parseArrayBounds.js";
3✔
9
import {
3✔
10
  AsyncIterableLike,
11
  AsyncIterableWithSideEffectsLike,
12
  ComputationLike_isPure,
13
  ComputationLike_isSynchronous,
14
  ComputationModule,
15
  ComputationType,
16
  Computation_T,
17
  Computation_baseOfT,
18
  Computation_deferredWithSideEffectsOfT,
19
  Computation_pureDeferredOfT,
20
  ConcurrentDeferredComputationModule,
21
  DeferredComputationModule,
22
  EventSourceLike,
23
  HigherOrderInnerComputationLike,
24
  InteractiveComputationModule,
25
  IterableLike,
26
  PauseableEventSourceLike,
27
  PauseableObservableLike,
28
  PureAsyncIterableLike,
29
} from "../computations.js";
30
import {
3✔
31
  Factory,
32
  Function1,
33
  Optional,
34
  Predicate,
35
  Reducer,
36
  SideEffect,
37
  SideEffect1,
38
  Updater,
39
  alwaysTrue,
40
  bindMethod,
41
  error,
42
  invoke,
43
  isFunction,
44
  isNone,
45
  isSome,
46
  newInstance,
47
  none,
48
  pick,
49
  pipe,
50
  raiseError,
51
  returns,
52
} from "../functions.js";
53
import { clampPositiveInteger } from "../math.js";
3✔
54
import * as Disposable from "../utils/Disposable.js";
3✔
55
import * as DisposableContainer from "../utils/DisposableContainer.js";
3✔
56
import {
3✔
57
  DisposableLike,
58
  DisposableLike_dispose,
59
  DisposableLike_isDisposed,
60
  EventListenerLike,
61
  EventListenerLike_notify,
62
  ObserverLike,
63
  QueueableLike_isReady,
64
  SchedulerLike,
65
  SchedulerLike_maxYieldInterval,
66
  SchedulerLike_now,
67
  SchedulerLike_schedule,
68
  SinkLike_complete,
69
} from "../utils.js";
70
import * as ComputationM from "./Computation.js";
3✔
71
import EventSource_addEventHandler from "./EventSource/__private__/EventSource.addEventHandler.js";
3✔
72
import EventSource_create from "./EventSource/__private__/EventSource.create.js";
3✔
73
import EventSource_fromAsyncIterable from "./EventSource/__private__/EventSource.fromAsyncIterable.js";
3✔
74
import Observable_create from "./Observable/__private__/Observable.create.js";
3✔
75
import Observable_fromAsyncIterable from "./Observable/__private__/Observable.fromAsyncIterable.js";
3✔
76
import Observable_multicast from "./Observable/__private__/Observable.multicast.js";
3✔
77
import * as PauseableEventSource from "./PauseableEventSource.js";
3✔
78
import * as PauseableObservable from "./PauseableObservable.js";
3✔
79

80
/**
81
 * @noInheritDoc
82
 */
83
export interface AsyncIterableComputation extends ComputationType {
84
  readonly [Computation_baseOfT]?: AsyncIterableLike<
85
    this[typeof Computation_T]
86
  >;
87

88
  readonly [Computation_deferredWithSideEffectsOfT]?: AsyncIterableWithSideEffectsLike<
89
    this[typeof Computation_T]
90
  >;
91

92
  readonly [Computation_pureDeferredOfT]?: PureAsyncIterableLike<
93
    this[typeof Computation_T]
94
  >;
95
}
96
export type Computation = AsyncIterableComputation;
97

98
export interface AsyncIterableModule
99
  extends ComputationModule<AsyncIterableComputation>,
100
    DeferredComputationModule<AsyncIterableComputation>,
101
    InteractiveComputationModule<AsyncIterableComputation>,
102
    ConcurrentDeferredComputationModule<AsyncIterableComputation> {
103
  of<T>(): Function1<AsyncIterable<T>, AsyncIterableWithSideEffectsLike<T>>;
104

105
  toEventSource<T>(): Function1<
106
    AsyncIterableLike<T>,
107
    EventSourceLike<T> & DisposableLike
108
  >;
109

110
  toPauseableEventSource<T>(): Function1<
111
    AsyncIterableLike<T>,
112
    PauseableEventSourceLike<T> & DisposableLike
113
  >;
114

115
  toPauseableObservable<T>(
116
    scheduler: SchedulerLike,
117
    options?: {
118
      readonly replay?: number;
119
    },
120
  ): Function1<
121
    AsyncIterableLike<T>,
122
    PauseableObservableLike<T> & DisposableLike
123
  >;
124
}
125

126
export type Signature = AsyncIterableModule;
127

128
class CatchErrorAsyncIterable<T> implements AsyncIterableLike<T> {
129
  public readonly [ComputationLike_isPure]?: boolean;
130
  public readonly [ComputationLike_isSynchronous]: false = false as const;
10✔
131

132
  constructor(
133
    private readonly s: AsyncIterableLike<T>,
10✔
134
    private readonly onError:
10✔
135
      | SideEffect1<Error>
136
      | Function1<Error, AsyncIterableLike<T>>,
137
    isPure: boolean,
138
  ) {
139
    this[ComputationLike_isPure] = ComputationM.isPure(s) && isPure;
10✔
140
  }
141

142
  async *[Symbol.asyncIterator]() {
143
    try {
4✔
144
      yield* this.s;
4✔
145
    } catch (e) {
146
      const err = error(e);
4✔
147
      let action: Optional<AsyncIterableLike<T>> = none;
4✔
148
      try {
4✔
149
        action = this.onError(err) as Optional<AsyncIterableLike<T>>;
4✔
150
      } catch (e) {
151
        throw error([error(e), err]);
1✔
152
      }
153

154
      isSome(action) && (yield* action);
3✔
155
    }
156
  }
157
}
158

159
export const catchError: Signature["catchError"] = (<
3✔
160
    T,
161
    TInnerLike extends HigherOrderInnerComputationLike,
162
  >(
163
    onError: SideEffect1<Error> | Function1<Error, AsyncIterableLike<T>>,
164
    options?: {
165
      readonly innerType: TInnerLike;
166
    },
167
  ) =>
168
  (iter: AsyncIterableLike<T>) =>
7✔
169
    newInstance(
10✔
170
      CatchErrorAsyncIterable,
171
      iter,
172
      onError,
173
      options?.innerType?.[ComputationLike_isPure] ?? true,
16✔
174
    )) as Signature["catchError"];
175

176
class ConcatAllAsyncIterable<T> implements AsyncIterableLike<T> {
177
  public readonly [ComputationLike_isSynchronous]: false = false as const;
9✔
178
  public readonly [ComputationLike_isPure]?: boolean;
179

180
  constructor(
181
    private readonly s: AsyncIterableLike<AsyncIterableLike<T>>,
9✔
182
    isPure: boolean,
183
  ) {
184
    this[ComputationLike_isPure] = ComputationM.isPure(s) && isPure;
9✔
185
  }
186

187
  async *[Symbol.asyncIterator]() {
188
    for await (const iter of this.s) {
5✔
189
      yield* iter;
8✔
190
    }
191
  }
192
}
193
export const concatAll: Signature["concatAll"] = (<
3✔
194
    T,
195
    TInnerLike extends HigherOrderInnerComputationLike,
196
  >(options?: {
197
    readonly innerType: TInnerLike;
198
  }) =>
199
  (iterable: AsyncIterableLike<AsyncIterableLike<T>>) =>
9✔
200
    newInstance(
9✔
201
      ConcatAllAsyncIterable,
202
      iterable,
203
      options?.innerType?.[ComputationLike_isPure] ?? true,
13✔
204
    )) as Signature["concatAll"];
205

206
class ConcatAsyncIterable<T> implements AsyncIterableLike<T> {
207
  public readonly [ComputationLike_isSynchronous]: false = false as const;
6✔
208
  public readonly [ComputationLike_isPure]?: boolean;
209

210
  constructor(private readonly s: ReadonlyArray<AsyncIterableLike<T>>) {
6✔
211
    this[ComputationLike_isPure] = ComputationM.areAllPure(s);
6✔
212
  }
213

214
  async *[Symbol.asyncIterator]() {
215
    for (const iter of this.s) {
11✔
216
      yield* iter;
20✔
217
    }
218
  }
219
}
220

221
export const concat: Signature["concat"] = (<T>(
3✔
222
  ...iterables: ReadonlyArray<AsyncIterableLike<T>>
223
) =>
224
  newInstance(
6✔
225
    ConcatAsyncIterable,
226
    iterables,
227
  )) as unknown as Signature["concat"];
228

229
class FromAsyncFactoryIterable<T>
230
  implements AsyncIterableWithSideEffectsLike<T>
231
{
232
  public [ComputationLike_isPure]: false = false as const;
5✔
233
  public [ComputationLike_isSynchronous]: false = false as const;
5✔
234

235
  constructor(private f: (options?: { signal: AbortSignal }) => Promise<T>) {}
5✔
236

237
  async *[Symbol.asyncIterator]() {
238
    const result = await this.f();
4✔
239
    yield result;
2✔
240
  }
241
}
242

243
export const fromAsyncFactory: Signature["fromAsyncFactory"] = returns(
3✔
244
  factory => newInstance(FromAsyncFactoryIterable, factory),
5✔
245
);
246

247
class FromIterableAsyncIterable<T> implements PureAsyncIterableLike<T> {
248
  public readonly [ComputationLike_isSynchronous]: false = false as const;
14✔
249

250
  constructor(private s: IterableLike<T>) {}
14✔
251

252
  async *[Symbol.asyncIterator]() {
253
    yield* this.s;
13✔
254
  }
255
}
256

257
export const fromIterable: Signature["fromIterable"] =
3✔
258
  /*@__PURE__*/
259
  returns(arr =>
260
    newInstance(FromIterableAsyncIterable, arr),
14✔
261
  ) as Signature["fromIterable"];
262

263
class FromReadonlyArrayAsyncIterable<T> implements PureAsyncIterableLike<T> {
264
  public readonly [ComputationLike_isSynchronous]: false = false as const;
62✔
265

266
  constructor(
267
    private readonly arr: readonly T[],
62✔
268
    private readonly count: number,
62✔
269
    private readonly start: number,
62✔
270
  ) {}
271

272
  async *[Symbol.asyncIterator]() {
273
    let { arr, start, count } = this;
60✔
274
    while (count !== 0) {
60✔
275
      const next = arr[start];
135✔
276
      yield next;
135✔
277

278
      count > 0 ? (start++, count--) : (start--, count++);
116✔
279
    }
280
  }
281
}
282

283
export const fromReadonlyArray: Signature["fromReadonlyArray"] = (<
3✔
284
    T,
285
  >(options?: {
286
    readonly count?: number;
287
    readonly start?: number;
288
  }) =>
289
  (arr: readonly T[]) => {
61✔
290
    let [start, count] = parseArrayBounds(arr, options);
62✔
291
    return newInstance(FromReadonlyArrayAsyncIterable, arr, count, start);
62✔
292
  }) as Signature["fromReadonlyArray"];
293

294
export const empty: Signature["empty"] = (<T>() =>
3✔
295
  pipe([], fromReadonlyArray<T>(), returns))() as Signature["empty"];
3✔
296

297
class EncodeUtf8AsyncIterable
298
  implements AsyncIterableLike<Uint8Array<ArrayBufferLike>>
299
{
300
  public readonly [ComputationLike_isPure]?: boolean;
301
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
302

303
  constructor(private readonly s: AsyncIterableLike<string>) {
3✔
304
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
3✔
305
  }
306

307
  async *[Symbol.asyncIterator]() {
308
    const textEncoder = newInstance(TextEncoder);
1✔
309

310
    for await (const chunk of this.s) {
1✔
311
      yield textEncoder.encode(chunk);
1✔
312
    }
313
  }
314
}
315

316
export const encodeUtf8: Signature["encodeUtf8"] = (() =>
3✔
317
  (iterable: AsyncIterableLike<string>) =>
2✔
318
    newInstance(EncodeUtf8AsyncIterable, iterable)) as Signature["encodeUtf8"];
3✔
319

320
export const firstAsync: Signature["firstAsync"] = /*@__PURE__*/ returns(
3✔
321
  async (iter: AsyncIterableLike) => {
322
    for await (const v of iter) {
3✔
323
      return v;
1✔
324
    }
325
    return none;
1✔
326
  },
327
) as Signature["firstAsync"];
328

329
class ForEachAsyncIterable<T> implements AsyncIterableWithSideEffectsLike<T> {
330
  public [ComputationLike_isPure]: false = false as const;
4✔
331
  public [ComputationLike_isSynchronous]: false = false as const;
4✔
332

333
  constructor(
334
    private readonly d: AsyncIterableLike<T>,
4✔
335
    private readonly ef: SideEffect1<T>,
4✔
336
  ) {}
337

338
  async *[Symbol.asyncIterator]() {
339
    const delegate = this.d;
2✔
340
    const effect = this.ef;
2✔
341

342
    for await (const v of delegate) {
2✔
343
      effect(v);
4✔
344
      yield v;
3✔
345
    }
346
  }
347
}
348

349
export const forEach: Signature["forEach"] = (<T>(effect: SideEffect1<T>) =>
3✔
350
  (iter: AsyncIterableLike<T>) =>
3✔
351
    newInstance(ForEachAsyncIterable, iter, effect)) as Signature["forEach"];
4✔
352

353
export const fromValue: Signature["fromValue"] =
3✔
354
  /*@__PURE__*/
355
  returns(v => fromReadonlyArray()([v])) as Signature["fromValue"];
2✔
356

357
class GeneratorAsyncIterable<T> implements PureAsyncIterableLike<T> {
358
  public readonly [ComputationLike_isSynchronous]: false = false as const;
4✔
359

360
  constructor(
361
    readonly generator: Updater<T>,
4✔
362
    readonly initialValue: Factory<T>,
4✔
363
    readonly count?: number,
4✔
364
  ) {}
365

366
  async *[Symbol.asyncIterator]() {
367
    const { count, generator } = this;
9✔
368
    let acc = this.initialValue();
9✔
369

370
    for (let cnt = 0; count === none || cnt < count; cnt++) {
9✔
371
      acc = generator(acc);
30✔
372
      yield acc;
30✔
373
    }
374
  }
375
}
376

377
export const generate: Signature["generate"] = (<T>(
3✔
378
  generator: Updater<T>,
379
  initialValue: Factory<T>,
380
  options?: {
381
    readonly count?: number;
382
  },
383
) =>
384
  newInstance(
4✔
385
    GeneratorAsyncIterable,
386
    generator,
387
    initialValue,
388
    options?.count,
389
  )) as Signature["generate"];
390

391
class KeepAsyncIterable<T> implements AsyncIterableLike<T> {
392
  public readonly [ComputationLike_isPure]?: boolean;
393
  public readonly [ComputationLike_isSynchronous]: false = false as const;
4✔
394

395
  constructor(
396
    private readonly d: AsyncIterableLike<T>,
4✔
397
    private readonly p: Predicate<T>,
4✔
398
  ) {
399
    this[ComputationLike_isPure] = d[ComputationLike_isPure];
4✔
400
  }
401

402
  async *[Symbol.asyncIterator]() {
403
    const delegate = this.d;
2✔
404
    const predicate = this.p;
2✔
405

406
    for await (const v of delegate) {
2✔
407
      if (predicate(v)) {
5✔
408
        yield v;
3✔
409
      }
410
    }
411
  }
412
}
413

414
export const keep: Signature["keep"] = (<T>(predicate: Predicate<T>) =>
3✔
415
  (iterable: AsyncIterableLike<T>) =>
3✔
416
    newInstance(KeepAsyncIterable, iterable, predicate)) as Signature["keep"];
4✔
417

418
export const lastAsync: Signature["lastAsync"] = /*@__PURE__*/ returns(
3✔
419
  async (iter: AsyncIterableLike) => {
420
    let result: Optional<unknown> = none;
6✔
421
    for await (const v of iter) {
6✔
422
      result = v;
6✔
423
    }
424
    return result;
3✔
425
  },
426
) as Signature["lastAsync"];
427

428
class MapAsyncIterable<TA, TB> implements AsyncIterableLike<TB> {
429
  public readonly [ComputationLike_isPure]?: boolean;
430
  public readonly [ComputationLike_isSynchronous]: false = false as const;
8✔
431

432
  constructor(
433
    private readonly d: AsyncIterableLike<TA>,
8✔
434
    private readonly m: Function1<TA, TB>,
8✔
435
  ) {
436
    this[ComputationLike_isPure] = d[ComputationLike_isPure];
8✔
437
  }
438

439
  async *[Symbol.asyncIterator]() {
440
    const delegate = this.d;
6✔
441
    const mapper = this.m;
6✔
442

443
    for await (const v of delegate) {
6✔
444
      yield mapper(v);
9✔
445
    }
446
  }
447
}
448

449
export const map: Signature["map"] = (<TA, TB>(mapper: Function1<TA, TB>) =>
3✔
450
  (iter: AsyncIterableLike<TA>) =>
7✔
451
    newInstance(MapAsyncIterable, iter, mapper)) as Signature["map"];
8✔
452

453
class AsyncIterableOf<T> implements AsyncIterableLike<T> {
454
  public readonly [ComputationLike_isPure]: false = false as const;
10✔
455
  public readonly [ComputationLike_isSynchronous]: false = false as const;
10✔
456

457
  constructor(private d: AsyncIterable<T>) {}
10✔
458

459
  [Symbol.asyncIterator]() {
460
    return this.d[Symbol.asyncIterator]();
8✔
461
  }
462
}
463

464
export const of: Signature["of"] = /*@__PURE__*/ returns(iter =>
3✔
465
  newInstance(AsyncIterableOf, iter),
10✔
466
);
467

468
class ScanAsyncIterable<T, TAcc> implements AsyncIterableLike<TAcc> {
469
  public readonly [ComputationLike_isPure]?: boolean;
470
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
471

472
  constructor(
473
    private readonly s: AsyncIterableLike<T>,
5✔
474
    private readonly r: Reducer<T, TAcc>,
5✔
475
    private readonly iv: Factory<TAcc>,
5✔
476
  ) {
477
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
5✔
478
  }
479

480
  async *[Symbol.asyncIterator]() {
481
    const reducer = this.r;
3✔
482
    let acc = this.iv();
3✔
483

484
    for await (const v of this.s) {
2✔
485
      acc = reducer(acc, v);
4✔
486
      yield acc;
3✔
487
    }
488
  }
489
}
490

491
class RaiseAsyncIterable<T> implements AsyncIterableLike<T> {
492
  public readonly [ComputationLike_isPure]?: true;
493
  public readonly [ComputationLike_isSynchronous]: false = false as const;
18✔
494

495
  constructor(private r: SideEffect) {}
18✔
496

497
  async *[Symbol.asyncIterator]() {
498
    raiseError(error(this.r()));
13✔
499
  }
500
}
501

502
export const raise: Signature["raise"] = (<T>(options?: {
3✔
503
  readonly raise?: SideEffect;
504
}) => {
505
  const { raise: factory = raise } = options ?? {};
18✔
506
  return newInstance(RaiseAsyncIterable<T>, factory);
18✔
507
}) as Signature["raise"];
508

509
export const reduceAsync: Signature["reduceAsync"] =
3✔
510
  <T, TAcc>(reducer: Reducer<T, TAcc>, initialValue: Factory<TAcc>) =>
3✔
511
  async (iterable: AsyncIterableLike<T>) => {
3✔
512
    let acc = initialValue();
3✔
513
    for await (let v of iterable) {
3✔
514
      acc = reducer(acc, v);
5✔
515
    }
516

517
    return acc;
2✔
518
  };
519

520
class RepeatAsyncIterable<T> implements AsyncIterableLike<T> {
521
  public readonly [ComputationLike_isPure]?: boolean;
522
  public readonly [ComputationLike_isSynchronous]: false = false as const;
6✔
523

524
  constructor(
525
    private i: AsyncIterableLike<T>,
6✔
526
    private p: Predicate<number>,
6✔
527
  ) {
528
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
6✔
529
  }
530

531
  async *[Symbol.asyncIterator]() {
532
    const iterable = this.i;
4✔
533
    const predicate = this.p;
4✔
534

535
    let cnt = 0;
4✔
536

537
    while (true) {
4✔
538
      yield* iterable;
8✔
539

540
      cnt++;
7✔
541
      if (!predicate(cnt)) {
7✔
542
        break;
2✔
543
      }
544
    }
545
  }
546
}
547

548
export const repeat: Signature["repeat"] = (<T>(
3✔
549
  predicate?: Predicate<number> | number,
550
) => {
551
  const repeatPredicate = isFunction(predicate)
5✔
552
    ? predicate
553
    : isNone(predicate)
3✔
554
      ? alwaysTrue
555
      : (count: number) => count < predicate;
3✔
556

557
  return (src: AsyncIterableLike<T>) =>
5✔
558
    newInstance(RepeatAsyncIterable, src, repeatPredicate);
6✔
559
}) as Signature["repeat"];
560

561
class RetryAsyncIterable<T> implements AsyncIterableLike<T> {
562
  public readonly [ComputationLike_isPure]?: boolean;
563
  public readonly [ComputationLike_isSynchronous]: false = false as const;
6✔
564

565
  constructor(
566
    private i: AsyncIterableLike<T>,
6✔
567
    private p: (count: number, error: Error) => boolean,
6✔
568
  ) {
569
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
6✔
570
  }
571

572
  async *[Symbol.asyncIterator]() {
573
    const iterable = this.i;
4✔
574
    const predicate = this.p;
4✔
575

576
    let cnt = 0;
4✔
577

578
    while (true) {
4✔
579
      try {
9✔
580
        yield* iterable;
9✔
581
      } catch (e) {
582
        cnt++;
7✔
583
        if (!predicate(cnt, error(e))) {
7✔
584
          break;
1✔
585
        }
586
      }
587
    }
588
  }
589
}
590

591
export const retry: Signature["retry"] = (<T>(
3✔
592
    shouldRetry?: (count: number, error: Error) => boolean,
593
  ) =>
594
  (deferable: AsyncIterableLike<T>) =>
5✔
595
    newInstance(
6✔
596
      RetryAsyncIterable,
597
      deferable,
598
      shouldRetry ?? alwaysTrue,
9✔
599
    )) as Signature["retry"];
600

601
export const scan: Signature["scan"] = (<T, TAcc>(
3✔
602
    scanner: Reducer<T, TAcc>,
603
    initialValue: Factory<TAcc>,
604
  ) =>
605
  (iter: AsyncIterableLike<T>) =>
4✔
606
    newInstance(
5✔
607
      ScanAsyncIterable,
608
      iter,
609
      scanner,
610
      initialValue,
611
    )) as Signature["scan"];
612

613
class TakeFirstAsyncIterable<T> implements AsyncIterableLike<T> {
614
  public readonly [ComputationLike_isPure]?: boolean;
615
  public readonly [ComputationLike_isSynchronous]: false = false as const;
15✔
616

617
  constructor(
618
    private s: AsyncIterableLike<T>,
15✔
619
    private c: number,
15✔
620
  ) {
621
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
15✔
622
  }
623

624
  async *[Symbol.asyncIterator]() {
625
    const takeCount = this.c;
13✔
626
    let count = 0;
13✔
627

628
    for await (const v of this.s) {
13✔
629
      if (count < takeCount) {
55✔
630
        yield v;
45✔
631
      } else {
632
        break;
10✔
633
      }
634
      count++;
45✔
635
    }
636
  }
637
}
638

639
export const takeFirst: Signature["takeFirst"] = (<T>(options?: {
3✔
640
    readonly count?: number;
641
  }) =>
642
  (iterable: AsyncIterableLike<T>) =>
14✔
643
    newInstance(
15✔
644
      TakeFirstAsyncIterable,
645
      iterable,
646
      clampPositiveInteger(options?.count ?? 1),
19✔
647
    )) as Signature["takeFirst"];
648

649
class TakeWhileAsyncIterable<T> implements AsyncIterableLike<T> {
650
  public readonly [ComputationLike_isPure]?: boolean;
651
  public readonly [ComputationLike_isSynchronous]: false = false as const;
7✔
652

653
  constructor(
654
    private s: AsyncIterableLike<T>,
7✔
655
    private p: Predicate<T>,
7✔
656
    private i: boolean,
7✔
657
  ) {
658
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
7✔
659
  }
660

661
  async *[Symbol.asyncIterator]() {
662
    const predicate = this.p;
5✔
663
    const inclusive = this.i;
5✔
664

665
    for await (const next of this.s) {
5✔
666
      const satisfiesPredicate = predicate(next);
12✔
667

668
      if (satisfiesPredicate || inclusive) {
11✔
669
        yield next;
10✔
670
      }
671

672
      if (!satisfiesPredicate) {
11✔
673
        break;
2✔
674
      }
675
    }
676
  }
677
}
678

679
export const takeWhile: Signature["takeWhile"] = (<T>(
3✔
680
    predicate: Predicate<T>,
681
    options?: {
682
      readonly inclusive?: boolean;
683
    },
684
  ) =>
685
  (iterable: AsyncIterableLike<T>) =>
6✔
686
    newInstance(
7✔
687
      TakeWhileAsyncIterable,
688
      iterable,
689
      predicate,
690
      options?.inclusive ?? false,
13✔
691
    )) as Signature["takeWhile"];
692

693
class ThrowIfEmptyAsyncIterable<T> implements AsyncIterableLike<T> {
694
  public readonly [ComputationLike_isPure]?: boolean;
695
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
696

697
  constructor(
698
    private readonly i: AsyncIterableLike<T>,
5✔
699
    private readonly f: Factory<unknown>,
5✔
700
  ) {
701
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
5✔
702
  }
703

704
  async *[Symbol.asyncIterator]() {
705
    let isEmpty = true;
3✔
706
    for await (const v of this.i) {
3✔
707
      isEmpty = false;
1✔
708
      yield v;
1✔
709
    }
710

711
    if (isEmpty) {
3✔
712
      raiseError(error(this.f()));
2✔
713
    }
714
  }
715
}
716

717
export const throwIfEmpty: Signature["throwIfEmpty"] = (<T>(
3✔
718
    factory: Factory<unknown>,
719
  ) =>
720
  (iter: AsyncIterableLike<T>) =>
4✔
721
    newInstance(
5✔
722
      ThrowIfEmptyAsyncIterable,
723
      iter,
724
      factory,
725
    )) as Signature["throwIfEmpty"];
726

727
export const toEventSource: Signature["toEventSource"] =
3✔
728
  EventSource_fromAsyncIterable;
729

730
export const toObservable: Signature["toObservable"] =
3✔
731
  Observable_fromAsyncIterable as Signature["toObservable"];
732

733
export const toPauseableEventSource: Signature["toPauseableEventSource"] =
3✔
734
  <T>() =>
3✔
735
  (iterable: AsyncIterableLike<T>) =>
2✔
736
    PauseableEventSource.create<T>((modeObs: EventSourceLike<boolean>) =>
2✔
737
      pipe(
2✔
738
        EventSource_create((listener: EventListenerLike<T>) => {
739
          const iterator = iterable[Symbol.asyncIterator]();
2✔
740

741
          let isPaused = true;
2✔
742

743
          const continuation = async () => {
2✔
744
            try {
2✔
745
              while (!listener[DisposableLike_isDisposed] && !isPaused) {
2✔
746
                const next = await iterator[Iterator_next]();
5✔
747

748
                if (next[Iterator_done]) {
4✔
749
                  listener[DisposableLike_dispose]();
1✔
750
                  break;
1✔
751
                } else if (!listener[DisposableLike_isDisposed]) {
3✔
752
                  listener[EventListenerLike_notify](next[Iterator_value]);
3✔
753
                }
754
              }
755
            } catch (e) {
756
              listener[DisposableLike_dispose](error(e));
1✔
757
            }
758
          };
759

760
          pipe(
2✔
761
            modeObs,
762
            EventSource_addEventHandler(async (mode: boolean) => {
763
              const wasPaused = isPaused;
2✔
764
              isPaused = mode;
2✔
765

766
              if (!isPaused && wasPaused) {
2✔
767
                await continuation();
2✔
768
              }
769
            }),
770
            Disposable.bindTo(listener),
771
          );
772
        }),
773
        Disposable.addToContainer(modeObs),
774
      ),
775
    );
776

777
export const toPauseableObservable: Signature["toPauseableObservable"] =
3✔
778
  <T>(
3✔
779
    scheduler: SchedulerLike,
780
    options?: {
781
      readonly replay?: number;
782
    },
783
  ) =>
784
  (iterable: AsyncIterableLike<T>) =>
3✔
785
    PauseableObservable.create<T>((modeObs: EventSourceLike<boolean>) =>
3✔
786
      pipe(
3✔
787
        Observable_create((observer: ObserverLike<T>) => {
788
          const iterator = iterable[Symbol.asyncIterator]();
3✔
789
          const maxYieldInterval = observer[SchedulerLike_maxYieldInterval];
3✔
790

791
          let isPaused = true;
3✔
792

793
          const continuation = async () => {
3✔
794
            const startTime = observer[SchedulerLike_now];
5✔
795

796
            try {
5✔
797
              while (
5✔
798
                !observer[DisposableLike_isDisposed] &&
53✔
799
                !isPaused &&
800
                observer[SchedulerLike_now] - startTime < maxYieldInterval
801
              ) {
802
                const next = await iterator[Iterator_next]();
17✔
803

804
                if (next[Iterator_done]) {
15✔
805
                  observer[SinkLike_complete]();
2✔
806
                  break;
2✔
807
                } else if (
13!
808
                  (observer[EventListenerLike_notify](next[Iterator_value]),
809
                  !observer[QueueableLike_isReady])
810
                ) {
811
                  // An async iterable can produce resolved promises which are immediately
812
                  // scheduled on the microtask queue. This prevents the observer's scheduler
813
                  // from running and draining queued events.
814
                  //
815
                  // Check the observer's buffer size so we can avoid queueing forever
816
                  // in this situation.
UNCOV
817
                  break;
×
818
                }
819
              }
820
            } catch (e) {
821
              observer[DisposableLike_dispose](error(e));
1✔
822
            }
823

824
            if (!isPaused) {
4✔
825
              pipe(
3✔
826
                observer[SchedulerLike_schedule](continuation),
827
                Disposable.addTo(observer),
828
              );
829
            }
830
          };
831

832
          pipe(
3✔
833
            modeObs,
834
            EventSource_addEventHandler((mode: boolean) => {
835
              const wasPaused = isPaused;
5✔
836
              isPaused = mode;
5✔
837

838
              if (!isPaused && wasPaused) {
5✔
839
                pipe(
4✔
840
                  observer[SchedulerLike_schedule](continuation),
841
                  Disposable.addTo(observer),
842
                );
843
              }
844
            }),
845
            Disposable.addTo(observer),
846
            DisposableContainer.onComplete(
847
              bindMethod(observer, SinkLike_complete),
848
            ),
849
          );
850
        }),
851
        Observable_multicast(scheduler, options),
852
        Disposable.addToContainer(modeObs),
853
      ),
854
    );
855

856
export const toReadonlyArrayAsync: Signature["toReadonlyArrayAsync"] =
3✔
857
  /*@__PURE__*/
858
  returns(async (iter: AsyncIterableLike) => {
859
    const result: any[] = [];
56✔
860
    for await (const v of iter) {
56✔
861
      result[Array_push](v);
137✔
862
    }
863
    return result;
42✔
864
  });
865

866
class ZipAsyncIterable implements AsyncIterableLike {
867
  public readonly [ComputationLike_isPure]?: boolean;
868
  public readonly [ComputationLike_isSynchronous]: false = false as const;
2✔
869

870
  constructor(private readonly iters: readonly AsyncIterableLike<any>[]) {
2✔
871
    this[ComputationLike_isPure] = ComputationM.areAllPure(iters);
2✔
872
  }
873

874
  async *[Symbol.asyncIterator]() {
875
    const iterators = this.iters[Array_map](invoke(Symbol.asyncIterator));
2✔
876

877
    while (true) {
2✔
878
      const next = await Promise.all(
5✔
879
        iterators[Array_map](invoke(Iterator_next)),
880
      );
881

882
      if (next.some(x => x[Iterator_done] ?? false)) {
13!
883
        break;
2✔
884
      }
885
      yield next[Array_map](pick(Iterator_value));
3✔
886
    }
887
  }
888
}
889

890
export const zip: Signature["zip"] = ((
3✔
891
  ...iters: readonly AsyncIterableLike<any>[]
892
) => newInstance(ZipAsyncIterable, iters)) as unknown as Signature["zip"];
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc