• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

RobotWebTools / rclnodejs / 23230092275

18 Mar 2026 05:08AM UTC coverage: 85.995% (+0.03%) from 85.961%
23230092275

push

github

minggangw
Add waitForMessage utility for one-shot message reception (#1444)

Adds `waitForMessage()`, the rclnodejs equivalent of rclpy's `wait_for_message(msg_type, node, topic, time_to_wait)`. This utility creates a temporary subscription, waits for the first message to arrive on a topic, and returns it as a Promise. The temporary subscription is always cleaned up — on success, timeout, or error. Supports an optional timeout and QoS profile. The node must be spinning before calling this function.

**New files:**

- `lib/wait_for_message.js` — Core implementation. Creates a temporary subscription with a one-shot callback that resolves the Promise on first message. Uses a `settled` guard to prevent double-resolution. Cleanup runs unconditionally via the `settle()` helper. Accepts optional `{ timeout, qos }` options.
- `test/test-wait-for-message.js` — 7 tests covering: successful message reception, timeout rejection, first-message-only semantics, different message types (String, Int32), indefinite wait without timeout, subscription cleanup after receiving, and subscription cleanup on timeout.

**Modified files:**

- `index.js` — Added `require('./lib/wait_for_message.js')` and re-exported as `waitForMessage` property on the module.
- `types/index.d.ts` — Added `WaitForMessageOptions` interface and generic `waitForMessage<T>(typeClass, node, topic, options?)` function declaration.

Fix: #1443

1477 of 1866 branches covered (79.15%)

Branch coverage included in aggregate %.

29 of 32 new or added lines in 1 file covered. (90.63%)

59 existing lines in 3 files now uncovered.

3036 of 3382 relevant lines covered (89.77%)

458.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.7
/lib/parameter_watcher.js
1
// Copyright (c) 2025 Mahmoud Alghalayini. All rights reserved.
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14

15
'use strict';
16

17
const EventEmitter = require('events');
26✔
18
const { TypeValidationError, OperationError } = require('./errors');
26✔
19
const { normalizeNodeName } = require('./utils');
26✔
20
const ParameterEventHandler = require('./parameter_event_handler.js');
26✔
21
const debug = require('debug')('rclnodejs:parameter_watcher');
26✔
22

23
/**
24
 * @class ParameterWatcher - Watches parameter changes on a remote node
25
 *
26
 * Subscribes to /parameter_events and emits 'change' events when
27
 * watched parameters on the target node are modified.
28
 *
29
 * @extends EventEmitter
30
 */
31
class ParameterWatcher extends EventEmitter {
32
  #node;
33
  #paramClient;
34
  #eventHandler;
35
  #watchedParams;
36
  #remoteNodeName;
37
  #destroyed;
38

39
  /**
40
   * Create a ParameterWatcher instance.
41
   * Note: Use node.createParameterWatcher() instead of calling this directly.
42
   *
43
   * @param {object} node - The local rclnodejs Node instance
44
   * @param {string} remoteNodeName - Name of the remote node to watch
45
   * @param {string[]} parameterNames - Array of parameter names to watch
46
   * @param {object} [options] - Options for the parameter client
47
   * @param {number} [options.timeout=5000] - Default timeout for parameter operations
48
   * @hideconstructor
49
   */
50
  constructor(node, remoteNodeName, parameterNames, options = {}) {
×
51
    super();
57✔
52

53
    if (!node || typeof node.createParameterClient !== 'function') {
57!
UNCOV
54
      throw new TypeValidationError('node', node, 'Node instance', {
×
55
        entityType: 'parameter watcher',
56
      });
57
    }
58

59
    if (typeof remoteNodeName !== 'string' || remoteNodeName.trim() === '') {
57✔
60
      throw new TypeValidationError(
2✔
61
        'remoteNodeName',
62
        remoteNodeName,
63
        'non-empty string',
64
        {
65
          entityType: 'parameter watcher',
66
        }
67
      );
68
    }
69

70
    if (!Array.isArray(parameterNames) || parameterNames.length === 0) {
55✔
71
      throw new TypeValidationError(
2✔
72
        'parameterNames',
73
        parameterNames,
74
        'non-empty array',
75
        {
76
          entityType: 'parameter watcher',
77
        }
78
      );
79
    }
80

81
    this.#node = node;
53✔
82
    this.#watchedParams = new Set(parameterNames);
53✔
83
    this.#paramClient = node.createParameterClient(remoteNodeName, options);
53✔
84
    // Cache the remote node name for error messages (in case paramClient is destroyed)
85
    this.#remoteNodeName = this.#paramClient.remoteNodeName;
53✔
86
    this.#eventHandler = null;
53✔
87
    this.#destroyed = false;
53✔
88

89
    debug(
53✔
90
      'Created ParameterWatcher for node=%s, params=%o',
91
      remoteNodeName,
92
      parameterNames
93
    );
94
  }
95

96
  /**
97
   * Get the remote node name being watched.
98
   * @type {string}
99
   * @readonly
100
   */
101
  get remoteNodeName() {
102
    return this.#remoteNodeName;
128✔
103
  }
104

105
  /**
106
   * Get the list of watched parameter names.
107
   * @type {string[]}
108
   * @readonly
109
   */
110
  get watchedParameters() {
111
    return Array.from(this.#watchedParams);
14✔
112
  }
113

114
  /**
115
   * Start watching for parameter changes.
116
   * Waits for the remote node's parameter services and subscribes to parameter events.
117
   *
118
   * @param {number} [timeout=5000] - Timeout in milliseconds to wait for services
119
   * @returns {Promise<boolean>} Resolves to true when watching has started
120
   * @throws {Error} If the watcher has been destroyed
121
   */
122
  async start(timeout = 5000) {
1✔
123
    this.#checkNotDestroyed();
46✔
124

125
    debug('Starting ParameterWatcher for node=%s', this.remoteNodeName);
45✔
126

127
    const available = await this.#paramClient.waitForService(timeout);
45✔
128

129
    if (!available) {
45✔
130
      debug(
4✔
131
        'Parameter services not available for node=%s',
132
        this.remoteNodeName
133
      );
134
      return false;
4✔
135
    }
136

137
    if (!this.#eventHandler) {
41!
138
      this.#eventHandler = new ParameterEventHandler(this.#node);
41✔
139
      this.#eventHandler.addParameterEventCallback((event) =>
41✔
140
        this.#handleParameterEvent(event)
15✔
141
      );
142

143
      debug('Subscribed to /parameter_events via ParameterEventHandler');
41✔
144
    }
145

146
    return true;
41✔
147
  }
148

149
  /**
150
   * Get current values of all watched parameters.
151
   *
152
   * @param {object} [options] - Options for the parameter client
153
   * @param {number} [options.timeout] - Timeout in milliseconds
154
   * @param {AbortSignal} [options.signal] - AbortSignal for cancellation
155
   * @returns {Promise<Parameter[]>} Array of Parameter objects
156
   * @throws {Error} If the watcher has been destroyed
157
   */
158
  async getCurrentValues(options) {
159
    this.#checkNotDestroyed();
6✔
160
    return await this.#paramClient.getParameters(
5✔
161
      Array.from(this.#watchedParams),
162
      options
163
    );
164
  }
165

166
  /**
167
   * Add a parameter name to the watch list.
168
   *
169
   * @param {string} name - Parameter name to watch
170
   * @throws {TypeError} If name is not a string
171
   * @throws {Error} If the watcher has been destroyed
172
   */
173
  addParameter(name) {
174
    this.#checkNotDestroyed();
9✔
175

176
    if (typeof name !== 'string' || name.trim() === '') {
7✔
177
      throw new TypeValidationError('name', name, 'non-empty string', {
2✔
178
        entityType: 'parameter watcher',
179
        entityName: this.remoteNodeName,
180
      });
181
    }
182

183
    const wasAdded = !this.#watchedParams.has(name);
5✔
184
    this.#watchedParams.add(name);
5✔
185

186
    if (wasAdded) {
5✔
187
      debug('Added parameter to watch list: %s', name);
3✔
188
    }
189
  }
190

191
  /**
192
   * Remove a parameter name from the watch list.
193
   *
194
   * @param {string} name - Parameter name to stop watching
195
   * @returns {boolean} True if the parameter was in the watch list
196
   * @throws {Error} If the watcher has been destroyed
197
   */
198
  removeParameter(name) {
199
    this.#checkNotDestroyed();
6✔
200

201
    const wasRemoved = this.#watchedParams.delete(name);
4✔
202

203
    if (wasRemoved) {
4✔
204
      debug('Removed parameter from watch list: %s', name);
3✔
205
    }
206

207
    return wasRemoved;
4✔
208
  }
209

210
  /**
211
   * Check if the watcher has been destroyed.
212
   *
213
   * @returns {boolean} True if destroyed
214
   */
215
  isDestroyed() {
216
    return this.#destroyed;
46✔
217
  }
218

219
  /**
220
   * Destroy the watcher and clean up resources.
221
   * Unsubscribes from parameter events and destroys the parameter client.
222
   */
223
  destroy() {
224
    if (this.#destroyed) {
105✔
225
      return;
52✔
226
    }
227

228
    debug('Destroying ParameterWatcher for node=%s', this.remoteNodeName);
53✔
229

230
    if (this.#eventHandler) {
53✔
231
      try {
41✔
232
        this.#eventHandler.destroy();
41✔
233
      } catch (error) {
234
        debug('Error destroying event handler: %s', error.message);
×
235
      }
236
      this.#eventHandler = null;
41✔
237
    }
238

239
    if (this.#paramClient) {
53!
240
      try {
53✔
241
        this.#node.destroyParameterClient(this.#paramClient);
53✔
242
      } catch (error) {
243
        debug('Error destroying parameter client: %s', error.message);
×
244
      }
245
      this.#paramClient = null;
53✔
246
    }
247

248
    this.removeAllListeners();
53✔
249

250
    this.#destroyed = true;
53✔
251
  }
252

253
  /**
254
   * Handle parameter event from /parameter_events topic.
255
   * @private
256
   */
257
  #handleParameterEvent(event) {
258
    if (normalizeNodeName(event.node) !== this.remoteNodeName) {
15✔
259
      return;
8✔
260
    }
261

262
    const relevantChanges = [];
7✔
263

264
    if (event.new_parameters) {
7!
265
      const newParams = event.new_parameters.filter((p) =>
7✔
266
        this.#watchedParams.has(p.name)
×
267
      );
268
      relevantChanges.push(...newParams);
7✔
269
    }
270

271
    if (event.changed_parameters) {
7!
272
      const changedParams = event.changed_parameters.filter((p) =>
7✔
273
        this.#watchedParams.has(p.name)
7✔
274
      );
275
      relevantChanges.push(...changedParams);
7✔
276
    }
277

278
    if (event.deleted_parameters) {
7!
279
      const deletedParams = event.deleted_parameters.filter((p) =>
7✔
280
        this.#watchedParams.has(p.name)
×
281
      );
282
      relevantChanges.push(...deletedParams);
7✔
283
    }
284

285
    if (relevantChanges.length > 0) {
7✔
286
      debug(
5✔
287
        'Parameter change detected: %o',
288
        relevantChanges.map((p) => p.name)
5✔
289
      );
290
      this.emit('change', relevantChanges);
5✔
291
    }
292
  }
293

294
  /**
295
   * Check if the watcher has been destroyed and throw if so.
296
   * @private
297
   */
298
  #checkNotDestroyed() {
299
    if (this.#destroyed) {
67✔
300
      throw new OperationError('ParameterWatcher has been destroyed', {
6✔
301
        code: 'WATCHER_DESTROYED',
302
        entityType: 'parameter watcher',
303
        entityName: this.remoteNodeName,
304
      });
305
    }
306
  }
307
}
308

309
module.exports = ParameterWatcher;
26✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc