• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

node-opcua / node-opcua / 17921016002

22 Sep 2025 03:55PM UTC coverage: 90.756% (+0.01%) from 90.742%
17921016002

Pull #1458

github

web-flow
Merge 618d884c6 into 0a94e8bf3
Pull Request #1458: Fast restart leak

11092 of 14094 branches covered (78.7%)

44 of 53 new or added lines in 3 files covered. (83.02%)

4 existing lines in 3 files now uncovered.

30169 of 33242 relevant lines covered (90.76%)

367020.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.24
/packages/node-opcua-client/source/private/client_subscription_impl.ts
1
/**
2
 * @module node-opcua-client-private
3
 */
4
// tslint:disable:unified-signatures
5
import { EventEmitter } from "events";
6✔
6
import chalk from "chalk";
6✔
7

8
import { assert } from "node-opcua-assert";
6✔
9
import { AttributeIds } from "node-opcua-data-model";
6✔
10
import { checkDebugFlag, make_debugLog, make_warningLog } from "node-opcua-debug";
6✔
11
import { resolveNodeId } from "node-opcua-nodeid";
6✔
12

13
import { ReadValueIdOptions, TimestampsToReturn } from "node-opcua-service-read";
14
import {
6✔
15
    CreateSubscriptionRequest,
16
    CreateSubscriptionResponse,
17
    DataChangeNotification,
18
    DeleteMonitoredItemsResponse,
19
    DeleteSubscriptionsResponse,
20
    MonitoringParametersOptions,
21
    NotificationMessage,
22
    StatusChangeNotification,
23
    NotificationData,
24
    EventNotificationList,
25
    SetTriggeringResponse,
26
    SetTriggeringRequest,
27
    MonitoringMode,
28
    ModifySubscriptionRequestOptions,
29
    ModifySubscriptionResponse
30
} from "node-opcua-service-subscription";
31

32
import { StatusCode, StatusCodes } from "node-opcua-status-code";
6✔
33
import { Callback, ErrorCallback } from "node-opcua-status-code";
34
import { isNullOrUndefined } from "node-opcua-utils";
6✔
35
import { promoteOpaqueStructureInNotificationData } from "node-opcua-client-dynamic-extension-object";
6✔
36

37
import { ClientMonitoredItemBase } from "../client_monitored_item_base";
38
import { ClientMonitoredItemGroup } from "../client_monitored_item_group";
39
import { ClientSession, MonitoredItemData, SubscriptionId } from "../client_session";
40
import {
6✔
41
    ClientHandle,
42
    ClientMonitoredItemBaseMap,
43
    ClientSubscription,
44
    ClientSubscriptionOptions,
45
    ModifySubscriptionOptions,
46
    ModifySubscriptionResult
47
} from "../client_subscription";
48
import { ClientMonitoredItem } from "../client_monitored_item";
49
import { ClientMonitoredItemToolbox } from "../client_monitored_item_toolbox";
6✔
50
import { ClientMonitoredItemGroupImpl } from "./client_monitored_item_group_impl";
6✔
51
import { ClientMonitoredItemImpl } from "./client_monitored_item_impl";
6✔
52
import { ClientSidePublishEngine } from "./client_publish_engine";
53
import { ClientSessionImpl } from "./client_session_impl";
54
import { detectLongOperation } from "./performance";
6✔
55

56
const debugLog = make_debugLog("CLIENT_SUBSCRIPTION");
6✔
57
const doDebug = checkDebugFlag("CLIENT_SUBSCRIPTION");
6✔
58
const warningLog = make_warningLog("CLIENT_SUBSCRIPTION");
6✔
59

60
export const PENDING_SUBSCRIPTION_ID = 0xc0cac01a;
6✔
61
export const TERMINATED_SUBSCRIPTION_ID = 0xc0cac01b;
6✔
62
export const TERMINATING_SUBSCRIPTION_ID = 0xc0cac01c;
6✔
63

64
const minimumMaxKeepAliveCount = 3;
6✔
65

66
function displayKeepAliveWarning(sessionTimeout: number, maxKeepAliveCount: number, publishingInterval: number): boolean {
67
    const keepAliveInterval = maxKeepAliveCount * publishingInterval;
821✔
68

69
    // istanbul ignore next
70
    if (sessionTimeout < keepAliveInterval) {
71
        warningLog(
72
            chalk.yellowBright(
73
                `[NODE-OPCUA-W09] The subscription parameters are not compatible with the session timeout !
74
                  session timeout    = ${sessionTimeout}  milliseconds
75
                  maxKeepAliveCount  = ${maxKeepAliveCount}
76
                  publishingInterval = ${publishingInterval} milliseconds"
77

78
                  It is important that the session timeout    ( ${chalk.red(sessionTimeout)} ms) is largely greater than :
79
                      (maxKeepAliveCount*publishingInterval  =  ${chalk.red(keepAliveInterval)} ms),
80
                  otherwise you may experience unexpected disconnection from the server if your monitored items are not
81
                  changing frequently.`
82
            )
83
        );
84

85
        if (sessionTimeout < 3000 && publishingInterval <= 1000) {
86
            warningLog(`[NODE-OPCUA-W10] You'll need to increase your sessionTimeout significantly.`);
87
        }
88
        if (
89
            sessionTimeout >= 3000 &&
90
            sessionTimeout < publishingInterval * minimumMaxKeepAliveCount &&
91
            maxKeepAliveCount <= minimumMaxKeepAliveCount + 2
92
        ) {
93
            warningLog(`[NODE-OPCUA-W11] your publishingInterval interval is probably too large, consider reducing it.`);
94
        }
95

96
        const idealMaxKeepAliveCount = Math.max(4, Math.floor((sessionTimeout * 0.8) / publishingInterval - 0.5));
97
        const idealPublishingInternal = Math.min(publishingInterval, sessionTimeout / (idealMaxKeepAliveCount + 3));
98
        const idealKeepAliveInterval = idealMaxKeepAliveCount * publishingInterval;
99
        warningLog(
100
            `[NODE-OPCUA-W12]  An ideal value for maxKeepAliveCount could be ${idealMaxKeepAliveCount}.
101
                  An ideal value for publishingInterval could be ${idealPublishingInternal} ms.
102
                  This will make  your subscription emit a keep alive signal every ${idealKeepAliveInterval} ms
103
                  if no monitored items are generating notifications.
104
                  for instance:
105
                    const  client = OPCUAClient.create({
106
                        requestedSessionTimeout: 30* 60* 1000, // 30 minutes
107
                    });
108
`
109
        );
110

111
        if (!ClientSubscription.ignoreNextWarning) {
112
            throw new Error("[NODE-OPCUA-W09] The subscription parameters are not compatible with the session timeout ");
113
        }
114
        return true;
115
    }
116
    return false;
817✔
117
}
118

119
export class ClientSubscriptionImpl extends EventEmitter implements ClientSubscription {
6✔
120
    /**
121
     * the associated session
122
     * @property session
123
     * @type {ClientSession}
124
     */
125
    public get session(): ClientSessionImpl {
126
        assert(this.publishEngine.session, "expecting a valid session here");
17,696✔
127
        return this.publishEngine.session! as ClientSessionImpl;
17,696✔
128
    }
129
    public get hasSession(): boolean {
130
        return !!this.publishEngine.session;
1,563✔
131
    }
132
    public get isActive(): boolean {
133
        return !(
14,697✔
134
            this.subscriptionId === PENDING_SUBSCRIPTION_ID ||
44,091✔
135
            this.subscriptionId === TERMINATED_SUBSCRIPTION_ID ||
136
            this.subscriptionId === TERMINATING_SUBSCRIPTION_ID
137
        );
138
    }
139

140
    public subscriptionId: SubscriptionId;
141
    public publishingInterval: number;
142
    public lifetimeCount: number;
143
    public maxKeepAliveCount: number;
144
    public maxNotificationsPerPublish: number;
145
    public publishingEnabled: boolean;
146
    public priority: number;
147
    public monitoredItems: ClientMonitoredItemBaseMap;
148
    public monitoredItemGroups: ClientMonitoredItemGroup[] = [];
407✔
149

150
    public timeoutHint = 0;
407✔
151
    public publishEngine: ClientSidePublishEngine;
152

153
    public lastSequenceNumber: number;
154
    private _nextClientHandle = 0;
407✔
155
    public hasTimedOut: boolean;
156

157
    constructor(session: ClientSession, options: ClientSubscriptionOptions) {
158
        super();
407✔
159

160
        const sessionImpl = session as ClientSessionImpl;
407✔
161
        this.publishEngine = sessionImpl.getPublishEngine();
407✔
162

163
        this.lastSequenceNumber = -1;
407✔
164

165
        options = options || {};
407!
166
        options.requestedPublishingInterval = options.requestedPublishingInterval || 100;
407!
167
        options.requestedLifetimeCount = options.requestedLifetimeCount || 60;
407✔
168
        options.requestedMaxKeepAliveCount = options.requestedMaxKeepAliveCount || 10;
407✔
169
        options.requestedMaxKeepAliveCount = Math.max(options.requestedMaxKeepAliveCount, minimumMaxKeepAliveCount);
407✔
170

171
        // perform some verification
172
        const warningEmitted = displayKeepAliveWarning(
407✔
173
            session.timeout,
174
            options.requestedMaxKeepAliveCount,
175
            options.requestedPublishingInterval
176
        );
177
        // istanbul ignore next
178
        if (warningEmitted) {
179
            warningLog(
180
                JSON.stringify(
181
                    {
182
                        ...options
183
                    },
184
                    null,
185
                    " "
186
                )
187
            );
188
        }
189

190
        options.maxNotificationsPerPublish = isNullOrUndefined(options.maxNotificationsPerPublish)
406✔
191
            ? 0
192
            : options.maxNotificationsPerPublish;
193

194
        options.publishingEnabled = !!options.publishingEnabled;
406✔
195
        options.priority = options.priority || 1;
406✔
196

197
        this.publishingInterval = options.requestedPublishingInterval;
406✔
198
        this.lifetimeCount = options.requestedLifetimeCount;
406✔
199
        this.maxKeepAliveCount = options.requestedMaxKeepAliveCount;
406✔
200
        this.maxNotificationsPerPublish = options.maxNotificationsPerPublish || 0;
406✔
201
        this.publishingEnabled = options.publishingEnabled === undefined ? true : options.publishingEnabled;
406!
202
        this.priority = options.priority;
406✔
203

204
        this.subscriptionId = PENDING_SUBSCRIPTION_ID;
406✔
205

206
        this._nextClientHandle = 0;
406✔
207
        this.monitoredItems = {};
406✔
208

209
        /**
210
         * set to True when the server has notified us that this subscription has timed out
211
         * ( maxLifeCounter x published interval without being able to process a PublishRequest
212
         * @property hasTimedOut
213
         * @type {boolean}
214
         */
215
        this.hasTimedOut = false;
406✔
216

217
        setImmediate(() => {
406✔
218
            __create_subscription(this, (err?: Error) => {
406✔
219
                if (!err) {
406✔
220
                    setImmediate(() => {
400✔
221
                        /**
222
                         * notify the observers that the subscription has now started
223
                         * @event started
224
                         */
225
                        this.emit("started", this.subscriptionId);
400✔
226
                    });
227
                } else {
228
                    setImmediate(() => {
6✔
229
                        /**
230
                         * notify the observers that the subscription has now failed
231
                         * @event failed
232
                         */
233
                        this.emit("error", err);
6✔
234
                    });
235
                }
236
            });
237
        });
238
    }
239

240
    public terminate(...args: any[]): any {
241
        debugLog("Terminating client subscription ", this.subscriptionId);
298✔
242
        const callback = args[0];
298✔
243
        assert(typeof callback === "function", "expecting a callback function");
298✔
244

245
        if (this.subscriptionId === TERMINATED_SUBSCRIPTION_ID || this.subscriptionId === TERMINATING_SUBSCRIPTION_ID) {
298✔
246
            // already terminated... just ignore
247
            return callback();
1✔
248
        }
249

250
        if (isFinite(this.subscriptionId)) {
297!
251
            const subscriptionId = this.subscriptionId;
297✔
252
            this.subscriptionId = TERMINATING_SUBSCRIPTION_ID;
297✔
253
            this.publishEngine.unregisterSubscription(subscriptionId);
297✔
254

255
            if (!this.hasSession) {
297✔
256
                return this._terminate_step2(callback);
15✔
257
            }
258
            const session = this.session;
282✔
259
            if (!session) {
282!
260
                return callback(new Error("no session"));
×
261
            }
262
            session.deleteSubscriptions(
282✔
263
                {
264
                    subscriptionIds: [subscriptionId]
265
                },
266
                (err: Error | null, response?: DeleteSubscriptionsResponse) => {
267
                    if (response && response!.results![0] !== StatusCodes.Good) {
282✔
268
                        debugLog("warning: deleteSubscription returned ", response.results);
3✔
269
                    }
270
                    if (err) {
282!
271
                        /**
272
                         * notify the observers that an error has occurred
273
                         * @event internal_error
274
                         * @param err the error
275
                         */
UNCOV
276
                        this.emit("internal_error", err);
×
277
                    }
278
                    this._terminate_step2(callback);
282✔
279
                }
280
            );
281
        } else {
282
            debugLog("subscriptionId is not value ", this.subscriptionId);
×
283
            assert(this.subscriptionId === PENDING_SUBSCRIPTION_ID);
×
284
            this._terminate_step2(callback);
×
285
        }
286
    }
287

288
    /**
289

290
     */
291
    public nextClientHandle(): number {
292
        this._nextClientHandle += 1;
14,663✔
293
        return this._nextClientHandle;
14,663✔
294
    }
295

296
    public async monitor(
297
        itemToMonitor: ReadValueIdOptions,
298
        requestedParameters: MonitoringParametersOptions,
299
        timestampsToReturn: TimestampsToReturn,
300
        monitoringMode: MonitoringMode
301
    ): Promise<ClientMonitoredItemBase>;
302
    public monitor(
303
        itemToMonitor: ReadValueIdOptions,
304
        requestedParameters: MonitoringParametersOptions,
305
        timestampsToReturn: TimestampsToReturn,
306
        monitoringMode: MonitoringMode,
307
        done: Callback<ClientMonitoredItemBase>
308
    ): void;
309
    public monitor(...args: any[]): any {
310
        const itemToMonitor = args[0] as ReadValueIdOptions;
634✔
311
        const requestedParameters = args[1] as MonitoringParametersOptions;
634✔
312
        const timestampsToReturn = args[2] as TimestampsToReturn;
634✔
313
        const monitoringMode = typeof args[3] === "function" ? MonitoringMode.Reporting : (args[3] as MonitoringMode);
634✔
314
        const done = (typeof args[3] === "function" ? args[3] : args[4]) as Callback<ClientMonitoredItemBase>;
634✔
315

316
        assert(typeof done === "function", "expecting a function here");
634✔
317

318
        itemToMonitor.nodeId = resolveNodeId(itemToMonitor.nodeId!);
634✔
319

320
        const monitoredItem = ClientMonitoredItem_create(
634✔
321
            this,
322
            itemToMonitor,
323
            requestedParameters,
324
            timestampsToReturn,
325
            monitoringMode,
326
            (err1?: Error | null, monitoredItem2?: ClientMonitoredItem) => {
327
                if (err1) {
634✔
328
                    return done && done(err1);
3✔
329
                }
330
                done(err1 || null, monitoredItem);
631✔
331
            }
332
        );
333
    }
334

335
    public async monitorItems(
336
        itemsToMonitor: ReadValueIdOptions[],
337
        requestedParameters: MonitoringParametersOptions,
338
        timestampsToReturn: TimestampsToReturn
339
    ): Promise<ClientMonitoredItemGroup>;
340

341
    public monitorItems(
342
        itemsToMonitor: ReadValueIdOptions[],
343
        requestedParameters: MonitoringParametersOptions,
344
        timestampsToReturn: TimestampsToReturn,
345
        done: Callback<ClientMonitoredItemGroup>
346
    ): void;
347
    public monitorItems(...args: any[]): any {
348
        const itemsToMonitor = args[0] as ReadValueIdOptions[];
17✔
349
        const requestedParameters = args[1] as MonitoringParametersOptions;
17✔
350
        const timestampsToReturn = args[2] as TimestampsToReturn;
17✔
351
        const done = args[3] as Callback<ClientMonitoredItemGroup>;
17✔
352

353
        const monitoredItemGroup = new ClientMonitoredItemGroupImpl(this, itemsToMonitor, requestedParameters, timestampsToReturn);
17✔
354

355
        this._wait_for_subscription_to_be_ready((err?: Error) => {
17✔
356
            if (err) {
17!
357
                return done(err);
×
358
            }
359
            monitoredItemGroup._monitor((err1?: Error) => {
17✔
360
                if (err1) {
17✔
361
                    return done && done(err1);
1✔
362
                }
363
                done(err1!, monitoredItemGroup);
16✔
364
            });
365
        });
366
    }
367

368
    public _delete_monitored_items(monitoredItems: ClientMonitoredItemBase[], callback: ErrorCallback): void {
369
        assert(typeof callback === "function");
36✔
370
        assert(Array.isArray(monitoredItems));
36✔
371

372
        assert(this.isActive);
36✔
373

374
        for (const monitoredItem of monitoredItems) {
36✔
375
            this._remove(monitoredItem);
6,712✔
376
        }
377
        const session = this.session as ClientSessionImpl;
36✔
378
        session.deleteMonitoredItems(
36✔
379
            {
380
                monitoredItemIds: monitoredItems.map((monitoredItem) => monitoredItem.monitoredItemId),
6,712✔
381
                subscriptionId: this.subscriptionId
382
            },
383
            (err: Error | null, response?: DeleteMonitoredItemsResponse) => {
384
                callback(err!);
36✔
385
            }
386
        );
387
    }
388

389
    public async setPublishingMode(publishingEnabled: boolean): Promise<StatusCode>;
390
    public setPublishingMode(publishingEnabled: boolean, callback: Callback<StatusCode>): void;
391
    public setPublishingMode(...args: any[]): any {
392
        const publishingEnabled = args[0] as boolean;
2✔
393
        const callback = args[1] as Callback<StatusCode>;
2✔
394
        assert(typeof callback === "function");
2✔
395

396
        const session = this.session as ClientSessionImpl;
2✔
397
        if (!session) {
2!
398
            return callback(new Error("no session"));
×
399
        }
400
        const subscriptionId = this.subscriptionId as SubscriptionId;
2✔
401
        session.setPublishingMode(publishingEnabled, subscriptionId, (err: Error | null, statusCode?: StatusCode) => {
2✔
402
            if (err) {
2!
403
                return callback(err);
×
404
            }
405
            /* istanbul ignore next */
406
            if (!statusCode) {
407
                return callback(new Error("Internal Error"));
408
            }
409
            if (statusCode.isNotGood()) {
2!
410
                return callback(null, statusCode);
×
411
            }
412
            callback(null, StatusCodes.Good);
2✔
413
        });
414
    }
415

416
    /**
417
     *
418
     */
419
    public setTriggering(
420
        triggeringItem: ClientMonitoredItemBase,
421
        linksToAdd: ClientMonitoredItemBase[] | null,
422
        linksToRemove?: ClientMonitoredItemBase[] | null
423
    ): Promise<SetTriggeringResponse>;
424
    public setTriggering(
425
        triggeringItem: ClientMonitoredItemBase,
426
        linksToAdd: ClientMonitoredItemBase[] | null,
427
        linksToRemove: ClientMonitoredItemBase[] | null,
428
        callback: Callback<SetTriggeringResponse>
429
    ): void;
430
    public setTriggering(...args: any[]): any {
431
        const triggeringItem = args[0] as ClientMonitoredItemBase;
4✔
432
        const linksToAdd = args[1] as ClientMonitoredItemBase[] | null;
4✔
433
        const linksToRemove = args[2] as ClientMonitoredItemBase[] | null;
4✔
434
        const callback = args[3] as Callback<SetTriggeringResponse>;
4✔
435
        assert(typeof callback === "function");
4✔
436
        const session = this.session as ClientSessionImpl;
4✔
437
        if (!session) {
4!
438
            return callback(new Error("no session"));
×
439
        }
440
        const subscriptionId = this.subscriptionId;
4✔
441

442
        const triggeringItemId = triggeringItem.monitoredItemId!;
4✔
443

444
        const setTriggeringRequest = new SetTriggeringRequest({
4✔
445
            linksToAdd: linksToAdd ? linksToAdd.map((i) => i.monitoredItemId!) : null,
4!
446
            linksToRemove: linksToRemove ? linksToRemove.map((i) => i.monitoredItemId!) : null,
1✔
447
            subscriptionId,
448
            triggeringItemId
449
        });
450
        session.setTriggering(setTriggeringRequest, (err: Error | null, response?: SetTriggeringResponse) => {
4✔
451
            if (err) {
4✔
452
                if (response) {
1!
453
                    // use soft error, no exceptions
454
                    return callback(null, response);
×
455
                } else {
456
                    return callback(err);
1✔
457
                }
458
            }
459
            // istanbul ignore next
460
            if (!response) {
461
                return callback(new Error("Internal Error"));
462
            }
463
            callback(null, response);
3✔
464
        });
465
    }
466

467
    // public subscription service
468
    public modify(options: ModifySubscriptionOptions, callback: Callback<ModifySubscriptionResult>): void;
469
    public modify(options: ModifySubscriptionOptions): Promise<ModifySubscriptionResult>;
470
    public modify(...args: any[]): any {
471
        const modifySubscriptionRequest = args[0] as ModifySubscriptionRequestOptions;
3✔
472
        const callback = args[1] as Callback<ModifySubscriptionResult>;
3✔
473
        const session = this.session as ClientSessionImpl;
3✔
474
        if (!session) {
3!
475
            return callback(new Error("no session"));
×
476
        }
477

478
        modifySubscriptionRequest.subscriptionId = this.subscriptionId;
3✔
479

480
        modifySubscriptionRequest.priority =
3✔
481
            modifySubscriptionRequest.priority === undefined ? this.priority : modifySubscriptionRequest.priority;
3!
482
        modifySubscriptionRequest.requestedLifetimeCount =
3✔
483
            modifySubscriptionRequest.requestedLifetimeCount === undefined
3✔
484
                ? this.lifetimeCount
485
                : modifySubscriptionRequest.requestedLifetimeCount;
486
        modifySubscriptionRequest.requestedMaxKeepAliveCount =
3✔
487
            modifySubscriptionRequest.requestedMaxKeepAliveCount === undefined
3✔
488
                ? this.maxKeepAliveCount
489
                : modifySubscriptionRequest.requestedMaxKeepAliveCount;
490
        modifySubscriptionRequest.requestedPublishingInterval =
3✔
491
            modifySubscriptionRequest.requestedPublishingInterval === undefined
3✔
492
                ? this.publishingInterval
493
                : modifySubscriptionRequest.requestedPublishingInterval;
494
        modifySubscriptionRequest.maxNotificationsPerPublish =
3✔
495
            modifySubscriptionRequest.maxNotificationsPerPublish === undefined
3!
496
                ? this.maxNotificationsPerPublish
497
                : modifySubscriptionRequest.maxNotificationsPerPublish;
498

499
        session.modifySubscription(modifySubscriptionRequest, (err: Error | null, response?: ModifySubscriptionResponse) => {
3✔
500
            if (err || !response) {
3!
501
                return callback(err);
×
502
            }
503
            this.publishingInterval = response.revisedPublishingInterval;
3✔
504
            this.lifetimeCount = response.revisedLifetimeCount;
3✔
505
            this.maxKeepAliveCount = response.revisedMaxKeepAliveCount;
3✔
506
            callback(null, response);
3✔
507
        });
508
    }
509

510
    public getMonitoredItems(): Promise<MonitoredItemData>;
511
    public getMonitoredItems(callback: Callback<MonitoredItemData>): void;
512
    public getMonitoredItems(...args: any[]): any {
513
        this.session.getMonitoredItems(this.subscriptionId, args[0]);
1✔
514
    }
515

516
    public toString(): string {
517
        let str = "";
×
518
        str += "subscriptionId      : " + this.subscriptionId + "\n";
×
519
        str += "publishingInterval  : " + this.publishingInterval + "\n";
×
520
        str += "lifetimeCount       : " + this.lifetimeCount + "\n";
×
521
        str += "maxKeepAliveCount   : " + this.maxKeepAliveCount + "\n";
×
522
        str += "hasTimedOut         : " + this.hasTimedOut + "\n";
×
523

524
        const timeToLive = this.lifetimeCount * this.publishingInterval;
×
525
        str += "(maxKeepAliveCount*publishingInterval: " + this.publishingInterval * this.maxKeepAliveCount + " ms)\n";
×
526
        str += "(maxLifetimeCount*publishingInterval: " + timeToLive + " ms)\n";
×
527

528
        const lastRequestSentTime = this.publishEngine.lastRequestSentTime;
×
529
        str += "lastRequestSentTime : " + lastRequestSentTime.toString() + "\n";
×
530
        const duration = Date.now() - lastRequestSentTime.getTime();
×
531
        const extra =
532
            duration - timeToLive > 0
×
533
                ? chalk.red(" expired since " + (duration - timeToLive) / 1000 + " seconds")
534
                : chalk.green(" valid for " + -(duration - timeToLive) / 1000 + " seconds");
535

536
        str += "timeSinceLast PR    : " + duration + "ms" + extra + "\n";
×
537
        str += "has expired         : " + (duration > timeToLive) + "\n";
×
538

539
        str += "(session timeout    : " + this.session.timeout + " ms)\n";
×
540

541
        return str;
×
542
    }
543

544
    /**
545
     * returns the approximated remaining life time of this subscription in milliseconds
546
     */
547
    public evaluateRemainingLifetime(): number {
548
        const now = Date.now();
×
549
        const timeout = this.publishingInterval * this.lifetimeCount;
×
550
        const lastRequestSentTime = this.publishEngine.lastRequestSentTime;
×
551
        const expiryTime = lastRequestSentTime.getTime() + timeout;
×
552
        return Math.max(0, expiryTime - now);
×
553
    }
554

555
    public _add_monitored_item(clientHandle: ClientHandle, monitoredItem: ClientMonitoredItemBase): void {
556
        assert(this.isActive, "subscription must be active and not terminated");
14,661✔
557
        assert(monitoredItem.monitoringParameters.clientHandle === clientHandle);
14,661✔
558
        this.monitoredItems[clientHandle] = monitoredItem;
14,661✔
559

560
        /**
561
         * notify the observers that a new monitored item has been added to the subscription.
562
         * @event item_added
563
         * @param the monitored item.
564
         */
565
        this.emit("item_added", monitoredItem);
14,661✔
566
    }
567

568
    public _add_monitored_items_group(monitoredItemGroup: ClientMonitoredItemGroupImpl): void {
569
        this.monitoredItemGroups.push(monitoredItemGroup);
25✔
570
    }
571

572
    public _wait_for_subscription_to_be_ready(done: ErrorCallback): void {
573
        let _watchDogCount = 0;
719✔
574

575
        const waitForSubscriptionAndMonitor = () => {
719✔
576
            _watchDogCount++;
138,147✔
577

578
            if (this.subscriptionId === PENDING_SUBSCRIPTION_ID) {
138,147✔
579
                // the subscriptionID is not yet known because the server hasn't replied yet
580
                // let postpone this call, a little bit, to let things happen
581
                setImmediate(waitForSubscriptionAndMonitor);
137,428✔
582
            } else if (this.subscriptionId === TERMINATED_SUBSCRIPTION_ID) {
719!
583
                // the subscription has been terminated in the meantime
584
                // this indicates a potential issue in the code using this api.
585
                if (typeof done === "function") {
×
586
                    done(new Error("subscription has been deleted"));
×
587
                }
588
            } else {
589
                done();
719✔
590
            }
591
        };
592

593
        setImmediate(waitForSubscriptionAndMonitor);
719✔
594
    }
595

596
    private __on_publish_response_DataChangeNotification(notification: DataChangeNotification) {
597
        assert(notification.schema.name === "DataChangeNotification");
663✔
598

599
        const monitoredItems = notification.monitoredItems || [];
663!
600

601
        let repeated = 0;
663✔
602
        for (const monitoredItem of monitoredItems) {
663✔
603
            const monitorItemObj = this.monitoredItems[monitoredItem.clientHandle];
17,126✔
604
            if (monitorItemObj) {
17,126!
605
                if (monitorItemObj.itemToMonitor.attributeId === AttributeIds.EventNotifier) {
17,126!
606
                    warningLog(
×
607
                        chalk.yellow("Warning"),
608
                        chalk.cyan(
609
                            " Server send a DataChangeNotification for an EventNotifier." + " EventNotificationList was expected"
610
                        )
611
                    );
612
                    warningLog(
×
613
                        chalk.cyan("         the Server may not be fully OPCUA compliant"),
614
                        chalk.yellow(". This notification will be ignored.")
615
                    );
616
                } else {
617
                    const monitoredItemImpl = monitorItemObj as ClientMonitoredItemImpl;
17,126✔
618
                    monitoredItemImpl._notify_value_change(monitoredItem.value);
17,126✔
619
                }
620
            } else {
621
                repeated += 1;
×
622
                if (repeated === 1) {
×
623
                    warningLog(
×
624
                        "Receiving a notification for a unknown monitoredItem with clientHandle ",
625
                        monitoredItem.clientHandle
626
                    );
627
                }
628
            }
629
        }
630
        // istanbul ignore next
631
        if (repeated > 1) {
632
            warningLog("previous message repeated", repeated, "times");
633
        }
634
    }
635

636
    private __on_publish_response_StatusChangeNotification(notification: StatusChangeNotification) {
637
        assert(notification.schema.name === "StatusChangeNotification");
6✔
638

639
        debugLog("Client has received a Status Change Notification ", notification.status.toString());
6✔
640

641
        if (notification.status === StatusCodes.GoodSubscriptionTransferred) {
6✔
642
            // OPCUA UA Spec 1.0.3 : part 3 - page 82 - 5.13.7 TransferSubscriptions:
643
            // If the Server transfers the Subscription to the new Session, the Server shall issue
644
            // a StatusChangeNotification  notificationMessage with the status code
645
            // Good_SubscriptionTransferred to the old Session.
646
            debugLog("ClientSubscription#__on_publish_response_StatusChangeNotification : GoodSubscriptionTransferred");
5✔
647

648
            // may be it has been transferred after a reconnection.... in this case should do nothing about it
649
        }
650
        if (notification.status === StatusCodes.BadTimeout) {
6✔
651
            // the server tells use that the subscription has timed out ..
652
            // this mean that this subscription has been closed on the server side and cannot process any
653
            // new PublishRequest.
654
            //
655
            // from Spec OPCUA Version 1.03 Part 4 - 5.13.1.1 Description : Page 69:
656
            //
657
            // h. Subscriptions have a lifetime counter that counts the number of consecutive publishing cycles in
658
            //    which there have been no Publish requests available to send a Publish response for the
659
            //    Subscription. Any Service call that uses the SubscriptionId or the processing of a Publish
660
            //    response resets the lifetime counter of this Subscription. When this counter reaches the value
661
            //    calculated for the lifetime of a Subscription based on the MaxKeepAliveCount parameter in the
662
            //    CreateSubscription Service (5.13.2), the Subscription is closed. Closing the Subscription causes
663
            //    its MonitoredItems to be deleted. In addition the Server shall issue a StatusChangeNotification
664
            //    notificationMessage with the status code BadTimeout.
665
            //
666
            this.hasTimedOut = true;
1✔
667
            this.terminate(() => {
1✔
668
                /* empty */
669
            });
670
        }
671
        /**
672
         * notify the observers that the server has send a status changed notification (such as BadTimeout )
673
         * @event status_changed
674
         */
675
        this.emit("status_changed", notification.status, notification.diagnosticInfo);
6✔
676
    }
677

678
    private __on_publish_response_EventNotificationList(notification: EventNotificationList) {
679
        assert(notification.schema.name === "EventNotificationList");
50✔
680
        const events = notification.events || [];
50!
681
        for (const event of events) {
50✔
682
            const monitorItemObj = this.monitoredItems[event.clientHandle];
94✔
683
            assert(monitorItemObj, "Expecting a monitored item");
94✔
684

685
            const monitoredItemImpl = monitorItemObj as ClientMonitoredItemImpl;
94✔
686
            monitoredItemImpl._notify_event(event.eventFields || []);
94!
687
        }
688
    }
689

690
    public onNotificationMessage(notificationMessage: NotificationMessage): void {
691
        assert(Object.prototype.hasOwnProperty.call(notificationMessage, "sequenceNumber"));
1,265✔
692

693
        this.lastSequenceNumber = notificationMessage.sequenceNumber;
1,265✔
694

695
        this.emit("raw_notification", notificationMessage);
1,265✔
696

697
        const notificationData = (notificationMessage.notificationData || []) as NotificationData[];
1,265!
698

699
        if (notificationData.length === 0) {
1,265✔
700
            // this is a keep alive message
701
            debugLog(chalk.yellow("Client : received a keep alive notification from client"));
547✔
702
            /**
703
             * notify the observers that a keep alive Publish Response has been received from the server.
704
             * @event keepalive
705
             */
706
            this.emit("keepalive");
547✔
707
        } else {
708
            /**
709
             * notify the observers that some notifications has been received from the server in  a PublishResponse
710
             * each modified monitored Item
711
             * @event  received_notifications
712
             */
713
            this.emit("received_notifications", notificationMessage);
718✔
714
            // let publish a global event
715

716
            promoteOpaqueStructureInNotificationData(this.session, notificationData).then(() => {
718✔
717
                detectLongOperation(
718✔
718
                    () => {
719
                        // now process all notifications
720
                        for (const notification of notificationData) {
718✔
721
                            // istanbul ignore next
722
                            if (!notification) {
723
                                continue;
724
                            }
725

726
                            // DataChangeNotification / StatusChangeNotification / EventNotification
727
                            switch (notification.schema.name) {
719!
728
                                case "DataChangeNotification":
729
                                    // now inform each individual monitored item
730
                                    this.__on_publish_response_DataChangeNotification(notification as DataChangeNotification);
663✔
731
                                    break;
663✔
732
                                case "StatusChangeNotification":
733
                                    this.__on_publish_response_StatusChangeNotification(notification as StatusChangeNotification);
6✔
734
                                    break;
6✔
735
                                case "EventNotificationList":
736
                                    this.__on_publish_response_EventNotificationList(notification as EventNotificationList);
50✔
737
                                    break;
50✔
738
                                default:
739
                                    warningLog(" Invalid notification :", notification.toString());
×
740
                            }
741
                        }
742
                    },
743
                    (duration: number) => {
744
                        const s = (a: unknown) => {
5✔
745
                            const b = a as { $_slowNotifCount: number; $_maxDuration: number };
21✔
746
                            b.$_slowNotifCount = b.$_slowNotifCount || 0;
21✔
747
                            b.$_maxDuration = b.$_maxDuration || 0;
21✔
748
                            return b;
21✔
749
                        };
750
                        s(this).$_maxDuration = Math.max(s(this).$_maxDuration, duration);
5✔
751
                        if (s(this).$_slowNotifCount > 0 && s(this).$_slowNotifCount % 1000 !== 0) return;
5✔
752
                        s(this).$_slowNotifCount++;
1✔
753
                        warningLog(
1✔
754
                            `[NODE-OPCUA-W32]}: monitored.item event handler takes too much time : operation duration ${duration} ms [repeated ${
755
                                s(this).$_slowNotifCount
756
                            } times]\n         please ensure that your monitoredItem event handler is not blocking the event loop.`
757
                        );
758
                    }
759
                );
760
            });
761
        }
762
    }
763

764
    private _terminate_step2(callback: (err?: Error) => void) {
765
        const monitoredItems = Object.values(this.monitoredItems);
297✔
766
        for (const monitoredItem of monitoredItems) {
297✔
767
            this._remove(monitoredItem);
7,857✔
768
        }
769

770
        const monitoredItemGroups = this.monitoredItemGroups;
297✔
771
        for (const monitoredItemGroup of monitoredItemGroups) {
297✔
772
            this._removeGroup(monitoredItemGroup);
11✔
773
        }
774

775
        assert(Object.values(this.monitoredItems).length === 0);
297✔
776

777
        setImmediate(() => {
297✔
778
            /**
779
             * notify the observers that the client subscription has terminated
780
             * @event  terminated
781
             */
782
            this.subscriptionId = TERMINATED_SUBSCRIPTION_ID;
297✔
783
            this.emit("terminated");
297✔
784
            callback();
297✔
785
        });
786
    }
787

788
    private _remove(monitoredItem: ClientMonitoredItemBase) {
789
        const clientHandle = monitoredItem.monitoringParameters.clientHandle;
14,569✔
790
        assert(clientHandle > 0);
14,569✔
791
        if (!Object.prototype.hasOwnProperty.call(this.monitoredItems, clientHandle)) {
14,569✔
792
            return; // may be monitoredItem failed to be created  ....
3✔
793
        }
794
        assert(Object.prototype.hasOwnProperty.call(this.monitoredItems, clientHandle));
14,566✔
795

796
        const priv = monitoredItem as ClientMonitoredItemImpl;
14,566✔
797
        priv._terminate_and_emit();
14,566✔
798
    }
799

800
    public _removeGroup(monitoredItemGroup: ClientMonitoredItemGroup): void {
801
        (monitoredItemGroup as any)._terminate_and_emit();
25✔
802
        this.monitoredItemGroups = this.monitoredItemGroups.filter((obj) => obj !== monitoredItemGroup);
25✔
803
    }
804
    /**
805
     * @private
806
     * @param itemToMonitor
807
     * @param monitoringParameters
808
     * @param timestampsToReturn
809
     */
810
    public _createMonitoredItem(
811
        itemToMonitor: ReadValueIdOptions,
812
        monitoringParameters: MonitoringParametersOptions,
813
        timestampsToReturn: TimestampsToReturn,
814
        monitoringMode: MonitoringMode = MonitoringMode.Reporting
×
815
    ): ClientMonitoredItem {
816
        /* istanbul ignore next*/
817
        const monitoredItem = new ClientMonitoredItemImpl(
818
            this,
819
            itemToMonitor,
820
            monitoringParameters,
821
            timestampsToReturn,
822
            monitoringMode
823
        );
824
        return monitoredItem;
×
825
    }
826
}
827

828
export function ClientMonitoredItem_create(
6✔
829
    subscription: ClientSubscription,
830
    itemToMonitor: ReadValueIdOptions,
831
    monitoringParameters: MonitoringParametersOptions,
832
    timestampsToReturn: TimestampsToReturn,
833
    monitoringMode: MonitoringMode = MonitoringMode.Reporting,
×
834
    callback?: (err3?: Error | null, monitoredItem?: ClientMonitoredItem) => void
835
): ClientMonitoredItem {
836
    const monitoredItem = new ClientMonitoredItemImpl(
693✔
837
        subscription,
838
        itemToMonitor,
839
        monitoringParameters,
840
        timestampsToReturn,
841
        monitoringMode
842
    );
843

844
    setImmediate(() => {
693✔
845
        (subscription as ClientSubscriptionImpl)._wait_for_subscription_to_be_ready((err?: Error) => {
693✔
846
            if (err) {
693!
847
                if (callback) {
×
848
                    callback(err);
×
849
                }
850
                return;
×
851
            }
852
            ClientMonitoredItemToolbox._toolbox_monitor(subscription, timestampsToReturn, [monitoredItem], (err1?: Error) => {
693✔
853
                if (err1) {
693✔
854
                    monitoredItem._terminate_and_emit(err1);
5✔
855
                }
856
                if (callback) {
693✔
857
                    callback(err1, monitoredItem);
634✔
858
                }
859
            });
860
        });
861
    });
862
    return monitoredItem;
693✔
863
}
864
// tslint:disable:no-var-requires
865
// tslint:disable:max-line-length
866
import { withCallback } from "thenify-ex";
6✔
867
const opts = { multiArgs: false };
6✔
868

869
ClientSubscriptionImpl.prototype.setPublishingMode = withCallback(ClientSubscriptionImpl.prototype.setPublishingMode);
6✔
870
ClientSubscriptionImpl.prototype.monitor = withCallback(ClientSubscriptionImpl.prototype.monitor);
6✔
871
ClientSubscriptionImpl.prototype.monitorItems = withCallback(ClientSubscriptionImpl.prototype.monitorItems);
6✔
872
ClientSubscriptionImpl.prototype.setTriggering = withCallback(ClientSubscriptionImpl.prototype.setTriggering);
6✔
873
ClientSubscriptionImpl.prototype.modify = withCallback(ClientSubscriptionImpl.prototype.modify);
6✔
874
ClientSubscriptionImpl.prototype.terminate = withCallback(ClientSubscriptionImpl.prototype.terminate);
6✔
875
ClientSubscriptionImpl.prototype.getMonitoredItems = withCallback(ClientSubscriptionImpl.prototype.getMonitoredItems);
6✔
876

877
ClientSubscription.create = (clientSession: ClientSession, options: ClientSubscriptionOptions) => {
6✔
878
    return new ClientSubscriptionImpl(clientSession, options);
35✔
879
};
880

881
export function __create_subscription(subscription: ClientSubscriptionImpl, callback: ErrorCallback) {
6✔
882
    // istanbul ignore next
883
    if (!subscription.hasSession) {
884
        return callback(new Error("__create_subscription: subscription has no Session"));
885
    }
886
    const session = subscription.session;
420✔
887

888
    debugLog(chalk.yellow.bold("ClientSubscription created "));
420✔
889

890
    const request = new CreateSubscriptionRequest({
420✔
891
        maxNotificationsPerPublish: subscription.maxNotificationsPerPublish,
892
        priority: subscription.priority,
893
        publishingEnabled: subscription.publishingEnabled,
894
        requestedLifetimeCount: subscription.lifetimeCount,
895
        requestedMaxKeepAliveCount: subscription.maxKeepAliveCount,
896
        requestedPublishingInterval: subscription.publishingInterval
897
    });
898

899
    session.createSubscription(request, (err: Error | null, response?: CreateSubscriptionResponse) => {
420✔
900
        if (err) {
420✔
901
            /* istanbul ignore next */
902
            subscription.emit("internal_error", err);
903
            if (callback) {
6!
904
                return callback(err);
6✔
905
            }
906
            return;
×
907
        }
908

909
        /* istanbul ignore next */
910
        if (!response) {
911
            return callback(new Error("internal error"));
912
        }
913

914
        if (!subscription.hasSession) {
414!
915
            return callback(new Error("createSubscription has failed = > no session"));
×
916
        }
917
        assert(subscription.hasSession);
414✔
918

919
        subscription.subscriptionId = response.subscriptionId;
414✔
920
        subscription.publishingInterval = response.revisedPublishingInterval;
414✔
921
        subscription.lifetimeCount = response.revisedLifetimeCount;
414✔
922
        subscription.maxKeepAliveCount = response.revisedMaxKeepAliveCount;
414✔
923

924
        subscription.timeoutHint = Math.min((subscription.maxKeepAliveCount + 10) * subscription.publishingInterval * 2, 0x7ffff);
414✔
925

926
        displayKeepAliveWarning(subscription.session.timeout, subscription.maxKeepAliveCount, subscription.publishingInterval);
414✔
927
        ClientSubscription.ignoreNextWarning = false;
414✔
928

929
        // istanbul ignore next
930
        if (doDebug) {
931
            debugLog(chalk.yellow.bold("registering callback"));
932
            debugLog(chalk.yellow.bold("publishingInterval               "), subscription.publishingInterval);
933
            debugLog(chalk.yellow.bold("lifetimeCount                    "), subscription.lifetimeCount);
934
            debugLog(chalk.yellow.bold("maxKeepAliveCount                "), subscription.maxKeepAliveCount);
935
            debugLog(chalk.yellow.bold("publish request timeout hint =   "), subscription.timeoutHint);
936
            debugLog(chalk.yellow.bold("hasTimedOut                      "), subscription.hasTimedOut);
937
            debugLog(chalk.yellow.bold("timeoutHint for publish request  "), subscription.timeoutHint);
938
        }
939

940
        subscription.publishEngine.registerSubscription(subscription);
414✔
941

942
        if (callback) {
414!
943
            callback();
414✔
944
        }
945
    });
946
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc