• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 14282590739

05 Apr 2025 01:59PM UTC coverage: 88.385% (-0.03%) from 88.418%
14282590739

push

github

bordoley
cleanup backpressure api

932 of 1260 branches covered (73.97%)

Branch coverage included in aggregate %.

6 of 11 new or added lines in 3 files covered. (54.55%)

2 existing lines in 1 file now uncovered.

6000 of 6583 relevant lines covered (91.14%)

458.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/src/node/NodeWritable.ts
1
import { Writable } from "stream";
2
import {
1✔
3
  include,
4
  init,
5
  mixInstanceFactory,
6
  props,
7
  proto,
8
  unsafeCast,
9
} from "../__internal__/mixins.js";
10
import * as Broadcaster from "../computations/Broadcaster.js";
1✔
11
import * as Publisher from "../computations/Publisher.js";
1✔
12
import { PublisherLike } from "../computations.js";
13
import {
1✔
14
  Function1,
15
  Optional,
16
  SideEffect1,
17
  bindMethod,
18
  none,
19
  pipe,
20
} from "../functions.js";
21
import * as Disposable from "../utils/Disposable.js";
1✔
22
import * as DisposableContainer from "../utils/DisposableContainer.js";
1✔
23
import DisposableMixin from "../utils/__mixins__/DisposableMixin.js";
1✔
24
import {
1✔
25
  ConsumerLike,
26
  DisposableLike,
27
  DisposableLike_dispose,
28
  EventListenerLike_notify,
29
  FlowControllerLike,
30
  FlowControllerLike_addOnReadyListener,
31
  FlowControllerLike_isReady,
32
  SinkLike_complete,
33
  SinkLike_isCompleted,
34
  raiseBackpressureError,
35
} from "../utils.js";
36
import * as NodeStream from "./NodeStream.js";
1✔
37

38
interface NodeWritable {
39
  toConsumer(options?: {
40
    autoDispose?: boolean;
41
  }): Function1<Writable, ConsumerLike<Uint8Array>>;
42
}
43

44
type Signature = NodeWritable;
45

46
export const toConsumer: Signature["toConsumer"] = /*@__PURE__*/ (() => {
1✔
47
  const WritableConsumer_autoDispose = Symbol("WritableConsumer_autoDispose");
1✔
48
  const WritableConsumer_writable = Symbol("WritableConsumer_writable");
1✔
49
  const WritableConsumer_onReadyPublisher = Symbol(
1✔
50
    "WritableConsumer_onReadyPublisher",
51
  );
52

53
  type TProperties = {
54
    [WritableConsumer_autoDispose]: boolean;
55
    [WritableConsumer_writable]: Writable;
56
    [SinkLike_isCompleted]: boolean;
57
    [WritableConsumer_onReadyPublisher]: PublisherLike<void>;
58
  };
59

60
  const createNodeWritableConsumer = mixInstanceFactory(
1✔
61
    include(DisposableMixin),
62
    function WritableConsumer(
63
      this: Omit<ConsumerLike<Uint8Array>, keyof DisposableLike> & TProperties,
64
      writable: Writable,
65
      options: Optional<{ autoDispose?: boolean }>,
66
    ): ConsumerLike<Uint8Array> {
67
      init(DisposableMixin, this);
3✔
68

69
      this[WritableConsumer_writable] = writable;
3✔
70
      this[WritableConsumer_autoDispose] = options?.autoDispose ?? false;
3!
71

72
      writable.on("finish", () => {
3✔
73
        this[SinkLike_isCompleted] = true;
2✔
74

75
        if (this[WritableConsumer_autoDispose]) {
2✔
76
          this[DisposableLike_dispose]();
2✔
77
        }
78
      });
79

80
      pipe(
3✔
81
        this,
82
        NodeStream.addToNodeStream(writable),
83
        DisposableContainer.onDisposed(bindMethod(this, SinkLike_complete)),
84
      );
85

86
      return this;
3✔
87
    },
88
    props<TProperties>({
89
      [WritableConsumer_autoDispose]: false,
90
      [WritableConsumer_writable]: none,
91
      [SinkLike_isCompleted]: false,
92
      [WritableConsumer_onReadyPublisher]: none,
93
    }),
94
    proto({
95
      get [FlowControllerLike_isReady]() {
96
        unsafeCast<TProperties>(this);
21✔
97
        const writable = this[WritableConsumer_writable];
21✔
98
        const needsDrain = writable.writableNeedDrain;
21✔
99
        const result = !this[SinkLike_isCompleted] && !needsDrain;
21✔
100

101
        return result;
21✔
102
      },
103

104
      [FlowControllerLike_addOnReadyListener](
105
        this: TProperties & ConsumerLike,
106
        callback: SideEffect1<void>,
107
      ) {
108
        const publisher =
109
          this[WritableConsumer_onReadyPublisher] ??
3✔
110
          (() => {
111
            const writable = this[WritableConsumer_writable];
3✔
112
            const publisher = pipe(
3✔
113
              Publisher.create<void>(),
114
              Disposable.addTo(this),
115
            );
116

117
            const onDrain = bindMethod(publisher, EventListenerLike_notify);
3✔
118
            writable.on("drain", onDrain);
3✔
119

120
            this[WritableConsumer_onReadyPublisher] = publisher;
3✔
121
            return publisher;
3✔
122
          })();
123

124
        return pipe(
3✔
125
          publisher,
126
          Broadcaster.addEventHandler(callback),
127
          Disposable.addTo(this),
128
        );
129
      },
130

131
      [EventListenerLike_notify](
132
        this: TProperties & FlowControllerLike,
133
        data: Uint8Array,
134
      ) {
135
        const writable = this[WritableConsumer_writable];
6✔
136
        if (this[FlowControllerLike_isReady]) {
6!
137
          writable.write(Buffer.from(data));
6✔
138
        } else {
139
          // FIXME: Not strictly correct, because bytes doesn't necessarily
140
          // map to event counts
NEW
141
          raiseBackpressureError(writable.writableHighWaterMark);
×
142
        }
143
      },
144

145
      [SinkLike_complete](this: TProperties & DisposableLike) {
146
        const isCompleted = this[SinkLike_isCompleted];
6✔
147
        const writable = this[WritableConsumer_writable];
6✔
148
        const ended = writable.writableEnded;
6✔
149

150
        this[SinkLike_isCompleted] = true;
6✔
151

152
        if (isCompleted || ended) {
6✔
153
          return;
3✔
154
        }
155

156
        writable.end();
3✔
157
      },
158
    }),
159
  );
160

161
  return options => writable => createNodeWritableConsumer(writable, options);
3✔
162
})();
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc