• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-opcua / #44

12 Aug 2024 04:53PM UTC coverage: 81.341% (+0.02%) from 81.32%
#44

push

lorenzo-gomez-windhover
Merge branch '1_minimally_functional_plugin' of github.com:WindhoverLabs/yamcs-opcua into 1_minimally_functional_plugin

55 of 125 new or added lines in 1 file covered. (44.0%)

85 existing lines in 1 file now uncovered.

728 of 895 relevant lines covered (81.34%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.04
/src/main/java/com/windhoverlabs/yamcs/opcua/OPCUALink.java
1
/****************************************************************************
2
 *
3
 *   Copyright (c) 2024 Windhover Labs, L.L.C. All rights reserved.
4
 *
5
 * Redistribution and use in source and binary forms, with or without
6
 * modification, are permitted provided that the following conditions
7
 * are met:
8
 *
9
 * 1. Redistributions of source code must retain the above copyright
10
 *    notice, this list of conditions and the following disclaimer.
11
 * 2. Redistributions in binary form must reproduce the above copyright
12
 *    notice, this list of conditions and the following disclaimer in
13
 *    the documentation and/or other materials provided with the
14
 *    distribution.
15
 * 3. Neither the name Windhover Labs nor the names of its
16
 *    contributors may be used to endorse or promote products derived
17
 *    from this software without specific prior written permission.
18
 *
19
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
22
 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
23
 * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
24
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
25
 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
26
 * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
27
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
28
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
29
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30
 * POSSIBILITY OF SUCH DAMAGE.
31
 *
32
 *****************************************************************************/
33

34
package com.windhoverlabs.yamcs.opcua;
35

36
import static com.google.common.collect.Lists.newArrayList;
37
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
38
import static org.yamcs.xtce.NameDescription.qualifiedName;
39

40
import com.google.gson.JsonObject;
41
import java.io.BufferedWriter;
42
import java.io.IOException;
43
import java.nio.file.Files;
44
import java.nio.file.Paths;
45
import java.nio.file.StandardOpenOption;
46
import java.time.temporal.ChronoUnit;
47
import java.util.ArrayList;
48
import java.util.Arrays;
49
import java.util.HashMap;
50
import java.util.HashSet;
51
import java.util.List;
52
import java.util.Map;
53
import java.util.Objects;
54
import java.util.Set;
55
import java.util.concurrent.CompletableFuture;
56
import java.util.concurrent.ConcurrentHashMap;
57
import java.util.concurrent.ExecutionException;
58
import java.util.concurrent.Executors;
59
import java.util.concurrent.ScheduledExecutorService;
60
import java.util.concurrent.TimeUnit;
61
import java.util.concurrent.atomic.AtomicLong;
62
import java.util.function.Supplier;
63
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
64
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
65
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaMonitoredItem;
66
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
67
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
68
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedDataItem;
69
import org.eclipse.milo.opcua.sdk.client.subscriptions.ManagedSubscription;
70
import org.eclipse.milo.opcua.stack.client.DiscoveryClient;
71
import org.eclipse.milo.opcua.stack.core.AttributeId;
72
import org.eclipse.milo.opcua.stack.core.Identifiers;
73
import org.eclipse.milo.opcua.stack.core.UaException;
74
import org.eclipse.milo.opcua.stack.core.types.builtin.ByteString;
75
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
76
import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
77
import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
78
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
79
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
80
import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
81
import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
82
import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
83
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
84
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
85
import org.eclipse.milo.opcua.stack.core.types.enumerated.IdType;
86
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
87
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
88
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
89
import org.eclipse.milo.opcua.stack.core.types.structured.BrowsePath;
90
import org.eclipse.milo.opcua.stack.core.types.structured.BrowsePathResult;
91
import org.eclipse.milo.opcua.stack.core.types.structured.ContentFilter;
92
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
93
import org.eclipse.milo.opcua.stack.core.types.structured.EventFilter;
94
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
95
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
96
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
97
import org.eclipse.milo.opcua.stack.core.types.structured.RelativePath;
98
import org.eclipse.milo.opcua.stack.core.types.structured.RelativePathElement;
99
import org.eclipse.milo.opcua.stack.core.types.structured.SimpleAttributeOperand;
100
import org.eclipse.milo.opcua.stack.core.types.structured.TranslateBrowsePathsToNodeIdsResponse;
101
import org.slf4j.Logger;
102
import org.slf4j.LoggerFactory;
103
import org.yamcs.ConfigurationException;
104
import org.yamcs.Spec;
105
import org.yamcs.Spec.OptionType;
106
import org.yamcs.StandardTupleDefinitions;
107
import org.yamcs.ValidationException;
108
import org.yamcs.YConfiguration;
109
import org.yamcs.YamcsServer;
110
import org.yamcs.http.NotFoundException;
111
import org.yamcs.mdb.XtceAssembler;
112
import org.yamcs.parameter.ParameterValue;
113
import org.yamcs.parameter.SystemParametersProducer;
114
import org.yamcs.parameter.SystemParametersService;
115
import org.yamcs.protobuf.Event.EventSeverity;
116
import org.yamcs.protobuf.Yamcs.NamedObjectId;
117
import org.yamcs.protobuf.Yamcs.Value.Type;
118
import org.yamcs.tctm.AbstractLink;
119
import org.yamcs.tctm.Link;
120
import org.yamcs.tctm.LinkAction;
121
import org.yamcs.utils.ValueUtility;
122
import org.yamcs.xtce.BooleanParameterType;
123
import org.yamcs.xtce.EnumeratedParameterType;
124
import org.yamcs.xtce.FloatParameterType;
125
import org.yamcs.xtce.IntegerParameterType;
126
import org.yamcs.xtce.NameDescription;
127
import org.yamcs.xtce.Parameter;
128
import org.yamcs.xtce.ParameterType;
129
import org.yamcs.xtce.SpaceSystem;
130
import org.yamcs.xtce.StringParameterType;
131
import org.yamcs.xtce.XtceDb;
132
import org.yamcs.yarch.DataType;
133
import org.yamcs.yarch.Stream;
134
import org.yamcs.yarch.Tuple;
135
import org.yamcs.yarch.TupleDefinition;
136
import org.yamcs.yarch.YarchDatabase;
137
import org.yamcs.yarch.YarchDatabaseInstance;
138
import org.yamcs.yarch.protobuf.Db.Event;
139

140
/**
141
 * Implementation of the OPCUA protocol as a YAMCS link. Maps configured nodes(see docs for details)
142
 * to yamcs PVs and subscribes to OPCUA variables for realtime updates.
143
 *
144
 * @author Lorenzo Gomez
145
 */
146
public class OPCUALink extends AbstractLink implements Runnable, SystemParametersProducer {
1✔
147

148
  class NodeIDAttrPair {
149
    NodeId nodeID;
150
    AttributeId attrID;
151

152
    public NodeIDAttrPair(NodeId newNodeID, AttributeId newAttrID) {
1✔
153
      this.nodeID = newNodeID;
1✔
154
      this.attrID = newAttrID;
1✔
155
    }
1✔
156

157
    public int hashCode() {
158
      return Objects.hash(this.nodeID, this.attrID);
1✔
159
    }
160

161
    public boolean equals(Object obj) {
162
      return (this.hashCode() == obj.hashCode());
1✔
163
    }
164
  }
165

166
  class NodePath {
1✔
167
    String path;
168
    HashMap<Object, Object> rootNodeID = new HashMap<Object, Object>();
1✔
169
  }
170

171
  /** Useful status for tracking initialization status of the link. */
172
  public enum OPCUAINITStatus {
1✔
173
    OPCUA_INIT_CONFIG,
1✔
174
    OPCUA_INIT_TREE,
1✔
175
    OPCUA_INIT_TREE_FAILED,
1✔
176
    OPCUA_INIT_GENERATE_XTCE,
1✔
177
    OPCUA_INIT_EVENTS,
1✔
178
    OPCUA_INIT_DATA_SUBSCRIPTION,
1✔
179
    OPCUA_INIT_ALL_DATA_QUERY,
1✔
180
    OPCUA_INIT_OK
1✔
181
  }
182

183
  /* Configuration Defaults */
184
  static final String STREAM_NAME = "opcua_params";
185

186
  /* Internal member attributes. */
187
  protected Thread thread;
188
  private String opcuaStreamName;
189
  private String parametersNamespace;
190
  XtceDb mdb;
191
  Stream opcuaStream;
192
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
1✔
193
  private ManagedSubscription opcuaSubscription;
194

195
  private static final Logger internalLogger = LoggerFactory.getLogger(OPCUALink.class.getName());
1✔
196

197
  /**
198
   * @note ALWAYS re-use params as org.yamcs.parameter.ParameterRequestManager.param2RequestMap uses
199
   *     the object inside a map that was added to the mdb for the very fist time. If when
200
   *     publishing the PV, we create a new VariableParam object clients will NOT receive real-time
201
   *     updates as the new object VariableParam inside the new PV won't match the one inside
202
   *     org.yamcs.parameter.ParameterRequestManager.param2RequestMap since the object hashes do not
203
   *     match (since VariableParam does not override its hash function).
204
   */
205
  private ConcurrentHashMap<NodeIDAttrPair, VariableParam> nodeIDToParamsMap =
1✔
206
      new ConcurrentHashMap<NodeIDAttrPair, VariableParam>();
207

208
  private OpcUaClient client;
209

210
  protected AtomicLong inCount = new AtomicLong(0);
1✔
211

212
  // realtimeCount is the same as inCount, except that it cannot be reset by users.
213
  //  Used specifically for deciding subStrikeCount
214
  protected AtomicLong realtimeCount = new AtomicLong(0);
1✔
215

216
  protected AtomicLong subStrikeCount = new AtomicLong(0);
1✔
217

218
  protected AtomicLong lastRealtimeCount = new AtomicLong(0);
1✔
219

220
  protected int subStrikeCountThreshold;
221

222
  private long subStrikeCountCheckTimeoutSecs;
223

224
  private Status linkStatus = Status.OK;
1✔
225

226
  /* Configuration Parameters */
227

228
  private String discoverURL;
229
  private String endpointURL;
230
  private boolean queryAllNodesAtStartup;
231
  private String outputFile;
232
  private int publishInterval; // milliseconds
233

234
  private ArrayList<NodePath> relativeNodePaths = new ArrayList<NodePath>();
1✔
235

236
  private final AtomicLong clientHandles = new AtomicLong(1L);
1✔
237

238
  /* System parameters*/
239

240
  private Parameter OPCUAInitStatusParam;
241
  private OPCUAINITStatus currentOPCUAStatus;
242
  private Parameter OPCUAActiveSubsParam;
243
  private Parameter realtimeCountParam;
244
  private Parameter lastRealtimeCountParam;
245

246
  private Parameter subStrikeCountCheckTimeoutSecsParam;
247

248
  private Parameter subStrikeCountParam;
249
  private Parameter subStrikeCountThresholdParam;
250
  private AtomicLong OPCUAActiveSubs = new AtomicLong(0);
1✔
251

252
  private int reconnectCount = 0;
1✔
253
  private Parameter reconnectCountParam;
254

255
  LinkAction startAction =
1✔
256
      new LinkAction("query_all", "Query All OPCUA Server Data") {
1✔
257
        @Override
258
        public JsonObject execute(Link link, JsonObject jsonObject) {
259

260
          internalLogger.info("Executing query_all action");
1✔
261
          CompletableFuture.supplyAsync(
1✔
262
                  (Supplier<Integer>)
263
                      () -> {
264
                        queryAllOPCUAData();
1✔
265

266
                        return 0;
1✔
267
                      })
268
              .whenComplete(
1✔
269
                  (vaue, e) -> {
270
                    internalLogger.info("query_all action Complete");
1✔
271
                  });
1✔
272

273
          return jsonObject;
1✔
274
        }
275
      };
276

277
  private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
1✔
278

279
  public OPCUAINITStatus getCurrentOPCUAStatus() {
280
    return currentOPCUAStatus;
1✔
281
  }
282

283
  @Override
284
  public Spec getSpec() {
285
    Spec spec = new Spec();
1✔
286

287
    /* Define our configuration parameters. */
288
    spec.addOption("name", OptionType.STRING).withRequired(true);
1✔
289
    spec.addOption("class", OptionType.STRING).withRequired(true);
1✔
290
    spec.addOption("opcuaStream", OptionType.STRING).withRequired(true);
1✔
291
    spec.addOption("endpointUrl", OptionType.STRING).withRequired(true);
1✔
292
    spec.addOption("discoveryUrl", OptionType.STRING).withRequired(true);
1✔
293
    spec.addOption("xtceOutputFile", OptionType.STRING).withRequired(true);
1✔
294
    spec.addOption("parametersNamespace", OptionType.STRING).withRequired(true);
1✔
295
    spec.addOption("publishInterval", OptionType.INTEGER).withRequired(true);
1✔
296
    spec.addOption("subStrikeCountThreshold", OptionType.INTEGER)
1✔
297
        .withDefault(3)
1✔
298
        .withRequired(false);
1✔
299

300
    spec.addOption("subStrikeCountCheckTimeoutSecs", OptionType.INTEGER)
1✔
301
        .withDefault(15)
1✔
302
        .withRequired(false);
1✔
303

304
    spec.addOption("queryAllNodesAtStartup", OptionType.BOOLEAN).withRequired(false);
1✔
305

306
    Spec rootNodeIDSpec = new Spec();
1✔
307

308
    rootNodeIDSpec.addOption("namespaceIndex", OptionType.INTEGER).withRequired(true);
1✔
309
    rootNodeIDSpec.addOption("identifier", OptionType.STRING).withRequired(true);
1✔
310
    rootNodeIDSpec.addOption("identifierType", OptionType.STRING).withRequired(true);
1✔
311

312
    Spec nodePathSpec = new Spec();
1✔
313
    nodePathSpec.addOption("path", OptionType.STRING);
1✔
314
    nodePathSpec
1✔
315
        .addOption("rootNodeID", OptionType.MAP)
1✔
316
        .withRequired(true)
1✔
317
        .withSpec(rootNodeIDSpec);
1✔
318

319
    spec.addOption("nodePaths", OptionType.LIST)
1✔
320
        .withElementType(OptionType.MAP)
1✔
321
        .withRequired(true)
1✔
322
        .withSpec(nodePathSpec);
1✔
323

324
    return spec;
1✔
325
  }
326

327
  @Override
328
  public void init(String yamcsInstance, String serviceName, YConfiguration config)
329
      throws ConfigurationException {
330
    super.init(yamcsInstance, serviceName, config);
1✔
331

332
    /* Local variables */
333
    this.config = config;
1✔
334
    /* Validate the configuration that the user passed us. */
335
    try {
336
      config = getSpec().validate(config);
1✔
UNCOV
337
    } catch (ValidationException e) {
×
UNCOV
338
      log.error("Failed configuration validation.", e);
×
UNCOV
339
      notifyFailed(e);
×
340
    }
1✔
341
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(yamcsInstance);
1✔
342

343
    this.opcuaStreamName = config.getString("opcuaStream");
1✔
344
    this.opcuaStream = getStream(ydb, opcuaStreamName);
1✔
345
    this.parametersNamespace = config.getString("parametersNamespace");
1✔
346
    this.mdb = YamcsServer.getServer().getInstance(yamcsInstance).getXtceDb();
1✔
347

348
    readOPCUAConfig(config);
1✔
349
    readNodePathsConfig(config);
1✔
350

351
    outputFile = config.getString("xtceOutputFile");
1✔
352

353
    subStrikeCountThreshold = config.getInt("subStrikeCountThreshold");
1✔
354

355
    subStrikeCountCheckTimeoutSecs = config.getInt("subStrikeCountCheckTimeoutSecs");
1✔
356
  }
1✔
357

358
  private void readOPCUAConfig(YConfiguration config) {
359
    this.endpointURL = config.getString("endpointUrl");
1✔
360
    this.discoverURL = config.getString("discoveryUrl");
1✔
361
    this.publishInterval = config.getInt("publishInterval");
1✔
362
    this.queryAllNodesAtStartup = config.getBoolean("queryAllNodesAtStartup", false);
1✔
363
  }
1✔
364

365
  private void readNodePathsConfig(YConfiguration config) {
366
    List<Map<Object, Object>> nodePaths = config.getList("nodePaths");
1✔
367

368
    for (Map<Object, Object> path : nodePaths) {
1✔
369
      NodePath nodePath = new NodePath();
1✔
370
      nodePath.path = (String) path.get("path");
1✔
371
      nodePath.rootNodeID = (HashMap<Object, Object>) path.get("rootNodeID");
1✔
372
      relativeNodePaths.add(nodePath);
1✔
373
    }
1✔
374
  }
1✔
375

376
  private static SpaceSystem verifySpaceSystem(XtceDb mdb, String pathName) {
377
    String namespace;
378
    String name;
379
    int lastSlash = pathName.lastIndexOf('/');
1✔
380
    if ("/".equals(pathName)) {
1✔
UNCOV
381
      namespace = "";
×
UNCOV
382
      name = "";
×
383
    } else if (lastSlash == -1 || lastSlash == pathName.length() - 1) {
1✔
UNCOV
384
      namespace = "";
×
UNCOV
385
      name = pathName;
×
386
    } else {
387
      namespace = pathName.substring(0, lastSlash);
1✔
388
      name = pathName.substring(lastSlash + 1);
1✔
389
    }
390

391
    // First try with a prefixed slash (should be the common case)
392
    NamedObjectId id =
393
        NamedObjectId.newBuilder().setNamespace("/" + namespace).setName(name).build();
1✔
394
    SpaceSystem spaceSystem = mdb.getSpaceSystem(id);
1✔
395
    if (spaceSystem != null) {
1✔
UNCOV
396
      return spaceSystem;
×
397
    }
398

399
    // Maybe some non-xtce namespace like MDB:OPS Name
400
    id = NamedObjectId.newBuilder().setNamespace(namespace).setName(name).build();
1✔
401
    spaceSystem = mdb.getSpaceSystem(id);
1✔
402
    if (spaceSystem != null) {
1✔
403
      return spaceSystem;
1✔
404
    }
405

UNCOV
406
    throw new NotFoundException("No such space system");
×
407
  }
408

409
  /**
410
   * Initializes all PV mappings to OPCUA nodes and realtime subscriptions(managed data items in
411
   * OPCUA terms).
412
   */
413
  private void opcuaInit() {
414
    try {
415

416
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_TREE;
1✔
417
      browseOPCUATree(client);
1✔
418
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_GENERATE_XTCE;
1✔
419
      exportXTCE();
1✔
420

421
    } catch (Exception e) {
1✔
422
      internalLogger.warn(e.toString());
1✔
423
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_TREE_FAILED;
1✔
424
      return;
1✔
425
    }
1✔
426
    try {
427
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_EVENTS;
1✔
428
      subscribeToEvents(client);
1✔
429
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_DATA_SUBSCRIPTION;
1✔
430
      createOPCUASubscriptions();
1✔
UNCOV
431
    } catch (Exception e) {
×
UNCOV
432
      internalLogger.warn(e.toString());
×
UNCOV
433
      return;
×
434
    }
1✔
435
    if (queryAllNodesAtStartup) {
1✔
UNCOV
436
      currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_ALL_DATA_QUERY;
×
UNCOV
437
      queryAllOPCUAData();
×
438
    }
439

440
    currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_OK;
1✔
441
  }
1✔
442

443
  private void exportXTCE() throws IOException {
444
    var spaceSystem = verifySpaceSystem(mdb, parametersNamespace);
1✔
445
    var xtce = new XtceAssembler().toXtce(mdb, spaceSystem.getQualifiedName(), fqn -> true);
1✔
446
    BufferedWriter writer = null;
1✔
447

448
    if (outputFile != null) {
1✔
449
      writer =
1✔
450
          Files.newBufferedWriter(
1✔
451
              Paths.get(outputFile),
1✔
452
              StandardOpenOption.CREATE,
453
              StandardOpenOption.TRUNCATE_EXISTING);
454

455
      writer.write(xtce);
1✔
456

457
      writer.flush();
1✔
458
      writer.close();
1✔
459
    }
460
  }
1✔
461

462
  private void opcuaClientConnect() throws Exception {
463
    client = configureClient();
1✔
464
    connectToOPCUAServer(client);
1✔
465
  }
1✔
466

467
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
468
    Stream stream = ydb.getStream(streamName);
1✔
469
    if (stream == null) {
1✔
470
      try {
471
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
1✔
472
      } catch (Exception e) {
×
UNCOV
473
        throw new ConfigurationException(e);
×
474
      }
1✔
475

476
      stream = ydb.getStream(streamName);
1✔
477
    }
478
    return stream;
1✔
479
  }
480

481
  @Override
482
  public void doDisable() {
483

484
    try {
485
      client.disconnect().get();
1✔
UNCOV
486
    } catch (InterruptedException | ExecutionException e) {
×
UNCOV
487
      internalLogger.warn(e.toString());
×
488
    }
1✔
489
    if (thread != null) {
1✔
490
      thread.interrupt();
1✔
491
    }
492

493
    linkStatus = Status.DISABLED;
1✔
494
  }
1✔
495

496
  @Override
497
  public void doEnable() {
498
    try {
499
      opcuaClientConnect();
1✔
UNCOV
500
    } catch (Exception e) {
×
UNCOV
501
      internalLogger.warn(e.toString());
×
UNCOV
502
      linkStatus = Status.FAILED;
×
UNCOV
503
      notifyFailed(e);
×
UNCOV
504
      return;
×
505
    }
1✔
506

507
    startAction.addChangeListener(
1✔
508
        () -> {
509
          /**
510
           * TODO:Might be useful if we want turn off any functionality when the action is disabled
511
           * for instance..
512
           */
UNCOV
513
        });
×
514

515
    /* Create and start the new thread. */
516
    thread = new Thread(this);
1✔
517
    thread.setName(this.getClass().getSimpleName() + "-" + linkName);
1✔
518
    thread.start();
1✔
519
    linkStatus = Status.OK;
1✔
520
  }
1✔
521

522
  @Override
523
  public String getDetailedStatus() {
524
    if (isDisabled()) {
1✔
525
      return String.format("DISABLED");
1✔
526
    } else {
527
      return String.format("OK, received %d packets", inCount.get());
1✔
528
    }
529
  }
530

531
  @Override
532
  public Status connectionStatus() {
533
    return linkStatus;
1✔
534
  }
535

536
  @Override
537
  protected void doStart() {
538
    if (!isDisabled()) {
1✔
539
      doEnable();
1✔
540
    }
541

542
    notifyStarted();
1✔
543
  }
1✔
544

545
  @Override
546
  protected void doStop() {
547
    try {
548
      client.disconnect().get();
1✔
UNCOV
549
    } catch (InterruptedException | ExecutionException e) {
×
UNCOV
550
      internalLogger.warn(e.toString());
×
551
    }
1✔
552
    if (thread != null) {
1✔
553
      thread.interrupt();
1✔
554
    }
555

556
    notifyStopped();
1✔
557
  }
1✔
558

559
  @Override
560
  public void run() {
561
    opcuaInit();
1✔
562
    /* Enter our main loop */
563

564
    scheduleReconnectThread();
1✔
565

566
    while (isRunningAndEnabled()) {}
1✔
567
  }
1✔
568

569
  private void scheduleReconnectThread() {
570
    scheduler.scheduleAtFixedRate(
1✔
571
        () -> {
572
          if (realtimeCount.get() > lastRealtimeCount.get()) {
1✔
573
            subStrikeCount.set(0);
1✔
574
          } else {
575
            subStrikeCount.getAndAdd(1);
1✔
576
          }
577

578
          if (subStrikeCount.intValue() > subStrikeCountThreshold) {
1✔
579
            org.yamcs.yarch.protobuf.Db.Event ev =
NEW
580
                Event.newBuilder()
×
NEW
581
                    .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
582
                    .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
583
                    .setSource(this.linkName)
×
NEW
584
                    .setType(this.linkName)
×
NEW
585
                    .setMessage(
×
NEW
586
                        String.format(
×
587
                            "Subscription strike count(%d) exceeded currently configured threshold(%d)",
NEW
588
                            subStrikeCount.intValue(), subStrikeCountThreshold))
×
NEW
589
                    .setSeverity(EventSeverity.ERROR)
×
NEW
590
                    .build();
×
NEW
591
            eventProducer.sendEvent(ev);
×
592

NEW
593
            if (currentOPCUAStatus == OPCUAINITStatus.OPCUA_INIT_DATA_SUBSCRIPTION) {
×
594
              //                    We could be in the middle of a reconnect...
NEW
595
              return;
×
596
            }
597

NEW
598
            linkStatus = Status.UNAVAIL;
×
599

600
            //            Reconnect to realtime data
601
            try {
602
              ev =
NEW
603
                  Event.newBuilder()
×
NEW
604
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
605
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
606
                      .setSource(this.linkName)
×
NEW
607
                      .setType(this.linkName)
×
NEW
608
                      .setMessage(String.format("Disconnecting"))
×
NEW
609
                      .setSeverity(EventSeverity.ERROR)
×
NEW
610
                      .build();
×
NEW
611
              eventProducer.sendEvent(ev);
×
NEW
612
              client.disconnect().get();
×
NEW
613
            } catch (InterruptedException | ExecutionException e) {
×
NEW
614
              internalLogger.warn(e.toString());
×
NEW
615
            }
×
616

617
            try {
NEW
618
              opcuaClientConnect();
×
NEW
619
            } catch (Exception e) {
×
NEW
620
              internalLogger.warn(e.toString());
×
NEW
621
              linkStatus = Status.FAILED;
×
622

623
              ev =
NEW
624
                  Event.newBuilder()
×
NEW
625
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
626
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
627
                      .setSource(this.linkName)
×
NEW
628
                      .setType(this.linkName)
×
NEW
629
                      .setMessage(String.format("Reconnect failed."))
×
NEW
630
                      .setSeverity(EventSeverity.ERROR)
×
NEW
631
                      .build();
×
NEW
632
              eventProducer.sendEvent(ev);
×
NEW
633
              notifyFailed(e);
×
NEW
634
              return;
×
NEW
635
            }
×
636

637
            try {
638
              ev =
NEW
639
                  Event.newBuilder()
×
NEW
640
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
641
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
642
                      .setSource(this.linkName)
×
NEW
643
                      .setType(this.linkName)
×
NEW
644
                      .setMessage(String.format("Resubscribing to events and realtime values."))
×
NEW
645
                      .setSeverity(EventSeverity.INFO)
×
NEW
646
                      .build();
×
NEW
647
              eventProducer.sendEvent(ev);
×
NEW
648
              subscribeToEvents(client);
×
NEW
649
              createOPCUASubscriptions();
×
NEW
650
              linkStatus = Status.OK;
×
NEW
651
            } catch (Exception e) {
×
652

653
              ev =
NEW
654
                  Event.newBuilder()
×
NEW
655
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
656
                      .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
657
                      .setSource(this.linkName)
×
NEW
658
                      .setType(this.linkName)
×
NEW
659
                      .setMessage(
×
NEW
660
                          String.format("Resubscribing failed. Exception info" + e.toString()))
×
NEW
661
                      .setSeverity(EventSeverity.ERROR)
×
NEW
662
                      .build();
×
NEW
663
              eventProducer.sendEvent(ev);
×
664

NEW
665
              internalLogger.warn(e.toString());
×
NEW
666
              return;
×
NEW
667
            }
×
668

NEW
669
            reconnectCount++;
×
670
          }
671

672
          lastRealtimeCount.set(realtimeCount.get());
1✔
673
        },
1✔
674
        1,
675
        subStrikeCountCheckTimeoutSecs,
676
        TimeUnit.SECONDS);
677
  }
1✔
678

679
  /**
680
   * Reads all attributes of all configured Value nodes and updates their corresponding PV. Useful
681
   * for querying data from the OPCUA server once, data such as browse names, NodeIds, etc.
682
   */
683
  private void queryAllOPCUAData() {
684
    Set<NodeId> nodeSet = new HashSet<NodeId>();
1✔
685
    /**
686
     * NOTE:This is super inefficient... The reason we collect these nodeIDs in a set is because
687
     * otherwise we will have redundant subscription(s) since there is more than 1 attribute per
688
     * nodeID given how nodeIDToParamsMap is designed
689
     */
690
    for (NodeIDAttrPair pair : nodeIDToParamsMap.keySet()) {
1✔
691
      nodeSet.add(pair.nodeID);
1✔
692
    }
1✔
693

694
    for (NodeId nId : nodeSet) {
1✔
695
      UaNode node;
696

697
      try {
698
        node = client.getAddressSpace().getNode(nId);
1✔
699

700
        DataValue nodeClass = node.readAttribute(AttributeId.NodeClass);
1✔
701

702
        switch (NodeClass.from((int) nodeClass.getValue().getValue())) {
1✔
703
          case Variable:
704
            for (AttributeId attr : AttributeId.VARIABLE_ATTRIBUTES) {
1✔
705
              VariableParam p = nodeIDToParamsMap.get(new NodeIDAttrPair(nId, attr));
1✔
706

707
              if (p.getParameterType() == null) {
1✔
UNCOV
708
                internalLogger.warn(
×
709
                    "{} ignored since it does not have a Parameter type",
710
                    p,
UNCOV
711
                    Character.toString(NameDescription.PATH_SEPARATOR));
×
UNCOV
712
                continue;
×
713
              }
714

715
              TupleDefinition tdef = gftdef.copy();
1✔
716
              List<Object> cols = new ArrayList<>(4 + 1);
1✔
717
              //            FIXME: Add leap seconds.... as config or get it from YAMCS API.
718

719
              if (node.readAttribute(attr).getValue().isNull()) {
1✔
720
                internalLogger.warn("{} since the data value is null", p);
1✔
721
                continue;
1✔
722
              }
723

724
              long gentime =
1✔
725
                  node.readAttribute(attr)
1✔
726
                      .getSourceTime()
1✔
727
                      .getJavaInstant()
1✔
728
                      .plus(37, ChronoUnit.SECONDS)
1✔
729
                      .toEpochMilli();
1✔
730
              cols.add(gentime);
1✔
731
              cols.add(parametersNamespace);
1✔
732
              cols.add(0);
1✔
733
              long rectime = timeService.getMissionTime();
1✔
734
              cols.add(rectime);
1✔
735

736
              switch (p.getParameterType().getValueType()) {
1✔
737
                case BOOLEAN:
738
                  {
739
                    Boolean value = true;
1✔
740
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
741
                      internalLogger.warn(
×
742
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
743
                          node);
UNCOV
744
                      continue;
×
745
                    } else {
746
                      value = (Boolean) node.readAttribute(attr).getValue().getValue();
1✔
747
                    }
748

749
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
750
                    cols.add(getPV(p, gentime, value));
1✔
751
                  }
752
                  break;
1✔
753
                case DOUBLE:
754
                  {
755
                    Number value = 0;
1✔
756
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
757
                      internalLogger.warn(
×
758
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
759
                          node);
760
                      continue;
×
761
                    } else {
762
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
763
                    }
764

765
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
766
                    cols.add(getPV(p, gentime, value.doubleValue()));
1✔
767
                  }
768
                  break;
1✔
769
                case FLOAT:
770
                  {
771
                    Number value = 0;
1✔
772
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
773
                      internalLogger.warn(
×
774
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
775
                          node);
776
                      continue;
×
777
                    } else {
778
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
779
                    }
780

781
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
782
                    cols.add(getPV(p, gentime, value.floatValue()));
1✔
783
                  }
784
                  break;
1✔
785
                case SINT32:
786
                  {
787
                    Number value = 0;
1✔
788
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
789
                      internalLogger.warn(
×
790
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
791
                          node);
792
                      continue;
×
793
                    } else {
794
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
795
                    }
796

797
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
798
                    cols.add(getPV(p, gentime, value.intValue()));
1✔
799
                  }
800
                  break;
1✔
801
                case SINT64:
802
                  {
803
                    Number value = 0;
1✔
804
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
805
                      internalLogger.warn(
×
806
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
807
                          node);
808
                      continue;
×
809
                    } else {
810
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
811
                    }
812

813
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
814
                    cols.add(getPV(p, gentime, value.longValue()));
1✔
815
                  }
816
                  break;
1✔
817
                case STRING:
818
                  {
819
                    String value = "";
1✔
820
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
821
                      internalLogger.warn(
×
822
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
823
                          node);
UNCOV
824
                      continue;
×
825
                    } else {
826
                      value = node.readAttribute(attr).getValue().getValue().toString();
1✔
827
                    }
828

829
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
830
                    cols.add(getPV(p, gentime, value));
1✔
831
                  }
832
                  break;
1✔
833
                case UINT32:
834
                  {
835
                    Number value = 0;
1✔
836
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
837
                      internalLogger.warn(
×
838
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
839
                          node);
UNCOV
840
                      continue;
×
841
                    } else {
842
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
843
                    }
844

845
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
846
                    cols.add(getPV(p, gentime, value.longValue()));
1✔
847
                  }
848
                  break;
1✔
849
                case UINT64:
850
                  {
851
                    Number value = 0;
1✔
852
                    if (node.readAttribute(attr).getValue().isNull()) {
1✔
UNCOV
853
                      internalLogger.warn(
×
854
                          "node {} has a Null variant. Ignoring and will not be pushed to stream.",
855
                          node);
UNCOV
856
                      continue;
×
857
                    } else {
858
                      value = (Number) node.readAttribute(attr).getValue().getValue();
1✔
859
                    }
860

861
                    tdef.addColumn(p.getQualifiedName(), DataType.PARAMETER_VALUE);
1✔
862
                    cols.add(getPV(p, gentime, value.longValue()));
1✔
863
                  }
864
                  break;
1✔
865
                default:
866
                  break;
867
              }
868

869
              log.debug("Pushing {} to stream", p.toString());
1✔
870

871
              pushTuple(tdef, cols);
1✔
872
              inCount.getAndAdd(1);
1✔
873
              realtimeCount.getAndAdd(1);
1✔
874
            }
1✔
875
            break;
1✔
876
          default:
877
            break;
878
        }
879

UNCOV
880
      } catch (UaException e) {
×
881
        // TODO Auto-generated catch block
UNCOV
882
        internalLogger.warn(e.toString());
×
UNCOV
883
        continue;
×
884
      }
1✔
885
    }
1✔
886
  }
1✔
887

888
  private synchronized void pushTuple(TupleDefinition tdef, List<Object> cols) {
889
    Tuple t;
890
    t = new Tuple(tdef, cols);
1✔
891
    opcuaStream.emitTuple(t);
1✔
892
  }
1✔
893

894
  private static ParameterType getOrCreateType(
895
      XtceDb mdb, String name, Supplier<ParameterType.Builder<?>> supplier) {
896

897
    String fqn = XtceDb.YAMCS_SPACESYSTEM_NAME + NameDescription.PATH_SEPARATOR + name;
1✔
898
    ParameterType ptype = mdb.getParameterType(fqn);
1✔
899
    if (ptype != null) {
1✔
900
      return ptype;
1✔
901
    }
902
    ParameterType.Builder<?> typeb = supplier.get().setName(name);
1✔
903

904
    ptype = typeb.build();
1✔
905
    ((NameDescription) ptype).setQualifiedName(fqn);
1✔
906

907
    return mdb.addSystemParameterType(ptype);
1✔
908
  }
909

910
  public static ParameterType getBasicType(XtceDb mdb, Type type) {
911
    ParameterType pType = null;
1✔
912
    switch (type) {
1✔
913
      case BOOLEAN:
914
        return getOrCreateType(mdb, "boolean", () -> new BooleanParameterType.Builder());
1✔
915
      case STRING:
916
        return getOrCreateType(mdb, "string", () -> new StringParameterType.Builder());
1✔
917

918
      case FLOAT:
919
        return getOrCreateType(
1✔
920
            mdb, "float32", () -> new FloatParameterType.Builder().setSizeInBits(32));
1✔
921
      case DOUBLE:
922
        return getOrCreateType(
1✔
923
            mdb, "float64", () -> new FloatParameterType.Builder().setSizeInBits(64));
1✔
924
      case SINT32:
925
        return getOrCreateType(
1✔
926
            mdb,
927
            "sint32",
UNCOV
928
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(true));
×
929
      case SINT64:
930
        return getOrCreateType(
1✔
931
            mdb,
932
            "sint64",
933
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(true));
1✔
934
      case UINT32:
935
        return getOrCreateType(
1✔
936
            mdb,
937
            "uint32",
UNCOV
938
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(false));
×
939
      case UINT64:
940
        return getOrCreateType(
1✔
941
            mdb,
942
            "uint64",
UNCOV
943
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(false));
×
944
      default:
945
        break;
946
    }
947

UNCOV
948
    return pType;
×
949
  }
950

951
  public ParameterValue getNewPv(Parameter parameter, long time) {
952
    ParameterValue pv = new ParameterValue(parameter);
1✔
953
    pv.setAcquisitionTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime());
1✔
954
    pv.setGenerationTime(time);
1✔
955
    return pv;
1✔
956
  }
957

958
  public ParameterValue getPV(Parameter parameter, long time, String v) {
959
    ParameterValue pv = getNewPv(parameter, time);
1✔
960
    pv.setEngValue(ValueUtility.getStringValue(v));
1✔
961
    pv.setRawValue(ValueUtility.getStringValue(v));
1✔
962
    return pv;
1✔
963
  }
964

965
  public ParameterValue getPV(Parameter parameter, long time, double v) {
966
    ParameterValue pv = getNewPv(parameter, time);
1✔
967
    pv.setEngValue(ValueUtility.getDoubleValue(v));
1✔
968
    pv.setRawValue(ValueUtility.getDoubleValue(v));
1✔
969
    return pv;
1✔
970
  }
971

972
  public ParameterValue getPV(Parameter parameter, long time, float v) {
973
    ParameterValue pv = getNewPv(parameter, time);
1✔
974
    pv.setEngValue(ValueUtility.getFloatValue(v));
1✔
975
    pv.setRawValue(ValueUtility.getFloatValue(v));
1✔
976
    return pv;
1✔
977
  }
978

979
  public ParameterValue getPV(Parameter parameter, long time, boolean v) {
980
    ParameterValue pv = getNewPv(parameter, time);
1✔
981
    pv.setEngValue(ValueUtility.getBooleanValue(v));
1✔
982
    pv.setRawValue(ValueUtility.getBooleanValue(v));
1✔
983
    return pv;
1✔
984
  }
985

986
  public ParameterValue getPV(Parameter parameter, long time, long v) {
987
    ParameterValue pv = getNewPv(parameter, time);
1✔
988
    pv.setEngValue(ValueUtility.getSint64Value(v));
1✔
989
    pv.setRawValue(ValueUtility.getSint64Value(v));
1✔
990
    return pv;
1✔
991
  }
992

993
  @Override
994
  public Status getLinkStatus() {
995
    return linkStatus;
1✔
996
  }
997

998
  @Override
999
  public boolean isDisabled() {
1000
    return linkStatus == Status.DISABLED;
1✔
1001
  }
1002

1003
  @Override
1004
  public long getDataInCount() {
1005
    return inCount.get();
1✔
1006
  }
1007

1008
  @Override
1009
  public long getDataOutCount() {
1010
    return 0;
1✔
1011
  }
1012

1013
  @Override
1014
  public void resetCounters() {
1015
    inCount.set(0);
1✔
1016
  }
1✔
1017

1018
  /**
1019
   * Selects first non-secured endpoint from endpoints found at discover URL. At the moment secured
1020
   * endpoints are not supported.
1021
   *
1022
   * @return
1023
   * @throws Exception
1024
   */
1025
  private OpcUaClient configureClient() throws Exception {
1026

1027
    List<EndpointDescription> endpoints = DiscoveryClient.getEndpoints(discoverURL).get();
1✔
1028

1029
    // At the moment, we do not support certificates.
1030
    EndpointDescription selectedEndpoint = null;
1✔
1031
    for (var endpoint : endpoints) {
1✔
1032
      switch (endpoint.getSecurityMode()) {
1✔
1033
        case Invalid:
UNCOV
1034
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
1035
          break;
×
1036
        case None:
1037
          selectedEndpoint = endpoint;
1✔
1038
          break;
1✔
1039
        case Sign:
UNCOV
1040
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
UNCOV
1041
          break;
×
1042
        case SignAndEncrypt:
UNCOV
1043
          internalLogger.warn("Endpoint mode {} is not supported.", endpoint.getSecurityMode());
×
1044
          break;
1045
      }
1046

1047
      if (selectedEndpoint != null) {
1✔
1048
        break;
1✔
1049
      }
UNCOV
1050
    }
×
1051

1052
    if (selectedEndpoint == null) {
1✔
UNCOV
1053
      throw new Exception("No viable endpoint found from list:" + endpoints);
×
1054
    }
1055

1056
    OpcUaClientConfig builder = OpcUaClientConfig.builder().setEndpoint(selectedEndpoint).build();
1✔
1057

1058
    return OpcUaClient.create(builder);
1✔
1059
  }
1060

1061
  /**
1062
   * Adds new PV with the name of node.
1063
   *
1064
   * @param client
1065
   * @param node
1066
   */
1067
  private void addOPCUAPV(OpcUaClient client, UaNode node) {
1068

1069
    if (node.getBrowseName()
1✔
1070
        .getName()
1✔
1071
        .contains(Character.toString(NameDescription.PATH_SEPARATOR))) {
1✔
UNCOV
1072
      internalLogger.info(
×
1073
          "{} ignored since it contains a {} character",
UNCOV
1074
          node.getBrowseName().getName(),
×
UNCOV
1075
          Character.toString(NameDescription.PATH_SEPARATOR));
×
1076

1077
    } else {
1078

1079
      /**
1080
       * NOTE:For now we'll just flatten all the attributes instead of using an aggregate type for
1081
       * attributes
1082
       */
1083
      for (AttributeId attr : AttributeId.values()) {
1✔
1084

1085
        ParameterType ptype = OPCUAAttrTypeToParamType(attr, node);
1✔
1086

1087
        String opcuaTranslatedQName = translateNodeToParamQName(client, node, attr);
1✔
1088
        Parameter p = VariableParam.getForFullyQualifiedName(opcuaTranslatedQName);
1✔
1089

1090
        p.setParameterType(ptype);
1✔
1091

1092
        if (mdb.getParameter(p.getQualifiedName()) == null) {
1✔
1093
          log.debug("Adding OPCUA object as parameter to mdb:{}", p.getQualifiedName());
1✔
1094
          mdb.addParameter(p, true);
1✔
1095
        }
1096
        nodeIDToParamsMap.put(new NodeIDAttrPair(node.getNodeId(), attr), (VariableParam) p);
1✔
1097
      }
1098
    }
1099
  }
1✔
1100

1101
  /**
1102
   * Map nodeID name to a qualified name that can be used for a YAMCS PV.
1103
   *
1104
   * @param client
1105
   * @param node
1106
   * @param attr
1107
   * @return
1108
   */
1109
  private String translateNodeToParamQName(OpcUaClient client, UaNode node, AttributeId attr) {
1110
    LocalizedText localizedDisplayName = null;
1✔
1111
    try {
1112

1113
      localizedDisplayName =
1✔
1114
          (LocalizedText) (node.readAttribute(AttributeId.DisplayName).getValue().getValue());
1✔
UNCOV
1115
    } catch (UaException e) {
×
UNCOV
1116
      internalLogger.warn(e.toString());
×
1117
    }
1✔
1118
    String opcuaTranslatedQName =
1✔
1119
        qualifiedName(
1✔
1120
            parametersNamespace
1121
                + NameDescription.PATH_SEPARATOR
1122
                + node.getNodeId().toParseableString().replace(";", "-")
1✔
1123
                + NameDescription.PATH_SEPARATOR
1124
                + localizedDisplayName.getText(),
1✔
1125
            attr.toString());
1✔
1126

1127
    return opcuaTranslatedQName;
1✔
1128
  }
1129

1130
  /**
1131
   * Browse node at nodePath relative to browseRoot.
1132
   *
1133
   * @param indent
1134
   * @param client
1135
   * @param browseRoot
1136
   * @param nodePath in the format of "0:Root,0:Objects,2:HelloWorld,2:MyObject,2:Bar"
1137
   * @throws Exception
1138
   */
1139
  private void browsePath(String indent, OpcUaClient client, NodeId startingNode, String nodePath)
1140
      throws Exception {
1141
    internalLogger.info("Browsing at " + startingNode);
1✔
1142
    ArrayList<String> rPathTokens = new ArrayList<String>();
1✔
1143
    ArrayList<RelativePathElement> relaitivePathElements = new ArrayList<RelativePathElement>();
1✔
1144

1145
    for (var pathToken : nodePath.split(",")) {
1✔
1146
      rPathTokens.add(nodePath);
1✔
1147

1148
      int namespaceIndex = 0;
1✔
1149

1150
      String namespaceName = "";
1✔
1151

1152
      namespaceIndex = Integer.parseInt(pathToken.split(":")[0]);
1✔
1153

1154
      namespaceName = pathToken.split(":")[1];
1✔
1155

1156
      relaitivePathElements.add(
1✔
1157
          new RelativePathElement(
1158
              Identifiers.HierarchicalReferences,
1159
              false,
1✔
1160
              true,
1✔
1161
              new QualifiedName(namespaceIndex, namespaceName)));
1162
    }
1163

1164
    ArrayList<BrowsePath> list = new ArrayList<BrowsePath>();
1✔
1165

1166
    RelativePathElement[] elements = new RelativePathElement[relaitivePathElements.size()];
1✔
1167

1168
    relaitivePathElements.toArray(elements);
1✔
1169

1170
    list.add(new BrowsePath(startingNode, new RelativePath(elements)));
1✔
1171

1172
    TranslateBrowsePathsToNodeIdsResponse response = null;
1✔
1173
    try {
1174
      response = client.translateBrowsePaths(list).get();
1✔
UNCOV
1175
    } catch (InterruptedException e) {
×
UNCOV
1176
      internalLogger.warn(e.toString());
×
UNCOV
1177
    } catch (ExecutionException e) {
×
UNCOV
1178
      internalLogger.warn(e.toString());
×
1179
    }
1✔
1180

1181
    BrowsePathResult result = Arrays.asList(response.getResults()).get(0);
1✔
1182
    StatusCode statusCode = result.getStatusCode();
1✔
1183

1184
    if (statusCode.isBad()) {
1✔
1185
      log.warn("Bad status code:" + statusCode);
1✔
1186
      org.yamcs.yarch.protobuf.Db.Event ev =
1187
          Event.newBuilder()
1✔
1188
              .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1189
              .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1190
              .setSource(this.linkName)
1✔
1191
              .setType(this.linkName)
1✔
1192
              .setMessage("Failed to find node:" + nodePath + ". Error code info:" + statusCode)
1✔
1193
              .setSeverity(EventSeverity.ERROR)
1✔
1194
              .build();
1✔
1195
      eventProducer.sendEvent(ev);
1✔
1196

1197
      throw new Exception("Bad status code:" + statusCode);
1✔
1198

1199
    } else if (statusCode.isUncertain()) {
1✔
UNCOV
1200
      log.warn("Uncertain status code:" + statusCode);
×
UNCOV
1201
      return;
×
1202
    }
1203

1204
    try {
1205
      UaNode node =
1✔
1206
          client
1207
              .getAddressSpace()
1✔
1208
              .getNode(
1✔
1209
                  result.getTargets()[0].getTargetId().toNodeId(client.getNamespaceTable()).get());
1✔
1210

1211
      addOPCUAPV(client, node);
1✔
UNCOV
1212
    } catch (UaException e) {
×
UNCOV
1213
      internalLogger.warn(e.toString());
×
1214
    }
1✔
1215
  }
1✔
1216

1217
  private void createOPCUASubscriptions() {
1218
    currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_DATA_SUBSCRIPTION;
1✔
1219
    createDataChangeListener();
1✔
1220
    Set<NodeId> nodeSet = new HashSet<NodeId>();
1✔
1221
    /**
1222
     * FIXME:This is super inefficient... The reason we collect these nodeIDs in a set is because
1223
     * otherwise we will have redundant subscription(s) since there is more than 1 attribute per
1224
     * nodeID given how nodeIDToParamsMap is designed
1225
     */
1226
    for (NodeIDAttrPair pair : nodeIDToParamsMap.keySet()) {
1✔
1227
      nodeSet.add(pair.nodeID);
1✔
1228
    }
1✔
1229

1230
    ArrayList<NodeId> variableNodes = new ArrayList<NodeId>();
1✔
1231
    for (NodeId id : nodeSet) {
1✔
1232
      Variant nodeClass = null;
1✔
1233
      try {
1234
        UaNode node = client.getAddressSpace().getNode(id);
1✔
1235

1236
        nodeClass = node.readAttribute(AttributeId.NodeClass).getValue();
1✔
1237

UNCOV
1238
      } catch (UaException e) {
×
UNCOV
1239
        internalLogger.warn(e.toString());
×
1240
      }
1✔
1241
      if (nodeClass != null) {
1✔
1242
        //        try {
1243
        switch (NodeClass.from((int) nodeClass.getValue())) {
1✔
1244
            // As per the spec, the only thing we can subscribe to is Variables
1245
          case Variable:
1246
            variableNodes.add(id);
1✔
1247
            break;
1248
        }
1249
      }
1250
    }
1✔
1251

1252
    try {
1253
      List<ManagedDataItem> dataItems = opcuaSubscription.createDataItems(variableNodes);
1✔
1254
      for (var dataItem : dataItems) {
1✔
1255
        log.debug("Status code for dataItem:{}", dataItem.getStatusCode());
1✔
1256
        OPCUAActiveSubs.addAndGet(1);
1✔
1257
      }
1✔
UNCOV
1258
    } catch (UaException e) {
×
UNCOV
1259
      internalLogger.warn(e.toString());
×
1260
    }
1✔
1261

1262
    currentOPCUAStatus = OPCUAINITStatus.OPCUA_INIT_OK;
1✔
1263
  }
1✔
1264

1265
  /**
1266
   * Connects to OPCUA server and activates query all action.
1267
   *
1268
   * @param client
1269
   * @throws Exception
1270
   */
1271
  public void connectToOPCUAServer(OpcUaClient client) throws Exception {
1272
    internalLogger.info("Connecting to OPCUA server...");
1✔
1273
    client.connect().get();
1✔
1274

1275
    if (getAction(startAction.getId()) == null) {
1✔
1276
      addAction(startAction);
1✔
1277
    }
1278
    startAction.setEnabled(true);
1✔
1279
  }
1✔
1280

1281
  /**
1282
   * Browses the tree on the OPCUA server and maps them to YAMCS Parameters.
1283
   *
1284
   * @param client
1285
   * @throws Exception
1286
   */
1287
  private void browseOPCUATree(OpcUaClient client) throws Exception {
1288
    // start browsing at root folder
1289
    internalLogger.info("Browsing OPCUA...");
1✔
1290
    for (var p : relativeNodePaths) {
1✔
1291
      int namespaceIndex = (int) p.rootNodeID.get("namespaceIndex");
1✔
1292
      String identifier = (String) p.rootNodeID.get("identifier");
1✔
1293
      IdType identifierType = IdType.valueOf((String) p.rootNodeID.get("identifierType"));
1✔
1294

1295
      browsePath(
1✔
1296
          endpointURL, client, getNewNodeID(identifierType, namespaceIndex, identifier), p.path);
1✔
1297
    }
1✔
1298
  }
1✔
1299

1300
  /**
1301
   * Get new OPCUA-compliant NodeID object that is created from NamespaceIndex and Identifier. At
1302
   * the moment only String and Numeric node ids are supported.
1303
   *
1304
   * @param rootIdentifierType
1305
   * @param NamespaceIndex
1306
   * @param Identifier
1307
   * @return
1308
   */
1309
  private NodeId getNewNodeID(IdType rootIdentifierType, int NamespaceIndex, String Identifier) {
1310
    NodeId nodeID = null;
1✔
1311
    switch (rootIdentifierType) {
1✔
1312
      case Guid:
UNCOV
1313
        internalLogger.warn("Guid nodeID is not supported");
×
UNCOV
1314
        break;
×
1315
      case Numeric:
1316
        nodeID = new NodeId(NamespaceIndex, Integer.parseInt(Identifier));
1✔
1317
        break;
1✔
1318
      case Opaque:
UNCOV
1319
        internalLogger.warn("Guid Opaque is not supported");
×
UNCOV
1320
        break;
×
1321
      case String:
UNCOV
1322
        nodeID = new NodeId(NamespaceIndex, Identifier);
×
1323
        break;
1324
    }
1325
    return nodeID;
1✔
1326
  }
1327

1328
  /** Data listener for realtime OPCUA server updates. */
1329
  private void createDataChangeListener() {
1330
    try {
1331
      opcuaSubscription = ManagedSubscription.create(client, publishInterval);
1✔
1332
    } catch (UaException e) {
×
1333
      internalLogger.warn(e.toString());
×
1334
    }
1✔
1335
    opcuaSubscription.addDataChangeListener(
1✔
1336
        (items, values) -> {
1337
          for (int i = 0; i < items.size(); i++) {
1✔
1338
            NodeIDAttrPair nodeAttrKey =
1✔
1339
                new NodeIDAttrPair(items.get(i).getNodeId(), AttributeId.Value);
1✔
1340
            log.debug(
1✔
1341
                "subscription value received: item={}, value={}",
1342
                items.get(i).getNodeId(),
1✔
1343
                values.get(i).getValue());
1✔
1344

1345
            log.debug(
1✔
1346
                "Pushing new PV for param name {} which is mapped to NodeID {}",
1347
                nodeIDToParamsMap.get(nodeAttrKey),
1✔
1348
                items.get(i).getNodeId());
1✔
1349

1350
            TupleDefinition tdef = gftdef.copy();
1✔
1351
            List<Object> cols = new ArrayList<>(4 + 1);
1✔
1352
            //            FIXME: Add leap seconds.... as config or get it from YAMCS API.
1353
            long gentime =
1✔
1354
                values
1355
                    .get(i)
1✔
1356
                    .getSourceTime()
1✔
1357
                    .getJavaInstant()
1✔
1358
                    .plus(37, ChronoUnit.SECONDS)
1✔
1359
                    .toEpochMilli();
1✔
1360
            cols.add(gentime);
1✔
1361
            cols.add(parametersNamespace);
1✔
1362
            cols.add(0);
1✔
1363
            long rectime = timeService.getMissionTime();
1✔
1364
            cols.add(rectime);
1✔
1365

1366
            /**
1367
             * TODO:Not sure if this is the best way to do this since the aggregate values will be
1368
             * partially updated. Another potential approach might be to decouple the live OPCUA
1369
             * data(subscriptions) via namespaces. For example; have a "special" namespace called
1370
             * "subscriptions" that ONLY gets updated with items. And maybe another namespace for
1371
             * static data...maybe.
1372
             *
1373
             * <p>Another option is to flatten everything and have no aggregate types at all. That
1374
             * approach might even simplify the code quite a bit...
1375
             *
1376
             * <p>Another question worth answering before moving forward is to find out whether or
1377
             * not it is concrete in the OPCUA protocol what data can change in real time and which
1378
             * data is "static". Not sure if there is any "static" data given that clients have the
1379
             * ability of writing to values... might be worth a test.
1380
             */
1381
            log.debug(
1✔
1382
                "Data({}) chnage triggered for {}",
1383
                values.get(i).getValue(),
1✔
1384
                nodeIDToParamsMap.get(nodeAttrKey));
1✔
1385

1386
            if (nodeIDToParamsMap.get(nodeAttrKey) == null) {
1✔
UNCOV
1387
              log.debug("No parameter mapping found for {}", nodeAttrKey.nodeID);
×
UNCOV
1388
              continue;
×
1389
            } else {
1390
              log.debug(
1✔
1391
                  String.format(
1✔
1392
                      "parameter mapping found for {} and {}",
1393
                      nodeAttrKey.nodeID,
1394
                      nodeAttrKey.attrID));
1395
            }
1396

1397
            if (values.get(i).getValue() != null && values.get(i).getValue().getValue() != null) {
1✔
1398

1399
              switch (nodeIDToParamsMap.get(nodeAttrKey).getParameterType().getValueType()) {
1✔
1400
                case BOOLEAN:
1401
                  {
1402
                    boolean value = (boolean) values.get(i).getValue().getValue();
1✔
1403

1404
                    tdef.addColumn(
1✔
1405
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1406
                        DataType.PARAMETER_VALUE);
1407
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value));
1✔
1408
                  }
1409
                  break;
1✔
1410
                case DOUBLE:
1411
                  {
1412
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1413

1414
                    tdef.addColumn(
1✔
1415
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1416
                        DataType.PARAMETER_VALUE);
1417
                    cols.add(
1✔
1418
                        getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.doubleValue()));
1✔
1419
                  }
1420
                  break;
1✔
1421
                case FLOAT:
1422
                  {
1423
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1424

1425
                    tdef.addColumn(
1✔
1426
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1427
                        DataType.PARAMETER_VALUE);
1428
                    cols.add(
1✔
1429
                        getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.floatValue()));
1✔
1430
                  }
1431
                  break;
1✔
1432
                case SINT32:
1433
                  {
1434
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1435
                    tdef.addColumn(
1✔
1436
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1437
                        DataType.PARAMETER_VALUE);
1438
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.intValue()));
1✔
1439
                  }
1440
                  break;
1✔
1441
                case SINT64:
1442
                  {
1443
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1444

1445
                    tdef.addColumn(
1✔
1446
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1447
                        DataType.PARAMETER_VALUE);
1448
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1449
                  }
1450
                  break;
1✔
1451
                case STRING:
1452
                  {
1453
                    String value = (String) values.get(i).getValue().getValue().toString();
1✔
1454

1455
                    tdef.addColumn(
1✔
1456
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1457
                        DataType.PARAMETER_VALUE);
1458
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value));
1✔
1459
                  }
1460
                  break;
1✔
1461
                case UINT32:
1462
                  {
1463
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1464
                    tdef.addColumn(
1✔
1465
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1466
                        DataType.PARAMETER_VALUE);
1467
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1468
                  }
1469
                  break;
1✔
1470
                case UINT64:
1471
                  {
1472
                    Number value = (Number) values.get(i).getValue().getValue();
1✔
1473

1474
                    tdef.addColumn(
1✔
1475
                        nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName(),
1✔
1476
                        DataType.PARAMETER_VALUE);
1477
                    cols.add(getPV(nodeIDToParamsMap.get(nodeAttrKey), gentime, value.longValue()));
1✔
1478
                  }
1479
                  break;
1✔
1480
                default:
1481
                  break;
1482
              }
1483

1484
              pushTuple(tdef, cols);
1✔
1485

1486
              inCount.getAndAdd(1);
1✔
1487
              realtimeCount.getAndAdd(1);
1✔
1488
            } else {
1489
              // TODO:Add some type emptyValue count for OPS.
UNCOV
1490
              log.warn(
×
1491
                  "Data chnage triggered for {}, but it empty. This should not happen.",
UNCOV
1492
                  nodeIDToParamsMap.get(nodeAttrKey).getQualifiedName());
×
1493
            }
1494
          }
1495
        });
1✔
1496
  }
1✔
1497

1498
  /**
1499
   * Get new ParameterType for the specified attribute of the node. Particularly useful for Value
1500
   * attributes of nodes.
1501
   *
1502
   * @param attr
1503
   * @param node
1504
   * @return
1505
   */
1506
  private ParameterType OPCUAAttrTypeToParamType(AttributeId attr, UaNode node) {
1507
    ParameterType pType = null;
1✔
1508

1509
    switch (attr) {
1✔
1510
      case AccessLevel:
1511
        pType = getBasicType(mdb, Type.STRING);
1✔
1512
        break;
1✔
1513
      case ArrayDimensions:
1514
        pType = getBasicType(mdb, Type.STRING);
1✔
1515
        break;
1✔
1516
      case BrowseName:
1517
        pType = getBasicType(mdb, Type.STRING);
1✔
1518
        break;
1✔
1519
      case ContainsNoLoops:
1520
        pType = getBasicType(mdb, Type.STRING);
1✔
1521
        break;
1✔
1522
      case DataType:
1523
        pType = getBasicType(mdb, Type.STRING);
1✔
1524
        break;
1✔
1525
      case Description:
1526
        pType = getBasicType(mdb, Type.STRING);
1✔
1527
        break;
1✔
1528
      case DisplayName:
1529
        pType = getBasicType(mdb, Type.STRING);
1✔
1530
        break;
1✔
1531
      case EventNotifier:
1532
        pType = getBasicType(mdb, Type.STRING);
1✔
1533
        break;
1✔
1534
      case Executable:
1535
        pType = getBasicType(mdb, Type.STRING);
1✔
1536
        break;
1✔
1537
      case Historizing:
1538
        pType = getBasicType(mdb, Type.STRING);
1✔
1539
        break;
1✔
1540
      case InverseName:
1541
        pType = getBasicType(mdb, Type.STRING);
1✔
1542
        break;
1✔
1543
      case IsAbstract:
1544
        pType = getBasicType(mdb, Type.STRING);
1✔
1545
        break;
1✔
1546
      case MinimumSamplingInterval:
1547
        pType = getBasicType(mdb, Type.STRING);
1✔
1548
        break;
1✔
1549
      case NodeClass:
1550
        pType = getBasicType(mdb, Type.STRING);
1✔
1551
        break;
1✔
1552
      case NodeId:
1553
        pType = getBasicType(mdb, Type.STRING);
1✔
1554
        break;
1✔
1555
      case Symmetric:
1556
        pType = getBasicType(mdb, Type.STRING);
1✔
1557
        break;
1✔
1558
      case UserAccessLevel:
1559
        pType = getBasicType(mdb, Type.STRING);
1✔
1560
        break;
1✔
1561
      case UserExecutable:
1562
        pType = getBasicType(mdb, Type.STRING);
1✔
1563
        break;
1✔
1564
      case UserWriteMask:
1565
        pType = getBasicType(mdb, Type.STRING);
1✔
1566
        break;
1✔
1567
      case Value:
1568
        try {
1569

1570
          var value = node.readAttribute(attr).getValue();
1✔
1571

1572
          if (value.isNotNull()) {
1✔
1573
            NodeId valueObjectType =
1✔
1574
                value.getDataType().get().toNodeId(client.getNamespaceTable()).get();
1✔
1575

1576
            /** As per the spec:https://reference.opcfoundation.org/Core/Part6/v104/docs/5.1.2 */
1577
            if (valueObjectType.equals(Identifiers.SByte)) {
1✔
UNCOV
1578
              pType = getBasicType(mdb, Type.SINT32);
×
1579
            } else if (valueObjectType.equals(Identifiers.Byte)) {
1✔
UNCOV
1580
              pType = getBasicType(mdb, Type.SINT32);
×
1581
            } else if (valueObjectType.equals(Identifiers.Int16)) {
1✔
UNCOV
1582
              pType = getBasicType(mdb, Type.SINT32);
×
1583
            } else if (valueObjectType.equals(Identifiers.UInt16)) {
1✔
UNCOV
1584
              pType = getBasicType(mdb, Type.SINT32);
×
1585
            } else if (valueObjectType.equals(Identifiers.Int32)) {
1✔
1586
              pType = getBasicType(mdb, Type.SINT32);
1✔
1587
            } else if (valueObjectType.equals(Identifiers.UInt32)) {
1✔
1588
              pType = getBasicType(mdb, Type.UINT32);
1✔
1589
            } else if (valueObjectType.equals(Identifiers.Int64)) {
1✔
1590
              pType = getBasicType(mdb, Type.SINT64);
1✔
1591
            } else if (valueObjectType.equals(Identifiers.UInt64)) {
1✔
1592
              pType = getBasicType(mdb, Type.UINT64);
1✔
1593
            } else if (valueObjectType.equals(Identifiers.Float)) {
1✔
1594
              pType = getBasicType(mdb, Type.FLOAT);
1✔
1595
            } else if (valueObjectType.equals(Identifiers.Double)) {
1✔
1596
              pType = getBasicType(mdb, Type.DOUBLE);
1✔
1597
            } else if (valueObjectType.equals(Identifiers.String)) {
1✔
1598
              pType = getBasicType(mdb, Type.STRING);
1✔
1599
            } else if (valueObjectType.equals(Identifiers.Boolean)) {
1✔
1600
              pType = getBasicType(mdb, Type.BOOLEAN);
1✔
1601
            }
1602
          } else {
1✔
UNCOV
1603
            pType = getBasicType(mdb, Type.STRING);
×
1604
          }
1605

UNCOV
1606
        } catch (UaException e) {
×
UNCOV
1607
          internalLogger.warn(e.toString());
×
1608
        }
1✔
UNCOV
1609
        break;
×
1610
      case ValueRank:
1611
        pType = getBasicType(mdb, Type.STRING);
1✔
1612
        break;
1✔
1613
      case WriteMask:
1614
        pType = getBasicType(mdb, Type.STRING);
1✔
1615
        break;
1✔
1616
      default:
1617
        break;
1618
    }
1619

1620
    return pType;
1✔
1621
  }
1622

1623
  /**
1624
   * Subscribe to OPCUA events as per the
1625
   * spec:https://reference.opcfoundation.org/Core/Part5/v104/docs/6.4.2
1626
   *
1627
   * @param client
1628
   * @throws InterruptedException
1629
   * @throws ExecutionException
1630
   */
1631
  private void subscribeToEvents(OpcUaClient client)
1632
      throws InterruptedException, ExecutionException {
1633
    // create a subscription and a monitored item
1634
    UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
1✔
1635

1636
    ReadValueId readValueId =
1✔
1637
        new ReadValueId(
1638
            Identifiers.Server, AttributeId.EventNotifier.uid(), null, QualifiedName.NULL_VALUE);
1✔
1639

1640
    // client handle must be unique per item
1641
    UInteger clientHandle = uint(clientHandles.getAndIncrement());
1✔
1642

1643
    EventFilter eventFilter =
1✔
1644
        new EventFilter(
1645
            new SimpleAttributeOperand[] {
1646
              new SimpleAttributeOperand(
1647
                  Identifiers.BaseEventType,
1648
                  new QualifiedName[] {new QualifiedName(0, "EventId")},
1649
                  AttributeId.Value.uid(),
1✔
1650
                  null),
1651
              new SimpleAttributeOperand(
1652
                  Identifiers.BaseEventType,
1653
                  new QualifiedName[] {new QualifiedName(0, "EventType")},
1654
                  AttributeId.Value.uid(),
1✔
1655
                  null),
1656
              new SimpleAttributeOperand(
1657
                  Identifiers.BaseEventType,
1658
                  new QualifiedName[] {new QualifiedName(0, "Severity")},
1659
                  AttributeId.Value.uid(),
1✔
1660
                  null),
1661
              new SimpleAttributeOperand(
1662
                  Identifiers.BaseEventType,
1663
                  new QualifiedName[] {new QualifiedName(0, "Time")},
1664
                  AttributeId.Value.uid(),
1✔
1665
                  null),
1666
              new SimpleAttributeOperand(
1667
                  Identifiers.BaseEventType,
1668
                  new QualifiedName[] {new QualifiedName(0, "Message")},
1669
                  AttributeId.Value.uid(),
1✔
1670
                  null)
1671
            },
1672
            new ContentFilter(null));
1673

1674
    MonitoringParameters parameters =
1✔
1675
        new MonitoringParameters(
1676
            clientHandle,
1677
            0.0,
1✔
1678
            ExtensionObject.encode(client.getStaticSerializationContext(), eventFilter),
1✔
1679
            uint(10),
1✔
1680
            true);
1✔
1681

1682
    MonitoredItemCreateRequest request =
1✔
1683
        new MonitoredItemCreateRequest(readValueId, MonitoringMode.Reporting, parameters);
1684

1685
    List<UaMonitoredItem> items =
1✔
1686
        subscription.createMonitoredItems(TimestampsToReturn.Both, newArrayList(request)).get();
1✔
1687

1688
    // do something with the value updates
1689
    UaMonitoredItem monitoredItem = items.get(0);
1✔
1690

1691
    monitoredItem.setEventConsumer(
1✔
1692
        (item, vs) -> {
1693
          internalLogger.info("Event Received from {}", item.getReadValueId().getNodeId());
1✔
1694

1695
          StringBuilder eventText = new StringBuilder();
1✔
1696

1697
          ByteString eventId;
1698
          NodeId eventType;
1699
          UShort eventSeverity;
1700
          DateTime eventTime;
1701
          LocalizedText eventMessage;
1702

1703
          for (int i = 0; i < vs.length; i++) {
1✔
1704
            internalLogger.info("\tvariant[{}]: {}", i, vs[i].getValue());
1✔
1705
          }
1706

1707
          eventId = (ByteString) vs[0].getValue();
1✔
1708
          eventType = (NodeId) vs[1].getValue();
1✔
1709
          eventSeverity = (UShort) vs[2].getValue();
1✔
1710
          eventTime = (DateTime) vs[3].getValue();
1✔
1711
          eventMessage = (LocalizedText) vs[4].getValue();
1✔
1712

1713
          //          FIXME:Map these values to YAMCS API
1714
          eventText.append("eventId:" + eventId);
1✔
1715
          eventText.append("\n");
1✔
1716
          eventText.append("eventType:" + eventType);
1✔
1717
          eventText.append("\n");
1✔
1718
          eventText.append("eventSeverity:" + eventSeverity);
1✔
1719
          eventText.append("\n");
1✔
1720
          eventText.append("eventTime:" + eventTime);
1✔
1721
          eventText.append("\n");
1✔
1722
          eventText.append("eventMessage:" + eventMessage);
1✔
1723
          org.yamcs.yarch.protobuf.Db.Event ev =
1724
              Event.newBuilder()
1✔
1725
                  .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1726
                  .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
1✔
1727
                  .setSource(this.linkName)
1✔
1728
                  .setType(this.linkName)
1✔
1729
                  .setMessage(eventText.toString())
1✔
1730
                  .setSeverity(EventSeverity.INFO)
1✔
1731
                  .build();
1✔
1732
          eventProducer.sendEvent(ev);
1✔
1733
        });
1✔
1734
  }
1✔
1735

1736
  @Override
1737
  public void setupSystemParameters(SystemParametersService sysParamService) {
1738
    super.setupSystemParameters(sysParamService);
1✔
1739
    OPCUAInitStatusParam =
1✔
1740
        sysParamService.createEnumeratedSystemParameter(
1✔
1741
            linkName + "/OPCUAInitStatusParam",
1742
            OPCUAINITStatus.class,
1743
            "The current initialization status of OPCUA client");
1744
    EnumeratedParameterType spLinkStatusType =
1✔
1745
        (EnumeratedParameterType) OPCUAInitStatusParam.getParameterType();
1✔
1746
    spLinkStatusType
1✔
1747
        .enumValue(OPCUAINITStatus.OPCUA_INIT_CONFIG.name())
1✔
1748
        .setDescription(
1✔
1749
            "This link is in the configuration stage(Configuring OPCUA parameters such as certificates)");
1750
    spLinkStatusType
1✔
1751
        .enumValue(OPCUAINITStatus.OPCUA_INIT_TREE.name())
1✔
1752
        .setDescription(
1✔
1753
            "The link is parsing the OPCUA Tree and mapping them to PVs."
1754
                + " Depending on configuration, this can take a while.");
1755

1756
    spLinkStatusType
1✔
1757
        .enumValue(OPCUAINITStatus.OPCUA_INIT_TREE_FAILED.name())
1✔
1758
        .setDescription("The initial parsing of configured nodes failed.");
1✔
1759
    spLinkStatusType
1✔
1760
        .enumValue(OPCUAINITStatus.OPCUA_INIT_EVENTS.name())
1✔
1761
        .setDescription("The link is configuring and subscribing to OPCUA events");
1✔
1762
    spLinkStatusType
1✔
1763
        .enumValue(OPCUAINITStatus.OPCUA_INIT_DATA_SUBSCRIPTION.name())
1✔
1764
        .setDescription(
1✔
1765
            "The link is creating subscriptions for each node that was parsed from the tree"
1766
                + "that has a Value attribute.");
1767
    spLinkStatusType
1✔
1768
        .enumValue(OPCUAINITStatus.OPCUA_INIT_ALL_DATA_QUERY.name())
1✔
1769
        .setDescription(
1✔
1770
            "The link is querying all attributes of all parsed nodes."
1771
                + "This is can be configured to be done at startup.");
1772
    spLinkStatusType
1✔
1773
        .enumValue(OPCUAINITStatus.OPCUA_INIT_OK.name())
1✔
1774
        .setDescription(
1✔
1775
            "The link is done with all OPCUA initialization. It is in an usable state.");
1776

1777
    OPCUAActiveSubsParam =
1✔
1778
        sysParamService.createSystemParameter(
1✔
1779
            linkName + "/OPCUAActiveSubs",
1780
            Type.UINT64,
1781
            "The total number of active opcua subscriptions");
1782

1783
    realtimeCountParam =
1✔
1784
        sysParamService.createSystemParameter(
1✔
1785
            linkName + "/RealtimeCount",
1786
            Type.UINT64,
1787
            "The total number of realtime count(used for sub strike counts)");
1788

1789
    lastRealtimeCountParam =
1✔
1790
        sysParamService.createSystemParameter(
1✔
1791
            linkName + "/LastRealtimeCount",
1792
            Type.UINT64,
1793
            "The total number of realtime counts last captured(used for sub strike counts)");
1794

1795
    subStrikeCountThresholdParam =
1✔
1796
        sysParamService.createSystemParameter(
1✔
1797
            linkName + "/SubStrikeCountThreshold",
1798
            Type.UINT64,
1799
            "Configured strike count threshold. If current strike count exceeds this value, users will be notified via events"
1800
                + " and a reconnect will be attempted.");
1801

1802
    subStrikeCountParam =
1✔
1803
        sysParamService.createSystemParameter(
1✔
1804
            linkName + "/SubStrikeCount", Type.UINT64, "Current subscription strike count.");
1805

1806
    subStrikeCountCheckTimeoutSecsParam =
1✔
1807
        sysParamService.createSystemParameter(
1✔
1808
            linkName + "/SubStrikeCountCheckTimeoutSecs",
1809
            Type.UINT64,
1810
            "Timeout(in seconds) between strike count checks.");
1811

1812
    reconnectCountParam =
1✔
1813
        sysParamService.createSystemParameter(
1✔
1814
            linkName + "/ReconnectCount",
1815
            Type.UINT64,
1816
            "Successful reconnect count, after sub strike count failures.");
1817
  }
1✔
1818

1819
  @Override
1820
  public List<ParameterValue> getSystemParameters() {
1821
    long time = getCurrentTime();
1✔
1822
    ArrayList<ParameterValue> list = new ArrayList<>();
1✔
1823

1824
    list.add(
1✔
1825
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1826
            OPCUAInitStatusParam, time, currentOPCUAStatus));
1827
    list.add(
1✔
1828
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1829
            OPCUAActiveSubsParam, time, OPCUAActiveSubs.get()));
1✔
1830

1831
    list.add(
1✔
1832
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1833
            realtimeCountParam, time, realtimeCount.get()));
1✔
1834
    list.add(
1✔
1835
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1836
            lastRealtimeCountParam, time, lastRealtimeCount.get()));
1✔
1837

1838
    list.add(
1✔
1839
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1840
            subStrikeCountThresholdParam, time, subStrikeCountThreshold));
1841

1842
    list.add(
1✔
1843
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1844
            subStrikeCountParam, time, subStrikeCount.get()));
1✔
1845

1846
    list.add(
1✔
1847
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1848
            subStrikeCountCheckTimeoutSecsParam, time, subStrikeCountCheckTimeoutSecs));
1849

1850
    list.add(
1✔
1851
        org.yamcs.parameter.SystemParametersService.getPV(
1✔
1852
            reconnectCountParam, time, reconnectCount));
1853
    try {
1854
      super.collectSystemParameters(time, list);
1✔
UNCOV
1855
    } catch (Exception e) {
×
UNCOV
1856
      log.error("Exception caught when collecting link system parameters", e);
×
1857
    }
1✔
1858
    return list;
1✔
1859
  }
1860
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc