• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 14278092116

05 Apr 2025 03:32AM UTC coverage: 88.48% (-6.7%) from 95.167%
14278092116

push

github

bordoley
tests

929 of 1256 branches covered (73.96%)

Branch coverage included in aggregate %.

2 of 2 new or added lines in 2 files covered. (100.0%)

229 existing lines in 27 files now uncovered.

6037 of 6617 relevant lines covered (91.23%)

421.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/src/node/NodeWritable.ts
1
import { Writable } from "stream";
2
import {
1✔
3
  include,
4
  init,
5
  mixInstanceFactory,
6
  props,
7
  proto,
8
  unsafeCast,
9
} from "../__internal__/mixins.js";
10
import * as Broadcaster from "../computations/Broadcaster.js";
1✔
11
import * as Publisher from "../computations/Publisher.js";
1✔
12
import { PublisherLike } from "../computations.js";
13
import {
1✔
14
  Function1,
15
  Optional,
16
  SideEffect1,
17
  bindMethod,
18
  newInstance,
19
  none,
20
  pipe,
21
  raise,
22
} from "../functions.js";
23
import * as Disposable from "../utils/Disposable.js";
1✔
24
import * as DisposableContainer from "../utils/DisposableContainer.js";
1✔
25
import DisposableMixin from "../utils/__mixins__/DisposableMixin.js";
1✔
26
import {
1✔
27
  BackPressureConfig_capacity,
28
  BackPressureConfig_strategy,
29
  BackPressureError,
30
  ConsumerLike,
31
  DisposableLike,
32
  DisposableLike_dispose,
33
  EventListenerLike_notify,
34
  FlowControllerLike,
35
  FlowControllerLike_addOnReadyListener,
36
  FlowControllerLike_isReady,
37
  SinkLike_complete,
38
  SinkLike_isCompleted,
39
  ThrowBackpressureStrategy,
40
} from "../utils.js";
41
import * as NodeStream from "./NodeStream.js";
1✔
42

43
interface NodeWritable {
44
  toConsumer(options?: {
45
    autoDispose?: boolean;
46
  }): Function1<Writable, ConsumerLike<Uint8Array>>;
47
}
48

49
type Signature = NodeWritable;
50

51
export const toConsumer: Signature["toConsumer"] = /*@__PURE__*/ (() => {
1✔
52
  const WritableConsumer_autoDispose = Symbol("WritableConsumer_autoDispose");
1✔
53
  const WritableConsumer_writable = Symbol("WritableConsumer_writable");
1✔
54
  const WritableConsumer_onReadyPublisher = Symbol(
1✔
55
    "WritableConsumer_onReadyPublisher",
56
  );
57

58
  type TProperties = {
59
    [WritableConsumer_autoDispose]: boolean;
60
    [WritableConsumer_writable]: Writable;
61
    [SinkLike_isCompleted]: boolean;
62
    [WritableConsumer_onReadyPublisher]: PublisherLike<void>;
63
  };
64

65
  const createNodeWritableConsumer = mixInstanceFactory(
1✔
66
    include(DisposableMixin),
67
    function WritableConsumer(
68
      this: Omit<ConsumerLike<Uint8Array>, keyof DisposableLike> & TProperties,
69
      writable: Writable,
70
      options: Optional<{ autoDispose?: boolean }>,
71
    ): ConsumerLike<Uint8Array> {
72
      init(DisposableMixin, this);
3✔
73

74
      this[WritableConsumer_writable] = writable;
3✔
75
      this[WritableConsumer_autoDispose] = options?.autoDispose ?? false;
3!
76

77
      writable.on("finish", () => {
3✔
78
        this[SinkLike_isCompleted] = true;
2✔
79

80
        if (this[WritableConsumer_autoDispose]) {
2✔
81
          this[DisposableLike_dispose]();
2✔
82
        }
83
      });
84

85
      pipe(
3✔
86
        this,
87
        NodeStream.addToNodeStream(writable),
88
        DisposableContainer.onDisposed(bindMethod(this, SinkLike_complete)),
89
      );
90

91
      return this;
3✔
92
    },
93
    props<TProperties>({
94
      [WritableConsumer_autoDispose]: false,
95
      [WritableConsumer_writable]: none,
96
      [SinkLike_isCompleted]: false,
97
      [WritableConsumer_onReadyPublisher]: none,
98
    }),
99
    proto({
100
      get [FlowControllerLike_isReady]() {
101
        unsafeCast<TProperties>(this);
21✔
102
        const writable = this[WritableConsumer_writable];
21✔
103
        const needsDrain = writable.writableNeedDrain;
21✔
104
        const result = !this[SinkLike_isCompleted] && !needsDrain;
21✔
105

106
        return result;
21✔
107
      },
108

109
      [FlowControllerLike_addOnReadyListener](
110
        this: TProperties & ConsumerLike,
111
        callback: SideEffect1<void>,
112
      ) {
113
        const publisher =
114
          this[WritableConsumer_onReadyPublisher] ??
3✔
115
          (() => {
116
            const writable = this[WritableConsumer_writable];
3✔
117
            const publisher = pipe(
3✔
118
              Publisher.create<void>(),
119
              Disposable.addTo(this),
120
            );
121

122
            const onDrain = bindMethod(publisher, EventListenerLike_notify);
3✔
123
            writable.on("drain", onDrain);
3✔
124

125
            this[WritableConsumer_onReadyPublisher] = publisher;
3✔
126
            return publisher;
3✔
127
          })();
128

129
        return pipe(
3✔
130
          publisher,
131
          Broadcaster.addEventHandler(callback),
132
          Disposable.addTo(this),
133
        );
134
      },
135

136
      [EventListenerLike_notify](
137
        this: TProperties & FlowControllerLike,
138
        data: Uint8Array,
139
      ) {
140
        const writable = this[WritableConsumer_writable];
6✔
141
        if (this[FlowControllerLike_isReady]) {
6!
142
          writable.write(Buffer.from(data));
6✔
143
        } else {
UNCOV
144
          raise(
×
145
            newInstance(BackPressureError, {
146
              [BackPressureConfig_strategy]: ThrowBackpressureStrategy,
147
              // FIXME: Not strictly correct, because bytes doesn't necessarily
148
              // map to event counts
149
              [BackPressureConfig_capacity]: writable.writableHighWaterMark,
150
            }),
151
          );
152
        }
153
      },
154

155
      [SinkLike_complete](this: TProperties & DisposableLike) {
156
        const isCompleted = this[SinkLike_isCompleted];
6✔
157
        const writable = this[WritableConsumer_writable];
6✔
158
        const ended = writable.writableEnded;
6✔
159

160
        this[SinkLike_isCompleted] = true;
6✔
161

162
        if (isCompleted || ended) {
6✔
163
          return;
3✔
164
        }
165

166
        writable.end();
3✔
167
      },
168
    }),
169
  );
170

171
  return options => writable => createNodeWritableConsumer(writable, options);
3✔
172
})();
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc