• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 13892587551

17 Mar 2025 05:57AM UTC coverage: 95.01% (-0.7%) from 95.738%
13892587551

push

github

bordoley
first steps towards removing multicast observable

1094 of 1236 branches covered (88.51%)

Branch coverage included in aggregate %.

25 of 73 new or added lines in 7 files covered. (34.25%)

7 existing lines in 1 file now uncovered.

6180 of 6420 relevant lines covered (96.26%)

3142.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.16
/src/node/NodeReadable.ts
1
import { Readable } from "stream";
2
import * as EventSource from "../computations/EventSource.js";
1✔
3
import * as PauseableEventSource from "../computations/PauseableEventSource.js";
1✔
4
import * as Producer from "../computations/Producer.js";
1✔
5
import {
6
  PauseableEventSourceLike,
7
  ProducerWithSideEffectsLike,
8
} from "../computations.js";
9
import { bindMethod, pipe } from "../functions.js";
1✔
10
import * as Disposable from "../utils/Disposable.js";
1✔
11
import {
1✔
12
  ConsumerLike_addOnReadyListener,
13
  ConsumerLike_isReady,
14
  DisposableLike,
15
  DisposableLike_dispose,
16
  EventListenerLike_notify,
17
  SinkLike_complete,
18
} from "../utils.js";
19
import * as NodeStream from "./NodeStream.js";
1✔
20

21
interface NodeReadable {
22
  toEventSource(
23
    readable: Readable,
24
  ): PauseableEventSourceLike<Uint8Array> & DisposableLike;
25

26
  toProducer(readable: Readable): ProducerWithSideEffectsLike<Uint8Array>;
27
}
28

29
type Signature = NodeReadable;
30

31
// FIXME: Ideally this would be implemented as a wrapper around toProducer
32
export const toEventSource: Signature["toEventSource"] = readable =>
1✔
33
  PauseableEventSource.create(mode =>
3✔
34
    pipe(
3✔
35
      EventSource.create<Uint8Array>(listener => {
36
        pipe(readable, NodeStream.addTo(listener), NodeStream.add(listener));
3✔
37

38
        readable.pause();
3✔
39

40
        pipe(
3✔
41
          mode,
42
          EventSource.addEventHandler(isPaused => {
43
            if (isPaused) {
5✔
44
              readable.pause();
1✔
45
            } else {
46
              readable.resume();
4✔
47
            }
48
          }),
49
          NodeStream.addToNodeStream(readable),
50
        );
51

52
        const onData = bindMethod(listener, EventListenerLike_notify);
3✔
53
        const onEnd = bindMethod(listener, DisposableLike_dispose);
3✔
54

55
        readable.on("data", onData);
3✔
56
        readable.on("end", onEnd);
3✔
57
      }),
58
      Disposable.bindTo(mode),
59
    ),
60
  );
61

62
export const toProducer: Signature["toProducer"] = readable =>
1✔
NEW
63
  Producer.create(consumer => {
×
NEW
64
    pipe(readable, NodeStream.addTo(consumer), NodeStream.add(consumer));
×
65

NEW
66
    readable.pause();
×
67

NEW
68
    consumer[ConsumerLike_addOnReadyListener](bindMethod(readable, "resume"));
×
69

NEW
70
    const onData = (data: Uint8Array) => {
×
NEW
71
      consumer[EventListenerLike_notify](data);
×
72

NEW
73
      if (!consumer[ConsumerLike_isReady]) {
×
NEW
74
        readable.pause();
×
75
      }
76
    };
NEW
77
    readable.on("data", onData);
×
78

NEW
79
    readable.on("end", bindMethod(consumer, SinkLike_complete));
×
80

NEW
81
    if (consumer[ConsumerLike_isReady]) {
×
NEW
82
      readable.resume();
×
83
    }
84
  });
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc