• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bordoley / reactive-js / 14282301741

05 Apr 2025 01:19PM UTC coverage: 88.418% (-0.1%) from 88.531%
14282301741

push

github

bordoley
Set default backpressure strategy for streams.

932 of 1259 branches covered (74.03%)

Branch coverage included in aggregate %.

16 of 16 new or added lines in 6 files covered. (100.0%)

12 existing lines in 2 files now uncovered.

6000 of 6581 relevant lines covered (91.17%)

455.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.98
/src/utils/PauseableScheduler.ts
1
import { MAX_VALUE } from "../__internal__/constants.js";
7✔
2
import {
7✔
3
  Mutable,
4
  include,
5
  init,
6
  mixInstanceFactory,
7
  props,
8
  unsafeCast,
9
} from "../__internal__/mixins.js";
10
import * as WritableStore from "../computations/WritableStore.js";
7✔
11
import { StoreLike_value, WritableStoreLike } from "../computations.js";
7✔
12
import { Optional, bind, isNone, isSome, none } from "../functions.js";
7✔
13
import { clampPositiveInteger } from "../math.js";
7✔
14
import {
7✔
15
  CollectionEnumeratorLike_peek,
16
  DisposableContainerLike_add,
17
  DisposableLike,
18
  DisposableLike_dispose,
19
  DisposableLike_isDisposed,
20
  EnumeratorLike_current,
21
  EnumeratorLike_moveNext,
22
  PauseableLike_isPaused,
23
  PauseableLike_pause,
24
  PauseableLike_resume,
25
  PauseableSchedulerLike,
26
  QueueLike,
27
  QueueLike_enqueue,
28
  SchedulerLike,
29
  SchedulerLike_inContinuation,
30
  SchedulerLike_maxYieldInterval,
31
  SchedulerLike_now,
32
  SchedulerLike_schedule,
33
  SchedulerLike_shouldYield,
34
  delayMs,
35
} from "../utils.js";
36
import * as Disposable from "./Disposable.js";
7✔
37
import QueueMixin from "./__mixins__/QueueMixin.js";
7✔
38
import SchedulerMixin, {
7✔
39
  SchedulerContinuation,
40
  SchedulerContinuationLike,
41
  SchedulerContinuationLike_dueTime,
42
  SchedulerContinuationLike_run,
43
  SchedulerMixinHostLike,
44
  SchedulerMixinHostLike_schedule,
45
  SchedulerMixinHostLike_shouldYield,
46
} from "./__mixins__/SchedulerMixin.js";
47

48
interface Signature {
49
  create(hostScheduler: SchedulerLike): PauseableSchedulerLike & DisposableLike;
50
}
51

52
export const create: Signature["create"] = /*@PURE__*/ (() => {
7✔
53
  const PauseableScheduler_hostScheduler = Symbol(
7✔
54
    "PauseableScheduler_hostScheduler",
55
  );
56

57
  const PauseableScheduler_hostSchedulerContinuationDueTime = Symbol(
7✔
58
    "PauseableScheduler_hostSchedulerContinuationDueTime",
59
  );
60

61
  const PauseableScheduler_pausedTime = Symbol("PauseableScheduler_pausedTime");
7✔
62
  const PauseableScheduler_timeDrift = Symbol("PauseableScheduler_timeDrift");
7✔
63
  const PauseableScheduler_activeContinuation = Symbol(
7✔
64
    "PauseableScheduler_activeContinuation",
65
  );
66
  const PauseableScheduler_hostSchedulerSubscription = Symbol(
7✔
67
    "PauseableScheduler_hostSchedulerSubscription",
68
  );
69

70
  type TProperties = {
71
    readonly [PauseableLike_isPaused]: WritableStoreLike<boolean>;
72
    readonly [PauseableScheduler_hostScheduler]: SchedulerLike;
73
    [PauseableScheduler_hostSchedulerContinuationDueTime]: number;
74
    [PauseableScheduler_pausedTime]: number;
75
    [PauseableScheduler_timeDrift]: number;
76
    [PauseableScheduler_activeContinuation]: Optional<SchedulerContinuationLike>;
77
    [PauseableScheduler_hostSchedulerSubscription]: DisposableLike;
78
  };
79

80
  const peek = (
7✔
81
    instance: TProperties &
82
      SchedulerMixinHostLike &
83
      QueueLike<SchedulerContinuationLike>,
84
  ): Optional<SchedulerContinuationLike> => {
85
    let continuation: Optional<SchedulerContinuationLike> = none;
528✔
86
    while (true) {
528✔
87
      continuation = instance[CollectionEnumeratorLike_peek];
530✔
88

89
      if (isNone(continuation) || !continuation[DisposableLike_isDisposed]) {
530✔
90
        break;
528✔
91
      }
92

93
      instance[EnumeratorLike_moveNext]();
2✔
94
    }
95

96
    return continuation;
528✔
97
  };
98

99
  const scheduleOnHost = (
7✔
100
    instance: TProperties &
101
      SchedulerMixinHostLike &
102
      SchedulerLike &
103
      QueueLike<SchedulerContinuationLike>,
104
  ) => {
105
    const hostScheduler = instance[PauseableScheduler_hostScheduler];
266✔
106

107
    const hostSchedulerContinuationIsScheduled =
108
      !instance[PauseableScheduler_hostSchedulerSubscription][
266✔
109
        DisposableLike_isDisposed
110
      ];
111
    const hostSchedulerContinuationDueTime =
112
      instance[PauseableScheduler_hostSchedulerContinuationDueTime];
266✔
113
    const nextContinuation = peek(instance);
266✔
114
    const nextContinuationDueTime =
115
      nextContinuation?.[SchedulerContinuationLike_dueTime] ?? MAX_VALUE;
266✔
116
    const inContinuation = instance[SchedulerLike_inContinuation];
266✔
117
    const isPaused = instance[PauseableLike_isPaused][StoreLike_value];
266✔
118
    const hostContinuationAlreadyScheduled =
119
      hostSchedulerContinuationIsScheduled &&
266✔
120
      hostSchedulerContinuationDueTime <= nextContinuationDueTime;
121

122
    if (
266✔
123
      isNone(nextContinuation) ||
807✔
124
      inContinuation ||
125
      hostContinuationAlreadyScheduled ||
126
      isPaused
127
    ) {
128
      return;
262✔
129
    }
130

131
    const now = instance[SchedulerLike_now];
4✔
132
    const dueTime = nextContinuation[SchedulerContinuationLike_dueTime];
4✔
133
    const delay = clampPositiveInteger(dueTime - now);
4✔
134

135
    instance[PauseableScheduler_hostSchedulerContinuationDueTime] = dueTime;
4✔
136

137
    instance[PauseableScheduler_hostSchedulerSubscription] = hostScheduler[
4✔
138
      SchedulerLike_schedule
139
    ](bind(hostSchedulerContinuation, instance), { delay });
140
  };
141

142
  function* hostSchedulerContinuation(
143
    this: SchedulerMixinHostLike &
144
      SchedulerLike &
145
      TProperties &
146
      QueueLike<SchedulerContinuationLike> &
147
      DisposableLike,
148
    scheduler: SchedulerLike,
149
  ) {
150
    const isPausedStore = this[PauseableLike_isPaused];
4✔
151
    while (
4✔
152
      !this[DisposableLike_isDisposed] &&
526✔
153
      !isPausedStore[StoreLike_value]
154
    ) {
155
      const nextContinuationToRun = peek(this);
262✔
156

157
      if (isNone(nextContinuationToRun)) {
262✔
158
        break;
3✔
159
      }
160

161
      const dueTime = nextContinuationToRun[SchedulerContinuationLike_dueTime];
259✔
162
      const now = this[SchedulerLike_now];
259✔
163
      const t = clampPositiveInteger(dueTime - now);
259✔
164

165
      if (t > 0) {
259✔
166
        this[PauseableScheduler_hostSchedulerContinuationDueTime] = dueTime;
1✔
167
      } else {
168
        this[EnumeratorLike_moveNext]();
258✔
169
        const continuation = this[EnumeratorLike_current];
258✔
170

171
        this[PauseableScheduler_activeContinuation] = continuation;
258✔
172
        continuation?.[SchedulerContinuationLike_run]();
258✔
173
        this[PauseableScheduler_activeContinuation] = none;
258✔
174
      }
175

176
      if (t > 0 || scheduler[SchedulerLike_shouldYield]) {
259✔
177
        yield delayMs(t);
253✔
178
      }
179
    }
180
  }
181

182
  return mixInstanceFactory(
7✔
183
    include(SchedulerMixin, QueueMixin()),
184
    function PauseableScheduler(
185
      this: Pick<
186
        PauseableSchedulerLike,
187
        typeof PauseableLike_pause | typeof PauseableLike_resume
188
      > &
189
        SchedulerMixinHostLike &
190
        Mutable<TProperties>,
191
      host: SchedulerLike,
192
    ): PauseableSchedulerLike & DisposableLike {
193
      init(SchedulerMixin, this);
4✔
194
      init(QueueMixin<SchedulerContinuationLike>(), this, {
4✔
195
        comparator: SchedulerContinuation.compare,
196
      });
197

198
      this[PauseableScheduler_hostScheduler] = host;
4✔
199

200
      this[PauseableScheduler_pausedTime] = host[SchedulerLike_now];
4✔
201
      this[PauseableScheduler_timeDrift] = 0;
4✔
202

203
      this[PauseableLike_isPaused] = WritableStore.create(true);
4✔
204

205
      host[DisposableContainerLike_add](this);
4✔
206

207
      return this;
4✔
208
    },
209
    props<TProperties>({
210
      [PauseableLike_isPaused]: none,
211
      [PauseableScheduler_hostScheduler]: none,
212
      [PauseableScheduler_hostSchedulerContinuationDueTime]: 0,
213
      [PauseableScheduler_pausedTime]: 0,
214
      [PauseableScheduler_timeDrift]: 0,
215
      [PauseableScheduler_activeContinuation]: none,
216
      [PauseableScheduler_hostSchedulerSubscription]: Disposable.disposed,
217
    }),
218
    {
219
      get [SchedulerLike_maxYieldInterval](): number {
UNCOV
220
        unsafeCast<TProperties>(this);
×
UNCOV
221
        return this[PauseableScheduler_hostScheduler][
×
222
          SchedulerLike_maxYieldInterval
223
        ];
224
      },
225

226
      get [SchedulerLike_now](): number {
227
        unsafeCast<TProperties>(this);
1,296✔
228
        const hostNow =
229
          this[PauseableScheduler_hostScheduler][SchedulerLike_now];
1,296✔
230
        const isPaused = this[PauseableLike_isPaused][StoreLike_value];
1,296✔
231
        const pausedTime =
232
          this[PauseableScheduler_pausedTime] -
1,296✔
233
          this[PauseableScheduler_timeDrift];
234
        const activeTime = hostNow - this[PauseableScheduler_timeDrift];
1,296✔
235

236
        return isPaused ? pausedTime : activeTime;
1,296✔
237
      },
238
      get [SchedulerMixinHostLike_shouldYield](): boolean {
UNCOV
239
        unsafeCast<
×
240
          TProperties &
241
            DisposableLike &
242
            SchedulerLike &
243
            QueueLike<SchedulerContinuationLike>
244
        >(this);
245

UNCOV
246
        const now = this[SchedulerLike_now];
×
UNCOV
247
        const nextContinuation = peek(this);
×
248

249
        const yieldToNextContinuation =
UNCOV
250
          isSome(nextContinuation) &&
×
251
          this[PauseableScheduler_activeContinuation] !== nextContinuation &&
252
          nextContinuation[SchedulerContinuationLike_dueTime] <= now;
253

UNCOV
254
        return (
×
255
          this[PauseableLike_isPaused][StoreLike_value] ||
×
256
          yieldToNextContinuation ||
257
          this[PauseableScheduler_hostScheduler][SchedulerLike_shouldYield]
258
        );
259
      },
260
      [PauseableLike_pause](this: TProperties & SchedulerMixinHostLike) {
261
        const hostNow =
262
          this[PauseableScheduler_hostScheduler][SchedulerLike_now];
1✔
263
        this[PauseableScheduler_pausedTime] = hostNow;
1✔
264
        this[PauseableScheduler_hostSchedulerSubscription][
1✔
265
          DisposableLike_dispose
266
        ]();
267
        this[PauseableLike_isPaused][StoreLike_value] = true;
1✔
268
      },
269
      [PauseableLike_resume](
270
        this: TProperties &
271
          SchedulerMixinHostLike &
272
          SchedulerLike &
273
          QueueLike<SchedulerContinuationLike>,
274
      ) {
275
        const hostNow =
276
          this[PauseableScheduler_hostScheduler][SchedulerLike_now];
5✔
277
        this[PauseableScheduler_timeDrift] +=
5✔
278
          hostNow - this[PauseableScheduler_pausedTime];
279
        this[PauseableLike_isPaused][StoreLike_value] = false;
5✔
280
        scheduleOnHost(this);
5✔
281
      },
282
      [SchedulerMixinHostLike_schedule](
283
        this: TProperties &
284
          SchedulerMixinHostLike &
285
          SchedulerLike &
286
          QueueLike<SchedulerContinuationLike>,
287
        continuation: SchedulerContinuationLike,
288
      ) {
289
        this[QueueLike_enqueue](continuation);
261✔
290

291
        scheduleOnHost(this);
261✔
292
      },
293
    },
294
  );
295
})();
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc