• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-opcua / #30

12 Jul 2024 11:08PM UTC coverage: 86.757% (-0.1%) from 86.874%
#30

push

lorenzo-gomez-windhover
-Cleanup

0 of 8 new or added lines in 1 file covered. (0.0%)

2 existing lines in 1 file now uncovered.

642 of 740 relevant lines covered (86.76%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.5
/src/main/java/com/windhoverlabs/yamcs/opcua/OPCUALink.java
1
/****************************************************************************
2
 *
3
 *   Copyright (c) 2024 Windhover Labs, L.L.C. All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 *
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in
13
 *    the documentation and/or other materials provided with the
14
 *    distribution.
15
 * 3. Neither the name Windhover Labs nor the names of its
16
 *    contributors may be used to endorse or promote products derived
17
 *    from this software without specific prior written permission.
18
 *
19
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22
 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23
 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
26
 * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
 * POSSIBILITY OF SUCH DAMAGE.
31
 *
32
 *****************************************************************************/
33

34
package com.windhoverlabs.yamcs.opcua;
35

36
import static com.google.common.collect.Lists.newArrayList;
37
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
38
import static org.yamcs.xtce.NameDescription.qualifiedName;
39

40
import com.google.gson.JsonObject;
41
import java.io.BufferedWriter;
42
import java.io.IOException;
43
import java.nio.file.Files;
44
import java.nio.file.Paths;
45
import java.nio.file.StandardOpenOption;
46
import java.time.Instant;
47
import java.time.temporal.ChronoUnit;
48
import java.util.ArrayList;
49
import java.util.Arrays;
50
import java.util.HashMap;
51
import java.util.HashSet;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Objects;
55
import java.util.Set;
56
import java.util.concurrent.CompletableFuture;
57
import java.util.concurrent.ConcurrentHashMap;
58
import java.util.concurrent.ExecutionException;
59
import java.util.concurrent.atomic.AtomicLong;
60
import java.util.function.Supplier;
61
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
62
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
63
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
64
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
65
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
66
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
67
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
68
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
69
import org.eclipse.milo.opcua.stack.core.AttributeId;
70
import org.eclipse.milo.opcua.stack.core.Identifiers;
71
import org.eclipse.milo.opcua.stack.core.UaException;
72
import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
73
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
74
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
75
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
76
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
77
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
78
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
79
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
80
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
81
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
82
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
83
import org.eclipse.milo.opcua.stack.core.types.enumerated.IdType;
84
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
85
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
86
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
87
import org.eclipse.milo.opcua.stack.core.types.structured.BrowsePath;
88
import org.eclipse.milo.opcua.stack.core.types.structured.BrowsePathResult;
89
import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
90
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
91
import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
92
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
93
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
94
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
95
import org.eclipse.milo.opcua.stack.core.types.structured.RelativePath;
96
import org.eclipse.milo.opcua.stack.core.types.structured.RelativePathElement;
97
import org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
98
import org.eclipse.milo.opcua.stack.core.types.structured.TranslateBrowsePathsToNodeIdsResponse;
99
import org.slf4j.Logger;
100
import org.slf4j.LoggerFactory;
101
import org.yamcs.ConfigurationException;
102
import org.yamcs.Spec;
103
import org.yamcs.Spec.OptionType;
104
import org.yamcs.StandardTupleDefinitions;
105
import org.yamcs.ValidationException;
106
import org.yamcs.YConfiguration;
107
import org.yamcs.YamcsServer;
108
import org.yamcs.http.NotFoundException;
109
import org.yamcs.mdb.XtceAssembler;
110
import org.yamcs.parameter.ParameterValue;
111
import org.yamcs.parameter.SystemParametersProducer;
112
import org.yamcs.parameter.SystemParametersService;
113
import org.yamcs.protobuf.Event.EventSeverity;
114
import org.yamcs.protobuf.Yamcs.NamedObjectId;
115
import org.yamcs.protobuf.Yamcs.Value.Type;
116
import org.yamcs.tctm.AbstractLink;
117
import org.yamcs.tctm.Link;
118
import org.yamcs.tctm.LinkAction;
119
import org.yamcs.utils.ValueUtility;
120
import org.yamcs.xtce.BooleanParameterType;
121
import org.yamcs.xtce.EnumeratedParameterType;
122
import org.yamcs.xtce.FloatParameterType;
123
import org.yamcs.xtce.IntegerParameterType;
124
import org.yamcs.xtce.NameDescription;
125
import org.yamcs.xtce.Parameter;
126
import org.yamcs.xtce.ParameterType;
127
import org.yamcs.xtce.SpaceSystem;
128
import org.yamcs.xtce.StringParameterType;
129
import org.yamcs.xtce.XtceDb;
130
import org.yamcs.yarch.DataType;
131
import org.yamcs.yarch.Stream;
132
import org.yamcs.yarch.Tuple;
133
import org.yamcs.yarch.TupleDefinition;
134
import org.yamcs.yarch.YarchDatabase;
135
import org.yamcs.yarch.YarchDatabaseInstance;
136
import org.yamcs.yarch.protobuf.Db.Event;
137

138
/**
139
 * Implementation of the OPCUA protocol as a YAMCS link. Maps configured nodes(see docs for details)
140
 * to yamcs PVs and subscribes to OPCUA variables for realtime updates.
141
 *
142
 * @author Lorenzo Gomez
143
 */
144
public class OPCUALink extends AbstractLink implements Runnable, SystemParametersProducer {
1✔
145

146
  class NodeIDAttrPair {
147
    NodeId nodeID;
148
    AttributeId attrID;
149

150
    public NodeIDAttrPair(NodeId newNodeID, AttributeId newAttrID) {
1✔
151
      this.nodeID = newNodeID;
1✔
152
      this.attrID = newAttrID;
1✔
153
    }
1✔
154

155
    public int hashCode() {
156
      return Objects.hash(this.nodeID, this.attrID);
1✔
157
    }
158

159
    public boolean equals(Object obj) {
160
      return (this.hashCode() == obj.hashCode());
1✔
161
    }
162
  }
163

164
  class NodePath {
1✔
165
    String path;
166
    HashMap<Object, Object> rootNodeID = new HashMap<Object, Object>();
1✔
167
  }
168

169
  /** Useful status for tracking initialization status of the link. */
170
  public enum OPCUAStatus {
1✔
171
    OPCUA_INIT_CONFIG,
1✔
172
    OPCUA_INIT_TREE,
1✔
173
    OPCUA_INIT_GENERATE_XTCE,
1✔
174
    OPCUA_INIT_EVENTS,
1✔
175
    OPCUA_INIT_DATA_SUBSCRIPTION,
1✔
176
    OPCUA_INIT_ALL_DATA_QUERY,
1✔
177
    OPCUA_OK
1✔
178
  }
179

180
  /* Configuration Defaults */
181
  static final String STREAM_NAME = "opcua_params";
182

183
  /* Internal member attributes. */
184
  protected Thread thread;
185
  private String opcuaStreamName;
186
  private String parametersNamespace;
187
  XtceDb mdb;
188
  Stream opcuaStream;
189
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
1✔
190
  private ManagedSubscription opcuaSubscription;
191

192
  private static final Logger internalLogger = LoggerFactory.getLogger(OPCUALink.class.getName());
1✔
193

194
  /**
195
   * @note ALWAYS re-use params as org.yamcs.parameter.ParameterRequestManager.param2RequestMap uses
196
   *     the object inside a map that was added to the mdb for the very fist time. If when
197
   *     publishing the PV, we create a new VariableParam object clients will NOT receive real-time
198
   *     updates as the new object VariableParam inside the new PV won't match the one inside
199
   *     org.yamcs.parameter.ParameterRequestManager.param2RequestMap since the object hashes do not
200
   *     match (since VariableParam does not override its hash function).
201
   */
202
  private ConcurrentHashMap<NodeIDAttrPair, VariableParam> nodeIDToParamsMap =
1✔
203
      new ConcurrentHashMap<NodeIDAttrPair, VariableParam>();
204

205
  private OpcUaClient client;
206

207
  protected AtomicLong inCount = new AtomicLong(0);
1✔
208

209
  private Status linkStatus = Status.OK;
1✔
210

211
  /* Configuration Parameters */
212

213
  private String discoverURL;
214
  private String endpointURL;
215
  private boolean queryAllNodesAtStartup;
216
  private String outputFile;
217
  private int publishInterval; // milliseconds
218

219
  private ArrayList<NodePath> relativeNodePaths = new ArrayList<NodePath>();
1✔
220

221
  private final AtomicLong clientHandles = new AtomicLong(1L);
1✔
222

223
  /* System parameters*/
224

225
  private Parameter OPCUAStatusParam;
226
  private OPCUAStatus currentOPCUAStatus;
227
  private Parameter OPCUAActiveSubsParam;
228
  private AtomicLong OPCUAActiveSubs = new AtomicLong(0);
1✔
229

230
  LinkAction startAction =
1✔
231
      new LinkAction("query_all", "Query All OPCUA Server Data") {
1✔
232
        @Override
233
        public JsonObject execute(Link link, JsonObject jsonObject) {
234

235
          internalLogger.info("Executing query_all action");
1✔
236
          CompletableFuture.supplyAsync(
1✔
237
                  (Supplier<Integer>)
238
                      () -> {
239
                        queryAllOPCUAData();
1✔
240

241
                        return 0;
1✔
242
                      })
243
              .whenComplete(
1✔
244
                  (vaue, e) -> {
245
                    internalLogger.info("query_all action Complete");
1✔
246
                  });
1✔
247

248
          return jsonObject;
1✔
249
        }
250
      };
251

252
  public OPCUAStatus getCurrentOPCUAStatus() {
253
    return currentOPCUAStatus;
1✔
254
  }
255

256
  @Override
257
  public Spec getSpec() {
258
    Spec spec = new Spec();
1✔
259

260
    /* Define our configuration parameters. */
261
    spec.addOption("name", OptionType.STRING).withRequired(true);
1✔
262
    spec.addOption("class", OptionType.STRING).withRequired(true);
1✔
263
    spec.addOption("opcuaStream", OptionType.STRING).withRequired(true);
1✔
264
    spec.addOption("endpointUrl", OptionType.STRING).withRequired(true);
1✔
265
    spec.addOption("discoveryUrl", OptionType.STRING).withRequired(true);
1✔
266
    spec.addOption("xtceOutputFile", OptionType.STRING).withRequired(true);
1✔
267
    spec.addOption("parametersNamespace", OptionType.STRING).withRequired(true);
1✔
268
    spec.addOption("publishInterval", OptionType.INTEGER).withRequired(true);
1✔
269
    spec.addOption("queryAllNodesAtStartup", OptionType.BOOLEAN).withRequired(false);
1✔
270

271
    Spec rootNodeIDSpec = new Spec();
1✔
272

273
    rootNodeIDSpec.addOption("namespaceIndex", OptionType.INTEGER).withRequired(true);
1✔
274
    rootNodeIDSpec.addOption("identifier", OptionType.STRING).withRequired(true);
1✔
275
    rootNodeIDSpec.addOption("identifierType", OptionType.STRING).withRequired(true);
1✔
276

277
    Spec nodePathSpec = new Spec();
1✔
278
    nodePathSpec.addOption("path", OptionType.STRING);
1✔
279
    nodePathSpec
1✔
280
        .addOption("rootNodeID", OptionType.MAP)
1✔
281
        .withRequired(true)
1✔
282
        .withSpec(rootNodeIDSpec);
1✔
283

284
    spec.addOption("nodePaths", OptionType.LIST)
1✔
285
        .withElementType(OptionType.MAP)
1✔
286
        .withRequired(true)
1✔
287
        .withSpec(nodePathSpec);
1✔
288

289
    return spec;
1✔
290
  }
291

292
  @Override
293
  public void init(String yamcsInstance, String serviceName, YConfiguration config)
294
      throws ConfigurationException {
295
    super.init(yamcsInstance, serviceName, config);
1✔
296

297
    /* Local variables */
298
    this.config = config;
1✔
299
    /* Validate the configuration that the user passed us. */
300
    try {
301
      config = getSpec().validate(config);
1✔
302
    } catch (ValidationException e) {
×
303
      log.error("Failed configuration validation.", e);
×
304
      notifyFailed(e);
×
305
    }
1✔
306
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(yamcsInstance);
1✔
307

308
    this.opcuaStreamName = config.getString("opcuaStream");
1✔
309
    this.opcuaStream = getStream(ydb, opcuaStreamName);
1✔
310
    this.parametersNamespace = config.getString("parametersNamespace");
1✔
311
    this.mdb = YamcsServer.getServer().getInstance(yamcsInstance).getXtceDb();
1✔
312

313
    readOPCUAConfig(config);
1✔
314
    readNodePathsConfig(config);
1✔
315

316
    outputFile = config.getString("xtceOutputFile");
1✔
317
  }
1✔
318

319
  private void readOPCUAConfig(YConfiguration config) {
320
    this.endpointURL = config.getString("endpointUrl");
1✔
321
    this.discoverURL = config.getString("discoveryUrl");
1✔
322
    this.publishInterval = config.getInt("publishInterval");
1✔
323
    this.queryAllNodesAtStartup = config.getBoolean("queryAllNodesAtStartup", false);
1✔
324
  }
1✔
325

326
  private void readNodePathsConfig(YConfiguration config) {
327
    List<Map<Object, Object>> nodePaths = config.getList("nodePaths");
1✔
328

329
    for (Map<Object, Object> path : nodePaths) {
1✔
330
      NodePath nodePath = new NodePath();
1✔
331
      nodePath.path = (String) path.get("path");
1✔
332
      nodePath.rootNodeID = (HashMap<Object, Object>) path.get("rootNodeID");
1✔
333
      relativeNodePaths.add(nodePath);
1✔
334
    }
1✔
335
  }
1✔
336

337
  private static SpaceSystem verifySpaceSystem(XtceDb mdb, String pathName) {
338
    String namespace;
339
    String name;
340
    int lastSlash = pathName.lastIndexOf('/');
1✔
341
    if ("/".equals(pathName)) {
1✔
342
      namespace = "";
×
343
      name = "";
×
344
    } else if (lastSlash == -1 || lastSlash == pathName.length() - 1) {
1✔
345
      namespace = "";
×
346
      name = pathName;
×
347
    } else {
348
      namespace = pathName.substring(0, lastSlash);
1✔
349
      name = pathName.substring(lastSlash + 1);
1✔
350
    }
351

352
    // First try with a prefixed slash (should be the common case)
353
    NamedObjectId id =
354
        NamedObjectId.newBuilder().setNamespace("/" + namespace).setName(name).build();
1✔
355
    SpaceSystem spaceSystem = mdb.getSpaceSystem(id);
1✔
356
    if (spaceSystem != null) {
1✔
357
      return spaceSystem;
×
358
    }
359

360
    // Maybe some non-xtce namespace like MDB:OPS Name
361
    id = NamedObjectId.newBuilder().setNamespace(namespace).setName(name).build();
1✔
362
    spaceSystem = mdb.getSpaceSystem(id);
1✔
363
    if (spaceSystem != null) {
1✔
364
      return spaceSystem;
1✔
365
    }
366

367
    throw new NotFoundException("No such space system");
×
368
  }
369

370
  /**
371
   * Initializes all PV mappings to OPCUA nodes and realtime subscriptions(managed data items in
372
   * OPCUA terms).
373
   */
374
  private void opcuaInit() {
375
    try {
376

377
      currentOPCUAStatus = OPCUAStatus.OPCUA_INIT_TREE;
1✔
378
      browseOPCUATree(client);
1✔
379
      currentOPCUAStatus = OPCUAStatus.OPCUA_INIT_GENERATE_XTCE;
1✔
380
      exportXTCE();
1✔
381
      currentOPCUAStatus = OPCUAStatus.OPCUA_INIT_EVENTS;
1✔
382
      subscribeToEvents(client);
1✔
383

384
    } catch (Exception e) {
×
385
      e.printStackTrace();
×
386
      return;
×
387
    }
1✔
388
    try {
389
      currentOPCUAStatus = OPCUAStatus.OPCUA_INIT_DATA_SUBSCRIPTION;
1✔
390
      createOPCUASubscriptions();
1✔
391
    } catch (Exception e) {
×
392
      e.printStackTrace();
×
393
    }
1✔
394
  }
1✔
395

396
  private void exportXTCE() throws IOException {
397
    var spaceSystem = verifySpaceSystem(mdb, parametersNamespace);
1✔
398
    var xtce = new XtceAssembler().toXtce(mdb, spaceSystem.getQualifiedName(), fqn -> true);
1✔
399
    BufferedWriter writer = null;
1✔
400

401
    if (outputFile != null) {
1✔
402
      writer =
1✔
403
          Files.newBufferedWriter(
1✔
404
              Paths.get(outputFile),
1✔
405
              StandardOpenOption.CREATE,
406
              StandardOpenOption.TRUNCATE_EXISTING);
407

408
      writer.write(xtce);
1✔
409

410
      writer.flush();
1✔
411
      writer.close();
1✔
412
    }
413
  }
1✔
414

415
  private void opcuaClientConnect() throws Exception {
416
    client = configureClient();
1✔
417
    connectToOPCUAServer(client);
1✔
418
  }
1✔
419

420
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
421
    Stream stream = ydb.getStream(streamName);
1✔
422
    if (stream == null) {
1✔
423
      try {
424
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
1✔
425
      } catch (Exception e) {
×
426
        throw new ConfigurationException(e);
×
427
      }
1✔
428

429
      stream = ydb.getStream(streamName);
1✔
430
    }
431
    return stream;
1✔
432
  }
433

434
  @Override
435
  public void doDisable() {
436
    /* If the thread is created, interrupt it. */
437
    if (thread != null) {
1✔
438
      thread.interrupt();
1✔
439
    }
440

441
    linkStatus = Status.DISABLED;
1✔
442
  }
1✔
443

444
  @Override
445
  public void doEnable() {
446
    linkStatus = Status.OK;
1✔
447
  }
1✔
448

449
  @Override
450
  public String getDetailedStatus() {
451
    if (isDisabled()) {
1✔
452
      return String.format("DISABLED");
1✔
453
    } else {
454
      return String.format("OK, received %d packets", inCount.get());
1✔
455
    }
456
  }
457

458
  @Override
459
  public Status connectionStatus() {
460
    return linkStatus;
1✔
461
  }
462

463
  @Override
464
  protected void doStart() {
465
    try {
466
      opcuaClientConnect();
1✔
467
    } catch (Exception e) {
×
468
      e.printStackTrace();
×
469
      linkStatus = Status.FAILED;
×
470
      notifyFailed(e);
×
471
      return;
×
472
    }
1✔
473
    if (!isDisabled()) {
1✔
474
      doEnable();
1✔
475
    }
476
    startAction.addChangeListener(
1✔
477
        () -> {
478
          /**
479
           * TODO:Might be useful if we want turn off any functionality when the action is disabled
480
           * for instance..
481
           */
482
        });
×
483

484
    /* Create and start the new thread. */
485
    thread = new Thread(this);
1✔
486
    thread.setName(this.getClass().getSimpleName() + "-" + linkName);
1✔
487
    thread.start();
1✔
488

489
    notifyStarted();
1✔
490
  }
1✔
491

492
  @Override
493
  protected void doStop() {
494
    try {
495
      client.disconnect().get();
1✔
496
    } catch (InterruptedException | ExecutionException e) {
×
497
      // TODO Auto-generated catch block
498
      e.printStackTrace();
×
499
    }
1✔
500
    if (thread != null) {
1✔
501
      thread.interrupt();
1✔
502
    }
503

504
    notifyStopped();
1✔
505
  }
1✔
506

507
  @Override
508
  public void run() {
509
    opcuaInit();
1✔
510
    if (queryAllNodesAtStartup) {
1✔
UNCOV
511
      currentOPCUAStatus = OPCUAStatus.OPCUA_INIT_ALL_DATA_QUERY;
×
512
      queryAllOPCUAData();
×
513
    }
514
    /* Enter our main loop */
515
    while (isRunningAndEnabled()) {
1✔
516
      currentOPCUAStatus = OPCUAStatus.OPCUA_OK;
1✔
517
    }
518
  }
1✔
519

520
  /**
521
   * Reads all attributes of all configured Value nodes and updates their corresponding PV. Useful
522
   * for querying data from the OPCUA server once, data such as browse names, NodeIds, etc.
523
   */
524
  private void queryAllOPCUAData() {
525
    TupleDefinition tdef = gftdef.copy();
1✔
526
    List<Object> cols = new ArrayList<>(4 + nodeIDToParamsMap.keySet().size());
1✔
527

528
    tdef = gftdef.copy();
1✔
529
    long gentime = timeService.getMissionTime();
1✔
530
    cols.add(gentime);
1✔
531
    cols.add(parametersNamespace);
1✔
532
    cols.add(0);
1✔
533
    cols.add(gentime);
1✔
534

535
    int columnCount = 0;
1✔
536

537
    Set<NodeId> nodeSet = new HashSet<NodeId>();
1✔
538
    /**
539
     * NOTE:This is super inefficient... The reason we collect these nodeIDs in a set is because
540
     * otherwise we will have redundant subscription(s) since there is more than 1 attribute per
541
     * nodeID given how nodeIDToParamsMap is designed
542
     */
543
    for (NodeIDAttrPair pair : nodeIDToParamsMap.keySet()) {
1✔
544
      nodeSet.add(pair.nodeID);
1✔
545
    }
1✔
546

547
    for (NodeId nId : nodeSet) {
1✔
548
      UaNode node;
549

550
      try {
551
        node = client.getAddressSpace().getNode(nId);
1✔
552

553
        DataValue nodeClass = node.readAttribute(AttributeId.NodeClass);
1✔
554

555
        switch (NodeClass.from((int) nodeClass.getValue().getValue())) {
1✔
556
          case Variable:
557
            for (AttributeId attr : AttributeId.VARIABLE_ATTRIBUTES) {
1✔
558
              VariableParam p = nodeIDToParamsMap.get(new NodeIDAttrPair(nId, attr));
1✔
559

560
              if (p.getParameterType() == null) {
1✔
561
                internalLogger.warn(
×
562
                    "{} ignored since it does not have a Parameter type",
563
                    p,
564
                    Character.toString(NameDescription.PATH_SEPARATOR));
×
565
                continue;
×
566
              }
567

568
              switch (p.getParameterType().getValueType()) {
1✔
569
                case BOOLEAN:
570
                  {
571
                    Boolean value = true;
1✔
572
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
573
                      //                      value = "NULL";
574
                    } else {
575
                      value = (Boolean) node.readAttribute(attr).getValue().getValue();
1✔
576
                    }
577

578
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
579
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value));
1✔
580
                  }
581
                  break;
1✔
582
                case DOUBLE:
583
                  {
584
                    Number value = 0;
1✔
585
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
586
                      internalLogger.warn("node {} has a Null variant.", node);
×
587
                    } else {
588
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
589
                    }
590

591
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
592
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.doubleValue()));
1✔
593
                  }
594
                  break;
1✔
595
                case FLOAT:
596
                  {
597
                    Number value = 0;
1✔
598
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
599
                      internalLogger.warn("node {} has a Null variant.", node);
×
600
                    } else {
601
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
602
                    }
603

604
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
605
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.floatValue()));
1✔
606
                  }
607
                  break;
1✔
608
                case SINT32:
609
                  {
610
                    Number value = 0;
1✔
611
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
612
                      internalLogger.warn("node {} has a Null variant.", node);
×
613
                    } else {
614
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
615
                    }
616

617
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
618
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.intValue()));
1✔
619
                  }
620
                  break;
1✔
621
                case SINT64:
622
                  {
623
                    Number value = 0;
1✔
624
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
625
                      internalLogger.warn("node {} has a Null variant.", node);
×
626
                    } else {
627
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
628
                    }
629

630
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
631
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.longValue()));
1✔
632
                  }
633
                  break;
1✔
634
                case STRING:
635
                  {
636
                    String value = "";
1✔
637
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
638
                      value = "NULL";
1✔
639
                    } else {
640
                      value = node.readAttribute(attr).getValue().getValue().toString();
1✔
641
                    }
642

643
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
644
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value));
1✔
645
                  }
646
                  break;
1✔
647
                case UINT32:
648
                  {
649
                    Number value = 0;
1✔
650
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
651
                      internalLogger.warn("node {} has a Null variant.", node);
×
652
                    } else {
653
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
654
                    }
655

656
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
657
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.longValue()));
1✔
658
                  }
659
                  break;
1✔
660
                case UINT64:
661
                  {
662
                    Number value = 0;
1✔
663
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
664
                      internalLogger.warn("node {} has a Null variant.", node);
×
665
                    } else {
666
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
667
                    }
668

669
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
670
                    cols.add(getPV(p, Instant.now().toEpochMilli(), value.longValue()));
1✔
671
                  }
672
                  break;
1✔
673
                default:
674
                  break;
675
              }
676

677
              log.debug("Pushing {} to stream", p.toString());
1✔
678

679
              columnCount++;
1✔
680
            }
1✔
681
            break;
1✔
682
          default:
683
            break;
684
        }
685

686
      } catch (UaException e) {
×
687
        // TODO Auto-generated catch block
688
        e.printStackTrace();
×
689
        continue;
×
690
      }
1✔
691
    }
1✔
692

693
    pushTuple(tdef, cols);
1✔
694
    inCount.getAndAdd(columnCount);
1✔
695
  }
1✔
696

697
  private synchronized void pushTuple(TupleDefinition tdef, List<Object> cols) {
698
    Tuple t;
699
    t = new Tuple(tdef, cols);
1✔
700
    opcuaStream.emitTuple(t);
1✔
701
  }
1✔
702

703
  private static ParameterType getOrCreateType(
704
      XtceDb mdb, String name, Supplier<ParameterType.Builder<?>> supplier) {
705

706
    String fqn = XtceDb.YAMCS_SPACESYSTEM_NAME + NameDescription.PATH_SEPARATOR + name;
1✔
707
    ParameterType ptype = mdb.getParameterType(fqn);
1✔
708
    if (ptype != null) {
1✔
709
      return ptype;
1✔
710
    }
711
    ParameterType.Builder<?> typeb = supplier.get().setName(name);
1✔
712

713
    ptype = typeb.build();
1✔
714
    ((NameDescription) ptype).setQualifiedName(fqn);
1✔
715

716
    return mdb.addSystemParameterType(ptype);
1✔
717
  }
718

719
  public static ParameterType getBasicType(XtceDb mdb, Type type) {
720
    ParameterType pType = null;
1✔
721
    switch (type) {
1✔
722
      case BOOLEAN:
723
        return getOrCreateType(mdb, "boolean", () -> new BooleanParameterType.Builder());
1✔
724
      case STRING:
725
        return getOrCreateType(mdb, "string", () -> new StringParameterType.Builder());
1✔
726

727
      case FLOAT:
728
        return getOrCreateType(
1✔
729
            mdb, "float32", () -> new FloatParameterType.Builder().setSizeInBits(32));
1✔
730
      case DOUBLE:
731
        return getOrCreateType(
1✔
732
            mdb, "float64", () -> new FloatParameterType.Builder().setSizeInBits(64));
1✔
733
      case SINT32:
734
        return getOrCreateType(
1✔
735
            mdb,
736
            "sint32",
737
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(true));
×
738
      case SINT64:
739
        return getOrCreateType(
1✔
740
            mdb,
741
            "sint64",
742
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(true));
1✔
743
      case UINT32:
744
        return getOrCreateType(
1✔
745
            mdb,
746
            "uint32",
747
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(false));
×
748
      case UINT64:
749
        return getOrCreateType(
1✔
750
            mdb,
751
            "uint64",
752
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(false));
×
753
      default:
754
        break;
755
    }
756

757
    return pType;
×
758
  }
759

760
  public static ParameterValue getNewPv(Parameter parameter, long time) {
761
    ParameterValue pv = new ParameterValue(parameter);
1✔
762
    pv.setAcquisitionTime(time);
1✔
763
    pv.setGenerationTime(time);
1✔
764
    return pv;
1✔
765
  }
766

767
  public static ParameterValue getPV(Parameter parameter, long time, String v) {
768
    ParameterValue pv = getNewPv(parameter, time);
1✔
769
    pv.setEngValue(ValueUtility.getStringValue(v));
1✔
770
    return pv;
1✔
771
  }
772

773
  public static ParameterValue getPV(Parameter parameter, long time, double v) {
774
    ParameterValue pv = getNewPv(parameter, time);
1✔
775
    pv.setEngValue(ValueUtility.getDoubleValue(v));
1✔
776
    return pv;
1✔
777
  }
778

779
  public static ParameterValue getPV(Parameter parameter, long time, float v) {
780
    ParameterValue pv = getNewPv(parameter, time);
1✔
781
    pv.setEngValue(ValueUtility.getFloatValue(v));
1✔
782
    return pv;
1✔
783
  }
784

785
  public static ParameterValue getPV(Parameter parameter, long time, boolean v) {
786
    ParameterValue pv = getNewPv(parameter, time);
1✔
787
    pv.setEngValue(ValueUtility.getBooleanValue(v));
1✔
788
    return pv;
1✔
789
  }
790

791
  public static ParameterValue getPV(Parameter parameter, long time, long v) {
792
    ParameterValue pv = getNewPv(parameter, time);
1✔
793
    pv.setEngValue(ValueUtility.getSint64Value(v));
1✔
794
    return pv;
1✔
795
  }
796

797
  @Override
798
  public Status getLinkStatus() {
799
    return linkStatus;
1✔
800
  }
801

802
  @Override
803
  public boolean isDisabled() {
804
    return linkStatus == Status.DISABLED;
1✔
805
  }
806

807
  @Override
808
  public long getDataInCount() {
809
    return inCount.get();
1✔
810
  }
811

812
  @Override
813
  public long getDataOutCount() {
814
    return 0;
1✔
815
  }
816

817
  @Override
818
  public void resetCounters() {
819
    inCount.set(0);
1✔
820
  }
1✔
821

822
  /**
823
   * Selects first non-secured endpoint from endpoints found at discover URL. At the moment secured
824
   * endpoints are not supported.
825
   *
826
   * @return
827
   * @throws Exception
828
   */
829
  private OpcUaClient configureClient() throws Exception {
830

831
    List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(discoverURL).get();
1✔
832

833
    // At the moment, we do not support certificates.
834
    EndpointDescription selectedEndpoint = null;
1✔
835
    for (var endpoint : endpoints) {
1✔
836
      switch (endpoint.getSecurityMode()) {
1✔
837
        case Invalid:
838
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
839
          break;
×
840
        case None:
841
          selectedEndpoint = endpoint;
1✔
842
          break;
1✔
843
        case Sign:
844
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
845
          break;
×
846
        case SignAndEncrypt:
847
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
848
          break;
849
      }
850

851
      if (selectedEndpoint != null) {
1✔
852
        break;
1✔
853
      }
854
    }
×
855

856
    if (selectedEndpoint == null) {
1✔
857
      throw new Exception("No viable endpoint found from list:" + endpoints);
×
858
    }
859

860
    OpcUaClientConfig builder = OpcUaClientConfig.builder().setEndpoint(selectedEndpoint).build();
1✔
861

862
    return OpcUaClient.create(builder);
1✔
863
  }
864

865
  /**
866
   * Adds new PV with the name of node.
867
   *
868
   * @param client
869
   * @param node
870
   */
871
  private void addOPCUAPV(OpcUaClient client, UaNode node) {
872

873
    if (node.getBrowseName()
1✔
874
        .getName()
1✔
875
        .contains(Character.toString(NameDescription.PATH_SEPARATOR))) {
1✔
876
      internalLogger.info(
×
877
          "{} ignored since it contains a {} character",
878
          node.getBrowseName().getName(),
×
879
          Character.toString(NameDescription.PATH_SEPARATOR));
×
880

881
    } else {
882

883
      /**
884
       * NOTE:For now we'll just flatten all the attributes instead of using an aggregate type for
885
       * attributes
886
       */
887
      for (AttributeId attr : AttributeId.values()) {
1✔
888

889
        ParameterType ptype = OPCUAAttrTypeToParamType(attr, node);
1✔
890

891
        String opcuaTranslatedQName = translateNodeToParamQName(client, node, attr);
1✔
892
        Parameter p = VariableParam.getForFullyQualifiedName(opcuaTranslatedQName);
1✔
893

894
        p.setParameterType(ptype);
1✔
895

896
        if (mdb.getParameter(p.getQualifiedName()) == null) {
1✔
897
          log.debug("Adding OPCUA object as parameter to mdb:{}", p.getQualifiedName());
1✔
898
          mdb.addParameter(p, true);
1✔
899

900
          nodeIDToParamsMap.put(new NodeIDAttrPair(node.getNodeId(), attr), (VariableParam) p);
1✔
901
        }
902
      }
903
    }
904
  }
1✔
905

906
  /**
907
   * Map nodeID name to a qualified name that can be used for a YAMCS PV.
908
   *
909
   * @param client
910
   * @param node
911
   * @param attr
912
   * @return
913
   */
914
  private String translateNodeToParamQName(OpcUaClient client, UaNode node, AttributeId attr) {
915
    LocalizedText localizedDisplayName = null;
1✔
916
    try {
917

918
      localizedDisplayName =
1✔
919
          (LocalizedText) (node.readAttribute(AttributeId.DisplayName).getValue().getValue());
1✔
920
    } catch (UaException e) {
×
921
      // TODO Auto-generated catch block
922
      e.printStackTrace();
×
923
    }
1✔
924
    String opcuaTranslatedQName =
1✔
925
        qualifiedName(
1✔
926
            parametersNamespace
927
                + NameDescription.PATH_SEPARATOR
928
                + node.getNodeId().toParseableString().replace(";", "-")
1✔
929
                + NameDescription.PATH_SEPARATOR
930
                + localizedDisplayName.getText(),
1✔
931
            attr.toString());
1✔
932

933
    return opcuaTranslatedQName;
1✔
934
  }
935

936
  /**
937
   * Browse node at nodePath relative to browseRoot.
938
   *
939
   * @param indent
940
   * @param client
941
   * @param browseRoot
942
   * @param nodePath in the format of "0:Root,0:Objects,2:HelloWorld,2:MyObject,2:Bar"
943
   * @throws Exception
944
   */
945
  private void browsePath(String indent, OpcUaClient client, NodeId startingNode, String nodePath)
946
      throws Exception {
947
    internalLogger.info("Browsing at " + startingNode);
1✔
948
    ArrayList<String> rPathTokens = new ArrayList<String>();
1✔
949
    ArrayList<RelativePathElement> relaitivePathElements = new ArrayList<RelativePathElement>();
1✔
950

951
    for (var pathToken : nodePath.split(",")) {
1✔
952
      rPathTokens.add(nodePath);
1✔
953

954
      int namespaceIndex = 0;
1✔
955

956
      String namespaceName = "";
1✔
957

958
      namespaceIndex = Integer.parseInt(pathToken.split(":")[0]);
1✔
959

960
      namespaceName = pathToken.split(":")[1];
1✔
961

962
      relaitivePathElements.add(
1✔
963
          new RelativePathElement(
964
              Identifiers.HierarchicalReferences,
965
              false,
1✔
966
              true,
1✔
967
              new QualifiedName(namespaceIndex, namespaceName)));
968
    }
969

970
    ArrayList<BrowsePath> list = new ArrayList<BrowsePath>();
1✔
971

972
    RelativePathElement[] elements = new RelativePathElement[relaitivePathElements.size()];
1✔
973

974
    relaitivePathElements.toArray(elements);
1✔
975

976
    list.add(new BrowsePath(startingNode, new RelativePath(elements)));
1✔
977

978
    TranslateBrowsePathsToNodeIdsResponse response = null;
1✔
979
    try {
980
      response = client.translateBrowsePaths(list).get();
1✔
981
    } catch (InterruptedException e) {
×
NEW
982
      internalLogger.warn(e.toString());
×
983
    } catch (ExecutionException e) {
×
NEW
984
      internalLogger.warn(e.toString());
×
985
    }
1✔
986

987
    BrowsePathResult result = Arrays.asList(response.getResults()).get(0);
1✔
988
    StatusCode statusCode = result.getStatusCode();
1✔
989

990
    if (statusCode.isBad()) {
1✔
991
      log.warn("Bad status code:" + statusCode);
×
992
      org.yamcs.yarch.protobuf.Db.Event ev =
UNCOV
993
          Event.newBuilder()
×
994
              .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
995
              .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
996
              .setSource(this.linkName)
×
997
              .setType(this.linkName)
×
998
              .setMessage("Failed to find node:" + nodePath + ". Error code info:" + statusCode)
×
999
              .setSeverity(EventSeverity.ERROR)
×
1000
              .build();
×
1001
      eventProducer.sendEvent(ev);
×
1002

1003
      throw new Exception("Bad status code:" + statusCode);
×
1004

1005
    } else if (statusCode.isUncertain()) {
1✔
1006
      log.warn("Uncertain status code:" + statusCode);
×
1007
      return;
×
1008
    }
1009

1010
    try {
1011
      UaNode node =
1✔
1012
          client
1013
              .getAddressSpace()
1✔
1014
              .getNode(
1✔
1015
                  result.getTargets()[0].getTargetId().toNodeId(client.getNamespaceTable()).get());
1✔
1016

1017
      addOPCUAPV(client, node);
1✔
1018
    } catch (UaException e) {
×
NEW
1019
      internalLogger.warn(e.toString());
×
1020
    }
1✔
1021
  }
1✔
1022

1023
  private void createOPCUASubscriptions() {
1024
    createDataChangeListener();
1✔
1025
    Set<NodeId> nodeSet = new HashSet<NodeId>();
1✔
1026
    /**
1027
     * FIXME:This is super inefficient... The reason we collect these nodeIDs in a set is because
1028
     * otherwise we will have redundant subscription(s) since there is more than 1 attribute per
1029
     * nodeID given how nodeIDToParamsMap is designed
1030
     */
1031
    for (NodeIDAttrPair pair : nodeIDToParamsMap.keySet()) {
1✔
1032
      nodeSet.add(pair.nodeID);
1✔
1033
    }
1✔
1034

1035
    ArrayList<NodeId> variableNodes = new ArrayList<NodeId>();
1✔
1036
    for (NodeId id : nodeSet) {
1✔
1037
      Variant nodeClass = null;
1✔
1038
      try {
1039
        UaNode node = client.getAddressSpace().getNode(id);
1✔
1040

1041
        nodeClass = node.readAttribute(AttributeId.NodeClass).getValue();
1✔
1042

1043
      } catch (UaException e) {
×
NEW
1044
        internalLogger.warn(e.toString());
×
1045
      }
1✔
1046
      if (nodeClass != null) {
1✔
1047
        //        try {
1048
        switch (NodeClass.from((int) nodeClass.getValue())) {
1✔
1049
            // As per the spec, the only thing we can subscribe to is Variables
1050
          case Variable:
1051
            variableNodes.add(id);
1✔
1052
            break;
1053
        }
1054
      }
1055
    }
1✔
1056

1057
    try {
1058
      List<ManagedDataItem> dataItems = opcuaSubscription.createDataItems(variableNodes);
1✔
1059
      for (var dataItem : dataItems) {
1✔
1060
        log.debug("Status code for dataItem:{}", dataItem.getStatusCode());
1✔
1061
        OPCUAActiveSubs.addAndGet(1);
1✔
1062
      }
1✔
1063
    } catch (UaException e) {
×
NEW
1064
      internalLogger.warn(e.toString());
×
1065
    }
1✔
1066
  }
1✔
1067

1068
  /**
1069
   * Connects to OPCUA server and activates query all action.
1070
   *
1071
   * @param client
1072
   * @throws Exception
1073
   */
1074
  public void connectToOPCUAServer(OpcUaClient client) throws Exception {
1075
    internalLogger.info("Connecting to OPCUA server...");
1✔
1076
    client.connect().get();
1✔
1077

1078
    addAction(startAction);
1✔
1079
    startAction.setEnabled(true);
1✔
1080
  }
1✔
1081

1082
  /**
1083
   * Browses the tree on the OPCUA server and maps them to YAMCS Parameters.
1084
   *
1085
   * @param client
1086
   * @throws Exception
1087
   */
1088
  private void browseOPCUATree(OpcUaClient client) throws Exception {
1089
    // start browsing at root folder
1090
    internalLogger.info("Browsing OPCUA...");
1✔
1091
    for (var p : relativeNodePaths) {
1✔
1092
      int namespaceIndex = (int) p.rootNodeID.get("namespaceIndex");
1✔
1093
      String identifier = (String) p.rootNodeID.get("identifier");
1✔
1094
      IdType identifierType = IdType.valueOf((String) p.rootNodeID.get("identifierType"));
1✔
1095

1096
      browsePath(
1✔
1097
          endpointURL, client, getNewNodeID(identifierType, namespaceIndex, identifier), p.path);
1✔
1098
    }
1✔
1099
  }
1✔
1100

1101
  /**
1102
   * Get new OPCUA-compliant NodeID object that is created from NamespaceIndex and Identifier. At
1103
   * the moment only String and Numeric node ids are supported.
1104
   *
1105
   * @param rootIdentifierType
1106
   * @param NamespaceIndex
1107
   * @param Identifier
1108
   * @return
1109
   */
1110
  private NodeId getNewNodeID(IdType rootIdentifierType, int NamespaceIndex, String Identifier) {
1111
    NodeId nodeID = null;
1✔
1112
    switch (rootIdentifierType) {
1✔
1113
      case Guid:
NEW
1114
        internalLogger.warn("Guid nodeID is not supported");
×
1115
        break;
×
1116
      case Numeric:
1117
        nodeID = new NodeId(NamespaceIndex, Integer.parseInt(Identifier));
1✔
1118
        break;
1✔
1119
      case Opaque:
NEW
1120
        internalLogger.warn("Guid Opaque is not supported");
×
1121
        break;
×
1122
      case String:
1123
        nodeID = new NodeId(NamespaceIndex, Identifier);
×
1124
        break;
1125
    }
1126
    return nodeID;
1✔
1127
  }
1128

1129
  /** Data listener for realtime OPCUA server updates. */
1130
  private void createDataChangeListener() {
1131
    try {
1132
      opcuaSubscription = ManagedSubscription.create(client, publishInterval);
1✔
1133
    } catch (UaException e) {
×
1134
      // TODO Auto-generated catch block
1135
      e.printStackTrace();
×
1136
    }
1✔
1137
    opcuaSubscription.addDataChangeListener(
1✔
1138
        (items, values) -> {
1139
          for (int i = 0; i < items.size(); i++) {
1✔
1140
            NodeIDAttrPair nodeAttrKey =
1✔
1141
                new NodeIDAttrPair(items.get(i).getNodeId(), AttributeId.Value);
1✔
1142
            log.debug(
1✔
1143
                "subscription value received: item={}, value={}",
1144
                items.get(i).getNodeId(),
1✔
1145
                values.get(i).getValue());
1✔
1146

1147
            log.debug(
1✔
1148
                "Pushing new PV for param name {} which is mapped to NodeID {}",
1149
                nodeIDToParamsMap.get(nodeAttrKey),
1✔
1150
                items.get(i).getNodeId());
1✔
1151

1152
            TupleDefinition tdef = gftdef.copy();
1✔
1153
            List<Object> cols = new ArrayList<>(4 + 1);
1✔
1154
            //            FIXME: Add leap seconds.... as config or get it from YAMCS API.
1155
            long gentime =
1✔
1156
                values
1157
                    .get(i)
1✔
1158
                    .getSourceTime()
1✔
1159
                    .getJavaInstant()
1✔
1160
                    .plus(37, ChronoUnit.SECONDS)
1✔
1161
                    .toEpochMilli();
1✔
1162
            cols.add(gentime);
1✔
1163
            cols.add(parametersNamespace);
1✔
1164
            cols.add(0);
1✔
1165
            cols.add(gentime);
1✔
1166

1167
            /**
1168
             * TODO:Not sure if this is the best way to do this since the aggregate values will be
1169
             * partially updated. Another potential approach might be to decouple the live OPCUA
1170
             * data(subscriptions) via namespaces. For example; have a "special" namespace called
1171
             * "subscriptions" that ONLY gets updated with items. And maybe another namespace for
1172
             * static data...maybe.
1173
             *
1174
             * <p>Another option is to flatten everything and have no aggregate types at all. That
1175
             * approach might even simplify the code quite a bit...
1176
             *
1177
             * <p>Another question worth answering before moving forward is to find out whether or
1178
             * not it is concrete in the OPCUA protocol what data can change in real time and which
1179
             * data is "static". Not sure if there is any "static" data given that clients have the
1180
             * ability of writing to values... might be worth a test.
1181
             */
1182
            log.debug(
1✔
1183
                "Data({}) chnage triggered for {}",
1184
                values.get(i).getValue(),
1✔
1185
                nodeIDToParamsMap.get(nodeAttrKey));
1✔
1186

1187
            if (nodeIDToParamsMap.get(nodeAttrKey) == null) {
1✔
1188
              log.debug("No parameter mapping found for {}", nodeAttrKey.nodeID);
×
1189
              continue;
×
1190
            } else {
1191
              log.debug(
1✔
1192
                  String.format(
1✔
1193
                      "parameter mapping found for {} and {}",
1194
                      nodeAttrKey.nodeID,
1195
                      nodeAttrKey.attrID));
1196
            }
1197

1198
            if (values.get(i).getValue() != null && values.get(i).getValue().getValue() != null) {
1✔
1199

1200
              switch (nodeIDToParamsMap.get(nodeAttrKey).getParameterType().getValueType()) {
1✔
1201
                case BOOLEAN:
1202
                  {
1203
                    boolean value = (boolean) values.get(i).getValue().getValue();
1✔
1204

1205
                    tdef.addColumn(
1✔
1206
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1207
                        DataType.PARAMETER_VALUE);
1208
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value));
1✔
1209
                  }
1210
                  break;
1✔
1211
                case DOUBLE:
1212
                  {
1213
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1214

1215
                    tdef.addColumn(
1✔
1216
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1217
                        DataType.PARAMETER_VALUE);
1218
                    cols.add(
1✔
1219
                        getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.doubleValue()));
1✔
1220
                  }
1221
                  break;
1✔
1222
                case FLOAT:
1223
                  {
1224
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1225

1226
                    tdef.addColumn(
1✔
1227
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1228
                        DataType.PARAMETER_VALUE);
1229
                    cols.add(
1✔
1230
                        getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.floatValue()));
1✔
1231
                  }
1232
                  break;
1✔
1233
                case SINT32:
1234
                  {
1235
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1236
                    tdef.addColumn(
1✔
1237
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1238
                        DataType.PARAMETER_VALUE);
1239
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.intValue()));
1✔
1240
                  }
1241
                  break;
1✔
1242
                case SINT64:
1243
                  {
1244
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1245

1246
                    tdef.addColumn(
1✔
1247
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1248
                        DataType.PARAMETER_VALUE);
1249
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1250
                  }
1251
                  break;
1✔
1252
                case STRING:
1253
                  {
1254
                    String value = (String) values.get(i).getValue().getValue().toString();
1✔
1255

1256
                    tdef.addColumn(
1✔
1257
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1258
                        DataType.PARAMETER_VALUE);
1259
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value));
1✔
1260
                  }
1261
                  break;
1✔
1262
                case UINT32:
1263
                  {
1264
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1265
                    tdef.addColumn(
1✔
1266
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1267
                        DataType.PARAMETER_VALUE);
1268
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1269
                  }
1270
                  break;
1✔
1271
                case UINT64:
1272
                  {
1273
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1274

1275
                    tdef.addColumn(
1✔
1276
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1277
                        DataType.PARAMETER_VALUE);
1278
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1279
                  }
1280
                  break;
1✔
1281
                default:
1282
                  break;
1283
              }
1284

1285
              pushTuple(tdef, cols);
1✔
1286

1287
              inCount.getAndAdd(1);
1✔
1288
            } else {
1289
              // TODO:Add some type emptyValue count for OPS.
1290
              log.warn(
×
1291
                  "Data chnage triggered for {}, but it empty. This should not happen.",
1292
                  nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName());
×
1293
            }
1294
          }
1295
        });
1✔
1296
  }
1✔
1297

1298
  /**
1299
   * Get new ParameterType for the specified attribute of the node. Particularly useful for Value
1300
   * attributes of nodes.
1301
   *
1302
   * @param attr
1303
   * @param node
1304
   * @return
1305
   */
1306
  private ParameterType OPCUAAttrTypeToParamType(AttributeId attr, UaNode node) {
1307
    ParameterType pType = null;
1✔
1308

1309
    switch (attr) {
1✔
1310
      case AccessLevel:
1311
        pType = getBasicType(mdb, Type.STRING);
1✔
1312
        break;
1✔
1313
      case ArrayDimensions:
1314
        pType = getBasicType(mdb, Type.STRING);
1✔
1315
        break;
1✔
1316
      case BrowseName:
1317
        pType = getBasicType(mdb, Type.STRING);
1✔
1318
        break;
1✔
1319
      case ContainsNoLoops:
1320
        pType = getBasicType(mdb, Type.STRING);
1✔
1321
        break;
1✔
1322
      case DataType:
1323
        pType = getBasicType(mdb, Type.STRING);
1✔
1324
        break;
1✔
1325
      case Description:
1326
        pType = getBasicType(mdb, Type.STRING);
1✔
1327
        break;
1✔
1328
      case DisplayName:
1329
        pType = getBasicType(mdb, Type.STRING);
1✔
1330
        break;
1✔
1331
      case EventNotifier:
1332
        pType = getBasicType(mdb, Type.STRING);
1✔
1333
        break;
1✔
1334
      case Executable:
1335
        pType = getBasicType(mdb, Type.STRING);
1✔
1336
        break;
1✔
1337
      case Historizing:
1338
        pType = getBasicType(mdb, Type.STRING);
1✔
1339
        break;
1✔
1340
      case InverseName:
1341
        pType = getBasicType(mdb, Type.STRING);
1✔
1342
        break;
1✔
1343
      case IsAbstract:
1344
        pType = getBasicType(mdb, Type.STRING);
1✔
1345
        break;
1✔
1346
      case MinimumSamplingInterval:
1347
        pType = getBasicType(mdb, Type.STRING);
1✔
1348
        break;
1✔
1349
      case NodeClass:
1350
        pType = getBasicType(mdb, Type.STRING);
1✔
1351
        break;
1✔
1352
      case NodeId:
1353
        pType = getBasicType(mdb, Type.STRING);
1✔
1354
        break;
1✔
1355
      case Symmetric:
1356
        pType = getBasicType(mdb, Type.STRING);
1✔
1357
        break;
1✔
1358
      case UserAccessLevel:
1359
        pType = getBasicType(mdb, Type.STRING);
1✔
1360
        break;
1✔
1361
      case UserExecutable:
1362
        pType = getBasicType(mdb, Type.STRING);
1✔
1363
        break;
1✔
1364
      case UserWriteMask:
1365
        pType = getBasicType(mdb, Type.STRING);
1✔
1366
        break;
1✔
1367
      case Value:
1368
        try {
1369

1370
          var value = node.readAttribute(attr).getValue();
1✔
1371

1372
          if (value.isNotNull()) {
1✔
1373
            NodeId valueObjectType =
1✔
1374
                value.getDataType().get().toNodeId(client.getNamespaceTable()).get();
1✔
1375

1376
            /** As per the spec:https://reference.opcfoundation.org/Core/Part6/v104/docs/5.1.2 */
1377
            if (valueObjectType.equals(Identifiers.SByte)) {
1✔
1378
              pType = getBasicType(mdb, Type.SINT32);
×
1379
            } else if (valueObjectType.equals(Identifiers.Byte)) {
1✔
1380
              pType = getBasicType(mdb, Type.SINT32);
×
1381
            } else if (valueObjectType.equals(Identifiers.Int16)) {
1✔
1382
              pType = getBasicType(mdb, Type.SINT32);
×
1383
            } else if (valueObjectType.equals(Identifiers.UInt16)) {
1✔
1384
              pType = getBasicType(mdb, Type.SINT32);
×
1385
            } else if (valueObjectType.equals(Identifiers.Int32)) {
1✔
1386
              pType = getBasicType(mdb, Type.SINT32);
1✔
1387
            } else if (valueObjectType.equals(Identifiers.UInt32)) {
1✔
1388
              pType = getBasicType(mdb, Type.UINT32);
1✔
1389
            } else if (valueObjectType.equals(Identifiers.Int64)) {
1✔
1390
              pType = getBasicType(mdb, Type.SINT64);
1✔
1391
            } else if (valueObjectType.equals(Identifiers.UInt64)) {
1✔
1392
              pType = getBasicType(mdb, Type.UINT64);
1✔
1393
            } else if (valueObjectType.equals(Identifiers.Float)) {
1✔
1394
              pType = getBasicType(mdb, Type.FLOAT);
1✔
1395
            } else if (valueObjectType.equals(Identifiers.Double)) {
1✔
1396
              pType = getBasicType(mdb, Type.DOUBLE);
1✔
1397
            } else if (valueObjectType.equals(Identifiers.String)) {
1✔
1398
              pType = getBasicType(mdb, Type.STRING);
1✔
1399
            } else if (valueObjectType.equals(Identifiers.Boolean)) {
1✔
1400
              pType = getBasicType(mdb, Type.BOOLEAN);
1✔
1401
            }
1402
          } else {
1✔
1403
            pType = getBasicType(mdb, Type.STRING);
×
1404
          }
1405

1406
        } catch (UaException e) {
×
NEW
1407
          internalLogger.warn(e.toString());
×
1408
        }
1✔
1409
        break;
×
1410
      case ValueRank:
1411
        pType = getBasicType(mdb, Type.STRING);
1✔
1412
        break;
1✔
1413
      case WriteMask:
1414
        pType = getBasicType(mdb, Type.STRING);
1✔
1415
        break;
1✔
1416
      default:
1417
        break;
1418
    }
1419

1420
    return pType;
1✔
1421
  }
1422

1423
  private void subscribeToEvents(OpcUaClient client)
1424
      throws InterruptedException, ExecutionException {
1425
    // create a subscription and a monitored item
1426
    UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
1✔
1427

1428
    ReadValueId readValueId =
1✔
1429
        new ReadValueId(
1430
            Identifiers.Server, AttributeId.EventNotifier.uid(), null, QualifiedName.NULL_VALUE);
1✔
1431

1432
    // client handle must be unique per item
1433
    UInteger clientHandle = uint(clientHandles.getAndIncrement());
1✔
1434

1435
    EventFilter eventFilter =
1✔
1436
        new EventFilter(
1437
            new SimpleAttributeOperand[] {
1438
              new SimpleAttributeOperand(
1439
                  Identifiers.BaseEventType,
1440
                  new QualifiedName[] {new QualifiedName(0, "EventId")},
1441
                  AttributeId.Value.uid(),
1✔
1442
                  null),
1443
              new SimpleAttributeOperand(
1444
                  Identifiers.BaseEventType,
1445
                  new QualifiedName[] {new QualifiedName(0, "EventType")},
1446
                  AttributeId.Value.uid(),
1✔
1447
                  null),
1448
              new SimpleAttributeOperand(
1449
                  Identifiers.BaseEventType,
1450
                  new QualifiedName[] {new QualifiedName(0, "Severity")},
1451
                  AttributeId.Value.uid(),
1✔
1452
                  null),
1453
              new SimpleAttributeOperand(
1454
                  Identifiers.BaseEventType,
1455
                  new QualifiedName[] {new QualifiedName(0, "Time")},
1456
                  AttributeId.Value.uid(),
1✔
1457
                  null),
1458
              new SimpleAttributeOperand(
1459
                  Identifiers.BaseEventType,
1460
                  new QualifiedName[] {new QualifiedName(0, "Message")},
1461
                  AttributeId.Value.uid(),
1✔
1462
                  null)
1463
            },
1464
            new ContentFilter(null));
1465

1466
    MonitoringParameters parameters =
1✔
1467
        new MonitoringParameters(
1468
            clientHandle,
1469
            0.0,
1✔
1470
            ExtensionObject.encode(client.getStaticSerializationContext(), eventFilter),
1✔
1471
            uint(10),
1✔
1472
            true);
1✔
1473

1474
    MonitoredItemCreateRequest request =
1✔
1475
        new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
1476

1477
    List<UaMonitoredItem> items =
1✔
1478
        subscription.createMonitoredItems(TimestampsToReturn.Both, newArrayList(request)).get();
1✔
1479

1480
    // do something with the value updates
1481
    UaMonitoredItem monitoredItem = items.get(0);
1✔
1482

1483
    monitoredItem.setEventConsumer(
1✔
1484
        (item, vs) -> {
1485
          internalLogger.info("Event Received from {}", item.getReadValueId().getNodeId());
1✔
1486

1487
          StringBuilder eventText = new StringBuilder();
1✔
1488

1489
          ByteString eventId;
1490
          NodeId eventType;
1491
          UShort eventSeverity;
1492
          DateTime eventTime;
1493
          LocalizedText eventMessage;
1494

1495
          for (int i = 0; i < vs.length; i++) {
1✔
1496
            internalLogger.info("\tvariant[{}]: {}", i, vs[i].getValue());
1✔
1497
          }
1498

1499
          eventId = (ByteString) vs[0].getValue();
1✔
1500
          eventType = (NodeId) vs[1].getValue();
1✔
1501
          eventSeverity = (UShort) vs[2].getValue();
1✔
1502
          eventTime = (DateTime) vs[3].getValue();
1✔
1503
          eventMessage = (LocalizedText) vs[4].getValue();
1✔
1504

1505
          //          FIXME:Map these values to YAMCS API
1506
          eventText.append("eventId:" + eventId);
1✔
1507
          eventText.append("\n");
1✔
1508
          eventText.append("eventType:" + eventType);
1✔
1509
          eventText.append("\n");
1✔
1510
          eventText.append("eventSeverity:" + eventSeverity);
1✔
1511
          eventText.append("\n");
1✔
1512
          eventText.append("eventTime:" + eventTime);
1✔
1513
          eventText.append("\n");
1✔
1514
          eventText.append("eventMessage:" + eventMessage);
1✔
1515
          org.yamcs.yarch.protobuf.Db.Event ev =
1516
              Event.newBuilder()
1✔
1517
                  .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1518
                  .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1519
                  .setSource(this.linkName)
1✔
1520
                  .setType(this.linkName)
1✔
1521
                  .setMessage(eventText.toString())
1✔
1522
                  .setSeverity(EventSeverity.INFO)
1✔
1523
                  .build();
1✔
1524
          eventProducer.sendEvent(ev);
1✔
1525
        });
1✔
1526
  }
1✔
1527

1528
  @Override
1529
  public void setupSystemParameters(SystemParametersService sysParamService) {
1530
    super.setupSystemParameters(sysParamService);
1✔
1531
    OPCUAStatusParam =
1✔
1532
        sysParamService.createEnumeratedSystemParameter(
1✔
1533
            linkName + "/OPCUAStatusParam",
1534
            OPCUAStatus.class,
1535
            "The current status of OPCUA client");
1536
    EnumeratedParameterType spLinkStatusType =
1✔
1537
        (EnumeratedParameterType) OPCUAStatusParam.getParameterType();
1✔
1538
    spLinkStatusType
1✔
1539
        .enumValue(OPCUAStatus.OPCUA_INIT_CONFIG.name())
1✔
1540
        .setDescription(
1✔
1541
            "This link is in the configuration stage(Configuring OPCUA parameters such as certificates)");
1542
    spLinkStatusType
1✔
1543
        .enumValue(OPCUAStatus.OPCUA_INIT_TREE.name())
1✔
1544
        .setDescription(
1✔
1545
            "The link is parsing the OPCUA Tree and mapping them to PVs."
1546
                + " Depending on configuration, this can take a while.");
1547
    spLinkStatusType
1✔
1548
        .enumValue(OPCUAStatus.OPCUA_INIT_EVENTS.name())
1✔
1549
        .setDescription("The link is configuring and subscribing to OPCUA events");
1✔
1550
    spLinkStatusType
1✔
1551
        .enumValue(OPCUAStatus.OPCUA_INIT_DATA_SUBSCRIPTION.name())
1✔
1552
        .setDescription(
1✔
1553
            "The link is creating subscriptions for each node that was parsed from the tree"
1554
                + "that has a Value attribute.");
1555
    spLinkStatusType
1✔
1556
        .enumValue(OPCUAStatus.OPCUA_INIT_ALL_DATA_QUERY.name())
1✔
1557
        .setDescription(
1✔
1558
            "The link is querying all attributes of all parsed nodes."
1559
                + "This is can be configured to be done at startup.");
1560
    spLinkStatusType
1✔
1561
        .enumValue(OPCUAStatus.OPCUA_OK.name())
1✔
1562
        .setDescription(
1✔
1563
            "The link is done with all OPCUA initialization. It is in an usable state.");
1564

1565
    OPCUAActiveSubsParam =
1✔
1566
        sysParamService.createSystemParameter(
1✔
1567
            linkName + "/OPCUAActiveSubs",
1568
            Type.UINT64,
1569
            "The total number of active opcua subscriptions");
1570
  }
1✔
1571

1572
  @Override
1573
  public List<ParameterValue> getSystemParameters() {
1574
    long time = getCurrentTime();
1✔
1575
    ArrayList<ParameterValue> list = new ArrayList<>();
1✔
1576

1577
    list.add(
1✔
1578
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1579
            OPCUAStatusParam, time, currentOPCUAStatus));
1580
    list.add(
1✔
1581
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1582
            OPCUAActiveSubsParam, time, OPCUAActiveSubs.get()));
1✔
1583
    try {
1584
      super.collectSystemParameters(time, list);
1✔
1585
    } catch (Exception e) {
×
1586
      log.error("Exception caught when collecting link system parameters", e);
×
1587
    }
1✔
1588
    return list;
1✔
1589
  }
1590
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc