• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

node-opcua / node-opcua / 23974043205

04 Apr 2026 07:17AM UTC coverage: 92.589% (+0.01%) from 92.576%
23974043205

push

github

erossignon
chore: fix Mocha.Suite.settimeout misused

18408 of 21832 branches covered (84.32%)

161708 of 174651 relevant lines covered (92.59%)

461089.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/packages/node-opcua-server/source/server_publish_engine.ts
1
/**
103✔
2
 * @module node-opcua-server
2✔
3
 */
2✔
4
// tslint:disable:no-console
2✔
5

2✔
6
import { EventEmitter } from "node:events";
2✔
7
import chalk from "chalk";
2✔
8
import partition from "lodash.partition";
2✔
9
import sortBy from "lodash.sortby";
2✔
10

2✔
11
import { assert } from "node-opcua-assert";
2✔
12
import { checkDebugFlag, make_debugLog } from "node-opcua-debug";
2✔
13
import { ObjectRegistry } from "node-opcua-object-registry";
2✔
14
import { type StatusCode, StatusCodes } from "node-opcua-status-code";
2✔
15

2✔
16
import { PublishRequest, PublishResponse, ServiceFault, type SubscriptionAcknowledgement } from "node-opcua-types";
2✔
17
import type { IClosedOrTransferredSubscription, IServerSidePublishEngine } from "./i_server_side_publish_engine";
2✔
18
import { Subscription, SubscriptionState } from "./server_subscription";
2✔
19

2✔
20
const debugLog = make_debugLog(__filename);
2✔
21
const doDebug = checkDebugFlag(__filename);
2✔
22

2✔
23
function traceLog(...args: [unknown?, ...unknown[]]) {
82✔
24
    if (!doDebug) {
82✔
25
        return;
82✔
26
    }
82✔
27
    const a: string[] = args.map((x?: unknown) => String(x));
43!
28
    a.unshift(chalk.yellow(" TRACE "));
×
29
    debugLog(...a);
×
30
}
43✔
31

2✔
32
export interface ServerSidePublishEngineOptions {
2✔
33
    maxPublishRequestInQueue?: number;
2✔
34
}
2✔
35

2✔
36
interface PublishData {
2✔
37
    request: PublishRequest;
2✔
38
    serverTimeWhenReceived: number;
2✔
39
    results: StatusCode[];
2✔
40
    callback: (request: PublishRequest, response: PublishResponse | ServiceFault) => void;
2✔
41
}
2✔
42

2✔
43
function _assertValidPublishData(publishData: PublishData) {
3,394✔
44
    assert(publishData.request instanceof PublishRequest);
3,394✔
45
    assert(typeof publishData.serverTimeWhenReceived === "number");
3,394✔
46
    assert(Array.isArray(publishData.results));
3,394✔
47
    assert(typeof publishData.callback === "function");
3,394✔
48
}
3,394✔
49

2✔
50
function dummy_function() {
62✔
51
    /* empty */
62✔
52
}
62✔
53

2✔
54
function addDate(date: Date, delta: number) {
16,449✔
55
    return new Date(date.getTime() + delta);
16,449✔
56
}
16,449✔
57

2✔
58
function timeout_filter(publishData: PublishData): boolean {
17,177✔
59
    const request = publishData.request;
17,177✔
60
    const _results = publishData.results;
17,177✔
61
    if (!request.requestHeader.timeoutHint) {
17,177✔
62
        // no limits
728✔
63
        return false;
728✔
64
    }
728✔
65
    const serverTimeWhenReceived = publishData.serverTimeWhenReceived;
17,177✔
66
    // remark : do not use request.requestHeader.timestamp! here as this is a client date and server and client clocks might differ
16,449✔
67
    const expected_timeout_time = addDate(new Date(serverTimeWhenReceived), request.requestHeader.timeoutHint);
16,449✔
68
    return expected_timeout_time.getTime() < Date.now();
16,449✔
69
}
17,177✔
70

2✔
71
/***
2✔
72
 *  a Publish Engine for a given session
2✔
73
 */
2✔
74
export class ServerSidePublishEngine extends EventEmitter implements IServerSidePublishEngine {
2✔
75
    public static registry = new ObjectRegistry();
91✔
76

91✔
77
    /**
91✔
78
     * @private
91✔
79
     */
91✔
80
    public static transferSubscriptionsToOrphan(
91✔
81
        srcPublishEngine: ServerSidePublishEngine,
32✔
82
        destPublishEngine: ServerSidePublishEngine
32✔
83
    ): void {
32✔
84
        debugLog(
32✔
85
            chalk.yellow(
32✔
86
                "ServerSidePublishEngine#transferSubscriptionsToOrphan! " + "start transferring long live subscriptions to orphan"
32✔
87
            )
32✔
88
        );
32✔
89

32✔
90
        for (const subscription of Object.values(srcPublishEngine._subscriptions)) {
32✔
91
            assert(subscription.publishEngine === srcPublishEngine);
7✔
92

7✔
93
            if (subscription.$session) {
7✔
94
                subscription.$session._unexposeSubscriptionDiagnostics(subscription);
7✔
95
            } else {
7!
96
                console.warn("Warning:  subscription", subscription.id, " has no session attached!!!");
×
97
            }
×
98

7✔
99
            ServerSidePublishEngine.transferSubscription(subscription, destPublishEngine, false);
7✔
100
        }
7✔
101
        assert(srcPublishEngine.subscriptionCount === 0);
32✔
102

32✔
103
        debugLog(
32✔
104
            chalk.yellow(
32✔
105
                "ServerSidePublishEngine#transferSubscriptionsToOrphan! " + "end transferring long lived subscriptions to orphan"
32✔
106
            )
32✔
107
        );
32✔
108
    }
32✔
109

91✔
110
    /**
91✔
111
     * @param subscription
91✔
112
     * @param destPublishEngine
91✔
113
     * @param sendInitialValues true if initial values should be sent
91✔
114
     * @private
91✔
115
     */
91✔
116
    public static async transferSubscription(
91✔
117
        subscription: Subscription,
27✔
118
        destPublishEngine: ServerSidePublishEngine,
27✔
119
        sendInitialValues: boolean
27✔
120
    ): Promise<Subscription> {
27✔
121
        const srcPublishEngine = subscription.publishEngine as ServerSidePublishEngine;
27✔
122

27✔
123
        assert(!destPublishEngine.getSubscriptionById(subscription.id));
27✔
124
        assert(srcPublishEngine.getSubscriptionById(subscription.id));
27✔
125

27✔
126
        // remove pending StatusChangeNotification on the same session that may exist already
27✔
127
        destPublishEngine._purge_dangling_subscription(subscription.id);
27✔
128

27✔
129
        debugLog(chalk.cyan("ServerSidePublishEngine.transferSubscription live subscriptionId ="), subscription.subscriptionId);
27✔
130

27✔
131
        // xx const internalNotification = subscription._flushSentNotifications();
27✔
132
        debugLog(chalk.cyan("ServerSidePublishEngine.transferSubscription with  = "), subscription.getAvailableSequenceNumbers());
27✔
133

27✔
134
        //  If the Server transfers the Subscription to the new Session, the Server shall issue a
27✔
135
        //  StatusChangeNotification notificationMessage with the status code Good_SubscriptionTransferred
27✔
136
        //  to the old Session.
27✔
137
        subscription.notifyTransfer();
27✔
138

27✔
139
        const tmp = srcPublishEngine.detach_subscription(subscription);
27✔
140
        destPublishEngine.add_subscription(tmp);
27✔
141

27✔
142
        subscription.resetLifeTimeCounter();
27✔
143
        if (sendInitialValues) {
27✔
144
            /*  A Boolean parameter with the following values:
18✔
145
                TRUE  the first Publish response(s) after the TransferSubscriptions call
18✔
146
                      shall contain the current values of all Monitored Items in the
18✔
147
                      Subscription where the Monitoring Mode is set to Reporting.
18✔
148
                      If a value is queued for a data MonitoredItem, the next value in
18✔
149
                      the queue is sent in the Publish response. If no value is queued
18✔
150
                      for a data MonitoredItem, the last value sent is repeated in the
18✔
151
                      Publish response.
18✔
152
                FALSE the first Publish response after the TransferSubscriptions call
18✔
153
                      shall contain only the value changes since the last Publish
18✔
154
                      response was sent.
18✔
155
                This parameter only applies to MonitoredItems used for monitoring Attribute
18✔
156
                changes
18✔
157
            */
18✔
158
            debugLog("Resending initial values");
18✔
159
            await subscription.resendInitialValues();
18✔
160
        }
18✔
161

27✔
162
        assert(destPublishEngine.getSubscriptionById(subscription.id));
27✔
163
        assert(!srcPublishEngine.getSubscriptionById(subscription.id));
27✔
164

27✔
165
        return subscription;
27✔
166
    }
27✔
167

91✔
168
    public maxPublishRequestInQueue = 0;
91✔
169
    public isSessionClosed = false;
91✔
170

91✔
171
    private _publish_request_queue: PublishData[] = [];
91✔
172
    private _subscriptions: { [key: string]: Subscription };
91✔
173
    private _closed_subscriptions: IClosedOrTransferredSubscription[] = [];
91✔
174

91✔
175
    constructor(options?: ServerSidePublishEngineOptions) {
91✔
176
        super();
1,134✔
177

1,134✔
178
        options = options || {};
1,134✔
179

1,134✔
180
        ServerSidePublishEngine.registry.register(this);
1,134✔
181

1,134✔
182
        // a queue of pending publish request send by the client
1,134✔
183
        // waiting to be used by the server to send notification
1,134✔
184
        this._publish_request_queue = []; // { request :/*PublishRequest*/{},
1,134✔
185

1,134✔
186
        this._subscriptions = {};
1,134✔
187

1,134✔
188
        // _closed_subscriptions contains a collection of Subscription that
1,134✔
189
        // have  expired but that still need to send some pending notification
1,134✔
190
        // to the client.
1,134✔
191
        // Once publish requests will be received from the  client
1,134✔
192
        // the notifications of those subscriptions will be processed so that
1,134✔
193
        // they can be properly disposed.
1,134✔
194
        this._closed_subscriptions = [];
1,134✔
195

1,134✔
196
        this.maxPublishRequestInQueue = options.maxPublishRequestInQueue || 100;
1,134✔
197

1,134✔
198
        this.isSessionClosed = false;
1,134✔
199
    }
1,134✔
200

91✔
201
    public toString(): string {
91✔
202
        let str = "";
×
203
        str += `maxPublishRequestInQueue ${this.maxPublishRequestInQueue}\n`;
×
204
        str += `subscriptions ${Object.keys(this._subscriptions).join()}\n`;
×
205
        str += `closed subscriptions ${this._closed_subscriptions.map((s) => s.id).join()}\n`;
×
206
        return str;
×
207
    }
×
208
    public dispose(): void {
91✔
209
        debugLog("ServerSidePublishEngine#dispose");
1,134✔
210

1,134✔
211
        assert(Object.keys(this._subscriptions).length === 0, "self._subscriptions count!=0");
1,134✔
212
        this._subscriptions = {};
1,134✔
213

1,134✔
214
        assert(this._closed_subscriptions.length === 0, "self._closed_subscriptions count!=0");
1,134✔
215
        this._closed_subscriptions = [];
1,134✔
216

1,134✔
217
        ServerSidePublishEngine.registry.unregister(this);
1,134✔
218
    }
1,134✔
219

91✔
220
    public process_subscriptionAcknowledgements(subscriptionAcknowledgements: SubscriptionAcknowledgement[]): StatusCode[] {
91✔
221
        // process acknowledgements
3,394✔
222
        subscriptionAcknowledgements = subscriptionAcknowledgements || [];
3,394!
223
        debugLog("process_subscriptionAcknowledgements = ", subscriptionAcknowledgements);
3,394✔
224
        const results = subscriptionAcknowledgements.map((subscriptionAcknowledgement: SubscriptionAcknowledgement) => {
3,394✔
225
            const subscription = this.getSubscriptionById(subscriptionAcknowledgement.subscriptionId);
653✔
226
            if (!subscription) {
653✔
227
                // // try to find the session
4✔
228
                // const transferredSubscription = this._transferred_subscriptions.find(
4✔
229
                //   (s) => s.subscriptionId === subscriptionAcknowledgement.subscriptionId
4✔
230
                // );
4✔
231
                // if (transferredSubscription) {
4✔
232
                //   debugLog("Subscription acknowledgeNotification done in transferred subscription ");
4✔
233
                //   return transferredSubscription.acknowledgeNotification(subscriptionAcknowledgement.sequenceNumber);
4✔
234
                // }
4✔
235
                return StatusCodes.BadSubscriptionIdInvalid;
4✔
236
            }
4✔
237
            return subscription.acknowledgeNotification(subscriptionAcknowledgement.sequenceNumber);
650✔
238
        });
3,394✔
239

3,394✔
240
        return results;
3,394✔
241
    }
3,394✔
242

91✔
243
    /**
91✔
244
     * get a array of subscription handled by the publish engine.
91✔
245
     */
91✔
246
    public get subscriptions(): Subscription[] {
91✔
247
        return Object.values(this._subscriptions);
2,754✔
248
    }
2,754✔
249

91✔
250
    /**
91✔
251
     */
91✔
252
    public add_subscription(subscription: Subscription): Subscription {
91✔
253
        assert(subscription instanceof Subscription);
528✔
254
        assert(Number.isFinite(subscription.id));
528✔
255
        subscription.publishEngine = subscription.publishEngine || this;
528✔
256
        assert(subscription.publishEngine === this);
528✔
257
        assert(!this._subscriptions[subscription.id]);
528✔
258

528✔
259
        debugLog("ServerSidePublishEngine#add_subscription -  adding subscription with Id:", subscription.id);
528✔
260
        this._subscriptions[subscription.id] = subscription;
528✔
261
        // xx subscription._flushSentNotifications();
528✔
262
        return subscription;
528✔
263
    }
528✔
264

91✔
265
    public detach_subscription(subscription: Subscription): Subscription {
91✔
266
        assert(subscription instanceof Subscription);
27✔
267
        assert(Number.isFinite(subscription.id));
27✔
268
        assert(subscription.publishEngine === this);
27✔
269
        assert(this._subscriptions[subscription.id] === subscription);
27✔
270

27✔
271
        delete this._subscriptions[subscription.id];
27✔
272
        subscription.publishEngine = null as unknown as ServerSidePublishEngine;
27✔
273
        debugLog("ServerSidePublishEngine#detach_subscription detaching subscription with Id:", subscription.id);
27✔
274
        return subscription;
27✔
275
    }
27✔
276

91✔
277
    /**
91✔
278
     */
91✔
279
    public shutdown(): void {
91✔
280
        if (this.subscriptionCount !== 0) {
1,134!
281
            debugLog(chalk.red("Shutting down pending subscription"));
×
282
            this.subscriptions.map((subscription: Subscription) => subscription.terminate());
×
283
        }
×
284

1,134✔
285
        assert(this.subscriptionCount === 0, "subscription shall be removed first before you can shutdown a publish engine");
1,134✔
286

1,134✔
287
        debugLog("ServerSidePublishEngine#shutdown");
1,134✔
288

1,134✔
289
        // purge _publish_request_queue
1,134✔
290
        this._publish_request_queue = [];
1,134✔
291

1,134✔
292
        // purge self._closed_subscriptions
1,134✔
293
        this._closed_subscriptions.map((subscription) => subscription.dispose());
1,134✔
294
        this._closed_subscriptions = [];
1,134✔
295
    }
1,134✔
296

91✔
297
    /**
91✔
298
     * number of pending PublishRequest available in queue
91✔
299
     */
91✔
300
    public get pendingPublishRequestCount(): number {
91✔
301
        return this._publish_request_queue.length;
72,552✔
302
    }
72,552✔
303

91✔
304
    /**
91✔
305
     * number of subscriptions
91✔
306
     */
91✔
307
    public get subscriptionCount(): number {
91✔
308
        return Object.keys(this._subscriptions).length;
59,079✔
309
    }
59,079✔
310

91✔
311
    public get pendingClosedSubscriptionCount(): number {
91✔
312
        return this._closed_subscriptions.length;
2✔
313
    }
2✔
314

91✔
315
    public get currentMonitoredItemCount(): number {
91✔
316
        const subscriptions = Object.values(this._subscriptions);
42,222✔
317
        const result = subscriptions.reduce((sum: number, subscription: Subscription) => {
42,222✔
318
            return sum + subscription.monitoredItemCount;
13,721✔
319
        }, 0);
42,222✔
320
        assert(Number.isFinite(result));
42,222✔
321
        return result;
42,222✔
322
    }
42,222✔
323

91✔
324
    public _purge_dangling_subscription(subscriptionId: number): void {
91✔
325
        this._closed_subscriptions = this._closed_subscriptions.filter((s) => s.id !== subscriptionId);
27✔
326
    }
27✔
327

91✔
328
    public on_close_subscription(subscription: IClosedOrTransferredSubscription): void {
91✔
329
        doDebug && debugLog("ServerSidePublishEngine#on_close_subscription", subscription.id);
501!
330
        if (subscription.hasPendingNotifications) {
501✔
331
            doDebug &&
15!
332
                debugLog(
×
333
                    "ServerSidePublishEngine#on_close_subscription storing subscription",
×
334
                    subscription.id,
×
335
                    " to _closed_subscriptions because it has pending notification"
×
336
                );
5✔
337
            this._closed_subscriptions.push(subscription);
15✔
338
        } else {
501✔
339
            doDebug && debugLog("ServerSidePublishEngine#on_close_subscription disposing subscription", subscription.id);
486!
340
            // subscription is no longer needed
486✔
341
            subscription.dispose();
486✔
342
        }
486✔
343

501✔
344
        delete this._subscriptions[subscription.id];
501✔
345

501✔
346
        while (this.#_feed_closed_subscription()) {
501!
347
            /* keep looping */
×
348
        }
×
349
        if (this.subscriptionCount === 0 && this._closed_subscriptions.length === 0) {
501✔
350
            this.cancelPendingPublishRequest();
459✔
351
        }
459✔
352
    }
501✔
353

91✔
354
    /**
91✔
355
     * retrieve a subscription by id.
91✔
356
     * @param subscriptionId
91✔
357
     * @return Subscription
91✔
358
     */
91✔
359
    public getSubscriptionById(subscriptionId: number | string): Subscription {
91✔
360
        return this._subscriptions[subscriptionId.toString()];
3,178✔
361
    }
3,178✔
362

91✔
363
    public findLateSubscriptions(): Subscription[] {
91✔
364
        const subscriptions = Object.values(this._subscriptions);
3,276✔
365
        return subscriptions.filter((subscription: Subscription) => {
3,276✔
366
            return (subscription.state === SubscriptionState.LATE || !subscription.messageSent) && subscription.publishingEnabled;
3,457✔
367
        });
3,276✔
368
    }
3,276✔
369

91✔
370
    public get hasLateSubscriptions(): boolean {
91✔
371
        return this.findLateSubscriptions().length > 0;
×
372
    }
×
373

91✔
374
    public findLateSubscriptionsSortedByAge(): Subscription[] {
91✔
375
        let late_subscriptions = this.findLateSubscriptions();
3✔
376
        late_subscriptions = sortBy(late_subscriptions, "timeToExpiration");
3✔
377

3✔
378
        return late_subscriptions;
3✔
379
    }
3✔
380

91✔
381
    public cancelPendingPublishRequestBeforeChannelChange(): void {
91✔
382
        this._cancelPendingPublishRequest(StatusCodes.BadSecureChannelClosed);
21✔
383
    }
21✔
384

91✔
385
    public onSessionClose(): void {
91✔
386
        this.isSessionClosed = true;
1,072✔
387
        this._cancelPendingPublishRequest(StatusCodes.BadSessionClosed);
1,072✔
388
    }
1,072✔
389

91✔
390
    /**
91✔
391
     * @private
91✔
392
     */
91✔
393
    public cancelPendingPublishRequest(): void {
91✔
394
        assert(this.subscriptionCount === 0);
879✔
395
        this._cancelPendingPublishRequest(StatusCodes.BadNoSubscription);
879✔
396
    }
879✔
397

91✔
398
    /**
91✔
399
     *
91✔
400
     * @param request
91✔
401
     * @param callback
91✔
402
     * @private
91✔
403
     * @internal
91✔
404
     */
91✔
405
    public _on_PublishRequest(
91✔
406
        request: PublishRequest,
3,394✔
407
        callback?: (request1: PublishRequest, response: PublishResponse | ServiceFault) => void
3,394✔
408
    ): void {
3,394✔
409
        callback = callback || dummy_function;
3,394✔
410
        assert(typeof callback === "function");
3,394✔
411

3,394✔
412
        // c8 ignore next
3,394✔
413
        if (!(request instanceof PublishRequest)) {
3,394!
414
            throw new Error("Internal error : expecting a Publish Request here");
×
415
        }
×
416

3,394✔
417
        const subscriptionAckResults = this.process_subscriptionAcknowledgements(request.subscriptionAcknowledgements || []);
3,394!
418

3,394✔
419
        const currentTime = Date.now();
3,394✔
420
        const publishData: PublishData = {
3,394✔
421
            callback,
3,394✔
422
            request,
3,394✔
423
            results: subscriptionAckResults,
3,394✔
424
            serverTimeWhenReceived: currentTime
3,394✔
425
        };
3,394✔
426

3,394✔
427
        if (this.isSessionClosed) {
3,394!
428
            traceLog("server has received a PublishRequest but session is Closed");
×
429
            this._send_error_for_request(publishData, StatusCodes.BadSessionClosed);
×
430
        } else if (this.subscriptionCount === 0) {
3,394✔
431
            if (this._closed_subscriptions.length > 0 && this._closed_subscriptions[0].hasPendingNotifications) {
47✔
432
                const _verif = this._publish_request_queue.length;
8✔
433
                // add the publish request to the queue for later processing
8✔
434
                this._publish_request_queue.push(publishData);
8✔
435

8✔
436
                const _processed = this.#_feed_closed_subscription();
8✔
437
                //xx ( may be subscription has expired by themselves) assert(verif === this._publish_request_queue.length);
8✔
438
                //xx  ( may be subscription has expired by themselves) assert(processed);
8✔
439
                return;
8✔
440
            }
8✔
441
            traceLog("server has received a PublishRequest but has no subscription opened");
46✔
442
            this._send_error_for_request(publishData, StatusCodes.BadNoSubscription);
39✔
443
        } else {
3,394✔
444
            // add the publish request to the queue for later processing
3,347✔
445
            this._publish_request_queue.push(publishData);
3,347✔
446
            assert(this.pendingPublishRequestCount > 0);
3,347✔
447

3,347✔
448
            debugLog(chalk.bgWhite.red("Adding a PublishRequest to the queue "), this._publish_request_queue.length);
3,347✔
449

3,347✔
450
            this.#_feed_closed_subscription();
3,347✔
451

3,347✔
452
            this.#_feed_late_subscription();
3,347✔
453

3,347✔
454
            this.#_handle_too_many_requests();
3,347✔
455
        }
3,347✔
456
    }
3,394✔
457

91✔
458
    #_find_starving_subscription(): Subscription | null {
91✔
459
        const late_subscriptions = this.findLateSubscriptions();
3,273✔
460
        function compare_subscriptions(s1: Subscription, s2: Subscription): number {
3,273✔
461
            if (s1.priority === s2.priority) {
4✔
462
                return s1.timeToExpiration < s2.timeToExpiration ? 1 : 0;
4✔
463
            }
4✔
464
            return s1.priority > s2.priority ? 1 : 0;
4!
465
        }
4✔
466
        function findLateSubscriptionSortedByPriority() {
3,273✔
467
            if (late_subscriptions.length === 0) {
3,273✔
468
                return null;
2,750✔
469
            }
2,750✔
470
            late_subscriptions.sort(compare_subscriptions);
582✔
471

523✔
472
            // c8 ignore next
523✔
473
            if (doDebug) {
3,273!
474
                debugLog(
×
475
                    late_subscriptions
×
476
                        .map(
×
477
                            (s: Subscription) =>
×
478
                                "[ id = " +
×
479
                                s.id +
×
480
                                " prio=" +
×
481
                                s.priority +
×
482
                                " t=" +
×
483
                                s.timeToExpiration +
×
484
                                " ka=" +
×
485
                                s.timeToKeepAlive +
×
486
                                " m?=" +
×
487
                                s.hasUncollectedMonitoredItemNotifications +
×
488
                                " " +
×
489
                                SubscriptionState[s.state] +
×
490
                                " " +
×
491
                                s.messageSent +
×
492
                                "]"
×
493
                        )
×
494
                        .join(" \n")
×
495
                );
×
496
            }
×
497
            return late_subscriptions[late_subscriptions.length - 1];
582✔
498
        }
582✔
499

3,273✔
500
        if (this._closed_subscriptions) {
3,273✔
501
            /** */
3,273✔
502
        }
3,273✔
503
        const starving_subscription = /* this.findSubscriptionWaitingForFirstPublish() || */ findLateSubscriptionSortedByPriority();
3,273✔
504
        return starving_subscription;
3,273✔
505
    }
3,273✔
506

91✔
507
    #_feed_late_subscription() {
91✔
508
        setImmediate(() => {
3,347✔
509
            if (!this.pendingPublishRequestCount) {
3,343✔
510
                return;
70✔
511
            }
70✔
512
            const starving_subscription = this.#_find_starving_subscription();
3,278✔
513
            if (starving_subscription) {
3,343✔
514
                doDebug &&
523!
515
                    debugLog(chalk.bgWhite.red("feeding most late subscription subscriptionId  = "), starving_subscription.id);
469✔
516
                starving_subscription.process_subscription();
523✔
517
            }
523✔
518
        });
3,347✔
519
    }
3,347✔
520

91✔
521
    #_feed_closed_subscription() {
91✔
522
        if (!this.pendingPublishRequestCount) {
3,856✔
523
            return false;
211✔
524
        }
211✔
525

3,711✔
526
        if (this._closed_subscriptions.length === 0) {
3,856✔
527
            debugLog("ServerSidePublishEngine#_feed_closed_subscription  -> nothing to do");
3,631✔
528
            return false;
3,631✔
529
        }
3,631✔
530
        // process closed subscription
207✔
531
        const closed_subscription = this._closed_subscriptions[0];
207✔
532
        assert(closed_subscription.hasPendingNotifications);
14✔
533
        debugLog("ServerSidePublishEngine#_feed_closed_subscription for closed_subscription ", closed_subscription.id);
14✔
534
        closed_subscription._publish_pending_notifications();
14✔
535
        if (!closed_subscription.hasPendingNotifications) {
14✔
536
            closed_subscription.dispose();
14✔
537
            this._closed_subscriptions.shift();
14✔
538
        }
14✔
539
        return true;
14✔
540
    }
207✔
541

91✔
542
    private _send_error_for_request(publishData: PublishData, statusCode: StatusCode): void {
91✔
543
        _assertValidPublishData(publishData);
2,042✔
544
        const response = new ServiceFault({
2,042✔
545
            responseHeader: { serviceResult: statusCode }
2,042✔
546
        });
2,042✔
547
        this._send_response_for_request(publishData, response);
2,042✔
548
    }
2,042✔
549

91✔
550
    private _cancelPendingPublishRequest(statusCode: StatusCode): void {
91✔
551
        if (this._publish_request_queue) {
1,972✔
552
            debugLog(
1,972✔
553
                chalk.red("Cancelling pending PublishRequest with statusCode  "),
1,972✔
554
                statusCode.toString(),
1,972✔
555
                " length =",
1,972✔
556
                this._publish_request_queue.length
1,972✔
557
            );
1,972✔
558
        } else {
1,972!
559
            debugLog(chalk.red("No pending PublishRequest to cancel"));
×
560
        }
×
561

1,972✔
562
        for (const publishData of this._publish_request_queue) {
1,972✔
563
            this._send_error_for_request(publishData, statusCode);
1,993✔
564
        }
1,993✔
565
        this._publish_request_queue = [];
1,972✔
566
    }
1,972✔
567

91✔
568
    #_handle_too_many_requests() {
91✔
569
        if (this.pendingPublishRequestCount > this.maxPublishRequestInQueue) {
3,347✔
570
            traceLog(
7✔
571
                "server has received too many PublishRequest",
7✔
572
                this.pendingPublishRequestCount,
7✔
573
                "/",
7✔
574
                this.maxPublishRequestInQueue
7✔
575
            );
7✔
576
            assert(this.pendingPublishRequestCount === this.maxPublishRequestInQueue + 1);
7✔
577
            // When a Server receives a new Publish request that exceeds its limit it shall de-queue the oldest Publish
7✔
578
            // request and return a response with the result set to Bad_TooManyPublishRequests.
7✔
579

7✔
580
            // dequeue oldest request
7✔
581
            const publishData = this._publish_request_queue.shift();
7✔
582
            if (!publishData) {
7!
583
                return;
×
584
            }
×
585
            this._send_error_for_request(publishData, StatusCodes.BadTooManyPublishRequests);
7✔
586
        }
7✔
587
    }
3,347✔
588

91✔
589
    /**
91✔
590
     * call by a subscription when no notification message is available after the keep alive delay has
91✔
591
     * expired.
91✔
592
     *
91✔
593
     * @param subscriptionId
91✔
594
     * @param future_sequence_number
91✔
595
     * @return true if a publish response has been sent
91✔
596
     */
91✔
597
    public send_keep_alive_response(subscriptionId: number, future_sequence_number: number): boolean {
91✔
598
        //  this keep-alive Message informs the Client that the Subscription is still active.
616✔
599
        //  Each keep-alive Message is a response to a Publish request in which the  notification Message
616✔
600
        //  parameter does not contain any Notifications and that contains the sequence number of the next
616✔
601
        //  Notification Message that is to be sent.
616✔
602

616✔
603
        const subscription = this.getSubscriptionById(subscriptionId);
616✔
604
        /* c8 ignore next */
2✔
605
        if (!subscription) {
2✔
606
            traceLog("send_keep_alive_response  => invalid subscriptionId = ", subscriptionId);
×
607
            return false;
×
608
        }
×
609
        // let check if we have available PublishRequest to send the keep alive
616✔
610
        if (this.pendingPublishRequestCount === 0 || subscription.hasPendingNotifications) {
616✔
611
            // we cannot send the keep alive PublishResponse
36✔
612
            traceLog(
36✔
613
                "send_keep_alive_response  => cannot send keep-alive  (no PublishRequest left) subscriptionId = ",
36✔
614
                subscriptionId
36✔
615
            );
36✔
616
            return false;
36✔
617
        }
36✔
618
        debugLog(
580✔
619
            `Sending keep alive response for subscription id ${subscription.id} ${subscription.publishingInterval} ${subscription.maxKeepAliveCount}`
580✔
620
        );
580✔
621
        this._send_response(
580✔
622
            subscription,
580✔
623
            new PublishResponse({
580✔
624
                availableSequenceNumbers: subscription.getAvailableSequenceNumbers(),
580✔
625
                moreNotifications: false,
580✔
626
                notificationMessage: {
580✔
627
                    sequenceNumber: future_sequence_number
580✔
628
                },
580✔
629
                subscriptionId
580✔
630
            })
580✔
631
        );
580✔
632
        return true;
580✔
633
    }
610✔
634
    public _send_response(_subscription: Subscription, response: PublishResponse): void {
91✔
635
        assert(this.pendingPublishRequestCount > 0);
1,352✔
636
        assert(response.subscriptionId !== 0xffffff);
1,352✔
637
        const publishData = this._publish_request_queue.shift();
1,352✔
638
        if (!publishData) {
1,352!
639
            return;
×
640
        }
×
641
        this._send_valid_response_for_request(publishData, response);
1,352✔
642
    }
1,352✔
643

91✔
644
    public _on_tick(): void {
91✔
645
        this._cancelTimeoutRequests();
5,762✔
646
    }
5,762✔
647

91✔
648
    private _cancelTimeoutRequests(): void {
91✔
649
        if (this._publish_request_queue.length === 0) {
5,762✔
650
            return;
2,352✔
651
        }
2,352✔
652

5,090✔
653
        // filter out timeout requests
5,090✔
654
        const parts = partition(this._publish_request_queue, timeout_filter);
5,090✔
655

3,410✔
656
        this._publish_request_queue = parts[1]; // still valid
3,410✔
657

3,410✔
658
        const invalid_published_request = parts[0];
3,410✔
659
        for (const publishData of invalid_published_request) {
5,762✔
660
            // c8 ignore next
3✔
661
            if (doDebug) {
3!
662
                debugLog(chalk.cyan(" CANCELING TIMEOUT PUBLISH REQUEST "));
×
663
            }
×
664
            this._send_error_for_request(publishData, StatusCodes.BadTimeout);
3✔
665
        }
3✔
666
    }
5,090✔
667
    public _send_response_for_request(publishData: PublishData, response: PublishResponse | ServiceFault): void {
91✔
668
        response.responseHeader.requestHandle = publishData.request.requestHeader.requestHandle;
3,394✔
669
        publishData.callback(publishData.request, response);
3,394✔
670
    }
3,394✔
671
    public _send_valid_response_for_request(publishData: PublishData, response: PublishResponse): void {
91✔
672
        // c8 ignore next
1,352✔
673
        if (doDebug) {
1,352!
674
            debugLog("_send_response_for_request ", response.toString());
×
675
        }
×
676
        _assertValidPublishData(publishData);
1,352✔
677
        // xx assert(response.responseHeader.requestHandle !== 0,"expecting a valid requestHandle");
1,352✔
678
        response.results = publishData.results;
1,352✔
679
        this._send_response_for_request(publishData, response);
1,352✔
680
    }
1,352✔
681
}
91!
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc