• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #111

15 Dec 2023 04:45PM UTC coverage: 17.035% (+0.4%) from 16.596%
#111

push

lorenzo-gomez-windhover
-Add generation time to ParameterViewer

17830 of 104668 relevant lines covered (17.03%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.54
/core/commander-core/src/main/java/com/windhoverlabs/pv/yamcs/YamcsSubscriptionService.java
1
package com.windhoverlabs.pv.yamcs;
2

3
import java.time.Instant;
4
import java.util.ArrayList;
5
import java.util.Arrays;
6
import java.util.Collections;
7
import java.util.HashSet;
8
import java.util.LinkedHashMap;
9
import java.util.List;
10
import java.util.Map;
11
import java.util.Map.Entry;
12
import java.util.Objects;
13
import java.util.Set;
14
import java.util.concurrent.Executors;
15
import java.util.concurrent.ScheduledExecutorService;
16
import java.util.concurrent.TimeUnit;
17
import java.util.concurrent.atomic.AtomicBoolean;
18
import java.util.logging.Logger;
19
import java.util.stream.Collectors;
20
import org.epics.util.array.ArrayDouble;
21
import org.epics.vtype.Alarm;
22
import org.epics.vtype.AlarmSeverity;
23
import org.epics.vtype.AlarmStatus;
24
import org.epics.vtype.Display;
25
import org.epics.vtype.EnumDisplay;
26
import org.epics.vtype.Time;
27
import org.epics.vtype.VBoolean;
28
import org.epics.vtype.VDouble;
29
import org.epics.vtype.VDoubleArray;
30
import org.epics.vtype.VEnum;
31
import org.epics.vtype.VFloat;
32
import org.epics.vtype.VInt;
33
import org.epics.vtype.VLong;
34
import org.epics.vtype.VString;
35
import org.epics.vtype.VStringArray;
36
import org.epics.vtype.VTable;
37
import org.epics.vtype.VType;
38
import org.epics.vtype.VUInt;
39
import org.epics.vtype.VULong;
40
import org.phoebus.pv.PV;
41
import org.yamcs.client.ParameterSubscription;
42
import org.yamcs.protobuf.Pvalue.AcquisitionStatus;
43
import org.yamcs.protobuf.Pvalue.ParameterValue;
44
import org.yamcs.protobuf.SubscribeParametersRequest;
45
import org.yamcs.protobuf.SubscribeParametersRequest.Action;
46
import org.yamcs.protobuf.Yamcs.NamedObjectId;
47

48
/**
49
 * Keeps track of {@link IPV} registration state and takes care of establishing or re-establishing a
50
 * bundled parameter subscription against Yamcs.
51
 */
52
public class YamcsSubscriptionService implements YamcsAware, ParameterSubscription.Listener {
53

54
  private static final Logger log = Logger.getLogger(YamcsSubscriptionService.class.getName());
1✔
55

56
  private String instanceName;
57
  private String currentProcessor;
58

59
  private Map<NamedObjectId, Set<YamcsPV>> pvsById = new LinkedHashMap<>();
1✔
60

61
  private ParameterSubscription subscription;
62

63
  public ParameterSubscription getSubscription() {
64
    return subscription;
×
65
  }
66

67
  private AtomicBoolean subscriptionDirty = new AtomicBoolean(false);
1✔
68
  private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
1✔
69

70
  private Set<ParameterValueListener> parameterValueListeners = new HashSet<>();
1✔
71

72
  private ArrayList<NamedObjectId> ids = new ArrayList<NamedObjectId>();
1✔
73

74
  private String serverName = null;
1✔
75

76
  static final Alarm UDF = Alarm.of(AlarmSeverity.UNDEFINED, AlarmStatus.UNDEFINED, "UDF");
1✔
77

78
  public YamcsSubscriptionService(
79
      ParameterSubscription newSubscriprion,
80
      String newServerName,
81
      String newInstanceName,
82
      String processor) {
1✔
83
    serverName = newServerName;
1✔
84
    subscription = newSubscriprion;
1✔
85
    instanceName = newInstanceName;
1✔
86
    currentProcessor = processor;
1✔
87
    subscription.addListener(this);
1✔
88

89
    // Periodically check if the subscription needs a refresh
90
    // (PVs send individual events, so this bundles them)
91
    executor.scheduleWithFixedDelay(
1✔
92
        () -> {
93
          if (subscriptionDirty.getAndSet(false) && subscription != null) {
1✔
94
            Set<NamedObjectId> ids = getRequestedIdentifiers();
×
95
            // TODO:Make log level configurable
96
            //            log.info(String.format("Modifying subscription to %s", ids));
97
            subscription.sendMessage(
×
98
                SubscribeParametersRequest.newBuilder()
×
99
                    .setAction(Action.REPLACE)
×
100
                    .setSendFromCache(true)
×
101
                    .setAbortOnInvalid(false)
×
102
                    .setUpdateOnExpiration(true)
×
103
                    .addAllId(ids)
×
104
                    .setProcessor(currentProcessor)
×
105
                    .build());
×
106
          }
107
        },
1✔
108
        500,
109
        500,
110
        TimeUnit.MILLISECONDS);
111
  }
1✔
112

113
  private Set<NamedObjectId> getRequestedIdentifiers() {
114
    return pvsById.entrySet().stream()
×
115
        .filter(entry -> !entry.getValue().isEmpty())
×
116
        .map(Entry::getKey)
×
117
        .collect(Collectors.toSet());
×
118
  }
119

120
  public boolean isSubscriptionAvailable() {
121
    return subscription != null;
×
122
  }
123

124
  /**
125
   * Convert something like yamcs://cfs/CPD/ci/CI_HkTlm_t.usCmdCnt to
126
   * /cfs/CPD/ci/CI_HkTlm_t.usCmdCnt. Very useful for querying the Yamcs server.
127
   *
128
   * @param pvName
129
   * @return
130
   */
131
  private String getYamcsPvName(String pvName, String serverName) {
132
    String subStr = "/" + serverName + ":" + instanceName;
×
133
    return pvName.substring(subStr.length());
×
134
  }
135

136
  @Override
137
  public void changeProcessor(String instance, String processor) {
138
    currentProcessor = processor;
×
139
    executor.execute(
×
140
        () -> {
141
          // Ready to receive some data
142
          Set<NamedObjectId> ids = getRequestedIdentifiers();
×
143
          //          log.fine(String.format("Subscribing to %s [%s/%s]", ids, instance,
144
          // processor));
145
          subscription.sendMessage(
×
146
              SubscribeParametersRequest.newBuilder()
×
147
                  .setInstance(instance)
×
148
                  .setProcessor(processor)
×
149
                  .setSendFromCache(true)
×
150
                  .setAbortOnInvalid(false)
×
151
                  .setUpdateOnExpiration(false)
×
152
                  .addAllId(ids)
×
153
                  .build());
×
154
        });
×
155
  }
×
156

157
  /** Async adds a Yamcs PV for receiving updates. */
158
  public void register(YamcsPV pv) {
159
    NamedObjectId id =
×
160
        YamcsSubscriptionService.identityOf(getYamcsPvName(pv.getName(), serverName));
×
161
    executor.execute(
×
162
        () -> {
163
          Set<YamcsPV> pvs = pvsById.computeIfAbsent(id, x -> new HashSet<>());
×
164
          pvs.add(pv);
×
165
          subscriptionDirty.set(true);
×
166
        });
×
167

168
    ids.add(id);
×
169

170
    try {
171
      subscription.sendMessage(
×
172
          SubscribeParametersRequest.newBuilder()
×
173
              .setInstance(instanceName)
×
174
              .setProcessor(currentProcessor)
×
175
              .setSendFromCache(true)
×
176
              .setAbortOnInvalid(false)
×
177
              .setUpdateOnExpiration(false)
×
178
              .addId(id)
×
179
              .setAction(Action.ADD)
×
180
              .build());
×
181
    } catch (Exception e) {
×
182
      System.out.println("e:" + e);
×
183
    }
×
184
  }
×
185

186
  /** Async removes a Yamcs PV from receiving updates. */
187
  public void unregister(PV pv) {
188
    NamedObjectId id = identityOf(pv.getName());
×
189
    executor.execute(
×
190
        () -> {
191
          Set<YamcsPV> pvs = pvsById.get(id);
×
192
          if (pvs != null) {
×
193
            boolean removed = pvs.remove(pv);
×
194
            if (removed) {
×
195
              subscriptionDirty.set(true);
×
196
            }
197
          }
198
        });
×
199
  }
×
200

201
  public void destroy() {
202
    subscription.cancel(true);
1✔
203
    pvsById.clear();
1✔
204
    YamcsPVFactory.clearPVs();
1✔
205
    executor.shutdown();
1✔
206
  }
1✔
207

208
  public void addParameterValueListener(ParameterValueListener listener) {
209
    parameterValueListeners.add(listener);
×
210
  }
×
211

212
  @Override
213
  public void onInvalidIdentification(NamedObjectId id) {
214
    //        executor.execute(() -> {
215
    //            // We keep the id in pvsById, we want to again receive the invalid
216
    //            // identification when the subscription is updated.
217
    //            Set<PV> pvs = pvsById.get(id);
218
    //            if (pvs != null) {
219
    //                pvs.forEach(PV::setInvalid);
220
    //            }
221
    //        });
222
  }
×
223

224
  public static NamedObjectId identityOf(String pvName) {
225
    return NamedObjectId.newBuilder().setName(pvName).build();
×
226
    //    if (pvName.startsWith("yamcs://")) {
227
    //      return
228
    // NamedObjectId.newBuilder().setName(pvName.substring("yamcs://".length())).build();
229
    //    } else {
230
    //      System.out.println("identityOf2" + pvName);
231
    //    }
232
    //        } else if (pvName.startsWith("para://")) {
233
    //            return NamedObjectId.newBuilder()
234
    //                    .setName(pvName.substring("para://".length()))
235
    //                    .build();
236
    //        } else if (pvName.startsWith("raw://")) {
237
    //            return NamedObjectId.newBuilder()
238
    //                    .setName(pvName.substring("raw://".length()))
239
    //                    .build();
240
    //        } else {
241
    //            return NamedObjectId.newBuilder()
242
    //                    .setName(pvName)
243
    //                    .build();
244
    //        }
245
  }
246

247
  @FunctionalInterface
248
  public static interface ParameterValueListener {
249
    void onData(List<ParameterValue> values);
250
  }
251

252
  /**
253
   * create a VType from a yamcs ParameterValue object.
254
   *
255
   * @param parameter
256
   * @return
257
   */
258
  private VType getVType(ParameterValue parameter) {
259
    ArrayList<String> yamcsValues = new ArrayList<String>();
×
260

261
    Class<? extends VType> valueType = null;
×
262
    VType value = null;
×
263

264
    switch (parameter.getEngValue().getType()) {
×
265
      case AGGREGATE:
266
        // TODO Implement
267
        break;
×
268
      case ARRAY:
269
        // TODO Implement
270
        break;
×
271
      case BINARY:
272
        // TODO Implement
273
        break;
×
274
      case BOOLEAN:
275
        {
276
          yamcsValues.add(Boolean.toString(parameter.getEngValue().getBooleanValue()));
×
277
          valueType = VBoolean.class;
×
278
          break;
×
279
        }
280
      case DOUBLE:
281
        {
282
          yamcsValues.add(Double.toString(parameter.getEngValue().getDoubleValue()));
×
283
          valueType = VDouble.class;
×
284
          break;
×
285
        }
286
        //                case ENUMERATED:
287
        ////                        yamcsValues.add(Enum.toString(parameter.getEngValue().getS));
288
        //                        valueType = VFloat.class;
289
        //                        break;
290
      case FLOAT:
291
        {
292
          yamcsValues.add(Float.toString(parameter.getEngValue().getFloatValue()));
×
293
          valueType = VFloat.class;
×
294
          break;
×
295
        }
296
      case NONE:
297
        // TODO Implement
298
        break;
×
299
      case SINT32:
300
        {
301
          yamcsValues.add(Integer.toString(parameter.getEngValue().getSint32Value()));
×
302
          valueType = VInt.class;
×
303
          break;
×
304
        }
305
      case SINT64:
306
        {
307

308
          //            return
309
          // Value.newBuilder().setType(Type.STRING).setStringValue(String.valueOf(value)).build();
310
          yamcsValues.add(Long.toString(parameter.getEngValue().getSint64Value()));
×
311
          valueType = VLong.class;
×
312
          break;
×
313
        }
314
      case STRING:
315
      case ENUMERATED:
316
        {
317
          yamcsValues.add(parameter.getEngValue().getStringValue());
×
318
          valueType = VString.class;
×
319
          break;
×
320
        }
321
      case TIMESTAMP:
322
        break;
×
323
      case UINT32:
324
        {
325
          yamcsValues.add(Integer.toUnsignedString(parameter.getEngValue().getUint32Value()));
×
326
          valueType = VUInt.class;
×
327
          break;
×
328
        }
329
      case UINT64:
330
        {
331
          yamcsValues.add(Long.toUnsignedString(parameter.getEngValue().getUint64Value()));
×
332
          valueType = VULong.class;
×
333
          break;
×
334
        }
335
      default:
336
        break;
337
    }
338

339
    if (!yamcsValues.isEmpty()) {
×
340
      try {
341
        value =
×
342
            getInitialValue(
×
343
                yamcsValues,
344
                valueType,
345
                parameter.getAcquisitionStatus(),
×
346
                Instant.ofEpochSecond(
×
347
                    parameter.getGenerationTime().getSeconds(),
×
348
                    parameter.getGenerationTime().getNanos()));
×
349
      } catch (Exception e) {
×
350
        // TODO Auto-generated catch block
351
        e.printStackTrace();
×
352
      }
×
353
    }
354

355
    return value;
×
356
  }
357

358
  /**
359
   * @param items Items from <code>splitInitialItems</code>
360
   * @return All items as strings, surrounding quotes removed, un-escaping quotes
361
   */
362
  private static List<String> getInitialStrings(List<String> items) {
363
    if (items == null) return Arrays.asList("");
×
364
    final List<String> strings = new ArrayList<>(items.size());
×
365
    for (String item : items)
×
366
      if (item.startsWith("\""))
×
367
        strings.add(item.substring(1, item.length() - 1).replace("\\\"", "\""));
×
368
      else strings.add(item);
×
369
    return strings;
×
370
  }
371

372
  /**
373
   * TODO:Refactor so that not everything is "doubles"
374
   *
375
   * @param items Items from <code>splitInitialItems</code>
376
   * @return Numeric values for all items
377
   * @throws Exception on error
378
   */
379
  public static double[] getInitialDoubles(List<?> items) throws Exception {
380
    final double[] values = new double[items.size()];
×
381
    for (int i = 0; i < values.length; ++i) {
×
382
      try {
383
        final String text = Objects.toString(items.get(i));
×
384
        if (text.startsWith("0x")) values[i] = Integer.parseInt(text.substring(2), 16);
×
385
        else values[i] = Double.parseDouble(text);
×
386
      } catch (NumberFormatException ex) {
×
387
        throw new Exception("Cannot parse number from " + items.get(i));
×
388
      }
×
389
    }
390

391
    return values;
×
392
  }
393

394
  /**
395
   * TODO:We should make a distinction between unsigned and signed long.
396
   *
397
   * @param items Items from <code>splitInitialItems</code>
398
   * @return Numeric values for all items
399
   * @throws Exception on error
400
   */
401
  public static long[] getInitialLongs(List<?> items) throws Exception {
402
    final long[] values = new long[items.size()];
×
403
    for (int i = 0; i < values.length; ++i) {
×
404
      try {
405
        final String text = Objects.toString(items.get(i));
×
406
        if (text.startsWith("0x")) {
×
407
          values[i] = Integer.parseInt(text.substring(2), 16);
×
408
        } else {
409
          values[i] = Long.parseLong(text);
×
410
        }
411
      } catch (NumberFormatException ex) {
×
412
        throw new Exception("Cannot parse number from " + items.get(i));
×
413
      }
×
414
    }
415

416
    return values;
×
417
  }
418

419
  /**
420
   * @param items Items from <code>splitInitialItems</code>
421
   * @return Boolean list of all items
422
   */
423
  private static List<Boolean> getInitialBooleans(List<String> items) {
424
    if (items == null) return Arrays.asList(Boolean.FALSE);
×
425
    return items.stream()
×
426
        .map(
×
427
            item -> {
428
              return Boolean.parseBoolean(item);
×
429
            })
430
        .collect(Collectors.toList());
×
431
  }
432

433
  /**
434
   * @param items Items from <code>splitInitialItems</code>, i.e. strings are quoted
435
   * @param type Desired VType
436
   * @return VType for initial value
437
   * @throws Exception on error
438
   */
439
  public static VType getInitialValue(
440
      final List<String> items,
441
      Class<? extends VType> type,
442
      AcquisitionStatus status,
443
      Instant timeStamp)
444
      throws Exception {
445
    Alarm alarm = Alarm.none();
×
446
    switch (status) {
×
447
      case ACQUIRED:
448
        break;
×
449
      case EXPIRED:
450
        alarm = Alarm.noValue();
×
451
        break;
×
452
      case INVALID:
453
        break;
×
454
      case NOT_RECEIVED:
455
        break;
×
456
      default:
457
        break;
458
    }
459

460
    if (type == VDouble.class) {
×
461
      if (items == null) return VDouble.of(0.0, alarm, Time.of(timeStamp), Display.none());
×
462
      if (items.size() == 1)
×
463
        return VDouble.of(getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
464
      else throw new Exception("Expected one number, got " + items);
×
465
    }
466

467
    if (type == VFloat.class) {
×
468
      if (items == null) return VFloat.of(0.0, alarm, Time.of(timeStamp), Display.none());
×
469
      if (items.size() == 1)
×
470
        return VFloat.of(getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
471
      else throw new Exception("Expected one number, got " + items);
×
472
    }
473

474
    if (type == VLong.class) {
×
475
      if (items.size() == 1)
×
476
        return VLong.of(
×
477
            (long) getInitialLongs(items)[0], alarm, Time.of(timeStamp), Display.none());
×
478
      else throw new Exception("Expected one number, got " + items);
×
479
    }
480

481
    if (type == VULong.class) {
×
482
      if (items.size() == 1)
×
483
        return VLong.of(
×
484
            (long) getInitialLongs(items)[0], alarm, Time.of(timeStamp), Display.none());
×
485
      else throw new Exception("Expected one number, got " + items);
×
486
    }
487

488
    if (type == VInt.class) {
×
489
      if (items.size() == 1)
×
490
        return VInt.of(
×
491
            (long) getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
492
      else throw new Exception("Expected one number, got " + items);
×
493
    }
494

495
    if (type == VUInt.class) {
×
496
      if (items.size() == 1)
×
497
        return VInt.of(
×
498
            (long) getInitialDoubles(items)[0], alarm, Time.of(timeStamp), Display.none());
×
499
      else throw new Exception("Expected one number, got " + items);
×
500
    }
501

502
    if (type == VBoolean.class) {
×
503
      if (items == null || items.size() == 1)
×
504
        return VBoolean.of(getInitialBooleans(items).get(0), alarm, Time.of(timeStamp));
×
505
      else throw new Exception("Expected one boolean, got " + items);
×
506
    }
507

508
    if (type == VString.class) {
×
509
      if (items == null || items.size() == 1)
×
510
        return VString.of(getInitialStrings(items).get(0), alarm, Time.of(timeStamp));
×
511
      else throw new Exception("Expected one string, got " + items);
×
512
    }
513

514
    if (type == VDoubleArray.class)
×
515
      return VDoubleArray.of(
×
516
          ArrayDouble.of(getInitialDoubles(items)), alarm, Time.of(timeStamp), Display.none());
×
517

518
    //        if (type == VBooleanArray.class)
519
    //            return VBooleanArray.of(ArrayBoolean.of(getInitialBooleans(items)), Alarm.none(),
520
    // Time.of(timeStamp));
521

522
    if (type == VStringArray.class)
×
523
      return VStringArray.of(getInitialStrings(items), alarm, Time.of(timeStamp));
×
524

525
    if (type == VEnum.class) {
×
526
      if (items.size() < 2) throw new Exception("VEnum needs at least '(index, \"Label0\")'");
×
527
      final int initial;
528
      try {
529
        initial = Integer.parseInt(items.get(0));
×
530
      } catch (NumberFormatException ex) {
×
531
        throw new Exception("Cannot parse enum index", ex);
×
532
      }
×
533
      // Preserve original list
534
      final List<String> copy = new ArrayList<>(items.size() - 1);
×
535
      for (int i = 1; i < items.size(); ++i) copy.add(items.get(i));
×
536
      final List<String> labels = getInitialStrings(copy);
×
537
      return VEnum.of(initial, EnumDisplay.of(labels), alarm, Time.of(timeStamp));
×
538
    }
539

540
    if (type == VTable.class) {
×
541
      final List<String> headers = getInitialStrings(items);
×
542
      final List<Class<?>> types = new ArrayList<>();
×
543
      final List<Object> values = new ArrayList<>();
×
544
      while (headers.size() > values.size()) { // Assume each column is of type string, no values
×
545
        types.add(String.class);
×
546
        values.add(Collections.emptyList());
×
547
      }
548
      return VTable.of(types, headers, values);
×
549
    }
550
    throw new Exception("Cannot obtain type " + type.getSimpleName() + " from " + items);
×
551
  }
552

553
  @Override
554
  public void onData(List<ParameterValue> values) {
555
    // TODO
556
    for (ParameterValue p : values) {
×
557
      try {
558
        pvsById.get(p.getId()).iterator().next().updateValue(getVType(p));
×
559
      } catch (Exception e) {
×
560
        // TODO Auto-generated catch block
561
        e.printStackTrace();
×
562
      }
×
563
      ;
564
    }
×
565
  }
×
566
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc