• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #112

15 Dec 2023 06:10PM UTC coverage: 17.034% (-0.001%) from 17.035%
#112

push

lorenzo-gomez-windhover
-Add the rest of the VTypes to ParameterViewer

17830 of 104672 relevant lines covered (17.03%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.39
/core/commander-core/src/main/java/com/windhoverlabs/pv/yamcs/YamcsSubscriptionService.java
1
package com.windhoverlabs.pv.yamcs;
2

3
import java.time.Instant;
4
import java.util.ArrayList;
5
import java.util.Arrays;
6
import java.util.Collections;
7
import java.util.HashSet;
8
import java.util.LinkedHashMap;
9
import java.util.List;
10
import java.util.Map;
11
import java.util.Map.Entry;
12
import java.util.Objects;
13
import java.util.Set;
14
import java.util.concurrent.Executors;
15
import java.util.concurrent.ScheduledExecutorService;
16
import java.util.concurrent.TimeUnit;
17
import java.util.concurrent.atomic.AtomicBoolean;
18
import java.util.logging.Logger;
19
import java.util.stream.Collectors;
20
import org.epics.util.array.ArrayDouble;
21
import org.epics.vtype.Alarm;
22
import org.epics.vtype.AlarmSeverity;
23
import org.epics.vtype.AlarmStatus;
24
import org.epics.vtype.Display;
25
import org.epics.vtype.EnumDisplay;
26
import org.epics.vtype.Time;
27
import org.epics.vtype.VBoolean;
28
import org.epics.vtype.VDouble;
29
import org.epics.vtype.VDoubleArray;
30
import org.epics.vtype.VEnum;
31
import org.epics.vtype.VFloat;
32
import org.epics.vtype.VInt;
33
import org.epics.vtype.VLong;
34
import org.epics.vtype.VString;
35
import org.epics.vtype.VStringArray;
36
import org.epics.vtype.VTable;
37
import org.epics.vtype.VType;
38
import org.epics.vtype.VUInt;
39
import org.epics.vtype.VULong;
40
import org.phoebus.pv.PV;
41
import org.yamcs.client.ParameterSubscription;
42
import org.yamcs.protobuf.Pvalue.AcquisitionStatus;
43
import org.yamcs.protobuf.Pvalue.ParameterValue;
44
import org.yamcs.protobuf.SubscribeParametersRequest;
45
import org.yamcs.protobuf.SubscribeParametersRequest.Action;
46
import org.yamcs.protobuf.Yamcs.NamedObjectId;
47

48
/**
49
 * Keeps track of {@link IPV} registration state and takes care of establishing or re-establishing a
50
 * bundled parameter subscription against Yamcs.
51
 */
52
public class YamcsSubscriptionService implements YamcsAware, ParameterSubscription.Listener {
53

54
  private static final Logger log = Logger.getLogger(YamcsSubscriptionService.class.getName());
1✔
55

56
  private String instanceName;
57
  private String currentProcessor;
58

59
  private Map<NamedObjectId, Set<YamcsPV>> pvsById = new LinkedHashMap<>();
1✔
60

61
  private ParameterSubscription subscription;
62

63
  public ParameterSubscription getSubscription() {
64
    return subscription;
×
65
  }
66

67
  private AtomicBoolean subscriptionDirty = new AtomicBoolean(false);
1✔
68
  private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
1✔
69

70
  private Set<ParameterValueListener> parameterValueListeners = new HashSet<>();
1✔
71

72
  private ArrayList<NamedObjectId> ids = new ArrayList<NamedObjectId>();
1✔
73

74
  private String serverName = null;
1✔
75

76
  static final Alarm UDF = Alarm.of(AlarmSeverity.UNDEFINED, AlarmStatus.UNDEFINED, "UDF");
1✔
77

78
  public YamcsSubscriptionService(
79
      ParameterSubscription newSubscriprion,
80
      String newServerName,
81
      String newInstanceName,
82
      String processor) {
1✔
83
    serverName = newServerName;
1✔
84
    subscription = newSubscriprion;
1✔
85
    instanceName = newInstanceName;
1✔
86
    currentProcessor = processor;
1✔
87
    subscription.addListener(this);
1✔
88

89
    // Periodically check if the subscription needs a refresh
90
    // (PVs send individual events, so this bundles them)
91
    executor.scheduleWithFixedDelay(
1✔
92
        () -> {
93
          if (subscriptionDirty.getAndSet(false) && subscription != null) {
1✔
94
            Set<NamedObjectId> ids = getRequestedIdentifiers();
×
95
            // TODO:Make log level configurable
96
            //            log.info(String.format("Modifying subscription to %s", ids));
97
            subscription.sendMessage(
×
98
                SubscribeParametersRequest.newBuilder()
×
99
                    .setAction(Action.REPLACE)
×
100
                    .setSendFromCache(true)
×
101
                    .setAbortOnInvalid(false)
×
102
                    .setUpdateOnExpiration(true)
×
103
                    .addAllId(ids)
×
104
                    .setProcessor(currentProcessor)
×
105
                    .build());
×
106
          }
107
        },
1✔
108
        500,
109
        500,
110
        TimeUnit.MILLISECONDS);
111
  }
1✔
112

113
  private Set<NamedObjectId> getRequestedIdentifiers() {
114
    return pvsById.entrySet().stream()
×
115
        .filter(entry -> !entry.getValue().isEmpty())
×
116
        .map(Entry::getKey)
×
117
        .collect(Collectors.toSet());
×
118
  }
119

120
  public boolean isSubscriptionAvailable() {
121
    return subscription != null;
×
122
  }
123

124
  /**
125
   * Convert something like yamcs://cfs/CPD/ci/CI_HkTlm_t.usCmdCnt to
126
   * /cfs/CPD/ci/CI_HkTlm_t.usCmdCnt. Very useful for querying the Yamcs server.
127
   *
128
   * @param pvName
129
   * @return
130
   */
131
  private String getYamcsPvName(String pvName, String serverName) {
132
    String subStr = "/" + serverName + ":" + instanceName;
×
133
    return pvName.substring(subStr.length());
×
134
  }
135

136
  @Override
137
  public void changeProcessor(String instance, String processor) {
138
    currentProcessor = processor;
×
139
    executor.execute(
×
140
        () -> {
141
          // Ready to receive some data
142
          Set<NamedObjectId> ids = getRequestedIdentifiers();
×
143
          //          log.fine(String.format("Subscribing to %s [%s/%s]", ids, instance,
144
          // processor));
145
          subscription.sendMessage(
×
146
              SubscribeParametersRequest.newBuilder()
×
147
                  .setInstance(instance)
×
148
                  .setProcessor(processor)
×
149
                  .setSendFromCache(true)
×
150
                  .setAbortOnInvalid(false)
×
151
                  .setUpdateOnExpiration(false)
×
152
                  .addAllId(ids)
×
153
                  .build());
×
154
        });
×
155
  }
×
156

157
  /** Async adds a Yamcs PV for receiving updates. */
158
  public void register(YamcsPV pv) {
159
    NamedObjectId id =
×
160
        YamcsSubscriptionService.identityOf(getYamcsPvName(pv.getName(), serverName));
×
161
    executor.execute(
×
162
        () -> {
163
          Set<YamcsPV> pvs = pvsById.computeIfAbsent(id, x -> new HashSet<>());
×
164
          pvs.add(pv);
×
165
          subscriptionDirty.set(true);
×
166
        });
×
167

168
    ids.add(id);
×
169

170
    try {
171
      subscription.sendMessage(
×
172
          SubscribeParametersRequest.newBuilder()
×
173
              .setInstance(instanceName)
×
174
              .setProcessor(currentProcessor)
×
175
              .setSendFromCache(true)
×
176
              .setAbortOnInvalid(false)
×
177
              .setUpdateOnExpiration(false)
×
178
              .addId(id)
×
179
              .setAction(Action.ADD)
×
180
              .build());
×
181
    } catch (Exception e) {
×
182
      System.out.println("e:" + e);
×
183
    }
×
184
  }
×
185

186
  /** Async removes a Yamcs PV from receiving updates. */
187
  public void unregister(PV pv) {
188
    NamedObjectId id = identityOf(pv.getName());
×
189
    executor.execute(
×
190
        () -> {
191
          Set<YamcsPV> pvs = pvsById.get(id);
×
192
          if (pvs != null) {
×
193
            boolean removed = pvs.remove(pv);
×
194
            if (removed) {
×
195
              subscriptionDirty.set(true);
×
196
            }
197
          }
198
        });
×
199
  }
×
200

201
  public void destroy() {
202
    subscription.cancel(true);
1✔
203
    pvsById.clear();
1✔
204
    YamcsPVFactory.clearPVs();
1✔
205
    executor.shutdown();
1✔
206
  }
1✔
207

208
  public void addParameterValueListener(ParameterValueListener listener) {
209
    parameterValueListeners.add(listener);
×
210
  }
×
211

212
  @Override
213
  public void onInvalidIdentification(NamedObjectId id) {
214
    //        executor.execute(() -> {
215
    //            // We keep the id in pvsById, we want to again receive the invalid
216
    //            // identification when the subscription is updated.
217
    //            Set<PV> pvs = pvsById.get(id);
218
    //            if (pvs != null) {
219
    //                pvs.forEach(PV::setInvalid);
220
    //            }
221
    //        });
222
  }
×
223

224
  public static NamedObjectId identityOf(String pvName) {
225
    return NamedObjectId.newBuilder().setName(pvName).build();
×
226
    //    if (pvName.startsWith("yamcs://")) {
227
    //      return
228
    // NamedObjectId.newBuilder().setName(pvName.substring("yamcs://".length())).build();
229
    //    } else {
230
    //      System.out.println("identityOf2" + pvName);
231
    //    }
232
    //        } else if (pvName.startsWith("para://")) {
233
    //            return NamedObjectId.newBuilder()
234
    //                    .setName(pvName.substring("para://".length()))
235
    //                    .build();
236
    //        } else if (pvName.startsWith("raw://")) {
237
    //            return NamedObjectId.newBuilder()
238
    //                    .setName(pvName.substring("raw://".length()))
239
    //                    .build();
240
    //        } else {
241
    //            return NamedObjectId.newBuilder()
242
    //                    .setName(pvName)
243
    //                    .build();
244
    //        }
245
  }
246

247
  @FunctionalInterface
248
  public static interface ParameterValueListener {
249
    void onData(List<ParameterValue> values);
250
  }
251

252
  /**
253
   * create a VType from a yamcs ParameterValue object.
254
   *
255
   * @param parameter
256
   * @return
257
   */
258
  private VType getVType(ParameterValue parameter) {
259
    ArrayList<String> yamcsValues = new ArrayList<String>();
×
260

261
    Class<? extends VType> valueType = null;
×
262
    VType value = null;
×
263

264
    switch (parameter.getEngValue().getType()) {
×
265
      case AGGREGATE:
266
        // TODO Implement
267
        break;
×
268
      case ARRAY:
269
        // TODO Implement
270
        break;
×
271
      case BINARY:
272
        // TODO Implement
273
        break;
×
274
      case BOOLEAN:
275
        {
276
          yamcsValues.add(Boolean.toString(parameter.getEngValue().getBooleanValue()));
×
277
          valueType = VBoolean.class;
×
278
          break;
×
279
        }
280
      case DOUBLE:
281
        {
282
          yamcsValues.add(Double.toString(parameter.getEngValue().getDoubleValue()));
×
283
          valueType = VDouble.class;
×
284
          break;
×
285
        }
286
        //                case ENUMERATED:
287
        ////                        yamcsValues.add(Enum.toString(parameter.getEngValue().getS));
288
        //                        valueType = VFloat.class;
289
        //                        break;
290
      case FLOAT:
291
        {
292
          yamcsValues.add(Float.toString(parameter.getEngValue().getFloatValue()));
×
293
          valueType = VFloat.class;
×
294
          break;
×
295
        }
296
      case NONE:
297
        // TODO Implement
298
        break;
×
299
      case SINT32:
300
        {
301
          yamcsValues.add(Integer.toString(parameter.getEngValue().getSint32Value()));
×
302
          valueType = VInt.class;
×
303
          break;
×
304
        }
305
      case SINT64:
306
        {
307

308
          //            return
309
          // Value.newBuilder().setType(Type.STRING).setStringValue(String.valueOf(value)).build();
310
          yamcsValues.add(Long.toString(parameter.getEngValue().getSint64Value()));
×
311
          valueType = VLong.class;
×
312
          break;
×
313
        }
314

315
      case STRING:
316
        {
317
          yamcsValues.add(parameter.getEngValue().getStringValue());
×
318
          valueType = VString.class;
×
319
          break;
×
320
        }
321
      case ENUMERATED:
322
        {
323
          yamcsValues.add("0");
×
324
          yamcsValues.add(parameter.getEngValue().getStringValue());
×
325
          //          Enum values and labels might become relevant once Alarms are supported...
326
          //          for(var i:
327
          // parameter.getEngValue().getType().getDescriptorForType().getValues())
328
          //          {
329
          //              System.out.println( i.toString());
330
          //          }
331
          valueType = VEnum.class;
×
332
          break;
×
333
        }
334
      case TIMESTAMP:
335
        break;
×
336
      case UINT32:
337
        {
338
          yamcsValues.add(Integer.toUnsignedString(parameter.getEngValue().getUint32Value()));
×
339
          valueType = VUInt.class;
×
340
          break;
×
341
        }
342
      case UINT64:
343
        {
344
          yamcsValues.add(Long.toUnsignedString(parameter.getEngValue().getUint64Value()));
×
345
          valueType = VULong.class;
×
346
          break;
×
347
        }
348
      default:
349
        break;
350
    }
351

352
    if (!yamcsValues.isEmpty()) {
×
353
      try {
354
        value =
×
355
            getInitialValue(
×
356
                yamcsValues,
357
                valueType,
358
                parameter.getAcquisitionStatus(),
×
359
                Instant.ofEpochSecond(
×
360
                    parameter.getGenerationTime().getSeconds(),
×
361
                    parameter.getGenerationTime().getNanos()));
×
362
      } catch (Exception e) {
×
363
        // TODO Auto-generated catch block
364
        e.printStackTrace();
×
365
      }
×
366
    }
367

368
    return value;
×
369
  }
370

371
  /**
372
   * @param items Items from <code>splitInitialItems</code>
373
   * @return All items as strings, surrounding quotes removed, un-escaping quotes
374
   */
375
  private static List<String> getInitialStrings(List<String> items) {
376
    if (items == null) return Arrays.asList("");
×
377
    final List<String> strings = new ArrayList<>(items.size());
×
378
    for (String item : items)
×
379
      if (item.startsWith("\""))
×
380
        strings.add(item.substring(1, item.length() - 1).replace("\\\"", "\""));
×
381
      else strings.add(item);
×
382
    return strings;
×
383
  }
384

385
  /**
386
   * TODO:Refactor so that not everything is "doubles"
387
   *
388
   * @param items Items from <code>splitInitialItems</code>
389
   * @return Numeric values for all items
390
   * @throws Exception on error
391
   */
392
  public static double[] getInitialDoubles(List<?> items) throws Exception {
393
    final double[] values = new double[items.size()];
×
394
    for (int i = 0; i < values.length; ++i) {
×
395
      try {
396
        final String text = Objects.toString(items.get(i));
×
397
        if (text.startsWith("0x")) values[i] = Integer.parseInt(text.substring(2), 16);
×
398
        else values[i] = Double.parseDouble(text);
×
399
      } catch (NumberFormatException ex) {
×
400
        throw new Exception("Cannot parse number from " + items.get(i));
×
401
      }
×
402
    }
403

404
    return values;
×
405
  }
406

407
  /**
408
   * TODO:We should make a distinction between unsigned and signed long.
409
   *
410
   * @param items Items from <code>splitInitialItems</code>
411
   * @return Numeric values for all items
412
   * @throws Exception on error
413
   */
414
  public static long[] getInitialLongs(List<?> items) throws Exception {
415
    final long[] values = new long[items.size()];
×
416
    for (int i = 0; i < values.length; ++i) {
×
417
      try {
418
        final String text = Objects.toString(items.get(i));
×
419
        if (text.startsWith("0x")) {
×
420
          values[i] = Integer.parseInt(text.substring(2), 16);
×
421
        } else {
422
          values[i] = Long.parseLong(text);
×
423
        }
424
      } catch (NumberFormatException ex) {
×
425
        throw new Exception("Cannot parse number from " + items.get(i));
×
426
      }
×
427
    }
428

429
    return values;
×
430
  }
431

432
  /**
433
   * @param items Items from <code>splitInitialItems</code>
434
   * @return Boolean list of all items
435
   */
436
  private static List<Boolean> getInitialBooleans(List<String> items) {
437
    if (items == null) return Arrays.asList(Boolean.FALSE);
×
438
    return items.stream()
×
439
        .map(
×
440
            item -> {
441
              return Boolean.parseBoolean(item);
×
442
            })
443
        .collect(Collectors.toList());
×
444
  }
445

446
  /**
447
   * @param items Items from <code>splitInitialItems</code>, i.e. strings are quoted
448
   * @param type Desired VType
449
   * @return VType for initial value
450
   * @throws Exception on error
451
   */
452
  public static VType getInitialValue(
453
      final List<String> items,
454
      Class<? extends VType> type,
455
      AcquisitionStatus status,
456
      Instant timeStamp)
457
      throws Exception {
458
    Alarm alarm = Alarm.none();
×
459
    switch (status) {
×
460
      case ACQUIRED:
461
        break;
×
462
      case EXPIRED:
463
        alarm = Alarm.noValue();
×
464
        break;
×
465
      case INVALID:
466
        break;
×
467
      case NOT_RECEIVED:
468
        break;
×
469
      default:
470
        break;
471
    }
472

473
    if (type == VDouble.class) {
×
474
      if (items == null) return VDouble.of(0.0, alarm, Time.of(timeStamp), Display.none());
×
475
      if (items.size() == 1)
×
476
        return VDouble.of(getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
477
      else throw new Exception("Expected one number, got " + items);
×
478
    }
479

480
    if (type == VFloat.class) {
×
481
      if (items == null) return VFloat.of(0.0, alarm, Time.of(timeStamp), Display.none());
×
482
      if (items.size() == 1)
×
483
        return VFloat.of(getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
484
      else throw new Exception("Expected one number, got " + items);
×
485
    }
486

487
    if (type == VLong.class) {
×
488
      if (items.size() == 1)
×
489
        return VLong.of(
×
490
            (long) getInitialLongs(items)[0], alarm, Time.of(timeStamp), Display.none());
×
491
      else throw new Exception("Expected one number, got " + items);
×
492
    }
493

494
    if (type == VULong.class) {
×
495
      if (items.size() == 1)
×
496
        return VLong.of(
×
497
            (long) getInitialLongs(items)[0], alarm, Time.of(timeStamp), Display.none());
×
498
      else throw new Exception("Expected one number, got " + items);
×
499
    }
500

501
    if (type == VInt.class) {
×
502
      if (items.size() == 1)
×
503
        return VInt.of(
×
504
            (long) getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
505
      else throw new Exception("Expected one number, got " + items);
×
506
    }
507

508
    if (type == VUInt.class) {
×
509
      if (items.size() == 1)
×
510
        return VInt.of(
×
511
            (long) getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
512
      else throw new Exception("Expected one number, got " + items);
×
513
    }
514

515
    if (type == VBoolean.class) {
×
516
      if (items == null || items.size() == 1)
×
517
        return VBoolean.of(getInitialBooleans(items).get(0), alarm, Time.of(timeStamp));
×
518
      else throw new Exception("Expected one boolean, got " + items);
×
519
    }
520

521
    if (type == VString.class) {
×
522
      if (items == null || items.size() == 1)
×
523
        return VString.of(getInitialStrings(items).get(0), alarm, Time.of(timeStamp));
×
524
      else throw new Exception("Expected one string, got " + items);
×
525
    }
526

527
    if (type == VDoubleArray.class)
×
528
      return VDoubleArray.of(
×
529
          ArrayDouble.of(getInitialDoubles(items)), alarm, Time.of(timeStamp), Display.none());
×
530

531
    //        if (type == VBooleanArray.class)
532
    //            return VBooleanArray.of(ArrayBoolean.of(getInitialBooleans(items)), Alarm.none(),
533
    // Time.of(timeStamp));
534

535
    if (type == VStringArray.class)
×
536
      return VStringArray.of(getInitialStrings(items), alarm, Time.of(timeStamp));
×
537

538
    if (type == VEnum.class) {
×
539
      if (items.size() < 2) throw new Exception("VEnum needs at least '(index, \"Label0\")'");
×
540
      final int initial;
541
      try {
542
        initial = Integer.parseInt(items.get(0));
×
543
      } catch (NumberFormatException ex) {
×
544
        throw new Exception("Cannot parse enum index", ex);
×
545
      }
×
546
      // Preserve original list
547
      final List<String> copy = new ArrayList<>(items.size() - 1);
×
548
      for (int i = 1; i < items.size(); ++i) copy.add(items.get(i));
×
549
      final List<String> labels = getInitialStrings(copy);
×
550
      return VEnum.of(initial, EnumDisplay.of(labels), alarm, Time.of(timeStamp));
×
551
    }
552

553
    if (type == VTable.class) {
×
554
      final List<String> headers = getInitialStrings(items);
×
555
      final List<Class<?>> types = new ArrayList<>();
×
556
      final List<Object> values = new ArrayList<>();
×
557
      while (headers.size() > values.size()) { // Assume each column is of type string, no values
×
558
        types.add(String.class);
×
559
        values.add(Collections.emptyList());
×
560
      }
561
      return VTable.of(types, headers, values);
×
562
    }
563
    throw new Exception("Cannot obtain type " + type.getSimpleName() + " from " + items);
×
564
  }
565

566
  @Override
567
  public void onData(List<ParameterValue> values) {
568
    // TODO
569
    for (ParameterValue p : values) {
×
570
      try {
571
        pvsById.get(p.getId()).iterator().next().updateValue(getVType(p));
×
572
      } catch (Exception e) {
×
573
        // TODO Auto-generated catch block
574
        e.printStackTrace();
×
575
      }
×
576
      ;
577
    }
×
578
  }
×
579
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc