• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 13878156314

16 Mar 2025 12:39AM UTC coverage: 96.096% (-0.7%) from 96.799%
13878156314

push

github

bordoley
fix example

1099 of 1227 branches covered (89.57%)

Branch coverage included in aggregate %.

6040 of 6202 relevant lines covered (97.39%)

2899.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.3
/src/computations/AsyncIterable.ts
1
import {
3✔
2
  Array_map,
3
  Array_push,
4
  Iterator_done,
5
  Iterator_next,
6
  Iterator_value,
7
} from "../__internal__/constants.js";
8
import parseArrayBounds from "../__internal__/parseArrayBounds.js";
3✔
9
import {
3✔
10
  AsyncIterableLike,
11
  AsyncIterableWithSideEffectsLike,
12
  ComputationLike_isDeferred,
13
  ComputationLike_isPure,
14
  ComputationLike_isSynchronous,
15
  ComputationModule,
16
  ComputationType,
17
  Computation_T,
18
  Computation_baseOfT,
19
  Computation_deferredWithSideEffectsOfT,
20
  Computation_pureDeferredOfT,
21
  ConcurrentDeferredComputationModule,
22
  DeferredComputationModule,
23
  EventSourceLike,
24
  HigherOrderInnerComputationLike,
25
  InteractiveComputationModule,
26
  PauseableEventSourceLike,
27
  PauseableObservableLike,
28
  PureAsyncIterableLike,
29
} from "../computations.js";
30
import {
3✔
31
  Factory,
32
  Function1,
33
  Optional,
34
  Predicate,
35
  Reducer,
36
  SideEffect,
37
  SideEffect1,
38
  alwaysTrue,
39
  bindMethod,
40
  error,
41
  invoke,
42
  isFunction,
43
  isNone,
44
  isSome,
45
  newInstance,
46
  none,
47
  pick,
48
  pipe,
49
  raiseError,
50
  returns,
51
} from "../functions.js";
52
import { clampPositiveInteger } from "../math.js";
3✔
53
import * as Disposable from "../utils/Disposable.js";
3✔
54
import * as DisposableContainer from "../utils/DisposableContainer.js";
3✔
55
import {
3✔
56
  DisposableLike,
57
  DisposableLike_dispose,
58
  DisposableLike_isDisposed,
59
  EventListenerLike,
60
  EventListenerLike_notify,
61
  ObserverLike,
62
  QueueableLike_isReady,
63
  SchedulerLike,
64
  SchedulerLike_maxYieldInterval,
65
  SchedulerLike_now,
66
  SchedulerLike_schedule,
67
  SinkLike_complete,
68
} from "../utils.js";
69
import * as ComputationM from "./Computation.js";
3✔
70
import EventSource_addEventHandler from "./EventSource/__private__/EventSource.addEventHandler.js";
3✔
71
import EventSource_create from "./EventSource/__private__/EventSource.create.js";
3✔
72
import EventSource_fromAsyncIterable from "./EventSource/__private__/EventSource.fromAsyncIterable.js";
3✔
73
import Observable_create from "./Observable/__private__/Observable.create.js";
3✔
74
import Observable_fromAsyncIterable from "./Observable/__private__/Observable.fromAsyncIterable.js";
3✔
75
import Observable_multicast from "./Observable/__private__/Observable.multicast.js";
3✔
76
import * as PauseableEventSource from "./PauseableEventSource.js";
3✔
77
import * as PauseableObservable from "./PauseableObservable.js";
3✔
78

79
/**
80
 * @noInheritDoc
81
 */
82
export interface AsyncIterableComputation extends ComputationType {
83
  readonly [Computation_baseOfT]?: AsyncIterableLike<
84
    this[typeof Computation_T]
85
  >;
86

87
  readonly [Computation_deferredWithSideEffectsOfT]?: AsyncIterableWithSideEffectsLike<
88
    this[typeof Computation_T]
89
  >;
90

91
  readonly [Computation_pureDeferredOfT]?: PureAsyncIterableLike<
92
    this[typeof Computation_T]
93
  >;
94
}
95
export type Computation = AsyncIterableComputation;
96

97
export interface AsyncIterableModule
98
  extends ComputationModule<AsyncIterableComputation>,
99
    DeferredComputationModule<AsyncIterableComputation>,
100
    InteractiveComputationModule<AsyncIterableComputation>,
101
    ConcurrentDeferredComputationModule<AsyncIterableComputation> {
102
  of<T>(): Function1<AsyncIterable<T>, AsyncIterableWithSideEffectsLike<T>>;
103

104
  toEventSource<T>(): Function1<
105
    AsyncIterableLike<T>,
106
    EventSourceLike<T> & DisposableLike
107
  >;
108

109
  toPauseableEventSource<T>(): Function1<
110
    AsyncIterableLike<T>,
111
    PauseableEventSourceLike<T> & DisposableLike
112
  >;
113

114
  toPauseableObservable<T>(
115
    scheduler: SchedulerLike,
116
    options?: {
117
      readonly replay?: number;
118
    },
119
  ): Function1<
120
    AsyncIterableLike<T>,
121
    PauseableObservableLike<T> & DisposableLike
122
  >;
123
}
124

125
export type Signature = AsyncIterableModule;
126

127
class CatchErrorAsyncIterable<T> implements AsyncIterableLike<T> {
128
  public readonly [ComputationLike_isPure]?: boolean;
129
  public readonly [ComputationLike_isSynchronous]: false = false as const;
10✔
130

131
  constructor(
132
    private readonly s: AsyncIterableLike<T>,
10✔
133
    private readonly onError:
10✔
134
      | SideEffect1<Error>
135
      | Function1<Error, AsyncIterableLike<T>>,
136
    isPure: boolean,
137
  ) {
138
    this[ComputationLike_isPure] = ComputationM.isPure(s) && isPure;
10✔
139
  }
140

141
  async *[Symbol.asyncIterator]() {
142
    try {
4✔
143
      yield* this.s;
4✔
144
    } catch (e) {
145
      const err = error(e);
4✔
146
      let action: Optional<AsyncIterableLike<T>> = none;
4✔
147
      try {
4✔
148
        action = this.onError(err) as Optional<AsyncIterableLike<T>>;
4✔
149
      } catch (e) {
150
        throw error([error(e), err]);
1✔
151
      }
152

153
      isSome(action) && (yield* action);
3✔
154
    }
155
  }
156
}
157

158
export const catchError: Signature["catchError"] = (<
3✔
159
    T,
160
    TInnerLike extends HigherOrderInnerComputationLike,
161
  >(
162
    onError: SideEffect1<Error> | Function1<Error, AsyncIterableLike<T>>,
163
    options?: {
164
      readonly innerType: TInnerLike;
165
    },
166
  ) =>
167
  (iter: AsyncIterableLike<T>) =>
7✔
168
    newInstance(
10✔
169
      CatchErrorAsyncIterable,
170
      iter,
171
      onError,
172
      options?.innerType?.[ComputationLike_isPure] ?? true,
16✔
173
    )) as Signature["catchError"];
174

175
class ConcatAllAsyncIterable<T> implements AsyncIterableLike<T> {
176
  public readonly [ComputationLike_isSynchronous]: false = false as const;
9✔
177
  public readonly [ComputationLike_isPure]?: boolean;
178

179
  constructor(
180
    private readonly s: AsyncIterableLike<AsyncIterableLike<T>>,
9✔
181
    isPure: boolean,
182
  ) {
183
    this[ComputationLike_isPure] = ComputationM.isPure(s) && isPure;
9✔
184
  }
185

186
  async *[Symbol.asyncIterator]() {
187
    for await (const iter of this.s) {
5✔
188
      yield* iter;
8✔
189
    }
190
  }
191
}
192
export const concatAll: Signature["concatAll"] = (<
3✔
193
    T,
194
    TInnerLike extends HigherOrderInnerComputationLike,
195
  >(options?: {
196
    readonly innerType: TInnerLike;
197
  }) =>
198
  (iterable: AsyncIterableLike<AsyncIterableLike<T>>) =>
9✔
199
    newInstance(
9✔
200
      ConcatAllAsyncIterable,
201
      iterable,
202
      options?.innerType?.[ComputationLike_isPure] ?? true,
13✔
203
    )) as Signature["concatAll"];
204

205
class ConcatAsyncIterable<T> implements AsyncIterableLike<T> {
206
  public readonly [ComputationLike_isSynchronous]: false = false as const;
6✔
207
  public readonly [ComputationLike_isPure]?: boolean;
208

209
  constructor(private readonly s: ReadonlyArray<AsyncIterableLike<T>>) {
6✔
210
    this[ComputationLike_isPure] = ComputationM.areAllPure(s);
6✔
211
  }
212

213
  async *[Symbol.asyncIterator]() {
214
    for (const iter of this.s) {
11✔
215
      yield* iter;
20✔
216
    }
217
  }
218
}
219

220
export const concat: Signature["concat"] = (<T>(
3✔
221
  ...iterables: ReadonlyArray<AsyncIterableLike<T>>
222
) =>
223
  newInstance(
6✔
224
    ConcatAsyncIterable,
225
    iterables,
226
  )) as unknown as Signature["concat"];
227

228
class FromAsyncFactoryIterable<T>
229
  implements AsyncIterableWithSideEffectsLike<T>
230
{
231
  public [ComputationLike_isPure]: false = false as const;
5✔
232
  public [ComputationLike_isSynchronous]: false = false as const;
5✔
233

234
  constructor(private f: (options?: { signal: AbortSignal }) => Promise<T>) {}
5✔
235

236
  async *[Symbol.asyncIterator]() {
237
    const result = await this.f();
4✔
238
    yield result;
2✔
239
  }
240
}
241

242
export const fromAsyncFactory: Signature["fromAsyncFactory"] = returns(
3✔
243
  factory => newInstance(FromAsyncFactoryIterable, factory),
5✔
244
);
245

246
class FromReadonlyArrayAsyncIterable<T> implements PureAsyncIterableLike<T> {
247
  public readonly [ComputationLike_isSynchronous]: false = false as const;
62✔
248

249
  constructor(
250
    private readonly arr: readonly T[],
62✔
251
    private readonly count: number,
62✔
252
    private readonly start: number,
62✔
253
  ) {}
254

255
  async *[Symbol.asyncIterator]() {
256
    let { arr, start, count } = this;
60✔
257
    while (count !== 0) {
60✔
258
      const next = arr[start];
135✔
259
      yield next;
135✔
260

261
      count > 0 ? (start++, count--) : (start--, count++);
116✔
262
    }
263
  }
264
}
265

266
export const fromReadonlyArray: Signature["fromReadonlyArray"] = (<
3✔
267
    T,
268
  >(options?: {
269
    readonly count?: number;
270
    readonly start?: number;
271
  }) =>
272
  (arr: readonly T[]) => {
61✔
273
    let [start, count] = parseArrayBounds(arr, options);
62✔
274
    return newInstance(FromReadonlyArrayAsyncIterable, arr, count, start);
62✔
275
  }) as Signature["fromReadonlyArray"];
276

277
export const empty: Signature["empty"] = (<T>() =>
3✔
278
  pipe([], fromReadonlyArray<T>(), returns))() as Signature["empty"];
3✔
279

280
class EncodeUtf8AsyncIterable
281
  implements AsyncIterableLike<Uint8Array<ArrayBufferLike>>
282
{
283
  public readonly [ComputationLike_isPure]?: boolean;
284
  public readonly [ComputationLike_isSynchronous]: false = false as const;
3✔
285

286
  constructor(private readonly s: AsyncIterableLike<string>) {
3✔
287
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
3✔
288
  }
289

290
  async *[Symbol.asyncIterator]() {
291
    const textEncoder = newInstance(TextEncoder);
1✔
292

293
    for await (const chunk of this.s) {
1✔
294
      yield textEncoder.encode(chunk);
1✔
295
    }
296
  }
297
}
298

299
export const encodeUtf8: Signature["encodeUtf8"] = (() =>
3✔
300
  (iterable: AsyncIterableLike<string>) =>
2✔
301
    newInstance(EncodeUtf8AsyncIterable, iterable)) as Signature["encodeUtf8"];
3✔
302

303
export const firstAsync: Signature["firstAsync"] = /*@__PURE__*/ returns(
3✔
304
  async (iter: AsyncIterableLike) => {
305
    for await (const v of iter) {
3✔
306
      return v;
1✔
307
    }
308
    return none;
1✔
309
  },
310
) as Signature["firstAsync"];
311

312
class ForEachAsyncIterable<T> implements AsyncIterableWithSideEffectsLike<T> {
313
  public [ComputationLike_isPure]: false = false as const;
4✔
314
  public [ComputationLike_isSynchronous]: false = false as const;
4✔
315

316
  constructor(
317
    private readonly d: AsyncIterableLike<T>,
4✔
318
    private readonly ef: SideEffect1<T>,
4✔
319
  ) {}
320

321
  async *[Symbol.asyncIterator]() {
322
    const delegate = this.d;
2✔
323
    const effect = this.ef;
2✔
324

325
    for await (const v of delegate) {
2✔
326
      effect(v);
4✔
327
      yield v;
3✔
328
    }
329
  }
330
}
331

332
export const forEach: Signature["forEach"] = (<T>(effect: SideEffect1<T>) =>
3✔
333
  (iter: AsyncIterableLike<T>) =>
3✔
334
    newInstance(ForEachAsyncIterable, iter, effect)) as Signature["forEach"];
4✔
335

336
export const fromValue: Signature["fromValue"] =
3✔
337
  /*@__PURE__*/
338
  returns(v => fromReadonlyArray()([v])) as Signature["fromValue"];
2✔
339

340
class GenAsyncIterable<T> implements PureAsyncIterableLike<T> {
341
  public readonly [ComputationLike_isSynchronous]: false = false as const;
15✔
342
  public readonly [ComputationLike_isDeferred]: true = true as const;
15✔
343
  public readonly [ComputationLike_isPure]: true = true as const;
15✔
344

345
  constructor(readonly f: Factory<Generator<T>>) {}
15✔
346

347
  async *[Symbol.asyncIterator]() {
348
    const iter = this.f();
20✔
349
    yield* iter;
20✔
350
  }
351
}
352

353
export const gen: Signature["gen"] = (<T>(factory: Factory<Generator<T>>) =>
3✔
354
  newInstance(GenAsyncIterable<T>, factory)) as Signature["gen"];
15✔
355

356
class GenWithSideEffectsAsyncIterable<T>
357
  implements AsyncIterableWithSideEffectsLike<T>
358
{
359
  public readonly [ComputationLike_isSynchronous]: false = false as const;
×
360
  public readonly [ComputationLike_isDeferred]: true = true as const;
×
361
  public readonly [ComputationLike_isPure]: false = false as const;
×
362

363
  constructor(readonly f: Factory<Generator<T>>) {}
×
364

365
  async *[Symbol.asyncIterator]() {
366
    const iter = this.f();
×
367
    yield* iter;
×
368
  }
369
}
370

371
export const genWithSideEffects: Signature["genWithSideEffects"] = (<T>(
3✔
372
  factory: Factory<Generator<T>>,
373
) =>
374
  newInstance(
×
375
    GenWithSideEffectsAsyncIterable<T>,
376
    factory,
377
  )) as Signature["genWithSideEffects"];
378

379
class KeepAsyncIterable<T> implements AsyncIterableLike<T> {
380
  public readonly [ComputationLike_isPure]?: boolean;
381
  public readonly [ComputationLike_isSynchronous]: false = false as const;
4✔
382

383
  constructor(
384
    private readonly d: AsyncIterableLike<T>,
4✔
385
    private readonly p: Predicate<T>,
4✔
386
  ) {
387
    this[ComputationLike_isPure] = d[ComputationLike_isPure];
4✔
388
  }
389

390
  async *[Symbol.asyncIterator]() {
391
    const delegate = this.d;
2✔
392
    const predicate = this.p;
2✔
393

394
    for await (const v of delegate) {
2✔
395
      if (predicate(v)) {
5✔
396
        yield v;
3✔
397
      }
398
    }
399
  }
400
}
401

402
export const keep: Signature["keep"] = (<T>(predicate: Predicate<T>) =>
3✔
403
  (iterable: AsyncIterableLike<T>) =>
3✔
404
    newInstance(KeepAsyncIterable, iterable, predicate)) as Signature["keep"];
4✔
405

406
export const lastAsync: Signature["lastAsync"] = /*@__PURE__*/ returns(
3✔
407
  async (iter: AsyncIterableLike) => {
408
    let result: Optional<unknown> = none;
6✔
409
    for await (const v of iter) {
6✔
410
      result = v;
6✔
411
    }
412
    return result;
3✔
413
  },
414
) as Signature["lastAsync"];
415

416
class MapAsyncIterable<TA, TB> implements AsyncIterableLike<TB> {
417
  public readonly [ComputationLike_isPure]?: boolean;
418
  public readonly [ComputationLike_isSynchronous]: false = false as const;
8✔
419

420
  constructor(
421
    private readonly d: AsyncIterableLike<TA>,
8✔
422
    private readonly m: Function1<TA, TB>,
8✔
423
  ) {
424
    this[ComputationLike_isPure] = d[ComputationLike_isPure];
8✔
425
  }
426

427
  async *[Symbol.asyncIterator]() {
428
    const delegate = this.d;
6✔
429
    const mapper = this.m;
6✔
430

431
    for await (const v of delegate) {
6✔
432
      yield mapper(v);
9✔
433
    }
434
  }
435
}
436

437
export const map: Signature["map"] = (<TA, TB>(mapper: Function1<TA, TB>) =>
3✔
438
  (iter: AsyncIterableLike<TA>) =>
7✔
439
    newInstance(MapAsyncIterable, iter, mapper)) as Signature["map"];
8✔
440

441
class AsyncIterableOf<T> implements AsyncIterableLike<T> {
442
  public readonly [ComputationLike_isPure]: false = false as const;
10✔
443
  public readonly [ComputationLike_isSynchronous]: false = false as const;
10✔
444

445
  constructor(private d: AsyncIterable<T>) {}
10✔
446

447
  [Symbol.asyncIterator]() {
448
    return this.d[Symbol.asyncIterator]();
8✔
449
  }
450
}
451

452
export const of: Signature["of"] = /*@__PURE__*/ returns(iter =>
3✔
453
  newInstance(AsyncIterableOf, iter),
10✔
454
);
455

456
class ScanAsyncIterable<T, TAcc> implements AsyncIterableLike<TAcc> {
457
  public readonly [ComputationLike_isPure]?: boolean;
458
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
459

460
  constructor(
461
    private readonly s: AsyncIterableLike<T>,
5✔
462
    private readonly r: Reducer<T, TAcc>,
5✔
463
    private readonly iv: Factory<TAcc>,
5✔
464
  ) {
465
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
5✔
466
  }
467

468
  async *[Symbol.asyncIterator]() {
469
    const reducer = this.r;
3✔
470
    let acc = this.iv();
3✔
471

472
    for await (const v of this.s) {
2✔
473
      acc = reducer(acc, v);
4✔
474
      yield acc;
3✔
475
    }
476
  }
477
}
478

479
class RaiseAsyncIterable<T> implements AsyncIterableLike<T> {
480
  public readonly [ComputationLike_isPure]?: true;
481
  public readonly [ComputationLike_isSynchronous]: false = false as const;
18✔
482

483
  constructor(private r: SideEffect) {}
18✔
484

485
  async *[Symbol.asyncIterator]() {
486
    raiseError(error(this.r()));
13✔
487
  }
488
}
489

490
export const raise: Signature["raise"] = (<T>(options?: {
3✔
491
  readonly raise?: SideEffect;
492
}) => {
493
  const { raise: factory = raise } = options ?? {};
18✔
494
  return newInstance(RaiseAsyncIterable<T>, factory);
18✔
495
}) as Signature["raise"];
496

497
export const reduceAsync: Signature["reduceAsync"] =
3✔
498
  <T, TAcc>(reducer: Reducer<T, TAcc>, initialValue: Factory<TAcc>) =>
3✔
499
  async (iterable: AsyncIterableLike<T>) => {
3✔
500
    let acc = initialValue();
3✔
501
    for await (let v of iterable) {
3✔
502
      acc = reducer(acc, v);
5✔
503
    }
504

505
    return acc;
2✔
506
  };
507

508
class RepeatAsyncIterable<T> implements AsyncIterableLike<T> {
509
  public readonly [ComputationLike_isPure]?: boolean;
510
  public readonly [ComputationLike_isSynchronous]: false = false as const;
6✔
511

512
  constructor(
513
    private i: AsyncIterableLike<T>,
6✔
514
    private p: Predicate<number>,
6✔
515
  ) {
516
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
6✔
517
  }
518

519
  async *[Symbol.asyncIterator]() {
520
    const iterable = this.i;
4✔
521
    const predicate = this.p;
4✔
522

523
    let cnt = 0;
4✔
524

525
    while (true) {
4✔
526
      yield* iterable;
8✔
527

528
      cnt++;
7✔
529
      if (!predicate(cnt)) {
7✔
530
        break;
2✔
531
      }
532
    }
533
  }
534
}
535

536
export const repeat: Signature["repeat"] = (<T>(
3✔
537
  predicate?: Predicate<number> | number,
538
) => {
539
  const repeatPredicate = isFunction(predicate)
5✔
540
    ? predicate
541
    : isNone(predicate)
3✔
542
      ? alwaysTrue
543
      : (count: number) => count < predicate;
3✔
544

545
  return (src: AsyncIterableLike<T>) =>
5✔
546
    newInstance(RepeatAsyncIterable, src, repeatPredicate);
6✔
547
}) as Signature["repeat"];
548

549
class RetryAsyncIterable<T> implements AsyncIterableLike<T> {
550
  public readonly [ComputationLike_isPure]?: boolean;
551
  public readonly [ComputationLike_isSynchronous]: false = false as const;
6✔
552

553
  constructor(
554
    private i: AsyncIterableLike<T>,
6✔
555
    private p: (count: number, error: Error) => boolean,
6✔
556
  ) {
557
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
6✔
558
  }
559

560
  async *[Symbol.asyncIterator]() {
561
    const iterable = this.i;
4✔
562
    const predicate = this.p;
4✔
563

564
    let cnt = 0;
4✔
565

566
    while (true) {
4✔
567
      try {
9✔
568
        yield* iterable;
9✔
569
      } catch (e) {
570
        cnt++;
7✔
571
        if (!predicate(cnt, error(e))) {
7✔
572
          break;
1✔
573
        }
574
      }
575
    }
576
  }
577
}
578

579
export const retry: Signature["retry"] = (<T>(
3✔
580
    shouldRetry?: (count: number, error: Error) => boolean,
581
  ) =>
582
  (deferable: AsyncIterableLike<T>) =>
5✔
583
    newInstance(
6✔
584
      RetryAsyncIterable,
585
      deferable,
586
      shouldRetry ?? alwaysTrue,
9✔
587
    )) as Signature["retry"];
588

589
export const scan: Signature["scan"] = (<T, TAcc>(
3✔
590
    scanner: Reducer<T, TAcc>,
591
    initialValue: Factory<TAcc>,
592
  ) =>
593
  (iter: AsyncIterableLike<T>) =>
4✔
594
    newInstance(
5✔
595
      ScanAsyncIterable,
596
      iter,
597
      scanner,
598
      initialValue,
599
    )) as Signature["scan"];
600

601
class TakeFirstAsyncIterable<T> implements AsyncIterableLike<T> {
602
  public readonly [ComputationLike_isPure]?: boolean;
603
  public readonly [ComputationLike_isSynchronous]: false = false as const;
18✔
604

605
  constructor(
606
    private s: AsyncIterableLike<T>,
18✔
607
    private c: number,
18✔
608
  ) {
609
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
18✔
610
  }
611

612
  async *[Symbol.asyncIterator]() {
613
    const takeCount = this.c;
21✔
614
    let count = 0;
21✔
615

616
    for await (const v of this.s) {
21✔
617
      if (count < takeCount) {
81✔
618
        yield v;
65✔
619
      } else {
620
        break;
16✔
621
      }
622
      count++;
63✔
623
    }
624
  }
625
}
626

627
export const takeFirst: Signature["takeFirst"] = (<T>(options?: {
3✔
628
    readonly count?: number;
629
  }) =>
630
  (iterable: AsyncIterableLike<T>) =>
17✔
631
    newInstance(
18✔
632
      TakeFirstAsyncIterable,
633
      iterable,
634
      clampPositiveInteger(options?.count ?? 1),
22✔
635
    )) as Signature["takeFirst"];
636

637
class TakeWhileAsyncIterable<T> implements AsyncIterableLike<T> {
638
  public readonly [ComputationLike_isPure]?: boolean;
639
  public readonly [ComputationLike_isSynchronous]: false = false as const;
7✔
640

641
  constructor(
642
    private s: AsyncIterableLike<T>,
7✔
643
    private p: Predicate<T>,
7✔
644
    private i: boolean,
7✔
645
  ) {
646
    this[ComputationLike_isPure] = s[ComputationLike_isPure];
7✔
647
  }
648

649
  async *[Symbol.asyncIterator]() {
650
    const predicate = this.p;
5✔
651
    const inclusive = this.i;
5✔
652

653
    for await (const next of this.s) {
5✔
654
      const satisfiesPredicate = predicate(next);
12✔
655

656
      if (satisfiesPredicate || inclusive) {
11✔
657
        yield next;
10✔
658
      }
659

660
      if (!satisfiesPredicate) {
11✔
661
        break;
2✔
662
      }
663
    }
664
  }
665
}
666

667
export const takeWhile: Signature["takeWhile"] = (<T>(
3✔
668
    predicate: Predicate<T>,
669
    options?: {
670
      readonly inclusive?: boolean;
671
    },
672
  ) =>
673
  (iterable: AsyncIterableLike<T>) =>
6✔
674
    newInstance(
7✔
675
      TakeWhileAsyncIterable,
676
      iterable,
677
      predicate,
678
      options?.inclusive ?? false,
13✔
679
    )) as Signature["takeWhile"];
680

681
class ThrowIfEmptyAsyncIterable<T> implements AsyncIterableLike<T> {
682
  public readonly [ComputationLike_isPure]?: boolean;
683
  public readonly [ComputationLike_isSynchronous]: false = false as const;
5✔
684

685
  constructor(
686
    private readonly i: AsyncIterableLike<T>,
5✔
687
    private readonly f: Factory<unknown>,
5✔
688
  ) {
689
    this[ComputationLike_isPure] = i[ComputationLike_isPure];
5✔
690
  }
691

692
  async *[Symbol.asyncIterator]() {
693
    let isEmpty = true;
3✔
694
    for await (const v of this.i) {
3✔
695
      isEmpty = false;
1✔
696
      yield v;
1✔
697
    }
698

699
    if (isEmpty) {
3✔
700
      raiseError(error(this.f()));
2✔
701
    }
702
  }
703
}
704

705
export const throwIfEmpty: Signature["throwIfEmpty"] = (<T>(
3✔
706
    factory: Factory<unknown>,
707
  ) =>
708
  (iter: AsyncIterableLike<T>) =>
4✔
709
    newInstance(
5✔
710
      ThrowIfEmptyAsyncIterable,
711
      iter,
712
      factory,
713
    )) as Signature["throwIfEmpty"];
714

715
export const toEventSource: Signature["toEventSource"] =
3✔
716
  EventSource_fromAsyncIterable;
717

718
export const toObservable: Signature["toObservable"] =
3✔
719
  Observable_fromAsyncIterable as Signature["toObservable"];
720

721
export const toPauseableEventSource: Signature["toPauseableEventSource"] =
3✔
722
  <T>() =>
3✔
723
  (iterable: AsyncIterableLike<T>) =>
2✔
724
    PauseableEventSource.create<T>((modeObs: EventSourceLike<boolean>) =>
2✔
725
      pipe(
2✔
726
        EventSource_create((listener: EventListenerLike<T>) => {
727
          const iterator = iterable[Symbol.asyncIterator]();
2✔
728

729
          let isPaused = true;
2✔
730

731
          const continuation = async () => {
2✔
732
            try {
2✔
733
              while (!listener[DisposableLike_isDisposed] && !isPaused) {
2✔
734
                const next = await iterator[Iterator_next]();
5✔
735

736
                if (next[Iterator_done]) {
4✔
737
                  listener[DisposableLike_dispose]();
1✔
738
                  break;
1✔
739
                } else if (!listener[DisposableLike_isDisposed]) {
3✔
740
                  listener[EventListenerLike_notify](next[Iterator_value]);
3✔
741
                }
742
              }
743
            } catch (e) {
744
              listener[DisposableLike_dispose](error(e));
1✔
745
            }
746
          };
747

748
          pipe(
2✔
749
            modeObs,
750
            EventSource_addEventHandler(async (mode: boolean) => {
751
              const wasPaused = isPaused;
2✔
752
              isPaused = mode;
2✔
753

754
              if (!isPaused && wasPaused) {
2✔
755
                await continuation();
2✔
756
              }
757
            }),
758
            Disposable.bindTo(listener),
759
          );
760
        }),
761
        Disposable.addToContainer(modeObs),
762
      ),
763
    );
764

765
export const toPauseableObservable: Signature["toPauseableObservable"] =
3✔
766
  <T>(
3✔
767
    scheduler: SchedulerLike,
768
    options?: {
769
      readonly replay?: number;
770
    },
771
  ) =>
772
  (iterable: AsyncIterableLike<T>) =>
3✔
773
    PauseableObservable.create<T>((modeObs: EventSourceLike<boolean>) =>
3✔
774
      pipe(
3✔
775
        Observable_create((observer: ObserverLike<T>) => {
776
          const iterator = iterable[Symbol.asyncIterator]();
3✔
777
          const maxYieldInterval = observer[SchedulerLike_maxYieldInterval];
3✔
778

779
          let isPaused = true;
3✔
780

781
          const continuation = async () => {
3✔
782
            const startTime = observer[SchedulerLike_now];
5✔
783

784
            try {
5✔
785
              while (
5✔
786
                !observer[DisposableLike_isDisposed] &&
53✔
787
                !isPaused &&
788
                observer[SchedulerLike_now] - startTime < maxYieldInterval
789
              ) {
790
                const next = await iterator[Iterator_next]();
17✔
791

792
                if (next[Iterator_done]) {
15✔
793
                  observer[SinkLike_complete]();
2✔
794
                  break;
2✔
795
                } else if (
13!
796
                  (observer[EventListenerLike_notify](next[Iterator_value]),
797
                  !observer[QueueableLike_isReady])
798
                ) {
799
                  // An async iterable can produce resolved promises which are immediately
800
                  // scheduled on the microtask queue. This prevents the observer's scheduler
801
                  // from running and draining queued events.
802
                  //
803
                  // Check the observer's buffer size so we can avoid queueing forever
804
                  // in this situation.
805
                  break;
×
806
                }
807
              }
808
            } catch (e) {
809
              observer[DisposableLike_dispose](error(e));
1✔
810
            }
811

812
            if (!isPaused) {
4✔
813
              pipe(
3✔
814
                observer[SchedulerLike_schedule](continuation),
815
                Disposable.addTo(observer),
816
              );
817
            }
818
          };
819

820
          pipe(
3✔
821
            modeObs,
822
            EventSource_addEventHandler((mode: boolean) => {
823
              const wasPaused = isPaused;
5✔
824
              isPaused = mode;
5✔
825

826
              if (!isPaused && wasPaused) {
5✔
827
                pipe(
4✔
828
                  observer[SchedulerLike_schedule](continuation),
829
                  Disposable.addTo(observer),
830
                );
831
              }
832
            }),
833
            Disposable.addTo(observer),
834
            DisposableContainer.onComplete(
835
              bindMethod(observer, SinkLike_complete),
836
            ),
837
          );
838
        }),
839
        Observable_multicast(scheduler, options),
840
        Disposable.addToContainer(modeObs),
841
      ),
842
    );
843

844
export const toReadonlyArrayAsync: Signature["toReadonlyArrayAsync"] =
3✔
845
  /*@__PURE__*/
846
  returns(async (iter: AsyncIterableLike) => {
847
    const result: any[] = [];
54✔
848
    for await (const v of iter) {
54✔
849
      result[Array_push](v);
124✔
850
    }
851
    return result;
40✔
852
  });
853

854
class ZipAsyncIterable implements AsyncIterableLike {
855
  public readonly [ComputationLike_isPure]?: boolean;
856
  public readonly [ComputationLike_isSynchronous]: false = false as const;
2✔
857

858
  constructor(private readonly iters: readonly AsyncIterableLike<any>[]) {
2✔
859
    this[ComputationLike_isPure] = ComputationM.areAllPure(iters);
2✔
860
  }
861

862
  async *[Symbol.asyncIterator]() {
863
    const iterators = this.iters[Array_map](invoke(Symbol.asyncIterator));
2✔
864

865
    while (true) {
2✔
866
      const next = await Promise.all(
5✔
867
        iterators[Array_map](invoke(Iterator_next)),
868
      );
869

870
      if (next.some(x => x[Iterator_done] ?? false)) {
13!
871
        break;
2✔
872
      }
873
      yield next[Array_map](pick(Iterator_value));
3✔
874
    }
875
  }
876
}
877

878
export const zip: Signature["zip"] = ((
3✔
879
  ...iters: readonly AsyncIterableLike<any>[]
880
) => newInstance(ZipAsyncIterable, iters)) as unknown as Signature["zip"];
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc