• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

node-opcua / node-opcua / 17921016002

22 Sep 2025 03:55PM UTC coverage: 90.756% (+0.01%) from 90.742%
17921016002

Pull #1458

github

web-flow
Merge 618d884c6 into 0a94e8bf3
Pull Request #1458: Fast restart leak

11092 of 14094 branches covered (78.7%)

44 of 53 new or added lines in 3 files covered. (83.02%)

4 existing lines in 3 files now uncovered.

30169 of 33242 relevant lines covered (90.76%)

367020.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.22
/packages/node-opcua-client/source/private/client_publish_engine.ts
1
/**
2
 * @module node-opcua-client-private
3
 */
4
import chalk from "chalk";
6✔
5
import { getMinOPCUADate } from "node-opcua-date-time";
6✔
6
import { assert } from "node-opcua-assert";
6✔
7
import { checkDebugFlag, make_debugLog, make_warningLog } from "node-opcua-debug";
6✔
8
import { PublishRequest, PublishResponse } from "node-opcua-service-subscription";
6✔
9

10
import { ClientSession, SubscriptionId } from "../client_session";
11
import { ClientSubscription } from "../client_subscription";
12
import { ClientSessionImpl } from "../private/client_session_impl";
13
import { ClientSubscriptionImpl } from "./client_subscription_impl";
14

15
const debugLog = make_debugLog(__filename);
6✔
16
const doDebug = checkDebugFlag(__filename);
6✔
17
const warningLog = make_warningLog(__filename);
6✔
18
/**
19
 * A client side implementation to deal with publish service.
20
 *
21
 * @class ClientSidePublishEngine
22
 * The ClientSidePublishEngine encapsulates the mechanism to
23
 * deal with a OPCUA Server and constantly sending PublishRequest
24
 * The ClientSidePublishEngine also performs  notification acknowledgements.
25
 * Finally, ClientSidePublishEngine dispatch PublishResponse to the correct
26
 * Subscription id callback
27
 *
28
 * @private
29
 */
30
export class ClientSidePublishEngine {
6✔
31
    public static publishRequestCountInPipeline = 5;
6✔
32
    public timeoutHint: number;
33
    public activeSubscriptionCount: number;
34
    public nbPendingPublishRequests: number;
35
    public nbMaxPublishRequestsAcceptedByServer: number;
36
    public isSuspended: boolean;
37
    public session: ClientSession | null;
38
    private subscriptionAcknowledgements: any[];
39

40
    /**
41
     * @internal
42
     * @private
43
     */
44
    readonly subscriptionMap: { [key: number]: ClientSubscriptionImpl };
45

46
    public lastRequestSentTime: Date = getMinOPCUADate();
401✔
47

48
    constructor(session: ClientSession) {
49
        this.session = session;
401✔
50
        this.subscriptionAcknowledgements = [];
401✔
51
        this.subscriptionMap = {};
401✔
52

53
        this.timeoutHint = 10000; // 10 s by default
401✔
54

55
        this.activeSubscriptionCount = 0;
401✔
56

57
        // number of pending Publish request sent to the server and awaited for being processed by the server
58
        this.nbPendingPublishRequests = 0;
401✔
59

60
        // the maximum number of publish requests we think that the server can queue.
61
        // we will adjust this value .
62
        this.nbMaxPublishRequestsAcceptedByServer = 1000;
401✔
63

64
        this.isSuspended = false;
401✔
65

66
        assert(this.session, "Session must exist");
401✔
67
    }
68

69
    /**
70
     * the number of active subscriptions managed by this publish engine.
71
     * @property subscriptionCount
72
     * @type {Number}
73
     */
74
    get subscriptionCount(): number {
75
        return Object.keys(this.subscriptionMap).length;
1,657✔
76
    }
77

78
    public suspend(suspendedState: boolean): void {
79
        if (this.isSuspended === suspendedState) {
69✔
80
            // nothing to do ...
81
            return;
9✔
82
        }
83
        this.isSuspended = suspendedState;
60✔
84
        if (!this.isSuspended) {
60✔
85
            this.replenish_publish_request_queue();
23✔
86
        }
87
    }
88

89
    public acknowledge_notification(subscriptionId: SubscriptionId, sequenceNumber: number): void {
90
        this.subscriptionAcknowledgements.push({ subscriptionId, sequenceNumber });
716✔
91
    }
92

93
    public cleanup_acknowledgment_for_subscription(subscriptionId: SubscriptionId): void {
94
        this.subscriptionAcknowledgements = this.subscriptionAcknowledgements.filter((a) => a.subscriptionId !== subscriptionId);
×
95
    }
96

97
    /**
98
     * @private
99
     */
100
    public send_publish_request(): void {
101
        if (this.isSuspended) {
5,571✔
102
            return;
143✔
103
        }
104

105
        if (this.nbPendingPublishRequests >= this.nbMaxPublishRequestsAcceptedByServer) {
5,428✔
106
            return;
10✔
107
        }
108
        const session = this.session as ClientSessionImpl;
5,418✔
109
        if (session && !session.isChannelValid()) {
5,418✔
110
            // wait for channel  to be valid
111
            setTimeout(() => {
1,653✔
112
                if (this.subscriptionCount) {
1,653!
113
                    this.send_publish_request();
1,653✔
114
                }
115
            }, 100);
116
        } else {
117
            setImmediate(() => {
3,765✔
118
                if (!this.session || this.isSuspended) {
3,765✔
119
                    // session has been terminated or suspended
120
                    return;
6✔
121
                }
122
                this.internalSendPublishRequest();
3,759✔
123
            });
124
        }
125
    }
126

127
    /**
128
     * @private
129
     */
130
    public terminate(): void {
131
        debugLog("Terminated ClientPublishEngine ");
395✔
132
        this.session = null;
395✔
133
    }
134

135
    /**
136
     * @private
137
     */
138
    public registerSubscription(subscription: ClientSubscription): void {
139
        debugLog("ClientSidePublishEngine#registerSubscription ", subscription.subscriptionId);
423✔
140

141
        const _subscription = subscription as ClientSubscriptionImpl;
423✔
142
        assert(arguments.length === 1);
423✔
143
        assert(isFinite(subscription.subscriptionId));
423✔
144
        assert(!Object.prototype.hasOwnProperty.call(this.subscriptionMap, subscription.subscriptionId)); // already registered ?
423✔
145
        assert(typeof _subscription.onNotificationMessage === "function");
423✔
146
        assert(isFinite(subscription.timeoutHint));
423✔
147

148
        this.activeSubscriptionCount += 1;
423✔
149
        this.subscriptionMap[subscription.subscriptionId] = _subscription;
423✔
150

151
        this.timeoutHint = Math.min(Math.max(this.timeoutHint, subscription.timeoutHint), 0x7ffffff);
423✔
152

153
        debugLog("                       setting timeoutHint = ", this.timeoutHint, subscription.timeoutHint);
423✔
154

155
        this.replenish_publish_request_queue();
423✔
156
    }
157

158
    /**
159
     * @private
160
     */
161
    public replenish_publish_request_queue(): void {
162
        // Spec 1.03 part 4 5.13.5 Publish
163
        // [..] in high latency networks, the Client may wish to pipeline Publish requests
164
        // to ensure cyclic reporting from the Server. Pipe-lining involves sending more than one Publish
165
        // request for each Subscription before receiving a response. For example, if the network introduces a
166
        // delay between the Client and the Server of 5 seconds and the publishing interval for a Subscription
167
        // is one second, then the Client will have to issue Publish requests every second instead of waiting for
168
        // a response to be received before sending the next request.
169
        this.send_publish_request();
446✔
170
        // send more than one publish request to server to cope with latency
171
        for (let i = 0; i < ClientSidePublishEngine.publishRequestCountInPipeline - 1; i++) {
446✔
172
            this.send_publish_request();
1,780✔
173
        }
174
    }
175

176
    /**
177
     *
178
     * @param subscriptionId
179
     * @private
180
     */
181
    public unregisterSubscription(subscriptionId: SubscriptionId): void {
182
        debugLog("ClientSidePublishEngine#unregisterSubscription ", subscriptionId);
314✔
183

184
        assert(isFinite(subscriptionId) && subscriptionId > 0);
314✔
185
        this.activeSubscriptionCount -= 1;
314✔
186
        // note : it is possible that we get here while the server has already requested
187
        //        a session shutdown ... in this case it is possible that subscriptionId is already
188
        //        removed
189
        if (Object.prototype.hasOwnProperty.call(this.subscriptionMap, subscriptionId)) {
314!
190
            delete this.subscriptionMap[subscriptionId];
314✔
191
        } else {
UNCOV
192
            debugLog("ClientSidePublishEngine#unregisterSubscription cannot find subscription  ", subscriptionId);
×
193
        }
194
    }
195

196
    public getSubscriptionIds(): SubscriptionId[] {
197
        return Object.keys(this.subscriptionMap).map((a) => parseInt(a, 10));
23✔
198
    }
199

200
    /***
201
     * get the client subscription from Id
202
     */
203
    public getSubscription(subscriptionId: SubscriptionId): ClientSubscription {
204
        assert(isFinite(subscriptionId) && subscriptionId > 0);
28✔
205
        assert(Object.prototype.hasOwnProperty.call(this.subscriptionMap, subscriptionId));
28✔
206
        return this.subscriptionMap[subscriptionId];
28✔
207
    }
208

209
    public hasSubscription(subscriptionId: SubscriptionId): boolean {
210
        assert(isFinite(subscriptionId) && subscriptionId > 0);
14✔
211
        return Object.prototype.hasOwnProperty.call(this.subscriptionMap, subscriptionId);
14✔
212
    }
213

214
    public internalSendPublishRequest(): void {
215
        assert(this.session, "ClientSidePublishEngine terminated ?");
3,843✔
216

217
        this.nbPendingPublishRequests += 1;
3,843✔
218

219
        debugLog(chalk.yellow("sending publish request "), this.nbPendingPublishRequests);
3,843✔
220

221
        const subscriptionAcknowledgements = this.subscriptionAcknowledgements;
3,843✔
222
        this.subscriptionAcknowledgements = [];
3,843✔
223

224
        // as started in the spec (Spec 1.02 part 4 page 81 5.13.2.2 Function DequeuePublishReq())
225
        // the server will dequeue the PublishRequest  in first-in first-out order
226
        // and will validate if the publish request is still valid by checking the timeoutHint in the RequestHeader.
227
        // If the request timed out, the server will send a BadTimeout service result for the request and de-queue
228
        // another publish request.
229
        //
230
        // in Part 4. page 144 Request Header the timeoutHint is described this way.
231
        // timeoutHint UInt32 This timeout in milliseconds is used in the Client side Communication Stack to
232
        //                    set the timeout on a per-call base.
233
        //                    For a Server this timeout is only a hint and can be used to cancel long running
234
        //                    operations to free resources. If the Server detects a timeout, he can cancel the
235
        //                    operation by sending the Service result BadTimeout. The Server should wait
236
        //                    at minimum the timeout after he received the request before cancelling the operation.
237
        //                    The value of 0 indicates no timeout.
238
        // In issue#40 (MonitoredItem on changed not fired), we have found that some server might wrongly interpret
239
        // the timeoutHint of the request header ( and will bang a BadTimeout regardless if client send timeoutHint=0)
240
        // as a work around here , we force the timeoutHint to be set to a suitable value.
241
        //
242
        // see https://github.com/node-opcua/node-opcua/issues/141
243
        // This suitable value shall be at least the time between two keep alive signal that the server will send.
244
        // (i.e revisedLifetimeCount * revisedPublishingInterval)
245

246
        // also ( part 3 - Release 1.03 page 140)
247
        // The Server shall check the timeoutHint parameter of a PublishRequest before processing a PublishResponse.
248
        // If the request timed out, a BadTimeout Service result is sent and another PublishRequest is used.
249
        // The value of 0 indicates no timeout
250

251
        // in our case:
252

253
        assert(this.nbPendingPublishRequests > 0);
3,843✔
254
        const calculatedTimeout = Math.min(0x7fffffff, this.nbPendingPublishRequests * this.timeoutHint);
3,843✔
255

256
        const publishRequest = new PublishRequest({
3,843✔
257
            requestHeader: { timeoutHint: calculatedTimeout }, // see note
258
            subscriptionAcknowledgements
259
        });
260

261
        let active = true;
3,843✔
262

263
        const session = this.session! as ClientSessionImpl;
3,843✔
264
        session.publish(publishRequest, (err: Error | null, response?: PublishResponse) => {
3,843✔
265
            this.nbPendingPublishRequests -= 1;
3,803✔
266

267
            this.lastRequestSentTime = new Date();
3,803✔
268

269
            if (err) {
3,803✔
270
                debugLog(
2,070✔
271
                    chalk.cyan("ClientSidePublishEngine.prototype.internalSendPublishRequest callback : "),
272
                    chalk.yellow(err.message)
273
                );
274
                debugLog("'" + err.message + "'");
2,070✔
275

276
                if (err.message.match("not connected")) {
2,070!
277
                    debugLog(chalk.bgWhite.red(" WARNING :  CLIENT IS NOT CONNECTED :" + " MAY BE RECONNECTION IS IN PROGRESS"));
×
278
                    debugLog("this.activeSubscriptionCount =", this.activeSubscriptionCount);
×
279
                    // the previous publish request has ended up with an error because
280
                    // the connection has failed ...
281
                    // There is no need to send more publish request for the time being until reconnection is completed
282
                    active = false;
×
283
                }
284
                // istanbul ignore next
285
                if (err.message.match(/BadNoSubscription/) && this.activeSubscriptionCount >= 1) {
286
                    // there is something wrong happening here.
287
                    // the server tells us that there is no subscription for this session
288
                    // but the client have some active subscription left.
289
                    // This could happen if the client has missed or not received the StatusChange Notification
290
                    debugLog(chalk.bgWhite.red(" WARNING: server tells that there is no Subscription, but client disagree"));
291
                    debugLog("this.activeSubscriptionCount =", this.activeSubscriptionCount);
292
                    active = false;
293
                }
294

295
                if (err.message.match(/BadSessionClosed|BadSessionIdInvalid/)) {
2,070✔
296
                    //
297
                    // server has closed the session ....
298
                    // may be the session timeout is shorted than the subscription life time
299
                    // and the client does not send intermediate keepAlive request to keep the connection working.
300
                    //
301
                    debugLog(chalk.bgWhite.red(" WARNING : Server tells that the session has closed ..."));
584✔
302
                    debugLog(
584✔
303
                        "   the ClientSidePublishEngine shall now be disabled," + " as server will reject any further request"
304
                    );
305
                    // close all active subscription....
306
                    active = false;
584✔
307
                }
308
                if (err.message.match(/BadTooManyPublishRequests/)) {
2,070✔
309
                    // preventing queue overflow
310
                    // -------------------------
311
                    //   if the client send too many publish requests that the server can queue, the server returns
312
                    //   a Service result of BadTooManyPublishRequests.
313
                    //
314
                    //   let adjust the nbMaxPublishRequestsAcceptedByServer value so we never overflow the server
315
                    //   with extraneous publish requests in the future.
316
                    //
317
                    this.nbMaxPublishRequestsAcceptedByServer = Math.min(
5✔
318
                        this.nbPendingPublishRequests,
319
                        this.nbMaxPublishRequestsAcceptedByServer
320
                    );
321
                    active = false;
5✔
322
                    if (this.nbPendingPublishRequests < 10) {
5!
323
                        warningLog(chalk.bgWhite.red(" warning : server tells that too many publish request has been send ..."));
×
324
                        warningLog(" On our side nbPendingPublishRequests = ", this.nbPendingPublishRequests);
×
325
                        warningLog(" => nbMaxPublishRequestsAcceptedByServer =", this.nbMaxPublishRequestsAcceptedByServer);
×
326
                    }
327
                }
328
            } else {
329
                // istanbul ignore next
330
                if (doDebug) {
331
                    debugLog(chalk.cyan("ClientSidePublishEngine.prototype.internalSendPublishRequest callback "));
332
                }
333
                this._receive_publish_response(response!);
1,733✔
334
            }
335

336
            // feed the server with a new publish Request to the server
337
            if (!this.isSuspended && active && this.activeSubscriptionCount > 0) {
3,803✔
338
                if (err && err.message.match(/Connection Break/)) {
1,732✔
339
                    // do not renew when connection is broken
340
                } else {
341
                    this.send_publish_request();
1,692✔
342
                }
343
            }
344
        });
345
    }
346

347
    private _receive_publish_response(response: PublishResponse) {
348
        debugLog(chalk.yellow("receive publish response"));
1,733✔
349

350
        // the id of the subscription sending the notification message
351
        const subscriptionId = response.subscriptionId;
1,733✔
352

353
        // the sequence numbers available in this subscription
354
        // for retransmission and not acknowledged by the client
355
        // -- var available_seq = response.availableSequenceNumbers;
356

357
        // has the server more notification for us ?
358
        // -- var moreNotifications = response.moreNotifications;
359

360
        const notificationMessage = response.notificationMessage;
1,733✔
361
        //  notificationMessage.sequenceNumber
362
        //  notificationMessage.publishTime
363
        //  notificationMessage.notificationData[]
364

365
        notificationMessage.notificationData = notificationMessage.notificationData || [];
1,733!
366

367
        if (notificationMessage.notificationData.length !== 0) {
1,733✔
368
            this.acknowledge_notification(subscriptionId, notificationMessage.sequenceNumber);
719✔
369
        }
370
        // else {
371
        // this is a keep-alive notification
372
        // in this case , we shall not acknowledge notificationMessage.sequenceNumber
373
        // which is only an information of what will be the future sequenceNumber.
374
        // }
375

376
        const subscription = this.subscriptionMap[subscriptionId];
1,733✔
377

378
        if (subscription && this.session !== null) {
1,733✔
379
            try {
1,723✔
380
                // delegate notificationData to the subscription callback
381
                subscription.onNotificationMessage(notificationMessage);
1,723✔
382
            } catch (err) {
383
                // istanbul ignore next
384
                if (doDebug) {
385
                    debugLog(err);
386
                    debugLog("Exception in onNotificationMessage");
387
                }
388
            }
389
        } else {
390
            debugLog(" ignoring notificationMessage", notificationMessage, " for subscription", subscriptionId);
10✔
391
            debugLog(" because there is no subscription.");
10✔
392
            debugLog(" or because there is no session for the subscription (session terminated ?).");
10✔
393
        }
394
    }
395
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc