• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 14297555273

06 Apr 2025 11:12PM UTC coverage: 88.777% (+0.2%) from 88.543%
14297555273

push

github

bordoley
Add toAsyncIterable. Long term though Producer and AsyncIterator will probably be merged.

952 of 1275 branches covered (74.67%)

Branch coverage included in aggregate %.

110 of 112 new or added lines in 8 files covered. (98.21%)

17 existing lines in 8 files now uncovered.

6104 of 6673 relevant lines covered (91.47%)

542.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.15
/src/node/NodeWritable.ts
1
import { Writable } from "stream";
2
import {
1✔
3
  include,
4
  init,
5
  mixInstanceFactory,
6
  props,
7
  proto,
8
  unsafeCast,
9
} from "../__internal__/mixins.js";
10
import * as Broadcaster from "../computations/Broadcaster.js";
1✔
11
import * as Publisher from "../computations/Publisher.js";
1✔
12
import { PublisherLike } from "../computations.js";
13
import {
1✔
14
  Function1,
15
  SideEffect1,
16
  bindMethod,
17
  none,
18
  pipe,
19
  returns,
20
} from "../functions.js";
21
import * as Disposable from "../utils/Disposable.js";
1✔
22
import * as DisposableContainer from "../utils/DisposableContainer.js";
1✔
23
import DisposableMixin from "../utils/__mixins__/DisposableMixin.js";
1✔
24
import {
1✔
25
  ConsumerLike,
26
  DisposableLike,
27
  DisposableLike_dispose,
28
  EventListenerLike_notify,
29
  FlowControllerLike,
30
  FlowControllerLike_addOnReadyListener,
31
  FlowControllerLike_isReady,
32
  SinkLike_complete,
33
  SinkLike_isCompleted,
34
  raiseCapacityExceededError,
35
} from "../utils.js";
36
import * as NodeStream from "./NodeStream.js";
1✔
37

38
interface NodeWritable {
39
  toConsumer(): Function1<Writable, ConsumerLike<Uint8Array>>;
40
}
41

42
type Signature = NodeWritable;
43

44
export const toConsumer: Signature["toConsumer"] = /*@__PURE__*/ (() => {
1✔
45
  const WritableConsumer_writable = Symbol("WritableConsumer_writable");
1✔
46
  const WritableConsumer_onReadyPublisher = Symbol(
1✔
47
    "WritableConsumer_onReadyPublisher",
48
  );
49

50
  type TProperties = {
51
    [WritableConsumer_writable]: Writable;
52
    [SinkLike_isCompleted]: boolean;
53
    [WritableConsumer_onReadyPublisher]: PublisherLike<void>;
54
  };
55

56
  const createNodeWritableConsumer = mixInstanceFactory(
1✔
57
    include(DisposableMixin),
58
    function WritableConsumer(
59
      this: Omit<ConsumerLike<Uint8Array>, keyof DisposableLike> & TProperties,
60
      writable: Writable,
61
    ): ConsumerLike<Uint8Array> {
62
      init(DisposableMixin, this);
3✔
63

64
      this[WritableConsumer_writable] = writable;
3✔
65

66
      writable.on("finish", bindMethod(this, DisposableLike_dispose));
3✔
67

68
      pipe(
3✔
69
        this,
70
        NodeStream.addToNodeStream(writable),
71
        DisposableContainer.onDisposed(bindMethod(this, SinkLike_complete)),
72
      );
73

74
      return this;
3✔
75
    },
76
    props<TProperties>({
77
      [WritableConsumer_writable]: none,
78
      [SinkLike_isCompleted]: false,
79
      [WritableConsumer_onReadyPublisher]: none,
80
    }),
81
    proto({
82
      get [FlowControllerLike_isReady]() {
83
        unsafeCast<TProperties>(this);
30✔
84
        const writable = this[WritableConsumer_writable];
30✔
85
        const needsDrain = writable.writableNeedDrain;
30✔
86
        const result = !this[SinkLike_isCompleted] && !needsDrain;
30✔
87

88
        return result;
30✔
89
      },
90

91
      [FlowControllerLike_addOnReadyListener](
92
        this: TProperties & ConsumerLike,
93
        callback: SideEffect1<void>,
94
      ) {
95
        const publisher =
96
          this[WritableConsumer_onReadyPublisher] ??
3✔
97
          (() => {
98
            const writable = this[WritableConsumer_writable];
3✔
99
            const publisher = pipe(
3✔
100
              Publisher.create<void>(),
101
              Disposable.addTo(this),
102
            );
103

104
            const onDrain = bindMethod(publisher, EventListenerLike_notify);
3✔
105
            writable.on("drain", onDrain);
3✔
106

107
            this[WritableConsumer_onReadyPublisher] = publisher;
3✔
108
            return publisher;
3✔
109
          })();
110

111
        return pipe(
3✔
112
          publisher,
113
          Broadcaster.addEventHandler(callback),
114
          Disposable.addTo(this),
115
        );
116
      },
117

118
      [EventListenerLike_notify](
119
        this: TProperties & FlowControllerLike,
120
        data: Uint8Array,
121
      ) {
122
        const writable = this[WritableConsumer_writable];
6✔
123
        if (this[FlowControllerLike_isReady]) {
6!
124
          writable.write(Buffer.from(data));
6✔
125
        } else {
126
          // FIXME: Not strictly correct, because bytes doesn't necessarily
127
          // map to event counts
UNCOV
128
          raiseCapacityExceededError(writable.writableHighWaterMark);
×
129
        }
130
      },
131

132
      [SinkLike_complete](this: TProperties & DisposableLike) {
133
        const isCompleted = this[SinkLike_isCompleted];
6✔
134
        const writable = this[WritableConsumer_writable];
6✔
135
        const ended = writable.writableEnded;
6✔
136

137
        this[SinkLike_isCompleted] = true;
6✔
138

139
        if (isCompleted || ended) {
6✔
140
          return;
3✔
141
        }
142

143
        writable.end();
3✔
144
      },
145
    }),
146
  );
147

148
  return returns(createNodeWritableConsumer);
1✔
149
})();
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc