• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-opcua / #38

05 Aug 2024 11:22PM UTC coverage: 88.415% (-0.2%) from 88.606%
#38

push

lorenzo-gomez-windhover
-Fix action null check

1 of 1 new or added line in 1 file covered. (100.0%)

68 existing lines in 1 file now uncovered.

664 of 751 relevant lines covered (88.42%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.2
/src/main/java/com/windhoverlabs/yamcs/opcua/OPCUALink.java
1
/****************************************************************************
2
 *
3
 *   Copyright (c) 2024 Windhover Labs, L.L.C. All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 *
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in
13
 *    the documentation and/or other materials provided with the
14
 *    distribution.
15
 * 3. Neither the name Windhover Labs nor the names of its
16
 *    contributors may be used to endorse or promote products derived
17
 *    from this software without specific prior written permission.
18
 *
19
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22
 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23
 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
26
 * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
 * POSSIBILITY OF SUCH DAMAGE.
31
 *
32
 *****************************************************************************/
33

34
package com.windhoverlabs.yamcs.opcua;
35

36
import static com.google.common.collect.Lists.newArrayList;
37
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
38
import static org.yamcs.xtce.NameDescription.qualifiedName;
39

40
import com.google.gson.JsonObject;
41
import java.io.BufferedWriter;
42
import java.io.IOException;
43
import java.nio.file.Files;
44
import java.nio.file.Paths;
45
import java.nio.file.StandardOpenOption;
46
import java.time.Instant;
47
import java.time.temporal.ChronoUnit;
48
import java.util.ArrayList;
49
import java.util.Arrays;
50
import java.util.HashMap;
51
import java.util.HashSet;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Objects;
55
import java.util.Set;
56
import java.util.concurrent.CompletableFuture;
57
import java.util.concurrent.ConcurrentHashMap;
58
import java.util.concurrent.ExecutionException;
59
import java.util.concurrent.atomic.AtomicLong;
60
import java.util.function.Supplier;
61
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
62
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
63
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
64
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
65
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
66
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
67
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
68
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
69
import org.eclipse.milo.opcua.stack.core.AttributeId;
70
import org.eclipse.milo.opcua.stack.core.Identifiers;
71
import org.eclipse.milo.opcua.stack.core.UaException;
72
import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
73
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
74
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
75
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
76
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
77
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
78
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
79
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
80
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
81
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
82
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
83
import org.eclipse.milo.opcua.stack.core.types.enumerated.IdType;
84
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
85
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
86
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
87
import org.eclipse.milo.opcua.stack.core.types.structured.BrowsePath;
88
import org.eclipse.milo.opcua.stack.core.types.structured.BrowsePathResult;
89
import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
90
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
91
import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
92
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
93
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
94
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
95
import org.eclipse.milo.opcua.stack.core.types.structured.RelativePath;
96
import org.eclipse.milo.opcua.stack.core.types.structured.RelativePathElement;
97
import org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
98
import org.eclipse.milo.opcua.stack.core.types.structured.TranslateBrowsePathsToNodeIdsResponse;
99
import org.slf4j.Logger;
100
import org.slf4j.LoggerFactory;
101
import org.yamcs.ConfigurationException;
102
import org.yamcs.Spec;
103
import org.yamcs.Spec.OptionType;
104
import org.yamcs.StandardTupleDefinitions;
105
import org.yamcs.ValidationException;
106
import org.yamcs.YConfiguration;
107
import org.yamcs.YamcsServer;
108
import org.yamcs.http.NotFoundException;
109
import org.yamcs.mdb.XtceAssembler;
110
import org.yamcs.parameter.ParameterValue;
111
import org.yamcs.parameter.SystemParametersProducer;
112
import org.yamcs.parameter.SystemParametersService;
113
import org.yamcs.protobuf.Event.EventSeverity;
114
import org.yamcs.protobuf.Yamcs.NamedObjectId;
115
import org.yamcs.protobuf.Yamcs.Value.Type;
116
import org.yamcs.tctm.AbstractLink;
117
import org.yamcs.tctm.Link;
118
import org.yamcs.tctm.LinkAction;
119
import org.yamcs.utils.ValueUtility;
120
import org.yamcs.xtce.BooleanParameterType;
121
import org.yamcs.xtce.EnumeratedParameterType;
122
import org.yamcs.xtce.FloatParameterType;
123
import org.yamcs.xtce.IntegerParameterType;
124
import org.yamcs.xtce.NameDescription;
125
import org.yamcs.xtce.Parameter;
126
import org.yamcs.xtce.ParameterType;
127
import org.yamcs.xtce.SpaceSystem;
128
import org.yamcs.xtce.StringParameterType;
129
import org.yamcs.xtce.XtceDb;
130
import org.yamcs.yarch.DataType;
131
import org.yamcs.yarch.Stream;
132
import org.yamcs.yarch.Tuple;
133
import org.yamcs.yarch.TupleDefinition;
134
import org.yamcs.yarch.YarchDatabase;
135
import org.yamcs.yarch.YarchDatabaseInstance;
136
import org.yamcs.yarch.protobuf.Db.Event;
137

138
/**
139
 * Implementation of the OPCUA protocol as a YAMCS link. Maps configured nodes(see docs for details)
140
 * to yamcs PVs and subscribes to OPCUA variables for realtime updates.
141
 *
142
 * @author Lorenzo Gomez
143
 */
144
public class OPCUALink extends AbstractLink implements Runnable, SystemParametersProducer {
1✔
145

146
  class NodeIDAttrPair {
147
    NodeId nodeID;
148
    AttributeId attrID;
149

150
    public NodeIDAttrPair(NodeId newNodeID, AttributeId newAttrID) {
1✔
151
      this.nodeID = newNodeID;
1✔
152
      this.attrID = newAttrID;
1✔
153
    }
1✔
154

155
    public int hashCode() {
156
      return Objects.hash(this.nodeID, this.attrID);
1✔
157
    }
158

159
    public boolean equals(Object obj) {
160
      return (this.hashCode() == obj.hashCode());
1✔
161
    }
162
  }
163

164
  class NodePath {
1✔
165
    String path;
166
    HashMap<Object, Object> rootNodeID = new HashMap<Object, Object>();
1✔
167
  }
168

169
  /** Useful status for tracking initialization status of the link. */
170
  public enum OPCUAINITStatus {
1✔
171
    OPCUA_INIT_CONFIG,
1✔
172
    OPCUA_INIT_TREE,
1✔
173
    OPCUA_INIT_TREE_FAILED,
1✔
174
    OPCUA_INIT_GENERATE_XTCE,
1✔
175
    OPCUA_INIT_EVENTS,
1✔
176
    OPCUA_INIT_DATA_SUBSCRIPTION,
1✔
177
    OPCUA_INIT_ALL_DATA_QUERY,
1✔
178
    OPCUA_INIT_OK
1✔
179
  }
180

181
  /* Configuration Defaults */
182
  static final String STREAM_NAME = "opcua_params";
183

184
  /* Internal member attributes. */
185
  protected Thread thread;
186
  private String opcuaStreamName;
187
  private String parametersNamespace;
188
  XtceDb mdb;
189
  Stream opcuaStream;
190
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
1✔
191
  private ManagedSubscription opcuaSubscription;
192

193
  private static final Logger internalLogger = LoggerFactory.getLogger(OPCUALink.class.getName());
1✔
194

195
  /**
196
   * @note ALWAYS re-use params as org.yamcs.parameter.ParameterRequestManager.param2RequestMap uses
197
   *     the object inside a map that was added to the mdb for the very fist time. If when
198
   *     publishing the PV, we create a new VariableParam object clients will NOT receive real-time
199
   *     updates as the new object VariableParam inside the new PV won't match the one inside
200
   *     org.yamcs.parameter.ParameterRequestManager.param2RequestMap since the object hashes do not
201
   *     match (since VariableParam does not override its hash function).
202
   */
203
  private ConcurrentHashMap<NodeIDAttrPair, VariableParam> nodeIDToParamsMap =
1✔
204
      new ConcurrentHashMap<NodeIDAttrPair, VariableParam>();
205

206
  private OpcUaClient client;
207

208
  protected AtomicLong inCount = new AtomicLong(0);
1✔
209

210
  private Status linkStatus = Status.OK;
1✔
211

212
  /* Configuration Parameters */
213

214
  private String discoverURL;
215
  private String endpointURL;
216
  private boolean queryAllNodesAtStartup;
217
  private String outputFile;
218
  private int publishInterval; // milliseconds
219

220
  private ArrayList<NodePath> relativeNodePaths = new ArrayList<NodePath>();
1✔
221

222
  private final AtomicLong clientHandles = new AtomicLong(1L);
1✔
223

224
  /* System parameters*/
225

226
  private Parameter OPCUAInitStatusParam;
227
  private OPCUAINITStatus currentOPCUAStatus;
228
  private Parameter OPCUAActiveSubsParam;
229
  private AtomicLong OPCUAActiveSubs = new AtomicLong(0);
1✔
230

231
  LinkAction startAction =
1✔
232
      new LinkAction("query_all", "Query All OPCUA Server Data") {
1✔
233
        @Override
234
        public JsonObject execute(Link link, JsonObject jsonObject) {
235

236
          internalLogger.info("Executing query_all action");
1✔
237
          CompletableFuture.supplyAsync(
1✔
238
                  (Supplier<Integer>)
239
                      () -> {
240
                        queryAllOPCUAData();
1✔
241

242
                        return 0;
1✔
243
                      })
244
              .whenComplete(
1✔
245
                  (vaue, e) -> {
246
                    internalLogger.info("query_all action Complete");
1✔
247
                  });
1✔
248

249
          return jsonObject;
1✔
250
        }
251
      };
252

253
  public OPCUAINITStatus getCurrentOPCUAStatus() {
254
    return currentOPCUAStatus;
1✔
255
  }
256

257
  @Override
258
  public Spec getSpec() {
259
    Spec spec = new Spec();
1✔
260

261
    /* Define our configuration parameters. */
262
    spec.addOption("name", OptionType.STRING).withRequired(true);
1✔
263
    spec.addOption("class", OptionType.STRING).withRequired(true);
1✔
264
    spec.addOption("opcuaStream", OptionType.STRING).withRequired(true);
1✔
265
    spec.addOption("endpointUrl", OptionType.STRING).withRequired(true);
1✔
266
    spec.addOption("discoveryUrl", OptionType.STRING).withRequired(true);
1✔
267
    spec.addOption("xtceOutputFile", OptionType.STRING).withRequired(true);
1✔
268
    spec.addOption("parametersNamespace", OptionType.STRING).withRequired(true);
1✔
269
    spec.addOption("publishInterval", OptionType.INTEGER).withRequired(true);
1✔
270
    spec.addOption("queryAllNodesAtStartup", OptionType.BOOLEAN).withRequired(false);
1✔
271

272
    Spec rootNodeIDSpec = new Spec();
1✔
273

274
    rootNodeIDSpec.addOption("namespaceIndex", OptionType.INTEGER).withRequired(true);
1✔
275
    rootNodeIDSpec.addOption("identifier", OptionType.STRING).withRequired(true);
1✔
276
    rootNodeIDSpec.addOption("identifierType", OptionType.STRING).withRequired(true);
1✔
277

278
    Spec nodePathSpec = new Spec();
1✔
279
    nodePathSpec.addOption("path", OptionType.STRING);
1✔
280
    nodePathSpec
1✔
281
        .addOption("rootNodeID", OptionType.MAP)
1✔
282
        .withRequired(true)
1✔
283
        .withSpec(rootNodeIDSpec);
1✔
284

285
    spec.addOption("nodePaths", OptionType.LIST)
1✔
286
        .withElementType(OptionType.MAP)
1✔
287
        .withRequired(true)
1✔
288
        .withSpec(nodePathSpec);
1✔
289

290
    return spec;
1✔
291
  }
292

293
  @Override
294
  public void init(String yamcsInstance, String serviceName, YConfiguration config)
295
      throws ConfigurationException {
296
    super.init(yamcsInstance, serviceName, config);
1✔
297

298
    /* Local variables */
299
    this.config = config;
1✔
300
    /* Validate the configuration that the user passed us. */
301
    try {
302
      config = getSpec().validate(config);
1✔
303
    } catch (ValidationException e) {
×
304
      log.error("Failed configuration validation.", e);
×
305
      notifyFailed(e);
×
306
    }
1✔
307
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(yamcsInstance);
1✔
308

309
    this.opcuaStreamName = config.getString("opcuaStream");
1✔
310
    this.opcuaStream = getStream(ydb, opcuaStreamName);
1✔
311
    this.parametersNamespace = config.getString("parametersNamespace");
1✔
312
    this.mdb = YamcsServer.getServer().getInstance(yamcsInstance).getXtceDb();
1✔
313

314
    readOPCUAConfig(config);
1✔
315
    readNodePathsConfig(config);
1✔
316

317
    outputFile = config.getString("xtceOutputFile");
1✔
318
  }
1✔
319

320
  private void readOPCUAConfig(YConfiguration config) {
321
    this.endpointURL = config.getString("endpointUrl");
1✔
322
    this.discoverURL = config.getString("discoveryUrl");
1✔
323
    this.publishInterval = config.getInt("publishInterval");
1✔
324
    this.queryAllNodesAtStartup = config.getBoolean("queryAllNodesAtStartup", false);
1✔
325
  }
1✔
326

327
  private void readNodePathsConfig(YConfiguration config) {
328
    List<Map<Object, Object>> nodePaths = config.getList("nodePaths");
1✔
329

330
    for (Map<Object, Object> path : nodePaths) {
1✔
331
      NodePath nodePath = new NodePath();
1✔
332
      nodePath.path = (String) path.get("path");
1✔
333
      nodePath.rootNodeID = (HashMap<Object, Object>) path.get("rootNodeID");
1✔
334
      relativeNodePaths.add(nodePath);
1✔
335
    }
1✔
336
  }
1✔
337

338
  private static SpaceSystem verifySpaceSystem(XtceDb mdb, String pathName) {
339
    String namespace;
340
    String name;
341
    int lastSlash = pathName.lastIndexOf('/');
1✔
342
    if ("/".equals(pathName)) {
1✔
343
      namespace = "";
×
344
      name = "";
×
345
    } else if (lastSlash == -1 || lastSlash == pathName.length() - 1) {
1✔
346
      namespace = "";
×
347
      name = pathName;
×
348
    } else {
349
      namespace = pathName.substring(0, lastSlash);
1✔
350
      name = pathName.substring(lastSlash + 1);
1✔
351
    }
352

353
    // First try with a prefixed slash (should be the common case)
354
    NamedObjectId id =
355
        NamedObjectId.newBuilder().setNamespace("/" + namespace).setName(name).build();
1✔
356
    SpaceSystem spaceSystem = mdb.getSpaceSystem(id);
1✔
357
    if (spaceSystem != null) {
1✔
358
      return spaceSystem;
×
359
    }
360

361
    // Maybe some non-xtce namespace like MDB:OPS Name
362
    id = NamedObjectId.newBuilder().setNamespace(namespace).setName(name).build();
1✔
363
    spaceSystem = mdb.getSpaceSystem(id);
1✔
364
    if (spaceSystem != null) {
1✔
365
      return spaceSystem;
1✔
366
    }
367

368
    throw new NotFoundException("No such space system");
×
369
  }
370

371
  /**
372
   * Initializes all PV mappings to OPCUA nodes and realtime subscriptions(managed data items in
373
   * OPCUA terms).
374
   */
375
  private void opcuaInit() {
376
    try {
377

378
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_TREE;
1✔
379
      browseOPCUATree(client);
1✔
380
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_GENERATE_XTCE;
1✔
381
      exportXTCE();
1✔
382
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_EVENTS;
1✔
383
      subscribeToEvents(client);
1✔
384

385
    } catch (Exception e) {
1✔
386
      internalLogger.warn(e.toString());
1✔
387
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_TREE_FAILED;
1✔
388
      return;
1✔
389
    }
1✔
390
    try {
391
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_DATA_SUBSCRIPTION;
1✔
392
      createOPCUASubscriptions();
1✔
393
    } catch (Exception e) {
×
394
      internalLogger.warn(e.toString());
×
395
      return;
×
396
    }
1✔
397
    if (queryAllNodesAtStartup) {
1✔
398
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_ALL_DATA_QUERY;
×
399
      queryAllOPCUAData();
×
400
    }
401

402
    currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_OK;
1✔
403
  }
1✔
404

405
  private void exportXTCE() throws IOException {
406
    var spaceSystem = verifySpaceSystem(mdb, parametersNamespace);
1✔
407
    var xtce = new XtceAssembler().toXtce(mdb, spaceSystem.getQualifiedName(), fqn -> true);
1✔
408
    BufferedWriter writer = null;
1✔
409

410
    if (outputFile != null) {
1✔
411
      writer =
1✔
412
          Files.newBufferedWriter(
1✔
413
              Paths.get(outputFile),
1✔
414
              StandardOpenOption.CREATE,
415
              StandardOpenOption.TRUNCATE_EXISTING);
416

417
      writer.write(xtce);
1✔
418

419
      writer.flush();
1✔
420
      writer.close();
1✔
421
    }
422
  }
1✔
423

424
  private void opcuaClientConnect() throws Exception {
425
    client = configureClient();
1✔
426
    connectToOPCUAServer(client);
1✔
427
  }
1✔
428

429
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
430
    Stream stream = ydb.getStream(streamName);
1✔
431
    if (stream == null) {
1✔
432
      try {
433
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
1✔
434
      } catch (Exception e) {
×
435
        throw new ConfigurationException(e);
×
436
      }
1✔
437

438
      stream = ydb.getStream(streamName);
1✔
439
    }
440
    return stream;
1✔
441
  }
442

443
  @Override
444
  public void doDisable() {
445

446
    try {
447
      client.disconnect().get();
1✔
UNCOV
448
    } catch (InterruptedException | ExecutionException e) {
×
UNCOV
449
      internalLogger.warn(e.toString());
×
450
    }
1✔
451
    if (thread != null) {
1✔
452
      thread.interrupt();
1✔
453
    }
454

455
    linkStatus = Status.DISABLED;
1✔
456
  }
1✔
457

458
  @Override
459
  public void doEnable() {
460
    try {
461
      opcuaClientConnect();
1✔
UNCOV
462
    } catch (Exception e) {
×
UNCOV
463
      internalLogger.warn(e.toString());
×
UNCOV
464
      linkStatus = Status.FAILED;
×
UNCOV
465
      notifyFailed(e);
×
UNCOV
466
      return;
×
467
    }
1✔
468

469
    startAction.addChangeListener(
1✔
470
        () -> {
471
          /**
472
           * TODO:Might be useful if we want turn off any functionality when the action is disabled
473
           * for instance..
474
           */
UNCOV
475
        });
×
476

477
    /* Create and start the new thread. */
478
    thread = new Thread(this);
1✔
479
    thread.setName(this.getClass().getSimpleName() + "-" + linkName);
1✔
480
    thread.start();
1✔
481
    linkStatus = Status.OK;
1✔
482
  }
1✔
483

484
  @Override
485
  public String getDetailedStatus() {
486
    if (isDisabled()) {
1✔
487
      return String.format("DISABLED");
1✔
488
    } else {
489
      return String.format("OK, received %d packets", inCount.get());
1✔
490
    }
491
  }
492

493
  @Override
494
  public Status connectionStatus() {
495
    return linkStatus;
1✔
496
  }
497

498
  @Override
499
  protected void doStart() {
500
    if (!isDisabled()) {
1✔
501
      doEnable();
1✔
502
    }
503

504
    notifyStarted();
1✔
505
  }
1✔
506

507
  @Override
508
  protected void doStop() {
509
    try {
510
      client.disconnect().get();
1✔
UNCOV
511
    } catch (InterruptedException | ExecutionException e) {
×
UNCOV
512
      internalLogger.warn(e.toString());
×
513
    }
1✔
514
    if (thread != null) {
1✔
515
      thread.interrupt();
1✔
516
    }
517

518
    notifyStopped();
1✔
519
  }
1✔
520

521
  @Override
522
  public void run() {
523
    opcuaInit();
1✔
524
    /* Enter our main loop */
525
    while (isRunningAndEnabled()) {}
1✔
526
  }
1✔
527

528
  /**
529
   * Reads all attributes of all configured Value nodes and updates their corresponding PV. Useful
530
   * for querying data from the OPCUA server once, data such as browse names, NodeIds, etc.
531
   */
532
  private void queryAllOPCUAData() {
533
    TupleDefinition tdef = gftdef.copy();
1✔
534
    List<Object> cols = new ArrayList<>(4 + nodeIDToParamsMap.keySet().size());
1✔
535

536
    tdef = gftdef.copy();
1✔
537
    long gentime = timeService.getMissionTime();
1✔
538
    cols.add(gentime);
1✔
539
    cols.add(parametersNamespace);
1✔
540
    cols.add(0);
1✔
541
    cols.add(gentime);
1✔
542

543
    int columnCount = 0;
1✔
544

545
    Set<NodeId> nodeSet = new HashSet<NodeId>();
1✔
546
    /**
547
     * NOTE:This is super inefficient... The reason we collect these nodeIDs in a set is because
548
     * otherwise we will have redundant subscription(s) since there is more than 1 attribute per
549
     * nodeID given how nodeIDToParamsMap is designed
550
     */
551
    for (NodeIDAttrPair pair : nodeIDToParamsMap.keySet()) {
1✔
552
      nodeSet.add(pair.nodeID);
1✔
553
    }
1✔
554

555
    for (NodeId nId : nodeSet) {
1✔
556
      UaNode node;
557

558
      try {
559
        node = client.getAddressSpace().getNode(nId);
1✔
560

561
        DataValue nodeClass = node.readAttribute(AttributeId.NodeClass);
1✔
562

563
        switch (NodeClass.from((int) nodeClass.getValue().getValue())) {
1✔
564
          case Variable:
565
            for (AttributeId attr : AttributeId.VARIABLE_ATTRIBUTES) {
1✔
566
              VariableParam p = nodeIDToParamsMap.get(new NodeIDAttrPair(nId, attr));
1✔
567

568
              if (p.getParameterType() == null) {
1✔
UNCOV
569
                internalLogger.warn(
×
570
                    "{} ignored since it does not have a Parameter type",
571
                    p,
UNCOV
572
                    Character.toString(NameDescription.PATH_SEPARATOR));
×
UNCOV
573
                continue;
×
574
              }
575

576
              switch (p.getParameterType().getValueType()) {
1✔
577
                case BOOLEAN:
578
                  {
579
                    Boolean value = true;
1✔
580
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
581
                      //                      value = "NULL";
582
                    } else {
583
                      value = (Boolean) node.readAttribute(attr).getValue().getValue();
1✔
584
                    }
585

586
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
587
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value));
1✔
588
                  }
589
                  break;
1✔
590
                case DOUBLE:
591
                  {
592
                    Number value = 0;
1✔
593
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
594
                      internalLogger.warn("node {} has a Null variant.", node);
×
595
                    } else {
596
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
597
                    }
598

599
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
600
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.doubleValue()));
1✔
601
                  }
602
                  break;
1✔
603
                case FLOAT:
604
                  {
605
                    Number value = 0;
1✔
606
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
607
                      internalLogger.warn("node {} has a Null variant.", node);
×
608
                    } else {
609
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
610
                    }
611

612
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
613
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.floatValue()));
1✔
614
                  }
615
                  break;
1✔
616
                case SINT32:
617
                  {
618
                    Number value = 0;
1✔
619
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
620
                      internalLogger.warn("node {} has a Null variant.", node);
×
621
                    } else {
622
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
623
                    }
624

625
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
626
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.intValue()));
1✔
627
                  }
628
                  break;
1✔
629
                case SINT64:
630
                  {
631
                    Number value = 0;
1✔
632
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
633
                      internalLogger.warn("node {} has a Null variant.", node);
×
634
                    } else {
635
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
636
                    }
637

638
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
639
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.longValue()));
1✔
640
                  }
641
                  break;
1✔
642
                case STRING:
643
                  {
644
                    String value = "";
1✔
645
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
646
                      value = "NULL";
1✔
647
                    } else {
648
                      value = node.readAttribute(attr).getValue().getValue().toString();
1✔
649
                    }
650

651
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
652
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value));
1✔
653
                  }
654
                  break;
1✔
655
                case UINT32:
656
                  {
657
                    Number value = 0;
1✔
658
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
659
                      internalLogger.warn("node {} has a Null variant.", node);
×
660
                    } else {
661
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
662
                    }
663

664
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
665
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.longValue()));
1✔
666
                  }
667
                  break;
1✔
668
                case UINT64:
669
                  {
670
                    Number value = 0;
1✔
671
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
672
                      internalLogger.warn("node {} has a Null variant.", node);
×
673
                    } else {
674
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
675
                    }
676

677
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
678
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.longValue()));
1✔
679
                  }
680
                  break;
1✔
681
                default:
682
                  break;
683
              }
684

685
              log.debug("Pushing {} to stream", p.toString());
1✔
686

687
              columnCount++;
1✔
688
            }
1✔
689
            break;
1✔
690
          default:
691
            break;
692
        }
693

UNCOV
694
      } catch (UaException e) {
×
695
        // TODO Auto-generated catch block
UNCOV
696
        internalLogger.warn(e.toString());
×
UNCOV
697
        continue;
×
698
      }
1✔
699
    }
1✔
700

701
    pushTuple(tdef, cols);
1✔
702
    inCount.getAndAdd(columnCount);
1✔
703
  }
1✔
704

705
  private synchronized void pushTuple(TupleDefinition tdef, List<Object> cols) {
706
    Tuple t;
707
    t = new Tuple(tdef, cols);
1✔
708
    opcuaStream.emitTuple(t);
1✔
709
  }
1✔
710

711
  private static ParameterType getOrCreateType(
712
      XtceDb mdb, String name, Supplier<ParameterType.Builder<?>> supplier) {
713

714
    String fqn = XtceDb.YAMCS_SPACESYSTEM_NAME + NameDescription.PATH_SEPARATOR + name;
1✔
715
    ParameterType ptype = mdb.getParameterType(fqn);
1✔
716
    if (ptype != null) {
1✔
717
      return ptype;
1✔
718
    }
719
    ParameterType.Builder<?> typeb = supplier.get().setName(name);
1✔
720

721
    ptype = typeb.build();
1✔
722
    ((NameDescription) ptype).setQualifiedName(fqn);
1✔
723

724
    return mdb.addSystemParameterType(ptype);
1✔
725
  }
726

727
  public static ParameterType getBasicType(XtceDb mdb, Type type) {
728
    ParameterType pType = null;
1✔
729
    switch (type) {
1✔
730
      case BOOLEAN:
731
        return getOrCreateType(mdb, "boolean", () -> new BooleanParameterType.Builder());
1✔
732
      case STRING:
733
        return getOrCreateType(mdb, "string", () -> new StringParameterType.Builder());
1✔
734

735
      case FLOAT:
736
        return getOrCreateType(
1✔
737
            mdb, "float32", () -> new FloatParameterType.Builder().setSizeInBits(32));
1✔
738
      case DOUBLE:
739
        return getOrCreateType(
1✔
740
            mdb, "float64", () -> new FloatParameterType.Builder().setSizeInBits(64));
1✔
741
      case SINT32:
742
        return getOrCreateType(
1✔
743
            mdb,
744
            "sint32",
UNCOV
745
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(true));
×
746
      case SINT64:
747
        return getOrCreateType(
1✔
748
            mdb,
749
            "sint64",
750
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(true));
1✔
751
      case UINT32:
752
        return getOrCreateType(
1✔
753
            mdb,
754
            "uint32",
UNCOV
755
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(false));
×
756
      case UINT64:
757
        return getOrCreateType(
1✔
758
            mdb,
759
            "uint64",
UNCOV
760
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(false));
×
761
      default:
762
        break;
763
    }
764

UNCOV
765
    return pType;
×
766
  }
767

768
  public static ParameterValue getNewPv(Parameter parameter, long time) {
769
    ParameterValue pv = new ParameterValue(parameter);
1✔
770
    pv.setAcquisitionTime(time);
1✔
771
    pv.setGenerationTime(time);
1✔
772
    return pv;
1✔
773
  }
774

775
  public static ParameterValue getPV(Parameter parameter, long time, String v) {
776
    ParameterValue pv = getNewPv(parameter, time);
1✔
777
    pv.setEngValue(ValueUtility.getStringValue(v));
1✔
778
    return pv;
1✔
779
  }
780

781
  public static ParameterValue getPV(Parameter parameter, long time, double v) {
782
    ParameterValue pv = getNewPv(parameter, time);
1✔
783
    pv.setEngValue(ValueUtility.getDoubleValue(v));
1✔
784
    return pv;
1✔
785
  }
786

787
  public static ParameterValue getPV(Parameter parameter, long time, float v) {
788
    ParameterValue pv = getNewPv(parameter, time);
1✔
789
    pv.setEngValue(ValueUtility.getFloatValue(v));
1✔
790
    return pv;
1✔
791
  }
792

793
  public static ParameterValue getPV(Parameter parameter, long time, boolean v) {
794
    ParameterValue pv = getNewPv(parameter, time);
1✔
795
    pv.setEngValue(ValueUtility.getBooleanValue(v));
1✔
796
    return pv;
1✔
797
  }
798

799
  public static ParameterValue getPV(Parameter parameter, long time, long v) {
800
    ParameterValue pv = getNewPv(parameter, time);
1✔
801
    pv.setEngValue(ValueUtility.getSint64Value(v));
1✔
802
    return pv;
1✔
803
  }
804

805
  @Override
806
  public Status getLinkStatus() {
807
    return linkStatus;
1✔
808
  }
809

810
  @Override
811
  public boolean isDisabled() {
812
    return linkStatus == Status.DISABLED;
1✔
813
  }
814

815
  @Override
816
  public long getDataInCount() {
817
    return inCount.get();
1✔
818
  }
819

820
  @Override
821
  public long getDataOutCount() {
822
    return 0;
1✔
823
  }
824

825
  @Override
826
  public void resetCounters() {
827
    inCount.set(0);
1✔
828
  }
1✔
829

830
  /**
831
   * Selects first non-secured endpoint from endpoints found at discover URL. At the moment secured
832
   * endpoints are not supported.
833
   *
834
   * @return
835
   * @throws Exception
836
   */
837
  private OpcUaClient configureClient() throws Exception {
838

839
    List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(discoverURL).get();
1✔
840

841
    // At the moment, we do not support certificates.
842
    EndpointDescription selectedEndpoint = null;
1✔
843
    for (var endpoint : endpoints) {
1✔
844
      switch (endpoint.getSecurityMode()) {
1✔
845
        case Invalid:
846
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
847
          break;
×
848
        case None:
849
          selectedEndpoint = endpoint;
1✔
850
          break;
1✔
851
        case Sign:
UNCOV
852
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
UNCOV
853
          break;
×
854
        case SignAndEncrypt:
UNCOV
855
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
856
          break;
857
      }
858

859
      if (selectedEndpoint != null) {
1✔
860
        break;
1✔
861
      }
UNCOV
862
    }
×
863

864
    if (selectedEndpoint == null) {
1✔
UNCOV
865
      throw new Exception("No viable endpoint found from list:" + endpoints);
×
866
    }
867

868
    OpcUaClientConfig builder = OpcUaClientConfig.builder().setEndpoint(selectedEndpoint).build();
1✔
869

870
    return OpcUaClient.create(builder);
1✔
871
  }
872

873
  /**
874
   * Adds new PV with the name of node.
875
   *
876
   * @param client
877
   * @param node
878
   */
879
  private void addOPCUAPV(OpcUaClient client, UaNode node) {
880

881
    if (node.getBrowseName()
1✔
882
        .getName()
1✔
883
        .contains(Character.toString(NameDescription.PATH_SEPARATOR))) {
1✔
UNCOV
884
      internalLogger.info(
×
885
          "{} ignored since it contains a {} character",
UNCOV
886
          node.getBrowseName().getName(),
×
UNCOV
887
          Character.toString(NameDescription.PATH_SEPARATOR));
×
888

889
    } else {
890

891
      /**
892
       * NOTE:For now we'll just flatten all the attributes instead of using an aggregate type for
893
       * attributes
894
       */
895
      for (AttributeId attr : AttributeId.values()) {
1✔
896

897
        ParameterType ptype = OPCUAAttrTypeToParamType(attr, node);
1✔
898

899
        String opcuaTranslatedQName = translateNodeToParamQName(client, node, attr);
1✔
900
        Parameter p = VariableParam.getForFullyQualifiedName(opcuaTranslatedQName);
1✔
901

902
        p.setParameterType(ptype);
1✔
903

904
        if (mdb.getParameter(p.getQualifiedName()) == null) {
1✔
905
          log.debug("Adding OPCUA object as parameter to mdb:{}", p.getQualifiedName());
1✔
906
          mdb.addParameter(p, true);
1✔
907
        }
908
        nodeIDToParamsMap.put(new NodeIDAttrPair(node.getNodeId(), attr), (VariableParam) p);
1✔
909
      }
910
    }
911
  }
1✔
912

913
  /**
914
   * Map nodeID name to a qualified name that can be used for a YAMCS PV.
915
   *
916
   * @param client
917
   * @param node
918
   * @param attr
919
   * @return
920
   */
921
  private String translateNodeToParamQName(OpcUaClient client, UaNode node, AttributeId attr) {
922
    LocalizedText localizedDisplayName = null;
1✔
923
    try {
924

925
      localizedDisplayName =
1✔
926
          (LocalizedText) (node.readAttribute(AttributeId.DisplayName).getValue().getValue());
1✔
UNCOV
927
    } catch (UaException e) {
×
UNCOV
928
      internalLogger.warn(e.toString());
×
929
    }
1✔
930
    String opcuaTranslatedQName =
1✔
931
        qualifiedName(
1✔
932
            parametersNamespace
933
                + NameDescription.PATH_SEPARATOR
934
                + node.getNodeId().toParseableString().replace(";", "-")
1✔
935
                + NameDescription.PATH_SEPARATOR
936
                + localizedDisplayName.getText(),
1✔
937
            attr.toString());
1✔
938

939
    return opcuaTranslatedQName;
1✔
940
  }
941

942
  /**
943
   * Browse node at nodePath relative to browseRoot.
944
   *
945
   * @param indent
946
   * @param client
947
   * @param browseRoot
948
   * @param nodePath in the format of "0:Root,0:Objects,2:HelloWorld,2:MyObject,2:Bar"
949
   * @throws Exception
950
   */
951
  private void browsePath(String indent, OpcUaClient client, NodeId startingNode, String nodePath)
952
      throws Exception {
953
    internalLogger.info("Browsing at " + startingNode);
1✔
954
    ArrayList<String> rPathTokens = new ArrayList<String>();
1✔
955
    ArrayList<RelativePathElement> relaitivePathElements = new ArrayList<RelativePathElement>();
1✔
956

957
    for (var pathToken : nodePath.split(",")) {
1✔
958
      rPathTokens.add(nodePath);
1✔
959

960
      int namespaceIndex = 0;
1✔
961

962
      String namespaceName = "";
1✔
963

964
      namespaceIndex = Integer.parseInt(pathToken.split(":")[0]);
1✔
965

966
      namespaceName = pathToken.split(":")[1];
1✔
967

968
      relaitivePathElements.add(
1✔
969
          new RelativePathElement(
970
              Identifiers.HierarchicalReferences,
971
              false,
1✔
972
              true,
1✔
973
              new QualifiedName(namespaceIndex, namespaceName)));
974
    }
975

976
    ArrayList<BrowsePath> list = new ArrayList<BrowsePath>();
1✔
977

978
    RelativePathElement[] elements = new RelativePathElement[relaitivePathElements.size()];
1✔
979

980
    relaitivePathElements.toArray(elements);
1✔
981

982
    list.add(new BrowsePath(startingNode, new RelativePath(elements)));
1✔
983

984
    TranslateBrowsePathsToNodeIdsResponse response = null;
1✔
985
    try {
986
      response = client.translateBrowsePaths(list).get();
1✔
UNCOV
987
    } catch (InterruptedException e) {
×
UNCOV
988
      internalLogger.warn(e.toString());
×
UNCOV
989
    } catch (ExecutionException e) {
×
UNCOV
990
      internalLogger.warn(e.toString());
×
991
    }
1✔
992

993
    BrowsePathResult result = Arrays.asList(response.getResults()).get(0);
1✔
994
    StatusCode statusCode = result.getStatusCode();
1✔
995

996
    if (statusCode.isBad()) {
1✔
997
      log.warn("Bad status code:" + statusCode);
1✔
998
      org.yamcs.yarch.protobuf.Db.Event ev =
999
          Event.newBuilder()
1✔
1000
              .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1001
              .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1002
              .setSource(this.linkName)
1✔
1003
              .setType(this.linkName)
1✔
1004
              .setMessage("Failed to find node:" + nodePath + ". Error code info:" + statusCode)
1✔
1005
              .setSeverity(EventSeverity.ERROR)
1✔
1006
              .build();
1✔
1007
      eventProducer.sendEvent(ev);
1✔
1008

1009
      throw new Exception("Bad status code:" + statusCode);
1✔
1010

1011
    } else if (statusCode.isUncertain()) {
1✔
UNCOV
1012
      log.warn("Uncertain status code:" + statusCode);
×
UNCOV
1013
      return;
×
1014
    }
1015

1016
    try {
1017
      UaNode node =
1✔
1018
          client
1019
              .getAddressSpace()
1✔
1020
              .getNode(
1✔
1021
                  result.getTargets()[0].getTargetId().toNodeId(client.getNamespaceTable()).get());
1✔
1022

1023
      addOPCUAPV(client, node);
1✔
UNCOV
1024
    } catch (UaException e) {
×
UNCOV
1025
      internalLogger.warn(e.toString());
×
1026
    }
1✔
1027
  }
1✔
1028

1029
  private void createOPCUASubscriptions() {
1030
    createDataChangeListener();
1✔
1031
    Set<NodeId> nodeSet = new HashSet<NodeId>();
1✔
1032
    /**
1033
     * FIXME:This is super inefficient... The reason we collect these nodeIDs in a set is because
1034
     * otherwise we will have redundant subscription(s) since there is more than 1 attribute per
1035
     * nodeID given how nodeIDToParamsMap is designed
1036
     */
1037
    for (NodeIDAttrPair pair : nodeIDToParamsMap.keySet()) {
1✔
1038
      nodeSet.add(pair.nodeID);
1✔
1039
    }
1✔
1040

1041
    ArrayList<NodeId> variableNodes = new ArrayList<NodeId>();
1✔
1042
    for (NodeId id : nodeSet) {
1✔
1043
      Variant nodeClass = null;
1✔
1044
      try {
1045
        UaNode node = client.getAddressSpace().getNode(id);
1✔
1046

1047
        nodeClass = node.readAttribute(AttributeId.NodeClass).getValue();
1✔
1048

UNCOV
1049
      } catch (UaException e) {
×
UNCOV
1050
        internalLogger.warn(e.toString());
×
1051
      }
1✔
1052
      if (nodeClass != null) {
1✔
1053
        //        try {
1054
        switch (NodeClass.from((int) nodeClass.getValue())) {
1✔
1055
            // As per the spec, the only thing we can subscribe to is Variables
1056
          case Variable:
1057
            variableNodes.add(id);
1✔
1058
            break;
1059
        }
1060
      }
1061
    }
1✔
1062

1063
    try {
1064
      List<ManagedDataItem> dataItems = opcuaSubscription.createDataItems(variableNodes);
1✔
1065
      for (var dataItem : dataItems) {
1✔
1066
        log.debug("Status code for dataItem:{}", dataItem.getStatusCode());
1✔
1067
        OPCUAActiveSubs.addAndGet(1);
1✔
1068
      }
1✔
UNCOV
1069
    } catch (UaException e) {
×
UNCOV
1070
      internalLogger.warn(e.toString());
×
1071
    }
1✔
1072
  }
1✔
1073

1074
  /**
1075
   * Connects to OPCUA server and activates query all action.
1076
   *
1077
   * @param client
1078
   * @throws Exception
1079
   */
1080
  public void connectToOPCUAServer(OpcUaClient client) throws Exception {
1081
    internalLogger.info("Connecting to OPCUA server...");
1✔
1082
    client.connect().get();
1✔
1083

1084
    if (getAction(startAction.getId()) == null) {
1✔
1085
      addAction(startAction);
1✔
1086
    }
1087
    startAction.setEnabled(true);
1✔
1088
  }
1✔
1089

1090
  /**
1091
   * Browses the tree on the OPCUA server and maps them to YAMCS Parameters.
1092
   *
1093
   * @param client
1094
   * @throws Exception
1095
   */
1096
  private void browseOPCUATree(OpcUaClient client) throws Exception {
1097
    // start browsing at root folder
1098
    internalLogger.info("Browsing OPCUA...");
1✔
1099
    for (var p : relativeNodePaths) {
1✔
1100
      int namespaceIndex = (int) p.rootNodeID.get("namespaceIndex");
1✔
1101
      String identifier = (String) p.rootNodeID.get("identifier");
1✔
1102
      IdType identifierType = IdType.valueOf((String) p.rootNodeID.get("identifierType"));
1✔
1103

1104
      browsePath(
1✔
1105
          endpointURL, client, getNewNodeID(identifierType, namespaceIndex, identifier), p.path);
1✔
1106
    }
1✔
1107
  }
1✔
1108

1109
  /**
1110
   * Get new OPCUA-compliant NodeID object that is created from NamespaceIndex and Identifier. At
1111
   * the moment only String and Numeric node ids are supported.
1112
   *
1113
   * @param rootIdentifierType
1114
   * @param NamespaceIndex
1115
   * @param Identifier
1116
   * @return
1117
   */
1118
  private NodeId getNewNodeID(IdType rootIdentifierType, int NamespaceIndex, String Identifier) {
1119
    NodeId nodeID = null;
1✔
1120
    switch (rootIdentifierType) {
1✔
1121
      case Guid:
1122
        internalLogger.warn("Guid nodeID is not supported");
×
UNCOV
1123
        break;
×
1124
      case Numeric:
1125
        nodeID = new NodeId(NamespaceIndex, Integer.parseInt(Identifier));
1✔
1126
        break;
1✔
1127
      case Opaque:
UNCOV
1128
        internalLogger.warn("Guid Opaque is not supported");
×
UNCOV
1129
        break;
×
1130
      case String:
UNCOV
1131
        nodeID = new NodeId(NamespaceIndex, Identifier);
×
1132
        break;
1133
    }
1134
    return nodeID;
1✔
1135
  }
1136

1137
  /** Data listener for realtime OPCUA server updates. */
1138
  private void createDataChangeListener() {
1139
    try {
1140
      opcuaSubscription = ManagedSubscription.create(client, publishInterval);
1✔
UNCOV
1141
    } catch (UaException e) {
×
UNCOV
1142
      internalLogger.warn(e.toString());
×
1143
    }
1✔
1144
    opcuaSubscription.addDataChangeListener(
1✔
1145
        (items, values) -> {
1146
          for (int i = 0; i < items.size(); i++) {
1✔
1147
            NodeIDAttrPair nodeAttrKey =
1✔
1148
                new NodeIDAttrPair(items.get(i).getNodeId(), AttributeId.Value);
1✔
1149
            log.debug(
1✔
1150
                "subscription value received: item={}, value={}",
1151
                items.get(i).getNodeId(),
1✔
1152
                values.get(i).getValue());
1✔
1153

1154
            log.debug(
1✔
1155
                "Pushing new PV for param name {} which is mapped to NodeID {}",
1156
                nodeIDToParamsMap.get(nodeAttrKey),
1✔
1157
                items.get(i).getNodeId());
1✔
1158

1159
            TupleDefinition tdef = gftdef.copy();
1✔
1160
            List<Object> cols = new ArrayList<>(4 + 1);
1✔
1161
            //            FIXME: Add leap seconds.... as config or get it from YAMCS API.
1162
            long gentime =
1✔
1163
                values
1164
                    .get(i)
1✔
1165
                    .getSourceTime()
1✔
1166
                    .getJavaInstant()
1✔
1167
                    .plus(37, ChronoUnit.SECONDS)
1✔
1168
                    .toEpochMilli();
1✔
1169
            cols.add(gentime);
1✔
1170
            cols.add(parametersNamespace);
1✔
1171
            cols.add(0);
1✔
1172
            cols.add(gentime);
1✔
1173

1174
            /**
1175
             * TODO:Not sure if this is the best way to do this since the aggregate values will be
1176
             * partially updated. Another potential approach might be to decouple the live OPCUA
1177
             * data(subscriptions) via namespaces. For example; have a "special" namespace called
1178
             * "subscriptions" that ONLY gets updated with items. And maybe another namespace for
1179
             * static data...maybe.
1180
             *
1181
             * <p>Another option is to flatten everything and have no aggregate types at all. That
1182
             * approach might even simplify the code quite a bit...
1183
             *
1184
             * <p>Another question worth answering before moving forward is to find out whether or
1185
             * not it is concrete in the OPCUA protocol what data can change in real time and which
1186
             * data is "static". Not sure if there is any "static" data given that clients have the
1187
             * ability of writing to values... might be worth a test.
1188
             */
1189
            log.debug(
1✔
1190
                "Data({}) chnage triggered for {}",
1191
                values.get(i).getValue(),
1✔
1192
                nodeIDToParamsMap.get(nodeAttrKey));
1✔
1193

1194
            if (nodeIDToParamsMap.get(nodeAttrKey) == null) {
1✔
UNCOV
1195
              log.debug("No parameter mapping found for {}", nodeAttrKey.nodeID);
×
UNCOV
1196
              continue;
×
1197
            } else {
1198
              log.debug(
1✔
1199
                  String.format(
1✔
1200
                      "parameter mapping found for {} and {}",
1201
                      nodeAttrKey.nodeID,
1202
                      nodeAttrKey.attrID));
1203
            }
1204

1205
            if (values.get(i).getValue() != null && values.get(i).getValue().getValue() != null) {
1✔
1206

1207
              switch (nodeIDToParamsMap.get(nodeAttrKey).getParameterType().getValueType()) {
1✔
1208
                case BOOLEAN:
1209
                  {
1210
                    boolean value = (boolean) values.get(i).getValue().getValue();
1✔
1211

1212
                    tdef.addColumn(
1✔
1213
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1214
                        DataType.PARAMETER_VALUE);
1215
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value));
1✔
1216
                  }
1217
                  break;
1✔
1218
                case DOUBLE:
1219
                  {
1220
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1221

1222
                    tdef.addColumn(
1✔
1223
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1224
                        DataType.PARAMETER_VALUE);
1225
                    cols.add(
1✔
1226
                        getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.doubleValue()));
1✔
1227
                  }
1228
                  break;
1✔
1229
                case FLOAT:
1230
                  {
1231
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1232

1233
                    tdef.addColumn(
1✔
1234
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1235
                        DataType.PARAMETER_VALUE);
1236
                    cols.add(
1✔
1237
                        getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.floatValue()));
1✔
1238
                  }
1239
                  break;
1✔
1240
                case SINT32:
1241
                  {
1242
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1243
                    tdef.addColumn(
1✔
1244
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1245
                        DataType.PARAMETER_VALUE);
1246
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.intValue()));
1✔
1247
                  }
1248
                  break;
1✔
1249
                case SINT64:
1250
                  {
1251
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1252

1253
                    tdef.addColumn(
1✔
1254
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1255
                        DataType.PARAMETER_VALUE);
1256
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1257
                  }
1258
                  break;
1✔
1259
                case STRING:
1260
                  {
1261
                    String value = (String) values.get(i).getValue().getValue().toString();
1✔
1262

1263
                    tdef.addColumn(
1✔
1264
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1265
                        DataType.PARAMETER_VALUE);
1266
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value));
1✔
1267
                  }
1268
                  break;
1✔
1269
                case UINT32:
1270
                  {
1271
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1272
                    tdef.addColumn(
1✔
1273
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1274
                        DataType.PARAMETER_VALUE);
1275
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1276
                  }
1277
                  break;
1✔
1278
                case UINT64:
1279
                  {
1280
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1281

1282
                    tdef.addColumn(
1✔
1283
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1284
                        DataType.PARAMETER_VALUE);
1285
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1286
                  }
1287
                  break;
1✔
1288
                default:
1289
                  break;
1290
              }
1291

1292
              pushTuple(tdef, cols);
1✔
1293

1294
              inCount.getAndAdd(1);
1✔
1295
            } else {
1296
              // TODO:Add some type emptyValue count for OPS.
UNCOV
1297
              log.warn(
×
1298
                  "Data chnage triggered for {}, but it empty. This should not happen.",
UNCOV
1299
                  nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName());
×
1300
            }
1301
          }
1302
        });
1✔
1303
  }
1✔
1304

1305
  /**
1306
   * Get new ParameterType for the specified attribute of the node. Particularly useful for Value
1307
   * attributes of nodes.
1308
   *
1309
   * @param attr
1310
   * @param node
1311
   * @return
1312
   */
1313
  private ParameterType OPCUAAttrTypeToParamType(AttributeId attr, UaNode node) {
1314
    ParameterType pType = null;
1✔
1315

1316
    switch (attr) {
1✔
1317
      case AccessLevel:
1318
        pType = getBasicType(mdb, Type.STRING);
1✔
1319
        break;
1✔
1320
      case ArrayDimensions:
1321
        pType = getBasicType(mdb, Type.STRING);
1✔
1322
        break;
1✔
1323
      case BrowseName:
1324
        pType = getBasicType(mdb, Type.STRING);
1✔
1325
        break;
1✔
1326
      case ContainsNoLoops:
1327
        pType = getBasicType(mdb, Type.STRING);
1✔
1328
        break;
1✔
1329
      case DataType:
1330
        pType = getBasicType(mdb, Type.STRING);
1✔
1331
        break;
1✔
1332
      case Description:
1333
        pType = getBasicType(mdb, Type.STRING);
1✔
1334
        break;
1✔
1335
      case DisplayName:
1336
        pType = getBasicType(mdb, Type.STRING);
1✔
1337
        break;
1✔
1338
      case EventNotifier:
1339
        pType = getBasicType(mdb, Type.STRING);
1✔
1340
        break;
1✔
1341
      case Executable:
1342
        pType = getBasicType(mdb, Type.STRING);
1✔
1343
        break;
1✔
1344
      case Historizing:
1345
        pType = getBasicType(mdb, Type.STRING);
1✔
1346
        break;
1✔
1347
      case InverseName:
1348
        pType = getBasicType(mdb, Type.STRING);
1✔
1349
        break;
1✔
1350
      case IsAbstract:
1351
        pType = getBasicType(mdb, Type.STRING);
1✔
1352
        break;
1✔
1353
      case MinimumSamplingInterval:
1354
        pType = getBasicType(mdb, Type.STRING);
1✔
1355
        break;
1✔
1356
      case NodeClass:
1357
        pType = getBasicType(mdb, Type.STRING);
1✔
1358
        break;
1✔
1359
      case NodeId:
1360
        pType = getBasicType(mdb, Type.STRING);
1✔
1361
        break;
1✔
1362
      case Symmetric:
1363
        pType = getBasicType(mdb, Type.STRING);
1✔
1364
        break;
1✔
1365
      case UserAccessLevel:
1366
        pType = getBasicType(mdb, Type.STRING);
1✔
1367
        break;
1✔
1368
      case UserExecutable:
1369
        pType = getBasicType(mdb, Type.STRING);
1✔
1370
        break;
1✔
1371
      case UserWriteMask:
1372
        pType = getBasicType(mdb, Type.STRING);
1✔
1373
        break;
1✔
1374
      case Value:
1375
        try {
1376

1377
          var value = node.readAttribute(attr).getValue();
1✔
1378

1379
          if (value.isNotNull()) {
1✔
1380
            NodeId valueObjectType =
1✔
1381
                value.getDataType().get().toNodeId(client.getNamespaceTable()).get();
1✔
1382

1383
            /** As per the spec:https://reference.opcfoundation.org/Core/Part6/v104/docs/5.1.2 */
1384
            if (valueObjectType.equals(Identifiers.SByte)) {
1✔
UNCOV
1385
              pType = getBasicType(mdb, Type.SINT32);
×
1386
            } else if (valueObjectType.equals(Identifiers.Byte)) {
1✔
UNCOV
1387
              pType = getBasicType(mdb, Type.SINT32);
×
1388
            } else if (valueObjectType.equals(Identifiers.Int16)) {
1✔
UNCOV
1389
              pType = getBasicType(mdb, Type.SINT32);
×
1390
            } else if (valueObjectType.equals(Identifiers.UInt16)) {
1✔
UNCOV
1391
              pType = getBasicType(mdb, Type.SINT32);
×
1392
            } else if (valueObjectType.equals(Identifiers.Int32)) {
1✔
1393
              pType = getBasicType(mdb, Type.SINT32);
1✔
1394
            } else if (valueObjectType.equals(Identifiers.UInt32)) {
1✔
1395
              pType = getBasicType(mdb, Type.UINT32);
1✔
1396
            } else if (valueObjectType.equals(Identifiers.Int64)) {
1✔
1397
              pType = getBasicType(mdb, Type.SINT64);
1✔
1398
            } else if (valueObjectType.equals(Identifiers.UInt64)) {
1✔
1399
              pType = getBasicType(mdb, Type.UINT64);
1✔
1400
            } else if (valueObjectType.equals(Identifiers.Float)) {
1✔
1401
              pType = getBasicType(mdb, Type.FLOAT);
1✔
1402
            } else if (valueObjectType.equals(Identifiers.Double)) {
1✔
1403
              pType = getBasicType(mdb, Type.DOUBLE);
1✔
1404
            } else if (valueObjectType.equals(Identifiers.String)) {
1✔
1405
              pType = getBasicType(mdb, Type.STRING);
1✔
1406
            } else if (valueObjectType.equals(Identifiers.Boolean)) {
1✔
1407
              pType = getBasicType(mdb, Type.BOOLEAN);
1✔
1408
            }
1409
          } else {
1✔
UNCOV
1410
            pType = getBasicType(mdb, Type.STRING);
×
1411
          }
1412

UNCOV
1413
        } catch (UaException e) {
×
UNCOV
1414
          internalLogger.warn(e.toString());
×
1415
        }
1✔
UNCOV
1416
        break;
×
1417
      case ValueRank:
1418
        pType = getBasicType(mdb, Type.STRING);
1✔
1419
        break;
1✔
1420
      case WriteMask:
1421
        pType = getBasicType(mdb, Type.STRING);
1✔
1422
        break;
1✔
1423
      default:
1424
        break;
1425
    }
1426

1427
    return pType;
1✔
1428
  }
1429

1430
  /**
1431
   * Subscribe to OPCUA events as per the
1432
   * spec:https://reference.opcfoundation.org/Core/Part5/v104/docs/6.4.2
1433
   *
1434
   * @param client
1435
   * @throws InterruptedException
1436
   * @throws ExecutionException
1437
   */
1438
  private void subscribeToEvents(OpcUaClient client)
1439
      throws InterruptedException, ExecutionException {
1440
    // create a subscription and a monitored item
1441
    UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
1✔
1442

1443
    ReadValueId readValueId =
1✔
1444
        new ReadValueId(
1445
            Identifiers.Server, AttributeId.EventNotifier.uid(), null, QualifiedName.NULL_VALUE);
1✔
1446

1447
    // client handle must be unique per item
1448
    UInteger clientHandle = uint(clientHandles.getAndIncrement());
1✔
1449

1450
    EventFilter eventFilter =
1✔
1451
        new EventFilter(
1452
            new SimpleAttributeOperand[] {
1453
              new SimpleAttributeOperand(
1454
                  Identifiers.BaseEventType,
1455
                  new QualifiedName[] {new QualifiedName(0, "EventId")},
1456
                  AttributeId.Value.uid(),
1✔
1457
                  null),
1458
              new SimpleAttributeOperand(
1459
                  Identifiers.BaseEventType,
1460
                  new QualifiedName[] {new QualifiedName(0, "EventType")},
1461
                  AttributeId.Value.uid(),
1✔
1462
                  null),
1463
              new SimpleAttributeOperand(
1464
                  Identifiers.BaseEventType,
1465
                  new QualifiedName[] {new QualifiedName(0, "Severity")},
1466
                  AttributeId.Value.uid(),
1✔
1467
                  null),
1468
              new SimpleAttributeOperand(
1469
                  Identifiers.BaseEventType,
1470
                  new QualifiedName[] {new QualifiedName(0, "Time")},
1471
                  AttributeId.Value.uid(),
1✔
1472
                  null),
1473
              new SimpleAttributeOperand(
1474
                  Identifiers.BaseEventType,
1475
                  new QualifiedName[] {new QualifiedName(0, "Message")},
1476
                  AttributeId.Value.uid(),
1✔
1477
                  null)
1478
            },
1479
            new ContentFilter(null));
1480

1481
    MonitoringParameters parameters =
1✔
1482
        new MonitoringParameters(
1483
            clientHandle,
1484
            0.0,
1✔
1485
            ExtensionObject.encode(client.getStaticSerializationContext(), eventFilter),
1✔
1486
            uint(10),
1✔
1487
            true);
1✔
1488

1489
    MonitoredItemCreateRequest request =
1✔
1490
        new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
1491

1492
    List<UaMonitoredItem> items =
1✔
1493
        subscription.createMonitoredItems(TimestampsToReturn.Both, newArrayList(request)).get();
1✔
1494

1495
    // do something with the value updates
1496
    UaMonitoredItem monitoredItem = items.get(0);
1✔
1497

1498
    monitoredItem.setEventConsumer(
1✔
1499
        (item, vs) -> {
1500
          internalLogger.info("Event Received from {}", item.getReadValueId().getNodeId());
1✔
1501

1502
          StringBuilder eventText = new StringBuilder();
1✔
1503

1504
          ByteString eventId;
1505
          NodeId eventType;
1506
          UShort eventSeverity;
1507
          DateTime eventTime;
1508
          LocalizedText eventMessage;
1509

1510
          for (int i = 0; i < vs.length; i++) {
1✔
1511
            internalLogger.info("\tvariant[{}]: {}", i, vs[i].getValue());
1✔
1512
          }
1513

1514
          eventId = (ByteString) vs[0].getValue();
1✔
1515
          eventType = (NodeId) vs[1].getValue();
1✔
1516
          eventSeverity = (UShort) vs[2].getValue();
1✔
1517
          eventTime = (DateTime) vs[3].getValue();
1✔
1518
          eventMessage = (LocalizedText) vs[4].getValue();
1✔
1519

1520
          //          FIXME:Map these values to YAMCS API
1521
          eventText.append("eventId:" + eventId);
1✔
1522
          eventText.append("\n");
1✔
1523
          eventText.append("eventType:" + eventType);
1✔
1524
          eventText.append("\n");
1✔
1525
          eventText.append("eventSeverity:" + eventSeverity);
1✔
1526
          eventText.append("\n");
1✔
1527
          eventText.append("eventTime:" + eventTime);
1✔
1528
          eventText.append("\n");
1✔
1529
          eventText.append("eventMessage:" + eventMessage);
1✔
1530
          org.yamcs.yarch.protobuf.Db.Event ev =
1531
              Event.newBuilder()
1✔
1532
                  .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1533
                  .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1534
                  .setSource(this.linkName)
1✔
1535
                  .setType(this.linkName)
1✔
1536
                  .setMessage(eventText.toString())
1✔
1537
                  .setSeverity(EventSeverity.INFO)
1✔
1538
                  .build();
1✔
1539
          eventProducer.sendEvent(ev);
1✔
1540
        });
1✔
1541
  }
1✔
1542

1543
  @Override
1544
  public void setupSystemParameters(SystemParametersService sysParamService) {
1545
    super.setupSystemParameters(sysParamService);
1✔
1546
    OPCUAInitStatusParam =
1✔
1547
        sysParamService.createEnumeratedSystemParameter(
1✔
1548
            linkName + "/OPCUAInitStatusParam",
1549
            OPCUAINITStatus.class,
1550
            "The current initialization status of OPCUA client");
1551
    EnumeratedParameterType spLinkStatusType =
1✔
1552
        (EnumeratedParameterType) OPCUAInitStatusParam.getParameterType();
1✔
1553
    spLinkStatusType
1✔
1554
        .enumValue(OPCUAINITStatus.OPCUA_INIT_CONFIG.name())
1✔
1555
        .setDescription(
1✔
1556
            "This link is in the configuration stage(Configuring OPCUA parameters such as certificates)");
1557
    spLinkStatusType
1✔
1558
        .enumValue(OPCUAINITStatus.OPCUA_INIT_TREE.name())
1✔
1559
        .setDescription(
1✔
1560
            "The link is parsing the OPCUA Tree and mapping them to PVs."
1561
                + " Depending on configuration, this can take a while.");
1562

1563
    spLinkStatusType
1✔
1564
        .enumValue(OPCUAINITStatus.OPCUA_INIT_TREE_FAILED.name())
1✔
1565
        .setDescription("The initial parsing of configured nodes failed.");
1✔
1566
    spLinkStatusType
1✔
1567
        .enumValue(OPCUAINITStatus.OPCUA_INIT_EVENTS.name())
1✔
1568
        .setDescription("The link is configuring and subscribing to OPCUA events");
1✔
1569
    spLinkStatusType
1✔
1570
        .enumValue(OPCUAINITStatus.OPCUA_INIT_DATA_SUBSCRIPTION.name())
1✔
1571
        .setDescription(
1✔
1572
            "The link is creating subscriptions for each node that was parsed from the tree"
1573
                + "that has a Value attribute.");
1574
    spLinkStatusType
1✔
1575
        .enumValue(OPCUAINITStatus.OPCUA_INIT_ALL_DATA_QUERY.name())
1✔
1576
        .setDescription(
1✔
1577
            "The link is querying all attributes of all parsed nodes."
1578
                + "This is can be configured to be done at startup.");
1579
    spLinkStatusType
1✔
1580
        .enumValue(OPCUAINITStatus.OPCUA_INIT_OK.name())
1✔
1581
        .setDescription(
1✔
1582
            "The link is done with all OPCUA initialization. It is in an usable state.");
1583

1584
    OPCUAActiveSubsParam =
1✔
1585
        sysParamService.createSystemParameter(
1✔
1586
            linkName + "/OPCUAActiveSubs",
1587
            Type.UINT64,
1588
            "The total number of active opcua subscriptions");
1589
  }
1✔
1590

1591
  @Override
1592
  public List<ParameterValue> getSystemParameters() {
1593
    long time = getCurrentTime();
1✔
1594
    ArrayList<ParameterValue> list = new ArrayList<>();
1✔
1595

1596
    list.add(
1✔
1597
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1598
            OPCUAInitStatusParam, time, currentOPCUAStatus));
1599
    list.add(
1✔
1600
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1601
            OPCUAActiveSubsParam, time, OPCUAActiveSubs.get()));
1✔
1602
    try {
1603
      super.collectSystemParameters(time, list);
1✔
UNCOV
1604
    } catch (Exception e) {
×
UNCOV
1605
      log.error("Exception caught when collecting link system parameters", e);
×
1606
    }
1✔
1607
    return list;
1✔
1608
  }
1609
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc