• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

node-opcua / node-opcua / 23974043205

04 Apr 2026 07:17AM UTC coverage: 92.589% (+0.01%) from 92.576%
23974043205

push

github

erossignon
chore: fix Mocha.Suite.settimeout misused

18408 of 21832 branches covered (84.32%)

161708 of 174651 relevant lines covered (92.59%)

461089.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.08
/packages/node-opcua-server/source/server_subscription.ts
1
/**
337✔
2
 * @module node-opcua-server
2✔
3
 */
2✔
4
// tslint:disable:no-console
2✔
5

2✔
6
import { EventEmitter } from "node:events";
2✔
7
import chalk from "chalk";
2✔
8

2✔
9
import {
2✔
10
    type BaseNode,
2✔
11
    type Duration,
2✔
12
    type IAddressSpace,
2✔
13
    type ISessionContext,
2✔
14
    SessionContext,
2✔
15
    type UAMethod,
2✔
16
    type UAObject,
2✔
17
    type UAObjectType,
2✔
18
    type UAVariable
2✔
19
} from "node-opcua-address-space";
2✔
20
import { assert } from "node-opcua-assert";
2✔
21
import type { Byte, UInt32 } from "node-opcua-basic-types";
2✔
22
import { SubscriptionDiagnosticsDataType } from "node-opcua-common";
2✔
23
import { AttributeIds, isValidDataEncoding, NodeClass, type QualifiedNameLike } from "node-opcua-data-model";
2✔
24
import type { DataValue, TimestampsToReturn } from "node-opcua-data-value";
2✔
25
import { checkDebugFlag, make_debugLog, make_warningLog } from "node-opcua-debug";
2✔
26
import { NodeId } from "node-opcua-nodeid";
2✔
27
import type { NumericRange } from "node-opcua-numeric-range";
2✔
28
import { ObjectRegistry } from "node-opcua-object-registry";
2✔
29
import { SequenceNumberGenerator } from "node-opcua-secure-channel";
2✔
30
import { checkSelectClauses, EventFilter } from "node-opcua-service-filter";
2✔
31
import {
2✔
32
    AggregateFilter,
2✔
33
    DataChangeFilter,
2✔
34
    DataChangeNotification,
2✔
35
    EventNotificationList,
2✔
36
    MonitoredItemCreateRequest,
2✔
37
    MonitoredItemCreateResult,
2✔
38
    MonitoredItemNotification,
2✔
39
    MonitoringMode,
2✔
40
    NotificationMessage,
2✔
41
    PublishResponse,
2✔
42
    StatusChangeNotification
2✔
43
} from "node-opcua-service-subscription";
2✔
44
import { type StatusCode, StatusCodes } from "node-opcua-status-code";
2✔
45
import {
2✔
46
    AggregateFilterResult,
2✔
47
    ContentFilterResult,
2✔
48
    EventFieldList,
2✔
49
    EventFilterResult,
2✔
50
    type MonitoringFilter
2✔
51
} from "node-opcua-types";
2✔
52
import { type IServerSidePublishEngine, TransferredSubscription } from "./i_server_side_publish_engine";
2✔
53

2✔
54
import { MonitoredItem, type MonitoredItemOptions, type QueueItem } from "./monitored_item";
2✔
55
import { Queue } from "./queue";
2✔
56
import type { ServerSession } from "./server_session";
2✔
57
import { validateFilter } from "./validate_filter";
2✔
58

2✔
59
const debugLog = make_debugLog(__filename);
2✔
60
const doDebug = checkDebugFlag(__filename);
2✔
61
const warningLog = make_warningLog(__filename);
2✔
62
const maxNotificationMessagesInQueue = 100;
2✔
63

2✔
64
export interface SubscriptionDiagnosticsDataTypePriv extends SubscriptionDiagnosticsDataType {
2✔
65
    $subscription: Subscription;
2✔
66
}
2✔
67

2✔
68
export enum SubscriptionState {
2✔
69
    CLOSED = 1, // The Subscription has not yet been created or has terminated.
2✔
70
    CREATING = 2, // The Subscription is being created
2✔
71
    NORMAL = 3, // The Subscription is cyclically checking for Notifications from its MonitoredItems.
2✔
72
    // The keep-alive counter is not used in this state.
2✔
73
    LATE = 4, // The publishing timer has expired and there are Notifications available or a keep-alive Message is
2✔
74
    // ready to be sent, but there are no Publish requests queued. When in this state, the next Publish
2✔
75
    // request is processed when it is received. The keep-alive counter is not used in this state.
2✔
76
    KEEPALIVE = 5, // The Subscription is cyclically checking for Notification
2✔
77
    // alive counter to count down to 0 from its maximum.
2✔
78
    TERMINATED = 6
2✔
79
}
2✔
80

2✔
81
function _adjust_publishing_interval(publishingInterval?: number): number {
578✔
82
    publishingInterval =
578✔
83
        publishingInterval === undefined || Number.isNaN(publishingInterval)
578✔
84
            ? Subscription.defaultPublishingInterval
444✔
85
            : publishingInterval;
575✔
86
    publishingInterval = Math.max(publishingInterval, Subscription.minimumPublishingInterval);
578✔
87
    publishingInterval = Math.min(publishingInterval, Subscription.maximumPublishingInterval);
578✔
88
    return publishingInterval;
578✔
89
}
578✔
90

2✔
91
const minimumMaxKeepAliveCount = 2;
2✔
92
const maximumMaxKeepAliveCount = 12000;
2✔
93

2✔
94
function _adjust_maxKeepAliveCount(maxKeepAliveCount?: number /*,publishingInterval*/): number {
578✔
95
    maxKeepAliveCount = maxKeepAliveCount || minimumMaxKeepAliveCount;
578✔
96
    maxKeepAliveCount = Math.max(maxKeepAliveCount, minimumMaxKeepAliveCount);
578✔
97
    maxKeepAliveCount = Math.min(maxKeepAliveCount, maximumMaxKeepAliveCount);
578✔
98
    return maxKeepAliveCount;
578✔
99
}
578✔
100

2✔
101
const MaxUint32 = 0xffffffff;
2✔
102

2✔
103
function _adjust_lifeTimeCount(lifeTimeCount: number, maxKeepAliveCount: number, publishingInterval: number): number {
578✔
104
    lifeTimeCount = lifeTimeCount || 1;
578✔
105

578✔
106
    const minTicks = Math.ceil(Subscription.minimumLifetimeDuration / publishingInterval);
578✔
107
    const maxTicks = Math.floor(Subscription.maximumLifetimeDuration / publishingInterval);
578✔
108

578✔
109
    lifeTimeCount = Math.max(minTicks, lifeTimeCount);
578✔
110
    lifeTimeCount = Math.min(maxTicks, lifeTimeCount);
578✔
111

578✔
112
    // let's make sure that lifeTimeCount is at least three time maxKeepAliveCount
578✔
113
    // Note : the specs say ( part 3  - CreateSubscriptionParameter )
578✔
114
    //        "The lifetime count shall be a minimum of three times the keep keep-alive count."
578✔
115
    lifeTimeCount = Math.max(lifeTimeCount, Math.min(maxKeepAliveCount * 3, MaxUint32));
578✔
116

578✔
117
    return lifeTimeCount;
578✔
118
}
578✔
119

2✔
120
function _adjust_publishingEnable(publishingEnabled?: boolean | null): boolean {
570✔
121
    return publishingEnabled === null || publishingEnabled === undefined ? true : !!publishingEnabled;
570✔
122
}
570✔
123

2✔
124
function _adjust_maxNotificationsPerPublish(maxNotificationsPerPublish?: number): number {
578✔
125
    assert(Subscription.maxNotificationPerPublishHighLimit > 0, "Subscription.maxNotificationPerPublishHighLimit must be positive");
578✔
126

578✔
127
    maxNotificationsPerPublish = maxNotificationsPerPublish || 0;
578✔
128
    assert(typeof maxNotificationsPerPublish === "number");
578✔
129

578✔
130
    // must be strictly positive
578✔
131
    maxNotificationsPerPublish = maxNotificationsPerPublish >= 0 ? maxNotificationsPerPublish : 0;
578!
132

578✔
133
    if (maxNotificationsPerPublish === 0) {
578✔
134
        // if zero then => use our HighLimit
156✔
135
        maxNotificationsPerPublish = Subscription.maxNotificationPerPublishHighLimit;
156✔
136
    } else {
578✔
137
        // if not zero then should be capped by maxNotificationPerPublishHighLimit
422✔
138
        maxNotificationsPerPublish = Math.min(Subscription.maxNotificationPerPublishHighLimit, maxNotificationsPerPublish);
422✔
139
    }
422✔
140

578✔
141
    assert(maxNotificationsPerPublish !== 0 && maxNotificationsPerPublish <= Subscription.maxNotificationPerPublishHighLimit);
578✔
142
    return maxNotificationsPerPublish;
578✔
143
}
578✔
144

2✔
145
function w(s: string | number, length: number): string {
×
146
    return `000${s}`.padStart(length);
×
147
}
×
148

2✔
149
function t(d: Date): string {
×
150
    return `${w(d.getHours(), 2)}:${w(d.getMinutes(), 2)}:${w(d.getSeconds(), 2)}:${w(d.getMilliseconds(), 3)}`;
×
151
}
×
152

2✔
153
function _getSequenceNumbers(arr: NotificationMessage[]): number[] {
1,546✔
154
    return arr.map((notificationMessage) => notificationMessage.sequenceNumber);
1,546✔
155
}
1,546✔
156

2✔
157
function analyzeEventFilterResult(node: BaseNode, eventFilter: EventFilter): EventFilterResult {
20✔
158
    /* c8 ignore next */
2✔
159
    if (!(eventFilter instanceof EventFilter)) {
2✔
160
        throw new Error("Internal Error");
×
161
    }
×
162

20✔
163
    const selectClauseResults = checkSelectClauses(node as UAObjectType, eventFilter.selectClauses || []);
20!
164

20✔
165
    const whereClauseResult = new ContentFilterResult();
20✔
166

20✔
167
    return new EventFilterResult({
20✔
168
        selectClauseDiagnosticInfos: [],
20✔
169
        selectClauseResults,
20✔
170
        whereClauseResult
20✔
171
    });
20✔
172
}
20✔
173

2✔
174
function analyzeDataChangeFilterResult(_node: BaseNode, dataChangeFilter: DataChangeFilter): null {
39✔
175
    assert(dataChangeFilter instanceof DataChangeFilter);
39✔
176
    // the opcua specification doesn't provide dataChangeFilterResult
39✔
177
    return null;
39✔
178
}
39✔
179

2✔
180
function analyzeAggregateFilterResult(_node: BaseNode, aggregateFilter: AggregateFilter): AggregateFilterResult {
×
181
    assert(aggregateFilter instanceof AggregateFilter);
×
182
    return new AggregateFilterResult({});
×
183
}
×
184

2✔
185
function _process_filter(node: BaseNode, filter: MonitoringFilter | null): EventFilterResult | AggregateFilterResult | null {
14,778✔
186
    if (!filter) {
14,778✔
187
        return null;
14,719✔
188
    }
14,719✔
189

123✔
190
    if (filter instanceof EventFilter) {
14,778✔
191
        return analyzeEventFilterResult(node, filter);
20✔
192
    } else if (filter instanceof DataChangeFilter) {
14,778✔
193
        return analyzeDataChangeFilterResult(node, filter);
39✔
194
    } else if (filter instanceof AggregateFilter) {
39!
195
        return analyzeAggregateFilterResult(node, filter);
×
196
    }
×
197
    // c8 ignore next
92✔
198
    throw new Error("invalid filter");
92!
199
}
92✔
200

2✔
201
/**
2✔
202
 * @private
2✔
203
 */
2✔
204
function createSubscriptionDiagnostics(subscription: Subscription): SubscriptionDiagnosticsDataTypePriv {
570✔
205
    assert(subscription instanceof Subscription);
570✔
206

570✔
207
    const subscriptionDiagnostics = new SubscriptionDiagnosticsDataType({});
570✔
208

570✔
209
    const sd = subscriptionDiagnostics as SubscriptionDiagnosticsDataTypePriv;
570✔
210
    sd.$subscription = subscription;
570✔
211

570✔
212
    const defineGetter = <T>(name: string, getter: (this: SubscriptionDiagnosticsDataTypePriv) => T) => {
570✔
213
        Object.defineProperty(sd, name, { get: getter, configurable: true });
7,410✔
214
    };
2,202✔
215

570✔
216
    // "sessionId"
570✔
217
    defineGetter("sessionId", function (this: SubscriptionDiagnosticsDataTypePriv): NodeId {
570✔
218
        if (!this.$subscription) {
996!
219
            return NodeId.nullNodeId;
×
220
        }
×
221
        return this.$subscription.getSessionId();
996✔
222
    });
570✔
223
    defineGetter("subscriptionId", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
224
        if (!this.$subscription) {
5,599!
225
            return 0;
×
226
        }
×
227
        return this.$subscription.id;
5,599✔
228
    });
570✔
229
    defineGetter("priority", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
230
        if (!this.$subscription) {
996!
231
            return 0;
×
232
        }
×
233
        return this.$subscription.priority;
996✔
234
    });
570✔
235
    defineGetter("publishingInterval", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
236
        if (!this.$subscription) {
996!
237
            return 0;
×
238
        }
×
239
        return this.$subscription.publishingInterval;
996✔
240
    });
570✔
241
    defineGetter("maxLifetimeCount", function (this: SubscriptionDiagnosticsDataTypePriv) {
570✔
242
        return this.$subscription.lifeTimeCount;
996✔
243
    });
570✔
244
    defineGetter("maxKeepAliveCount", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
245
        if (!this.$subscription) {
996!
246
            return 0;
×
247
        }
×
248
        return this.$subscription.maxKeepAliveCount;
996✔
249
    });
570✔
250
    defineGetter("maxNotificationsPerPublish", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
251
        if (!this.$subscription) {
996!
252
            return 0;
×
253
        }
×
254
        return this.$subscription.maxNotificationsPerPublish;
996✔
255
    });
570✔
256
    defineGetter("publishingEnabled", function (this: SubscriptionDiagnosticsDataTypePriv): boolean {
570✔
257
        if (!this.$subscription) {
996!
258
            return false;
×
259
        }
×
260
        return this.$subscription.publishingEnabled;
996✔
261
    });
570✔
262
    defineGetter("monitoredItemCount", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
263
        if (!this.$subscription) {
1,004!
264
            return 0;
×
265
        }
×
266
        return this.$subscription.monitoredItemCount;
1,004✔
267
    });
570✔
268
    defineGetter("nextSequenceNumber", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
269
        if (!this.$subscription) {
996!
270
            return 0;
×
271
        }
×
272
        return this.$subscription.futureSequenceNumber;
996✔
273
    });
570✔
274
    defineGetter("disabledMonitoredItemCount", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
275
        if (!this.$subscription) {
998!
276
            return 0;
×
277
        }
×
278
        return this.$subscription.disabledMonitoredItemCount;
998✔
279
    });
570✔
280

570✔
281
    /* those member of self.subscriptionDiagnostics are handled directly
570✔
282

570✔
283
   modifyCount
570✔
284
   enableCount,
570✔
285
   disableCount,
570✔
286
   republishRequestCount,
570✔
287
   notificationsCount,
570✔
288
   publishRequestCount,
570✔
289
   dataChangeNotificationsCount,
570✔
290
   eventNotificationsCount,
570✔
291
  */
570✔
292

570✔
293
    /*
570✔
294
   those members are not updated yet in the code :
570✔
295
   "republishMessageRequestCount",
570✔
296
   "republishMessageCount",
570✔
297
   "transferRequestCount",
570✔
298
   "transferredToAltClientCount",
570✔
299
   "transferredToSameClientCount",
570✔
300
   "latePublishRequestCount",
570✔
301
   "unacknowledgedMessageCount",
570✔
302
   "discardedMessageCount",
570✔
303
   "monitoringQueueOverflowCount",
570✔
304
   "eventQueueOverflowCount"
570✔
305
   */
570✔
306
    defineGetter("currentKeepAliveCount", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
307
        if (!this.$subscription) {
995!
308
            return 0;
×
309
        }
×
310
        return this.$subscription.currentKeepAliveCount;
995✔
311
    });
570✔
312
    defineGetter("currentLifetimeCount", function (this: SubscriptionDiagnosticsDataTypePriv): number {
570✔
313
        if (!this.$subscription) {
995!
314
            return 0;
×
315
        }
×
316
        return this.$subscription.currentLifetimeCount;
995✔
317
    });
570✔
318
    // add object in Variable SubscriptionDiagnosticArray (i=2290) ( Array of SubscriptionDiagnostics)
570✔
319
    // add properties in Variable to reflect
570✔
320
    return subscriptionDiagnostics as SubscriptionDiagnosticsDataTypePriv;
570✔
321
}
570✔
322

2✔
323
interface IGlobalMonitoredItemCounter {
2✔
324
    totalMonitoredItemCount: number;
2✔
325
}
2✔
326

2✔
327
export interface SubscriptionOptions {
2✔
328
    sessionId?: NodeId;
2✔
329
    /**
2✔
330
     * (default:1000) the publishing interval.
2✔
331
     */
2✔
332
    publishingInterval?: number;
2✔
333
    /**
2✔
334
     * (default:10) the max Life Time Count
2✔
335
     */
2✔
336
    maxKeepAliveCount?: number;
2✔
337

2✔
338
    lifeTimeCount?: number;
2✔
339
    /**
2✔
340
     * (default:true)
2✔
341
     */
2✔
342
    publishingEnabled?: boolean;
2✔
343
    /**
2✔
344
     * (default:0)
2✔
345
     */
2✔
346
    maxNotificationsPerPublish?: number;
2✔
347
    /**
2✔
348
     * subscription priority Byte:(0-255)
2✔
349
     */
2✔
350
    priority?: number;
2✔
351

2✔
352
    publishEngine?: IServerSidePublishEngine;
2✔
353
    /**
2✔
354
     *  a unique identifier
2✔
355
     */
2✔
356
    id?: number;
2✔
357

2✔
358
    serverCapabilities: ServerCapabilitiesPartial;
2✔
359
    globalCounter: IGlobalMonitoredItemCounter;
2✔
360
}
2✔
361

2✔
362
let g_monitoredItemId = Math.ceil(Math.random() * 100000);
2✔
363

2✔
364
function getNextMonitoredItemId() {
14,778✔
365
    return g_monitoredItemId++;
14,778✔
366
}
14,778✔
367

2✔
368
// function myFilter<T>(t1: any, chunk: any[]): T[] {
2✔
369
//     return chunk.filter(filter_instanceof.bind(null, t1));
2✔
370
// }
2✔
371

2✔
372
// function makeNotificationData(notifications_chunk: QueueItem): NotificationData {
2✔
373
//     const dataChangedNotificationData = myFilter<MonitoredItemNotification>(MonitoredItemNotification, notifications_chunk);
2✔
374
//     const eventNotificationListData = myFilter<EventFieldList>(EventFieldList, notifications_chunk);
2✔
375

2✔
376
//     assert(notifications_chunk.length === dataChangedNotificationData.length + eventNotificationListData.length);
2✔
377

2✔
378
//     const notifications: (DataChangeNotification | EventNotificationList)[] = [];
2✔
379

2✔
380
//     // add dataChangeNotification
2✔
381
//     if (dataChangedNotificationData.length) {
2✔
382
//         const dataChangeNotification = new DataChangeNotification({
2✔
383
//             diagnosticInfos: [],
2✔
384
//             monitoredItems: dataChangedNotificationData
2✔
385
//         });
2✔
386
//         notifications.push(dataChangeNotification);
2✔
387
//     }
2✔
388

2✔
389
//     // add dataChangeNotification
2✔
390
//     if (eventNotificationListData.length) {
2✔
391
//         const eventNotificationList = new EventNotificationList({
2✔
392
//             events: eventNotificationListData
2✔
393
//         });
2✔
394
//         notifications.push(eventNotificationList);
2✔
395
//     }
2✔
396
//     return notifications.length === 0 ? null : notifications;
2✔
397
// }
2✔
398
const INVALID_ID = -1;
2✔
399

2✔
400
export type Notification = DataChangeNotification | EventNotificationList | StatusChangeNotification;
2✔
401
export type Counter = number;
2✔
402

2✔
403
export interface ModifySubscriptionParameters {
2✔
404
    /**
2✔
405
     *     requestedPublishingInterval =0 means fastest possible
2✔
406
     */
2✔
407
    requestedPublishingInterval?: Duration;
2✔
408
    /*
2✔
409
     * requestedLifetimeCount=0 means no change
2✔
410
     */
2✔
411
    requestedLifetimeCount?: Counter;
2✔
412
    /**
2✔
413
     * requestedMaxKeepAliveCount  ===0 means no change
2✔
414
     */
2✔
415
    requestedMaxKeepAliveCount?: Counter;
2✔
416
    maxNotificationsPerPublish?: Counter;
2✔
417
    priority?: Byte;
2✔
418
}
2✔
419

2✔
420
export interface GetMonitoredItemsResult {
2✔
421
    /**
2✔
422
     * array of serverHandles for all MonitoredItems of the subscription
2✔
423
     * identified by subscriptionId.
2✔
424
     */
2✔
425
    serverHandles: Uint32Array;
2✔
426
    /**
2✔
427
     *  array of clientHandles for all MonitoredItems of the subscription
2✔
428
     *  identified by subscriptionId.
2✔
429
     */
2✔
430
    clientHandles: Uint32Array;
2✔
431
    statusCode: StatusCode;
2✔
432
}
2✔
433

2✔
434
export interface InternalNotification {
2✔
435
    monitoredItemId?: number;
2✔
436
    notification: QueueItem | StatusChangeNotification;
2✔
437
    publishTime: Date;
2✔
438
    start_tick: number;
2✔
439
}
2✔
440

2✔
441
export interface InternalCreateMonitoredItemResult {
2✔
442
    monitoredItem?: MonitoredItem;
2✔
443
    monitoredItemCreateRequest: MonitoredItemCreateRequest;
2✔
444
    createResult: MonitoredItemCreateResult;
2✔
445
}
2✔
446

2✔
447
export interface MonitoredItemBase {
2✔
448
    node: UAVariable | UAObject | UAMethod | null;
2✔
449
    // from monitoring parameters
2✔
450
    filter: MonitoringFilter | null;
2✔
451
    monitoringMode: MonitoringMode;
2✔
452
    timestampsToReturn: TimestampsToReturn;
2✔
453
    discardOldest: boolean;
2✔
454
    queueSize: number;
2✔
455
    clientHandle: UInt32;
2✔
456
}
2✔
457
export type CreateMonitoredItemHook = (subscription: Subscription, monitoredItem: MonitoredItemBase) => Promise<StatusCode>;
2✔
458
export type DeleteMonitoredItemHook = (subscription: Subscription, monitoredItem: MonitoredItemBase) => Promise<StatusCode>;
2✔
459

2✔
460
export interface ServerCapabilitiesPartial {
2✔
461
    maxMonitoredItems: UInt32;
2✔
462
    maxMonitoredItemsPerSubscription: UInt32;
2✔
463
}
2✔
464

2✔
465
export interface IReadAttributeCapable {
2✔
466
    readAttribute(
2✔
467
        context: ISessionContext | null,
2✔
468
        attributeId: AttributeIds,
2✔
469
        indexRange?: NumericRange,
2✔
470
        dataEncoding?: QualifiedNameLike | null
2✔
471
    ): DataValue;
2✔
472
}
2✔
473

2✔
474
/**
2✔
475
 * The Subscription class used in the OPCUA server side.
2✔
476
 */
2✔
477
export class Subscription extends EventEmitter {
2✔
478
    public static minimumPublishingInterval = 50; // fastest possible
137✔
479
    public static defaultPublishingInterval = 1000; // one second
137✔
480
    public static maximumPublishingInterval: number = 1000 * 60; // one minute
137✔
481
    public static maxNotificationPerPublishHighLimit = 1000;
137✔
482
    public static minimumLifetimeDuration = 5 * 1000; //  // we want 2 seconds minimum lifetime for any subscription
137✔
483
    public static maximumLifetimeDuration = 60 * 60 * 1000; // 1 hour
137✔
484

137✔
485
    /**
137✔
486
     * maximum number of monitored item in a subscription to be used
137✔
487
     * when serverCapacity.maxMonitoredItems and serverCapacity.maxMonitoredItemsPerSubscription are not set.
137✔
488
     */
137✔
489
    public static defaultMaxMonitoredItemCount = 20000;
137✔
490

137✔
491
    /**
137✔
492
     * @deprecated use serverCapacity.maxMonitoredItems and serverCapacity.maxMonitoredItemsPerSubscription instead
137✔
493
     */
137✔
494
    protected static get maxMonitoredItemCount() {
137✔
495
        return Subscription.defaultMaxMonitoredItemCount;
×
496
    }
×
497

137✔
498
    public static registry = new ObjectRegistry();
137✔
499

137✔
500
    public publishEngine?: IServerSidePublishEngine;
137✔
501
    public id: number;
137✔
502
    public priority: number;
137✔
503
    /**
137✔
504
     * the Subscription publishing interval
137✔
505
     * @default 1000
137✔
506
     */
137✔
507
    public publishingInterval: number;
137✔
508
    /**
137✔
509
     * The keep alive count defines how many times the publish interval need to
137✔
510
     * expires without having notifications available before the server send an
137✔
511
     * empty message.
137✔
512
     * OPCUA Spec says: a value of 0 is invalid.
137✔
513
     * @default 10
137✔
514
     *
137✔
515
     */
137✔
516
    public maxKeepAliveCount: number;
137✔
517
    /**
137✔
518
     * The life time count defines how many times the publish interval expires without
137✔
519
     * having a connection to the client to deliver data.
137✔
520
     * If the life time count reaches maxKeepAliveCount, the subscription will
137✔
521
     * automatically terminate.
137✔
522
     * OPCUA Spec: The life-time count shall be a minimum of three times the keep keep-alive count.
137✔
523
     *
137✔
524
     * Note: this has to be interpreted as without having a PublishRequest available
137✔
525
     * @default 1
137✔
526
     */
137✔
527
    public lifeTimeCount: number;
137✔
528
    /**
137✔
529
     * The maximum number of notifications that the Client wishes to receive in a
137✔
530
     * single Publish response. A value of zero indicates that there is no limit.
137✔
531
     * The number of notifications per Publish is the sum of monitoredItems in the
137✔
532
     * DataChangeNotification and events in the EventNotificationList.
137✔
533
     *
137✔
534
     * @property maxNotificationsPerPublish
137✔
535
     * @default 0
137✔
536
     */
137✔
537
    public maxNotificationsPerPublish: number;
137✔
538
    public publishingEnabled: boolean;
137✔
539
    public subscriptionDiagnostics: SubscriptionDiagnosticsDataTypePriv;
137✔
540
    public publishIntervalCount: number;
137✔
541
    /**
137✔
542
     *  number of monitored Item
137✔
543
     */
137✔
544
    public monitoredItemIdCounter: number;
137✔
545

137✔
546
    private _state: SubscriptionState = -1 as SubscriptionState;
137✔
547
    public set state(value: SubscriptionState) {
137✔
548
        if (this._state !== value) {
3,295✔
549
            this._state = value;
2,196✔
550
            this.emit("stateChanged", value);
2,196✔
551
        }
2,196✔
552
    }
3,295✔
553
    public get state(): SubscriptionState {
137✔
554
        return this._state;
30,406✔
555
    }
30,406✔
556

137✔
557
    public messageSent: boolean;
137✔
558
    public $session?: ServerSession;
137✔
559

137✔
560
    public get sessionId(): NodeId {
137✔
561
        return this.$session ? this.$session.nodeId : NodeId.nullNodeId;
2,938✔
562
    }
2,938✔
563

137✔
564
    public get currentLifetimeCount(): number {
137✔
565
        return this._life_time_counter;
997✔
566
    }
997✔
567
    public get currentKeepAliveCount(): number {
137✔
568
        return this._keep_alive_counter;
1,000✔
569
    }
1,000✔
570

137✔
571
    private _life_time_counter: number;
137✔
572
    protected _keep_alive_counter = 0;
137✔
573
    public _pending_notifications: Queue<InternalNotification>;
137✔
574
    private _sent_notification_messages: NotificationMessage[];
137✔
575
    private readonly _sequence_number_generator: SequenceNumberGenerator;
137✔
576
    private readonly monitoredItems: { [key: number]: MonitoredItem };
137✔
577
    private timerId: ReturnType<typeof setTimeout> | null;
137✔
578
    private _hasUncollectedMonitoredItemNotifications = false;
137✔
579

137✔
580
    private globalCounter: IGlobalMonitoredItemCounter;
137✔
581
    private serverCapabilities: ServerCapabilitiesPartial;
137✔
582

137✔
583
    constructor(options: SubscriptionOptions) {
137✔
584
        super();
570✔
585

570✔
586
        options = options || {};
570!
587

570✔
588
        Subscription.registry.register(this);
570✔
589

570✔
590
        assert(this.sessionId instanceof NodeId, "expecting a sessionId NodeId");
570✔
591

570✔
592
        this.publishEngine = options.publishEngine;
570✔
593

570✔
594
        this.id = options.id || INVALID_ID;
570✔
595

570✔
596
        this.priority = options.priority || 0;
570✔
597

570✔
598
        this.publishingInterval = _adjust_publishing_interval(options.publishingInterval);
570✔
599

570✔
600
        this.maxKeepAliveCount = _adjust_maxKeepAliveCount(options.maxKeepAliveCount); // , this.publishingInterval);
570✔
601

570✔
602
        this.resetKeepAliveCounter();
570✔
603

570✔
604
        this.lifeTimeCount = _adjust_lifeTimeCount(options.lifeTimeCount || 0, this.maxKeepAliveCount, this.publishingInterval);
570✔
605

570✔
606
        this.maxNotificationsPerPublish = _adjust_maxNotificationsPerPublish(options.maxNotificationsPerPublish);
570✔
607

570✔
608
        this._life_time_counter = 0;
570✔
609
        this.resetLifeTimeCounter();
570✔
610

570✔
611
        // notification message that are ready to be sent to the client
570✔
612
        this._pending_notifications = new Queue<InternalNotification>();
570✔
613

570✔
614
        this._sent_notification_messages = [];
570✔
615

570✔
616
        this._sequence_number_generator = new SequenceNumberGenerator();
570✔
617

570✔
618
        // initial state of the subscription
570✔
619
        this.state = SubscriptionState.CREATING;
570✔
620

570✔
621
        this.publishIntervalCount = 0;
570✔
622

570✔
623
        this.monitoredItems = {}; // monitored item map
570✔
624

570✔
625
        this.monitoredItemIdCounter = 0;
570✔
626

570✔
627
        this.publishingEnabled = _adjust_publishingEnable(options.publishingEnabled);
570✔
628

570✔
629
        this.subscriptionDiagnostics = createSubscriptionDiagnostics(this);
570✔
630

570✔
631
        // A boolean value that is set to TRUE to mean that either a NotificationMessage or a keep-alive
570✔
632
        // Message has been sent on the Subscription. It is a flag that is used to ensure that either a
570✔
633
        // NotificationMessage or a keep-alive Message is sent out the first time the publishing
570✔
634
        // timer expires.
570✔
635
        this.messageSent = false;
570✔
636

570✔
637
        this.timerId = null;
570✔
638
        this._start_timer({ firstTime: true });
570✔
639

570✔
640
        debugLog(chalk.green(`creating subscription ${this.id}`));
570✔
641

570✔
642
        this.serverCapabilities = options.serverCapabilities;
570✔
643
        this.serverCapabilities.maxMonitoredItems =
570✔
644
            this.serverCapabilities.maxMonitoredItems || Subscription.defaultMaxMonitoredItemCount;
570!
645
        this.serverCapabilities.maxMonitoredItemsPerSubscription =
570✔
646
            this.serverCapabilities.maxMonitoredItemsPerSubscription || Subscription.defaultMaxMonitoredItemCount;
570!
647
        this.globalCounter = options.globalCounter;
570✔
648
    }
570✔
649

137✔
650
    public getSessionId(): NodeId {
137✔
651
        return this.sessionId;
997✔
652
    }
997✔
653

137✔
654
    public toString(): string {
137✔
655
        let str = "Subscription:\n";
×
656
        str += `  subscriptionId          ${this.id}\n`;
×
657
        str += `  sessionId          ${this.getSessionId()?.toString()}\n`;
×
658

×
659
        str += `  publishingEnabled  ${this.publishingEnabled}\n`;
×
660
        str += `  maxKeepAliveCount  ${this.maxKeepAliveCount}\n`;
×
661
        str += `  publishingInterval ${this.publishingInterval}\n`;
×
662
        str += `  lifeTimeCount      ${this.lifeTimeCount}\n`;
×
663
        str += `  maxKeepAliveCount  ${this.maxKeepAliveCount}\n`;
×
664
        return str;
×
665
    }
×
666

137✔
667
    /**
137✔
668
     * modify subscription parameters
137✔
669
     * @param param
137✔
670
     */
137✔
671
    public modify(param: ModifySubscriptionParameters): void {
137✔
672
        // update diagnostic counter
8✔
673
        this.subscriptionDiagnostics.modifyCount += 1;
8✔
674

8✔
675
        const publishingInterval_old = this.publishingInterval;
8✔
676

8✔
677
        param.requestedPublishingInterval = param.requestedPublishingInterval || 0;
8✔
678
        param.requestedMaxKeepAliveCount = param.requestedMaxKeepAliveCount || this.maxKeepAliveCount;
8✔
679
        param.requestedLifetimeCount = param.requestedLifetimeCount || this.lifeTimeCount;
8✔
680

8✔
681
        this.publishingInterval = _adjust_publishing_interval(param.requestedPublishingInterval);
8✔
682
        this.maxKeepAliveCount = _adjust_maxKeepAliveCount(param.requestedMaxKeepAliveCount);
8✔
683

8✔
684
        this.lifeTimeCount = _adjust_lifeTimeCount(param.requestedLifetimeCount, this.maxKeepAliveCount, this.publishingInterval);
8✔
685

8✔
686
        this.maxNotificationsPerPublish = _adjust_maxNotificationsPerPublish(param.maxNotificationsPerPublish || 0);
8✔
687
        this.priority = param.priority || 0;
8✔
688

8✔
689
        this.resetLifeTimeAndKeepAliveCounters();
8✔
690

8✔
691
        if (publishingInterval_old !== this.publishingInterval) {
8✔
692
            // todo
6✔
693
        }
6✔
694
        this._stop_timer();
8✔
695

8✔
696
        this._start_timer({ firstTime: false });
8✔
697
    }
8✔
698

137✔
699
    /**
137✔
700
     * set publishing mode
137✔
701
     * @param publishingEnabled
137✔
702
     */
137✔
703
    public setPublishingMode(publishingEnabled: boolean): StatusCode {
137✔
704
        this.publishingEnabled = !!publishingEnabled;
6✔
705
        // update diagnostics
6✔
706
        if (this.publishingEnabled) {
6✔
707
            this.subscriptionDiagnostics.enableCount += 1;
3✔
708
        } else {
3✔
709
            this.subscriptionDiagnostics.disableCount += 1;
3✔
710
        }
3✔
711

6✔
712
        this.resetLifeTimeCounter();
6✔
713

6✔
714
        if (!publishingEnabled && this.state !== SubscriptionState.CLOSED) {
6✔
715
            this.state = SubscriptionState.NORMAL;
3✔
716
        }
3✔
717
        return StatusCodes.Good;
6✔
718
    }
6✔
719

137✔
720
    /**
137✔
721
     * @private
137✔
722
     */
137✔
723
    public get keepAliveCounterHasExpired(): boolean {
137✔
724
        return this._keep_alive_counter >= this.maxKeepAliveCount || this.state === SubscriptionState.LATE;
4,878✔
725
    }
4,878✔
726

137✔
727
    /**
137✔
728
     * Reset the Lifetime Counter Variable to the value specified for the lifetime of a Subscription in
137✔
729
     * the CreateSubscription Service( 5.13.2).
137✔
730
     * @private
137✔
731
     */
137✔
732
    public resetLifeTimeCounter(): void {
137✔
733
        this._life_time_counter = 0;
3,009✔
734
    }
3,009✔
735

137✔
736
    /**
137✔
737
     * @private
137✔
738
     */
137✔
739
    public increaseLifeTimeCounter(): void {
137✔
740
        this._life_time_counter += 1;
1,677✔
741
        if (this._life_time_counter >= this.lifeTimeCount) {
1,677✔
742
            this.emit("lifeTimeExpired");
15✔
743
        }
15✔
744
        this.emit("lifeTimeCounterChanged", this._life_time_counter);
1,677✔
745
    }
1,677✔
746

137✔
747
    /**
137✔
748
     *  True if the subscription life time has expired.
137✔
749
     *
137✔
750
     */
137✔
751
    public get lifeTimeHasExpired(): boolean {
137✔
752
        assert(this.lifeTimeCount > 0);
6,956✔
753
        return this._life_time_counter >= this.lifeTimeCount;
6,956✔
754
    }
6,956✔
755

137✔
756
    /**
137✔
757
     * number of milliseconds before this subscription times out (lifeTimeHasExpired === true);
137✔
758
     */
137✔
759
    public get timeToExpiration(): number {
137✔
760
        return (this.lifeTimeCount - this._life_time_counter) * this.publishingInterval;
22✔
761
    }
22✔
762

137✔
763
    public get timeToKeepAlive(): number {
137✔
764
        return (this.maxKeepAliveCount - this._keep_alive_counter) * this.publishingInterval;
×
765
    }
×
766

137✔
767
    /**
137✔
768
     * Terminates the subscription.
137✔
769
     * Calling this method will also remove any monitored items.
137✔
770
     *
137✔
771
     */
137✔
772
    public terminate(): void {
137✔
773
        debugLog("Subscription#terminate status", SubscriptionState[this.state]);
576✔
774

576✔
775
        if (this.state === SubscriptionState.CLOSED) {
576✔
776
            // todo verify if asserting is required here
6✔
777
            return;
6✔
778
        }
6✔
779

576✔
780
        // stop timer
576✔
781
        this._stop_timer();
576✔
782

570✔
783
        debugLog("terminating Subscription  ", this.id, " with ", this.monitoredItemCount, " monitored items");
570✔
784

570✔
785
        // dispose all monitoredItem
570✔
786
        const keys = Object.keys(this.monitoredItems);
570✔
787

570✔
788
        for (const key of keys) {
576✔
789
            const status = this.removeMonitoredItem(parseInt(key, 10));
8,100✔
790
            assert(status === StatusCodes.Good);
8,100✔
791
        }
8,100✔
792
        assert(this.monitoredItemCount === 0);
570✔
793

570✔
794
        if (this.$session?._unexposeSubscriptionDiagnostics) {
576✔
795
            this.$session._unexposeSubscriptionDiagnostics(this);
454✔
796
        }
454✔
797
        this.state = SubscriptionState.CLOSED;
576✔
798

570✔
799
        /**
570✔
800
         * notify the subscription owner that the subscription has been terminated.
570✔
801
         * @event "terminated"
570✔
802
         */
570✔
803
        this.emit("terminated");
570✔
804
        if (this.publishEngine) {
570✔
805
            this.publishEngine.on_close_subscription(this);
570✔
806
        }
570✔
807
    }
576✔
808

137✔
809
    public setTriggering(
137✔
810
        triggeringItemId: number,
24✔
811
        linksToAdd: number[] | null,
24✔
812
        linksToRemove: number[] | null
24✔
813
    ): { statusCode: StatusCode; addResults: StatusCode[]; removeResults: StatusCode[] } {
24✔
814
        /** Bad_NothingToDo, Bad_TooManyOperations,Bad_SubscriptionIdInvalid, Bad_MonitoredItemIdInvalid */
24✔
815
        linksToAdd = linksToAdd || [];
24!
816
        linksToRemove = linksToRemove || [];
24!
817

24✔
818
        if (linksToAdd.length === 0 && linksToRemove.length === 0) {
24✔
819
            return { statusCode: StatusCodes.BadNothingToDo, addResults: [], removeResults: [] };
3✔
820
        }
3✔
821
        const triggeringItem = this.getMonitoredItem(triggeringItemId);
23✔
822

21✔
823
        const monitoredItemsToAdd = linksToAdd.map((id) => this.getMonitoredItem(id));
21✔
824
        const monitoredItemsToRemove = linksToRemove.map((id) => this.getMonitoredItem(id));
21✔
825

21✔
826
        if (!triggeringItem) {
24✔
827
            const removeResults1: StatusCode[] = monitoredItemsToRemove.map((m) =>
1✔
828
                m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
×
829
            );
1✔
830
            const addResults1: StatusCode[] = monitoredItemsToAdd.map((m) =>
1✔
831
                m ? StatusCodes.Good : StatusCodes.BadMonitoredItemIdInvalid
1!
832
            );
1✔
833
            return {
1✔
834
                statusCode: StatusCodes.BadMonitoredItemIdInvalid,
1✔
835

1✔
836
                addResults: addResults1,
1✔
837
                removeResults: removeResults1
1✔
838
            };
1✔
839
        }
1✔
840
        //
23✔
841
        // note: it seems that CTT imposed that we do remove before add
23✔
842
        const removeResults = monitoredItemsToRemove.map((m) =>
23✔
843
            !m ? StatusCodes.BadMonitoredItemIdInvalid : triggeringItem.removeLinkItem(m.monitoredItemId)
10✔
844
        );
20✔
845
        const addResults = monitoredItemsToAdd.map((m) =>
20✔
846
            !m ? StatusCodes.BadMonitoredItemIdInvalid : triggeringItem.addLinkItem(m.monitoredItemId)
26✔
847
        );
20✔
848

20✔
849
        const statusCode: StatusCode = StatusCodes.Good;
20✔
850

20✔
851
        // do binding
20✔
852

20✔
853
        return {
20✔
854
            statusCode,
20✔
855

20✔
856
            addResults,
20✔
857
            removeResults
20✔
858
        };
20✔
859
    }
23✔
860
    public dispose(): void {
137✔
861
        // c8 ignore next
614✔
862
        if (doDebug) {
614!
863
            debugLog("Subscription#dispose", this.id, this.monitoredItemCount);
×
864
        }
×
865

614✔
866
        assert(this.monitoredItemCount === 0, "MonitoredItems haven't been  deleted first !!!");
614✔
867
        assert(this.timerId === null, "Subscription timer haven't been terminated");
614✔
868

614✔
869
        if (this.subscriptionDiagnostics) {
614✔
870
            (this.subscriptionDiagnostics as SubscriptionDiagnosticsDataTypePriv).$subscription = null as unknown as Subscription;
614✔
871
        }
614✔
872

614✔
873
        this.publishEngine = undefined;
614✔
874
        this._pending_notifications.clear();
614✔
875
        this._sent_notification_messages = [];
614✔
876

614✔
877
        this.$session = undefined;
614✔
878
        this.removeAllListeners();
614✔
879

614✔
880
        Subscription.registry.unregister(this);
614✔
881
    }
614✔
882

137✔
883
    public get aborted(): boolean {
137✔
884
        const session = this.$session;
×
885
        if (!session) {
×
886
            return true;
×
887
        }
×
888
        return session.aborted;
×
889
    }
×
890

137✔
891
    /**
137✔
892
     * number of pending notifications
137✔
893
     */
137✔
894
    public get pendingNotificationsCount(): number {
137✔
895
        return this._pending_notifications ? this._pending_notifications.size : 0;
12,591!
896
    }
12,591✔
897

137✔
898
    /**
137✔
899
     * is 'true' if there are pending notifications for this subscription. (i.e moreNotifications)
137✔
900
     */
137✔
901
    public get hasPendingNotifications(): boolean {
137✔
902
        return this.pendingNotificationsCount > 0;
12,584✔
903
    }
12,584✔
904

137✔
905
    /**
137✔
906
     * number of sent notifications
137✔
907
     */
137✔
908
    public get sentNotificationMessageCount(): number {
137✔
909
        return this._sent_notification_messages.length;
11✔
910
    }
11✔
911

137✔
912
    /**
137✔
913
     * @internal
137✔
914
     */
137✔
915
    public _flushSentNotifications(): NotificationMessage[] {
137✔
916
        const tmp = this._sent_notification_messages;
×
917
        this._sent_notification_messages = [];
×
918
        return tmp;
×
919
    }
×
920
    /**
137✔
921
     * number of monitored items handled by this subscription
137✔
922
     */
137✔
923
    public get monitoredItemCount(): number {
137✔
924
        return Object.keys(this.monitoredItems).length;
31,273✔
925
    }
31,273✔
926

137✔
927
    /**
137✔
928
     * number of disabled monitored items.
137✔
929
     */
137✔
930
    public get disabledMonitoredItemCount(): number {
137✔
931
        return Object.values(this.monitoredItems).reduce((sum: number, monitoredItem: MonitoredItem) => {
998✔
932
            return sum + (monitoredItem.monitoringMode === MonitoringMode.Disabled ? 1 : 0);
2,782✔
933
        }, 0);
998✔
934
    }
998✔
935

137✔
936
    /**
137✔
937
     * The number of unacknowledged messages saved in the republish queue.
137✔
938
     */
137✔
939
    public get unacknowledgedMessageCount(): number {
137✔
940
        return this.subscriptionDiagnostics.unacknowledgedMessageCount;
×
941
    }
×
942

137✔
943
    /**
137✔
944
     * adjust monitored item sampling interval
137✔
945
     *  - an samplingInterval ===0 means that we use a event-base model ( no sampling)
137✔
946
     *  - otherwise the sampling is adjusted
137✔
947
     * @private
137✔
948
     */
137✔
949
    public adjustSamplingInterval(samplingInterval: number, node?: IReadAttributeCapable): number {
137✔
950
        if (samplingInterval < 0) {
14,786✔
951
            // - The value -1 indicates that the default sampling interval defined by the publishing
5✔
952
            //   interval of the Subscription is requested.
5✔
953
            // - Any negative number is interpreted as -1.
5✔
954
            samplingInterval = this.publishingInterval;
5✔
955
        } else if (samplingInterval === 0) {
14,786✔
956
            // c8 ignore next
434✔
957
            if (!node) throw new Error("Internal Error");
434!
958

434✔
959
            // OPCUA 1.0.3 Part 4 - 5.12.1.2
434✔
960
            // The value 0 indicates that the Server should use the fastest practical rate.
434✔
961

434✔
962
            // The fastest supported sampling interval may be equal to 0, which indicates
434✔
963
            // that the data item is exception-based rather than being sampled at some period.
434✔
964
            // An exception-based model means that the underlying system does not require
434✔
965
            // sampling and reports data changes.
434✔
966

434✔
967
            const dataValueSamplingInterval = node.readAttribute(
434✔
968
                SessionContext.defaultContext,
434✔
969
                AttributeIds.MinimumSamplingInterval
434✔
970
            );
434✔
971

434✔
972
            // TODO if attributeId === AttributeIds.Value : sampling interval required here
434✔
973
            if (dataValueSamplingInterval.statusCode.isGood()) {
434✔
974
                // node provides a Minimum sampling interval ...
329✔
975
                samplingInterval = dataValueSamplingInterval.value.value;
329✔
976
                assert(samplingInterval >= 0 && samplingInterval <= MonitoredItem.maximumSamplingInterval);
329✔
977

329✔
978
                // note : at this stage, a samplingInterval===0 means that the data item is really exception-based
329✔
979
            }
329✔
980
        } else if (samplingInterval < MonitoredItem.minimumSamplingInterval) {
14,781✔
981
            samplingInterval = MonitoredItem.minimumSamplingInterval;
6,768✔
982
        } else if (samplingInterval > MonitoredItem.maximumSamplingInterval) {
14,347✔
983
            // If the requested samplingInterval is higher than the
1✔
984
            // maximum sampling interval supported by the Server, the maximum sampling
1✔
985
            // interval is returned.
1✔
986
            samplingInterval = MonitoredItem.maximumSamplingInterval;
1✔
987
        }
1✔
988

14,786✔
989
        const node_minimumSamplingInterval =
14,786✔
990
            node && (node as UAVariable).minimumSamplingInterval ? (node as UAVariable).minimumSamplingInterval : 0;
14,786✔
991

14,786✔
992
        samplingInterval = Math.max(samplingInterval, node_minimumSamplingInterval);
14,786✔
993

14,786✔
994
        return samplingInterval;
14,786✔
995
    }
14,786✔
996

137✔
997
    /**
137✔
998
     * create a monitored item
137✔
999
     * @param addressSpace - address space
137✔
1000
     * @param timestampsToReturn  - the timestamp to return
137✔
1001
     * @param monitoredItemCreateRequest - the parameters describing the monitored Item to create
137✔
1002
     */
137✔
1003
    public preCreateMonitoredItem(
137✔
1004
        addressSpace: IAddressSpace,
14,806✔
1005
        timestampsToReturn: TimestampsToReturn,
14,806✔
1006
        monitoredItemCreateRequest: MonitoredItemCreateRequest
14,806✔
1007
    ): InternalCreateMonitoredItemResult {
14,806✔
1008
        assert(monitoredItemCreateRequest instanceof MonitoredItemCreateRequest);
14,806✔
1009

14,806✔
1010
        function handle_error(statusCode: StatusCode): InternalCreateMonitoredItemResult {
14,806✔
1011
            return {
28✔
1012
                createResult: new MonitoredItemCreateResult({ statusCode }),
28✔
1013
                monitoredItemCreateRequest
28✔
1014
            };
28✔
1015
        }
28✔
1016

14,806✔
1017
        const itemToMonitor = monitoredItemCreateRequest.itemToMonitor;
14,806✔
1018

14,806✔
1019
        const node = addressSpace.findNode(itemToMonitor.nodeId) as UAObject | UAVariable | UAMethod;
14,806✔
1020
        if (
14,806✔
1021
            !node ||
14,806✔
1022
            (node.nodeClass !== NodeClass.Variable && node.nodeClass !== NodeClass.Object && node.nodeClass !== NodeClass.Method)
14,802✔
1023
        ) {
14,806✔
1024
            return handle_error(StatusCodes.BadNodeIdUnknown);
4✔
1025
        }
4✔
1026

14,803✔
1027
        if (itemToMonitor.attributeId === AttributeIds.Value && !(node.nodeClass === NodeClass.Variable)) {
14,806✔
1028
            // AttributeIds.Value is only valid for monitoring value of UAVariables.
2✔
1029
            return handle_error(StatusCodes.BadAttributeIdInvalid);
2✔
1030
        }
2✔
1031

14,801✔
1032
        if (itemToMonitor.attributeId === AttributeIds.INVALID) {
14,806✔
1033
            return handle_error(StatusCodes.BadAttributeIdInvalid);
1✔
1034
        }
1✔
1035

14,800✔
1036
        if (!itemToMonitor.indexRange.isValid()) {
14,806✔
1037
            return handle_error(StatusCodes.BadIndexRangeInvalid);
1✔
1038
        }
1✔
1039

14,799✔
1040
        // check dataEncoding applies only on Values
14,799✔
1041
        if (itemToMonitor.dataEncoding.name && itemToMonitor.attributeId !== AttributeIds.Value) {
14,806!
1042
            return handle_error(StatusCodes.BadDataEncodingInvalid);
×
1043
        }
×
1044

14,799✔
1045
        // check dataEncoding
14,799✔
1046
        if (!isValidDataEncoding(itemToMonitor.dataEncoding)) {
14,806!
1047
            return handle_error(StatusCodes.BadDataEncodingUnsupported);
×
1048
        }
×
1049

14,799✔
1050
        // check that item can be read by current user session
14,799✔
1051

14,799✔
1052
        // filter
14,799✔
1053
        const requestedParameters = monitoredItemCreateRequest.requestedParameters;
14,799✔
1054
        const filter = requestedParameters.filter;
14,798✔
1055
        const statusCodeFilter = validateFilter(filter, itemToMonitor, node);
14,798✔
1056
        if (statusCodeFilter !== StatusCodes.Good) {
14,806✔
1057
            return handle_error(statusCodeFilter);
17✔
1058
        }
17✔
1059

14,794✔
1060
        // do we have enough room for new monitored items ?
14,794✔
1061
        if (this.monitoredItemCount >= this.serverCapabilities.maxMonitoredItemsPerSubscription) {
14,806✔
1062
            return handle_error(StatusCodes.BadTooManyMonitoredItems);
1✔
1063
        }
1✔
1064

14,794✔
1065
        if (this.globalCounter.totalMonitoredItemCount >= this.serverCapabilities.maxMonitoredItems) {
14,806✔
1066
            return handle_error(StatusCodes.BadTooManyMonitoredItems);
2✔
1067
        }
2✔
1068

14,794✔
1069
        const createResult = this._createMonitoredItemStep2(timestampsToReturn, monitoredItemCreateRequest, node);
14,794✔
1070

14,778✔
1071
        assert(createResult.statusCode.isGood());
14,778✔
1072

14,778✔
1073
        const monitoredItem = this.getMonitoredItem(createResult.monitoredItemId);
14,778✔
1074
        // c8 ignore next
14,778✔
1075
        if (!monitoredItem) {
14,806!
1076
            throw new Error("internal error");
×
1077
        }
×
1078

14,794✔
1079
        // TODO: fix old way to set node. !!!!
14,794✔
1080
        monitoredItem.setNode(node);
14,794✔
1081

14,778✔
1082
        this.emit("monitoredItem", monitoredItem, itemToMonitor);
14,778✔
1083

14,778✔
1084
        return { monitoredItem, monitoredItemCreateRequest, createResult };
14,778✔
1085
    }
14,794✔
1086

137✔
1087
    public async applyOnMonitoredItem(functor: (monitoredItem: MonitoredItem) => Promise<void>): Promise<void> {
137✔
1088
        for (const m of Object.values(this.monitoredItems)) {
4✔
1089
            await functor(m);
5✔
1090
        }
5✔
1091
    }
4✔
1092

137✔
1093
    public postCreateMonitoredItem(
137✔
1094
        monitoredItem: MonitoredItem,
14,778✔
1095
        monitoredItemCreateRequest: MonitoredItemCreateRequest,
14,778✔
1096
        _createResult: MonitoredItemCreateResult
14,778✔
1097
    ): void {
14,778✔
1098
        this._createMonitoredItemStep3(monitoredItem, monitoredItemCreateRequest);
14,778✔
1099
    }
14,778✔
1100

137✔
1101
    public async createMonitoredItem(
137✔
1102
        addressSpace: IAddressSpace,
116✔
1103
        timestampsToReturn: TimestampsToReturn,
116✔
1104
        monitoredItemCreateRequest: MonitoredItemCreateRequest
116✔
1105
    ): Promise<MonitoredItemCreateResult> {
116✔
1106
        const { monitoredItem, createResult } = this.preCreateMonitoredItem(
116✔
1107
            addressSpace,
116✔
1108
            timestampsToReturn,
116✔
1109
            monitoredItemCreateRequest
116✔
1110
        );
116✔
1111
        if (!monitoredItem) {
116✔
1112
            return createResult;
20✔
1113
        }
20✔
1114
        this.postCreateMonitoredItem(monitoredItem, monitoredItemCreateRequest, createResult);
112✔
1115
        return createResult;
96✔
1116
    }
112✔
1117
    /**
137✔
1118
     * get a monitoredItem by Id.
137✔
1119
     * @param monitoredItemId : the id of the monitored item to get.
137✔
1120
     * @return the monitored item matching monitoredItemId
137✔
1121
     */
137✔
1122
    public getMonitoredItem(monitoredItemId: number): MonitoredItem | null {
137✔
1123
        return this.monitoredItems[monitoredItemId] || null;
21,912✔
1124
    }
21,912✔
1125

137✔
1126
    /**
137✔
1127
     * remove a monitored Item from the subscription.
137✔
1128
     * @param monitoredItemId : the id of the monitored item to get.
137✔
1129
     */
137✔
1130
    public removeMonitoredItem(monitoredItemId: number): StatusCode {
137✔
1131
        debugLog("Removing monitoredIem ", monitoredItemId);
14,813✔
1132
        if (!Object.hasOwn(this.monitoredItems, monitoredItemId.toString())) {
14,813✔
1133
            return StatusCodes.BadMonitoredItemIdInvalid;
4✔
1134
        }
4✔
1135

14,810✔
1136
        const monitoredItem = this.monitoredItems[monitoredItemId];
14,810✔
1137

14,809✔
1138
        monitoredItem.terminate();
14,809✔
1139

14,809✔
1140
        /**
14,809✔
1141
         *
14,809✔
1142
         * notify that a monitored item has been removed from the subscription
14,809✔
1143
         * @param monitoredItem {MonitoredItem}
14,809✔
1144
         */
14,809✔
1145
        this.emit("removeMonitoredItem", monitoredItem);
14,809✔
1146

14,809✔
1147
        monitoredItem.dispose();
14,809✔
1148

14,809✔
1149
        delete this.monitoredItems[monitoredItemId];
14,809✔
1150
        this.globalCounter.totalMonitoredItemCount -= 1;
14,809✔
1151

14,809✔
1152
        this._removePendingNotificationsFor(monitoredItemId);
14,809✔
1153
        // flush pending notifications
14,809✔
1154
        // assert(this._pending_notifications.size === 0);
14,809✔
1155
        return StatusCodes.Good;
14,809✔
1156
    }
14,810✔
1157

137✔
1158
    /**
137✔
1159
     * rue if monitored Item have uncollected Notifications
137✔
1160
     */
137✔
1161
    public get hasUncollectedMonitoredItemNotifications(): boolean {
137✔
1162
        if (this._hasUncollectedMonitoredItemNotifications) {
8,134✔
1163
            return true;
1,343✔
1164
        }
1,343✔
1165
        const keys = Object.keys(this.monitoredItems);
7,256✔
1166
        const n = keys.length;
6,791✔
1167
        for (let i = 0; i < n; i++) {
8,134✔
1168
            const key = parseInt(keys[i], 10);
39,087✔
1169
            const monitoredItem = this.monitoredItems[key];
39,087✔
1170
            if (monitoredItem.hasMonitoredItemNotifications) {
39,087✔
1171
                this._hasUncollectedMonitoredItemNotifications = true;
895✔
1172
                return true;
895✔
1173
            }
895✔
1174
        }
39,087✔
1175
        return false;
6,567✔
1176
    }
6,567✔
1177

137✔
1178
    public get subscriptionId(): number {
137✔
1179
        return this.id;
951✔
1180
    }
951✔
1181

137✔
1182
    public getMessageForSequenceNumber(sequenceNumber: number): NotificationMessage | null {
137✔
1183
        const notification_message = this._sent_notification_messages.find((e) => e.sequenceNumber === sequenceNumber);
30✔
1184
        return notification_message || null;
30✔
1185
    }
30✔
1186

137✔
1187
    /**
137✔
1188
     * returns true if the notification has expired
137✔
1189
     * @param notification
137✔
1190
     */
137✔
1191
    public notificationHasExpired(notification: { start_tick: number }): boolean {
137✔
1192
        assert(Object.hasOwn(notification, "start_tick"));
×
1193
        assert(Number.isFinite(notification.start_tick + this.maxKeepAliveCount));
×
1194
        return notification.start_tick + this.maxKeepAliveCount < this.publishIntervalCount;
×
1195
    }
×
1196

137✔
1197
    /**
137✔
1198
     *  returns in an array the sequence numbers of the notifications that have been sent
137✔
1199
     *  and that haven't been acknowledged yet.
137✔
1200
     */
137✔
1201
    public getAvailableSequenceNumbers(): number[] {
137✔
1202
        const availableSequenceNumbers = _getSequenceNumbers(this._sent_notification_messages);
1,546✔
1203
        return availableSequenceNumbers;
1,546✔
1204
    }
1,546✔
1205

137✔
1206
    /**
137✔
1207
     * acknowledges a notification identified by its sequence number
137✔
1208
     */
137✔
1209
    public acknowledgeNotification(sequenceNumber: number): StatusCode {
137✔
1210
        debugLog("acknowledgeNotification ", sequenceNumber);
651✔
1211
        let foundIndex = -1;
651✔
1212
        this._sent_notification_messages.forEach((e: NotificationMessage, index: number) => {
651✔
1213
            if (e.sequenceNumber === sequenceNumber) {
760✔
1214
                foundIndex = index;
650✔
1215
            }
650✔
1216
        });
651✔
1217

651✔
1218
        if (foundIndex === -1) {
651✔
1219
            // c8 ignore next
1✔
1220
            if (doDebug) {
1!
1221
                debugLog(chalk.red("acknowledging sequence FAILED !!! "), chalk.cyan(sequenceNumber.toString()));
×
1222
            }
×
1223
            return StatusCodes.BadSequenceNumberUnknown;
1✔
1224
        } else {
651✔
1225
            // c8 ignore next
650✔
1226
            if (doDebug) {
650!
1227
                debugLog(chalk.yellow("acknowledging sequence "), chalk.cyan(sequenceNumber.toString()));
×
1228
            }
×
1229
            this._sent_notification_messages.splice(foundIndex, 1);
650✔
1230
            this.subscriptionDiagnostics.unacknowledgedMessageCount--;
650✔
1231
            return StatusCodes.Good;
650✔
1232
        }
650✔
1233
    }
651✔
1234

137✔
1235
    /**
137✔
1236
     * getMonitoredItems is used to get information about monitored items of a subscription.Its intended
137✔
1237
     * use is defined in Part 4. This method is the implementation of the Standard OPCUA GetMonitoredItems Method.
137✔
1238
     * from spec:
137✔
1239
     * This method can be used to get the  list of monitored items in a subscription if CreateMonitoredItems
137✔
1240
     * failed due to a network interruption and the client does not know if the creation succeeded in the server.
137✔
1241
     *
137✔
1242
     */
137✔
1243
    public getMonitoredItems(): GetMonitoredItemsResult {
137✔
1244
        const monitoredItems = Object.keys(this.monitoredItems);
8✔
1245
        const monitoredItemCount = monitoredItems.length;
8✔
1246
        const result: GetMonitoredItemsResult = {
8✔
1247
            clientHandles: new Uint32Array(monitoredItemCount),
8✔
1248
            serverHandles: new Uint32Array(monitoredItemCount),
8✔
1249
            statusCode: StatusCodes.Good
8✔
1250
        };
8✔
1251
        for (let index = 0; index < monitoredItemCount; index++) {
8✔
1252
            const monitoredItemId = monitoredItems[index];
331✔
1253
            const serverHandle = parseInt(monitoredItemId, 10);
331✔
1254
            const monitoredItem = this.getMonitoredItem(serverHandle);
331✔
1255
            // c8 ignore next
331✔
1256
            if (!monitoredItem) {
331!
1257
                throw new Error("monitoredItem is null");
×
1258
            }
×
1259
            result.clientHandles[index] = monitoredItem.clientHandle;
331✔
1260
            // TODO:  serverHandle is defined anywhere in the OPCUA Specification 1.02
331✔
1261
            //        I am not sure what shall be reported for serverHandle...
331✔
1262
            //        using monitoredItem.monitoredItemId instead...
331✔
1263
            //        May be a clarification in the OPCUA Spec is required.
331✔
1264
            result.serverHandles[index] = serverHandle;
331✔
1265
        }
331✔
1266
        return result;
8✔
1267
    }
8✔
1268

137✔
1269
    /**
137✔
1270
     * @private
137✔
1271
     */
137✔
1272
    public async resendInitialValues(): Promise<void> {
137✔
1273
        this._keep_alive_counter = 0;
26✔
1274

26✔
1275
        try {
26✔
1276
            const promises: Promise<void>[] = [];
26✔
1277
            for (const monitoredItem of Object.values(this.monitoredItems)) {
26✔
1278
                promises.push(
19✔
1279
                    (async () => {
19✔
1280
                        try {
19✔
1281
                            monitoredItem.resendInitialValue();
19✔
1282
                        } catch (err) {
19!
1283
                            warningLog(
×
1284
                                "resendInitialValues:",
×
1285
                                monitoredItem.node?.nodeId.toString(),
×
1286
                                "error:",
×
1287
                                (err as Error).message
×
1288
                            );
×
1289
                        }
×
1290
                    })()
19✔
1291
                );
19✔
1292
            }
19✔
1293
            await Promise.all(promises);
26✔
1294
        } catch (err) {
26!
1295
            warningLog("resendInitialValues: error:", (err as Error).message);
×
1296
        }
×
1297
        // make sure data will be sent immediately
26✔
1298
        this._keep_alive_counter = this.maxKeepAliveCount - 1;
26✔
1299
        this.state = SubscriptionState.NORMAL;
26✔
1300
        this._harvestMonitoredItems();
26✔
1301
    }
26✔
1302

137✔
1303
    /**
137✔
1304
     * @private
137✔
1305
     */
137✔
1306
    public notifyTransfer(): void {
137✔
1307
        // OPCUA UA Spec 1.0.3 : part 3 - page 82 - 5.13.7 TransferSubscriptions:
27✔
1308
        // If the Server transfers the Subscription to the new Session, the Server shall issue
27✔
1309
        // a StatusChangeNotification notificationMessage with the status code
27✔
1310
        // Good_SubscriptionTransferred to the old Session.
27✔
1311
        debugLog(chalk.red(" Subscription => Notifying Transfer                                  "));
27✔
1312

27✔
1313
        const notificationData = new StatusChangeNotification({
27✔
1314
            status: StatusCodes.GoodSubscriptionTransferred
27✔
1315
        });
27✔
1316

27✔
1317
        if (this.publishEngine?.pendingPublishRequestCount) {
27✔
1318
            // the GoodSubscriptionTransferred can be processed immediately
12✔
1319
            this._addNotificationMessage(notificationData);
12✔
1320
            debugLog(chalk.red("pendingPublishRequestCount"), this.publishEngine?.pendingPublishRequestCount);
12✔
1321
            this._publish_pending_notifications();
12✔
1322
        } else {
27✔
1323
            debugLog(chalk.red("Cannot  send GoodSubscriptionTransferred => lets create a TransferredSubscription "));
15✔
1324
            // c8 ignore next
15✔
1325
            if (!this.publishEngine) {
15!
1326
                warningLog("notifyTransfer: publishEngine is not available");
×
1327
                return;
×
1328
            }
×
1329
            const ts = new TransferredSubscription({
15✔
1330
                generator: this._sequence_number_generator,
15✔
1331
                id: this.id,
15✔
1332
                publishEngine: this.publishEngine
15✔
1333
            });
15✔
1334

15✔
1335
            ts._pending_notification = notificationData;
15✔
1336
            (this.publishEngine as unknown as { _closed_subscriptions: TransferredSubscription[] })._closed_subscriptions.push(ts);
15✔
1337
        }
15✔
1338
    }
27✔
1339

137✔
1340
    /**
137✔
1341
     *
137✔
1342
     *  the server invokes the resetLifeTimeAndKeepAliveCounters method of the subscription
137✔
1343
     *  when the server  has send a Publish Response, so that the subscription
137✔
1344
     *  can reset its life time counter.
137✔
1345
     *
137✔
1346
     * @private
137✔
1347
     */
137✔
1348
    public resetLifeTimeAndKeepAliveCounters(): void {
137✔
1349
        this.resetLifeTimeCounter();
2,406✔
1350
        this.resetKeepAliveCounter();
2,406✔
1351
    }
2,406✔
1352

137✔
1353
    private _updateCounters(notificationMessage: NotificationMessage) {
137✔
1354
        for (const notificationData of notificationMessage.notificationData || []) {
913!
1355
            // update diagnostics
914✔
1356
            if (notificationData instanceof DataChangeNotification) {
914✔
1357
                const nbNotifs = notificationData.monitoredItems?.length || 0;
846!
1358
                this.subscriptionDiagnostics.dataChangeNotificationsCount += nbNotifs;
846✔
1359
                this.subscriptionDiagnostics.notificationsCount += nbNotifs;
846✔
1360
            } else if (notificationData instanceof EventNotificationList) {
914✔
1361
                const nbNotifs = notificationData.events?.length || 0;
50!
1362
                this.subscriptionDiagnostics.eventNotificationsCount += nbNotifs;
50✔
1363
                this.subscriptionDiagnostics.notificationsCount += nbNotifs;
50✔
1364
            } else {
68✔
1365
                assert(notificationData instanceof StatusChangeNotification);
18✔
1366
                // TODO
18✔
1367
                // note: :there is no way to count StatusChangeNotifications in opcua yet.
18✔
1368
            }
18✔
1369
        }
914✔
1370
    }
913✔
1371
    /**
137✔
1372
     *  _publish_pending_notifications send a "notification" event:
137✔
1373
     *
137✔
1374
     * @private
137✔
1375
     *
137✔
1376
     * precondition
137✔
1377
     *     - pendingPublishRequestCount > 0
137✔
1378
     */
137✔
1379
    public _publish_pending_notifications(): void {
137✔
1380
        const publishEngine = this.publishEngine;
913✔
1381
        // c8 ignore next
913✔
1382
        if (!publishEngine) {
913!
1383
            throw new Error("publishEngine is null");
×
1384
        }
×
1385
        const subscriptionId = this.id;
913✔
1386
        // preconditions
913✔
1387
        assert(publishEngine.pendingPublishRequestCount > 0);
913✔
1388
        assert(this.hasPendingNotifications);
913✔
1389

913✔
1390
        const notificationMessage = this._popNotificationToSend();
913✔
1391
        if (notificationMessage.notificationData?.length === 0) {
913!
1392
            return; // nothing to do
×
1393
        }
×
1394
        const moreNotifications = this.hasPendingNotifications;
913✔
1395

913✔
1396
        this.emit("notification", notificationMessage);
913✔
1397
        // Update counters ....
913✔
1398
        this._updateCounters(notificationMessage);
913✔
1399

913✔
1400
        assert(Object.hasOwn(notificationMessage, "sequenceNumber"));
913✔
1401
        assert(Object.hasOwn(notificationMessage, "notificationData"));
913✔
1402
        // update diagnostics
913✔
1403
        this.subscriptionDiagnostics.publishRequestCount += 1;
913✔
1404

913✔
1405
        const response = new PublishResponse({
913✔
1406
            moreNotifications,
913✔
1407
            notificationMessage: {
913✔
1408
                notificationData: notificationMessage.notificationData,
913✔
1409
                sequenceNumber: this._get_next_sequence_number()
913✔
1410
            },
913✔
1411
            subscriptionId
913✔
1412
        });
913✔
1413

913✔
1414
        this._sent_notification_messages.push(response.notificationMessage);
913✔
1415

913✔
1416
        // get available sequence number;
913✔
1417
        const availableSequenceNumbers = this.getAvailableSequenceNumbers();
913✔
1418
        assert(
913✔
1419
            !response.notificationMessage ||
913✔
1420
            availableSequenceNumbers[availableSequenceNumbers.length - 1] === response.notificationMessage.sequenceNumber
913✔
1421
        );
913✔
1422
        response.availableSequenceNumbers = availableSequenceNumbers;
913✔
1423

913✔
1424
        publishEngine._send_response(this, response);
913✔
1425

913✔
1426
        this.messageSent = true;
913✔
1427

913✔
1428
        this.subscriptionDiagnostics.unacknowledgedMessageCount++;
913✔
1429

913✔
1430
        this.resetLifeTimeAndKeepAliveCounters();
913✔
1431

913✔
1432
        // c8 ignore next
913✔
1433
        if (doDebug) {
913!
1434
            debugLog(
×
1435
                "Subscription sending a notificationMessage subscriptionId=",
×
1436
                subscriptionId,
×
1437
                "sequenceNumber = ",
×
1438
                notificationMessage.sequenceNumber.toString(),
×
1439
                notificationMessage.notificationData?.map((x) => x?.constructor.name).join(" ")
×
1440
            );
×
1441
            // debugLog(notificationMessage.toString());
×
1442
        }
×
1443

913✔
1444
        if (this.state !== SubscriptionState.CLOSED) {
913✔
1445
            assert((notificationMessage.notificationData?.length || 0) > 0, "We are not expecting a keep-alive message here");
907!
1446
            this.state = SubscriptionState.NORMAL;
907✔
1447
            debugLog(`subscription ${this.id}${chalk.bgYellow(" set to NORMAL")}`);
907✔
1448
        }
907✔
1449
    }
913✔
1450

137✔
1451
    public process_subscription(): void {
137✔
1452
        assert((this.publishEngine?.pendingPublishRequestCount || 0) > 0);
1,461!
1453

1,461✔
1454
        if (!this.publishingEnabled) {
1,461✔
1455
            // no publish to do, except keep alive
101✔
1456
            debugLog("    -> no publish to do, except keep alive");
101✔
1457
            this._process_keepAlive();
101✔
1458
            return;
101✔
1459
        }
101✔
1460

1,447✔
1461
        if (!this.hasPendingNotifications && this.hasUncollectedMonitoredItemNotifications) {
1,461✔
1462
            // collect notification from monitored items
879✔
1463
            this._harvestMonitoredItems();
879✔
1464
        }
879✔
1465

1,447✔
1466
        // let process them first
1,447✔
1467
        if (this.hasPendingNotifications) {
1,461✔
1468
            this._publish_pending_notifications();
895✔
1469

895✔
1470
            if (
895✔
1471
                this.state === SubscriptionState.NORMAL ||
895!
1472
                (this.state === SubscriptionState.LATE && this.hasPendingNotifications)
×
1473
            ) {
895✔
1474
                // c8 ignore next
895✔
1475
                if (doDebug) {
895!
1476
                    debugLog("    -> pendingPublishRequestCount > 0 " + "&& normal state => re-trigger tick event immediately ");
×
1477
                }
×
1478

895✔
1479
                // let process an new publish request
895✔
1480
                setImmediate(this._tick.bind(this));
895✔
1481
            }
895✔
1482
        } else {
1,461✔
1483
            this._process_keepAlive();
465✔
1484
        }
465✔
1485
    }
1,461✔
1486

137✔
1487
    private _process_keepAlive() {
137✔
1488
        this.increaseKeepAliveCounter();
4,852✔
1489

4,852✔
1490
        if (this.keepAliveCounterHasExpired) {
4,852✔
1491
            debugLog(`     ->  _process_keepAlive => keepAliveCounterHasExpired`);
674✔
1492
            if (this._sendKeepAliveResponse()) {
674✔
1493
                this.resetLifeTimeAndKeepAliveCounters();
634✔
1494
            } else {
674✔
1495
                debugLog(
40✔
1496
                    "     -> subscription.state === LATE , " +
40✔
1497
                    "because keepAlive Response cannot be send due to lack of PublishRequest"
40✔
1498
                );
40✔
1499
                if (this.messageSent || this.keepAliveCounterHasExpired) {
40✔
1500
                    this.state = SubscriptionState.LATE;
40✔
1501
                }
40✔
1502
            }
40✔
1503
        }
674✔
1504
    }
4,852✔
1505

137✔
1506
    private _stop_timer() {
137✔
1507
        if (this.timerId) {
578✔
1508
            debugLog(chalk.bgWhite.blue("Subscription#_stop_timer subscriptionId="), this.id);
578✔
1509
            clearInterval(this.timerId);
578✔
1510
            this.timerId = null;
578✔
1511
        }
578✔
1512
    }
578✔
1513

137✔
1514
    private _start_timer({ firstTime }: { firstTime: boolean }) {
137✔
1515
        debugLog(
578✔
1516
            chalk.bgWhite.blue("Subscription#_start_timer  subscriptionId="),
578✔
1517
            this.id,
578✔
1518
            " publishingInterval = ",
578✔
1519
            this.publishingInterval
578✔
1520
        );
578✔
1521

578✔
1522
        assert(this.timerId === null);
578✔
1523
        // from the spec:
578✔
1524
        // When a Subscription is created, the first Message is sent at the end of the first publishing cycle to
578✔
1525
        // inform the Client that the Subscription is operational. A NotificationMessage is sent if there are
578✔
1526
        // Notifications ready to be reported. If there are none, a keep-alive Message is sent instead that
578✔
1527
        // contains a sequence number of 1, indicating that the first NotificationMessage has not yet been sent.
578✔
1528
        // This is the only time a keep-alive Message is sent without waiting for the maximum keep-alive count
578✔
1529
        // to be reached, as specified in (f) above.
578✔
1530

578✔
1531
        // make sure that a keep-alive Message will be send at the end of the first publishing cycle
578✔
1532
        // if there are no Notifications ready.
578✔
1533
        this._keep_alive_counter = this.maxKeepAliveCount - 1;
578✔
1534

578✔
1535
        if (firstTime) {
578✔
1536
            assert(this.messageSent === false);
570✔
1537
            assert(this.state === SubscriptionState.CREATING);
570✔
1538
        }
570✔
1539

578✔
1540
        assert(this.publishingInterval >= Subscription.minimumPublishingInterval);
578✔
1541
        this.timerId = setInterval(this._tick.bind(this), this.publishingInterval);
578✔
1542
    }
578✔
1543

137✔
1544
    private _get_future_sequence_number(): number {
137✔
1545
        return this._sequence_number_generator ? this._sequence_number_generator.future() : 0;
1,674!
1546
    }
1,674✔
1547
    public get futureSequenceNumber(): number {
137✔
1548
        return this._get_future_sequence_number();
1,000✔
1549
    }
1,000✔
1550
    // counter
137✔
1551
    private _get_next_sequence_number(): number {
137✔
1552
        return this._sequence_number_generator ? this._sequence_number_generator.next() : 0;
916!
1553
    }
916✔
1554
    public get nextSequenceNumber(): number {
137✔
1555
        return this._get_next_sequence_number();
3✔
1556
    }
3✔
1557

137✔
1558
    /**
137✔
1559
     * @private
137✔
1560
     */
137✔
1561
    private _tick() {
137✔
1562
        // c8 ignore next
6,958✔
1563
        if (doDebug) {
6,958!
1564
            debugLog(`Subscription#_tick id ${this.id} aborted=${this.aborted} state=${SubscriptionState[this.state]}`);
×
1565
        }
×
1566
        if (this.state === SubscriptionState.CLOSED) {
6,958✔
1567
            warningLog(`Warning: Subscription#_tick id ${this.id}  called while subscription is CLOSED`);
2✔
1568
            return;
2✔
1569
        }
2✔
1570

6,956✔
1571
        this.discardOldSentNotifications();
6,956✔
1572

6,956✔
1573
        // c8 ignore next
6,956✔
1574
        if (doDebug) {
6,958!
1575
            debugLog(
×
1576
                `${t(new Date())}  ${this._life_time_counter}/${this.lifeTimeCount}${chalk.cyan("   Subscription#_tick")}`,
×
1577
                "  processing subscriptionId=",
×
1578
                this.id,
×
1579
                "hasUncollectedMonitoredItemNotifications = ",
×
1580
                this.hasUncollectedMonitoredItemNotifications,
×
1581
                " publishingIntervalCount =",
×
1582
                this.publishIntervalCount
×
1583
            );
×
1584
        }
×
1585

6,956✔
1586
        // give a chance to the publish engine to cancel timed out publish requests
6,956✔
1587
        this.publishEngine?._on_tick();
6,958✔
1588

6,958✔
1589
        this.publishIntervalCount += 1;
6,958✔
1590

6,958✔
1591
        if (this.state === SubscriptionState.LATE) {
6,958✔
1592
            this.increaseLifeTimeCounter();
1,677✔
1593
        }
1,677✔
1594

6,956✔
1595
        if (this.lifeTimeHasExpired) {
6,958✔
1596
            /* c8 ignore next */
2✔
1597
            doDebug && debugLog(chalk.red.bold(`Subscription ${this.id} has expired !!!!! => Terminating`));
2✔
1598

15✔
1599
            /**
15✔
1600
             * notify the subscription owner that the subscription has expired by exceeding its life time.
15✔
1601
             * @event expired
15✔
1602
             *
15✔
1603
             */
15✔
1604
            this.emit("expired");
15✔
1605

15✔
1606
            // notify new terminated status only when subscription has timeout.
15✔
1607
            doDebug && debugLog("adding StatusChangeNotification notification message for BadTimeout subscription = ", this.id);
15!
1608
            this._addNotificationMessage(new StatusChangeNotification({ status: StatusCodes.BadTimeout }));
15✔
1609

15✔
1610
            // kill timer and delete monitored items and transfer pending notification messages
15✔
1611
            this.terminate();
15✔
1612

15✔
1613
            return;
15✔
1614
        }
15✔
1615

6,951✔
1616
        const publishEngine = this.publishEngine;
6,951✔
1617
        if (!publishEngine) {
6,958!
1618
            throw new Error("publishEngine is null");
×
1619
        }
×
1620

6,951✔
1621
        // c8 ignore next
6,951✔
1622
        doDebug && debugLog("Subscription#_tick  self._pending_notifications= ", this._pending_notifications.size);
6,958!
1623

6,958✔
1624
        if (
6,958✔
1625
            publishEngine.pendingPublishRequestCount === 0 &&
6,958✔
1626
            (this.hasPendingNotifications || this.hasUncollectedMonitoredItemNotifications)
2,514✔
1627
        ) {
6,958✔
1628
            // c8 ignore next
545✔
1629
            doDebug &&
545!
1630
                debugLog(
×
1631
                    "subscription set to LATE  hasPendingNotifications = ",
×
1632
                    this.hasPendingNotifications,
×
1633
                    " hasUncollectedMonitoredItemNotifications =",
×
1634
                    this.hasUncollectedMonitoredItemNotifications
×
1635
                );
310✔
1636

545✔
1637
            this.state = SubscriptionState.LATE;
545✔
1638
            return;
545✔
1639
        }
545✔
1640

6,641✔
1641
        if (publishEngine.pendingPublishRequestCount > 0) {
6,958✔
1642
            if (this.hasPendingNotifications) {
4,427✔
1643
                // simply pop pending notification and send it
34✔
1644
                this.process_subscription();
34✔
1645
            } else if (this.hasUncollectedMonitoredItemNotifications) {
4,427✔
1646
                this.process_subscription();
903✔
1647
            } else {
4,393✔
1648
                this._process_keepAlive();
3,490✔
1649
            }
3,490✔
1650
        } else {
6,958✔
1651
            if (this.state !== SubscriptionState.LATE) {
1,969✔
1652
                this._process_keepAlive();
796✔
1653
            } else {
1,969✔
1654
                this.resetKeepAliveCounter();
1,173✔
1655
            }
1,173✔
1656
        }
1,969✔
1657
    }
6,958✔
1658

137✔
1659
    /**
137✔
1660
     * @private
137✔
1661
     */
137✔
1662
    private _sendKeepAliveResponse(): boolean {
137✔
1663
        const future_sequence_number = this._get_future_sequence_number();
674✔
1664

674✔
1665
        if (this.publishEngine?.send_keep_alive_response(this.id, future_sequence_number)) {
674✔
1666
            this.messageSent = true;
634✔
1667
            // c8 ignore next
634✔
1668
            doDebug &&
634!
1669
                debugLog(
×
1670
                    `    -> Subscription#_sendKeepAliveResponse subscriptionId ${this.id} future_sequence_number ${future_sequence_number}`
×
1671
                );
552✔
1672
            /**
634✔
1673
             * notify the subscription owner that a keepalive message has to be sent.
634✔
1674
             * @event keepalive
634✔
1675
             *
634✔
1676
             */
634✔
1677
            this.emit("keepalive", future_sequence_number);
634✔
1678
            this.state = SubscriptionState.KEEPALIVE;
634✔
1679

634✔
1680
            return true;
634✔
1681
        }
634✔
1682
        return false;
122✔
1683
    }
122✔
1684

137✔
1685
    /**
137✔
1686
     * Reset the Lifetime Counter Variable to the value specified for the lifetime of a Subscription in
137✔
1687
     * the CreateSubscription Service( 5.13.2).
137✔
1688
     * @private
137✔
1689
     */
137✔
1690
    private resetKeepAliveCounter(): void {
137✔
1691
        this._keep_alive_counter = 0;
4,149✔
1692

4,149✔
1693
        // c8 ignore next
4,149✔
1694
        doDebug &&
4,149!
1695
            debugLog(
×
1696
                "     -> subscriptionId",
×
1697
                this.id,
×
1698
                " Resetting keepAliveCounter = ",
×
1699
                this._keep_alive_counter,
×
1700
                this.maxKeepAliveCount
×
1701
            );
2,840✔
1702
    }
4,149✔
1703

137✔
1704
    /**
137✔
1705
     * @private
137✔
1706
     */
137✔
1707
    private increaseKeepAliveCounter() {
137✔
1708
        this._keep_alive_counter += 1;
4,852✔
1709

4,852✔
1710
        // c8 ignore next
4,852✔
1711
        doDebug &&
4,852!
1712
            debugLog(
×
1713
                "     -> subscriptionId",
×
1714
                this.id,
×
1715
                " Increasing keepAliveCounter = ",
×
1716
                this._keep_alive_counter,
×
1717
                this.maxKeepAliveCount
×
1718
            );
3,014✔
1719
    }
4,852✔
1720

137✔
1721
    /**
137✔
1722
     * @private
137✔
1723
     */
137✔
1724
    private _addNotificationMessage(notificationData: QueueItem | StatusChangeNotification, monitoredItemId?: number) {
137✔
1725
        // c8 ignore next
20,257✔
1726
        doDebug && debugLog(chalk.yellow("Subscription#_addNotificationMessage"), notificationData.toString());
20,257!
1727

20,257✔
1728
        this._pending_notifications.push({
20,257✔
1729
            monitoredItemId,
20,257✔
1730
            notification: notificationData,
20,257✔
1731
            publishTime: new Date(),
20,257✔
1732
            start_tick: this.publishIntervalCount
20,257✔
1733
        });
20,257✔
1734
    }
20,257✔
1735

137✔
1736
    /**
137✔
1737
     * @internal
137✔
1738
     * @param monitoredItemId
137✔
1739
     */
137✔
1740
    private _removePendingNotificationsFor(monitoredItemId: number) {
137✔
1741
        const nbRemovedNotification = this._pending_notifications.filterOut((e) => e.monitoredItemId === monitoredItemId);
14,809✔
1742
        doDebug && debugLog(`Removed ${nbRemovedNotification} notifications`);
14,809!
1743
    }
14,809✔
1744
    /**
137✔
1745
     * Extract the next Notification that is ready to be sent to the client.
137✔
1746
     * @return the Notification to send._pending_notifications
137✔
1747
     */
137✔
1748
    private _popNotificationToSend(): NotificationMessage {
137✔
1749
        assert(this._pending_notifications.size > 0);
913✔
1750

913✔
1751
        const notificationMessage = new NotificationMessage({
913✔
1752
            sequenceNumber: 0xffffffff,
913✔
1753
            notificationData: [],
913✔
1754
            publishTime: new Date()
913✔
1755
        }); //
913✔
1756

913✔
1757
        const dataChangeNotifications: DataChangeNotification = new DataChangeNotification({
913✔
1758
            monitoredItems: []
913✔
1759
        });
913✔
1760
        const eventNotificationList: EventNotificationList = new EventNotificationList({
913✔
1761
            events: []
913✔
1762
        });
913✔
1763

913✔
1764
        let statusChangeNotification: StatusChangeNotification | undefined;
913✔
1765

913✔
1766
        let i = 0;
913✔
1767
        let hasEventFieldList = 0;
913✔
1768
        let hasMonitoredItemNotification = 0;
913✔
1769
        const m = this.maxNotificationsPerPublish;
913✔
1770
        while (i < m && this._pending_notifications.size > 0) {
913✔
1771
            if (hasEventFieldList || hasMonitoredItemNotification) {
17,563✔
1772
                const notification1 = this._pending_notifications.first()?.notification;
16,650✔
1773
                if (notification1 instanceof StatusChangeNotification) {
16,650!
1774
                    break;
×
1775
                }
×
1776
            }
16,650✔
1777
            const notification = this._pending_notifications.shift()?.notification;
17,563✔
1778
            if (notification instanceof MonitoredItemNotification) {
17,563✔
1779
                assert(notification.clientHandle !== 4294967295);
17,451✔
1780
                dataChangeNotifications.monitoredItems?.push(notification);
17,451✔
1781
                hasMonitoredItemNotification = 1;
17,451✔
1782
            } else if (notification instanceof EventFieldList) {
17,563✔
1783
                eventNotificationList.events?.push(notification);
94✔
1784
                hasEventFieldList = 1;
94✔
1785
            } else if (notification instanceof StatusChangeNotification) {
112✔
1786
                // to do
18✔
1787
                statusChangeNotification = notification;
18✔
1788
                break;
18✔
1789
            }
18✔
1790
            i += 1;
17,550✔
1791
        }
17,545✔
1792

913✔
1793
        if (dataChangeNotifications.monitoredItems?.length) {
913✔
1794
            notificationMessage.notificationData?.push(dataChangeNotifications);
846✔
1795
        }
846✔
1796
        if (eventNotificationList.events?.length) {
913✔
1797
            notificationMessage.notificationData?.push(eventNotificationList);
50✔
1798
        }
50✔
1799
        if (statusChangeNotification) {
913✔
1800
            notificationMessage.notificationData?.push(statusChangeNotification);
18✔
1801
        }
18✔
1802
        return notificationMessage;
913✔
1803
    }
913✔
1804

137✔
1805
    /**
137✔
1806
     * discardOldSentNotification find all sent notification message that have expired keep-alive
137✔
1807
     * and destroy them.
137✔
1808
     * @private
137✔
1809
     *
137✔
1810
     * Subscriptions maintain a retransmission queue of sent  NotificationMessages.
137✔
1811
     * NotificationMessages are retained in this queue until they are acknowledged or until they have
137✔
1812
     * been in the queue for a minimum of one keep-alive interval.
137✔
1813
     *
137✔
1814
     */
137✔
1815
    private discardOldSentNotifications() {
137✔
1816
        // Sessions maintain a retransmission queue of sent NotificationMessages. NotificationMessages
6,956✔
1817
        // are retained in this queue until they are acknowledged. The Session shall maintain a
6,956✔
1818
        // retransmission queue size of at least two times the number of Publish requests per Session the
6,956✔
1819
        // Server supports.  Clients are required to acknowledge NotificationMessages as they are received. In the
6,956✔
1820
        // case of a retransmission queue overflow, the oldest sent NotificationMessage gets deleted. If a
6,956✔
1821
        // Subscription is transferred to another Session, the queued NotificationMessages for this
6,956✔
1822
        // Subscription are moved from the old to the new Session.
6,956✔
1823
        if (maxNotificationMessagesInQueue <= this._sent_notification_messages.length) {
6,956!
1824
            doDebug && debugLog("discardOldSentNotifications = ", this._sent_notification_messages.length);
×
1825
            this._sent_notification_messages.splice(this._sent_notification_messages.length - maxNotificationMessagesInQueue);
×
1826
        }
×
1827
    }
6,956✔
1828

137✔
1829
    /**
137✔
1830
     * @param timestampsToReturn
137✔
1831
     * @param monitoredItemCreateRequest
137✔
1832
     * @param node
137✔
1833
     * @private
137✔
1834
     */
137✔
1835
    private _createMonitoredItemStep2(
137✔
1836
        timestampsToReturn: TimestampsToReturn,
14,778✔
1837
        monitoredItemCreateRequest: MonitoredItemCreateRequest,
14,778✔
1838
        node: BaseNode
14,778✔
1839
    ): MonitoredItemCreateResult {
14,778✔
1840
        // note : most of the parameter inconsistencies shall have been handled by the caller
14,778✔
1841
        // any error here will raise an assert here
14,778✔
1842

14,778✔
1843
        assert(monitoredItemCreateRequest instanceof MonitoredItemCreateRequest);
14,778✔
1844
        const itemToMonitor = monitoredItemCreateRequest.itemToMonitor;
14,778✔
1845

14,778✔
1846
        // xx check if attribute Id invalid (we only support Value or EventNotifier )
14,778✔
1847
        // xx assert(itemToMonitor.attributeId !== AttributeIds.INVALID);
14,778✔
1848

14,778✔
1849
        this.monitoredItemIdCounter += 1;
14,778✔
1850

14,778✔
1851
        const monitoredItemId = getNextMonitoredItemId();
14,778✔
1852

14,778✔
1853
        const requestedParameters = monitoredItemCreateRequest.requestedParameters;
14,778✔
1854

14,778✔
1855
        // adjust requestedParameters.samplingInterval
14,778✔
1856
        requestedParameters.samplingInterval = this.adjustSamplingInterval(requestedParameters.samplingInterval, node);
14,778✔
1857

14,778✔
1858
        // reincorporate monitoredItemId and itemToMonitor into the requestedParameters
14,778✔
1859
        const options = requestedParameters as unknown as MonitoredItemOptions;
14,778✔
1860

14,778✔
1861
        options.monitoredItemId = monitoredItemId;
14,778✔
1862
        options.itemToMonitor = itemToMonitor;
14,778✔
1863

14,778✔
1864
        const monitoredItem = new MonitoredItem(options);
14,778✔
1865
        monitoredItem.timestampsToReturn = timestampsToReturn;
14,778✔
1866
        monitoredItem.$subscription = this;
14,778✔
1867

14,778✔
1868
        assert(monitoredItem.monitoredItemId === monitoredItemId);
14,778✔
1869

14,778✔
1870
        this.monitoredItems[monitoredItemId] = monitoredItem;
14,778✔
1871
        this.globalCounter.totalMonitoredItemCount += 1;
14,778✔
1872

14,778✔
1873
        assert(monitoredItem.clientHandle !== 4294967295);
14,778✔
1874

14,778✔
1875
        const filterResult = _process_filter(node, requestedParameters.filter);
14,778✔
1876

14,778✔
1877
        const monitoredItemCreateResult = new MonitoredItemCreateResult({
14,778✔
1878
            filterResult,
14,778✔
1879
            monitoredItemId,
14,778✔
1880
            revisedQueueSize: monitoredItem.queueSize,
14,778✔
1881
            revisedSamplingInterval: monitoredItem.samplingInterval,
14,778✔
1882
            statusCode: StatusCodes.Good
14,778✔
1883
        });
14,778✔
1884

14,778✔
1885
        // this.emit("monitoredItem", monitoredItem, itemToMonitor);
14,778✔
1886
        return monitoredItemCreateResult;
14,778✔
1887
    }
14,778✔
1888

137✔
1889
    /**
137✔
1890
     *
137✔
1891
     * @param monitoredItem
137✔
1892
     * @param monitoredItemCreateRequest
137✔
1893
     * @private
137✔
1894
     */
137✔
1895
    public _createMonitoredItemStep3(
137✔
1896
        monitoredItem: MonitoredItem | null,
14,778✔
1897
        monitoredItemCreateRequest: MonitoredItemCreateRequest
14,778✔
1898
    ): void {
14,778✔
1899
        if (!monitoredItem) {
14,778!
1900
            return;
×
1901
        }
×
1902
        assert(monitoredItem.monitoringMode === MonitoringMode.Invalid);
14,778✔
1903
        assert(typeof monitoredItem.samplingFunc === "function", " expecting a sampling function here");
14,778✔
1904
        const monitoringMode = monitoredItemCreateRequest.monitoringMode; // Disabled, Sampling, Reporting
14,778✔
1905
        monitoredItem.setMonitoringMode(monitoringMode);
14,778✔
1906
    }
14,778✔
1907

137✔
1908
    public _harvestMonitoredItems() {
137✔
1909
        for (const monitoredItem of Object.values(this.monitoredItems)) {
907✔
1910
            const notifications_chunks = monitoredItem.extractMonitoredItemNotifications();
21,363✔
1911
            for (const chunk of notifications_chunks) {
21,363✔
1912
                this._addNotificationMessage(chunk, monitoredItem.monitoredItemId);
20,230✔
1913
            }
20,230✔
1914
        }
21,363✔
1915
        this._hasUncollectedMonitoredItemNotifications = false;
907✔
1916
    }
907✔
1917
}
137✔
1918

2✔
1919
assert(Subscription.maximumPublishingInterval < 2147483647, "maximumPublishingInterval cannot exceed (2**31-1) ms ");
2!
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc