• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #77

03 Sep 2023 05:01PM UTC coverage: 16.544% (-0.02%) from 16.566%
#77

push

web-flow
Merge pull request #87 from WindhoverLabs/param_export

Param export

50 of 50 new or added lines in 3 files covered. (100.0%)

17740 of 107229 relevant lines covered (16.54%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

9.91
/core/commander-core/src/main/java/com/windhoverlabs/pv/yamcs/YamcsSubscriptionService.java
1
package com.windhoverlabs.pv.yamcs;
2

3
import java.util.ArrayList;
4
import java.util.Arrays;
5
import java.util.Collections;
6
import java.util.HashSet;
7
import java.util.LinkedHashMap;
8
import java.util.List;
9
import java.util.Map;
10
import java.util.Map.Entry;
11
import java.util.Objects;
12
import java.util.Set;
13
import java.util.concurrent.Executors;
14
import java.util.concurrent.ScheduledExecutorService;
15
import java.util.concurrent.TimeUnit;
16
import java.util.concurrent.atomic.AtomicBoolean;
17
import java.util.logging.Logger;
18
import java.util.stream.Collectors;
19
import org.epics.util.array.ArrayDouble;
20
import org.epics.vtype.Alarm;
21
import org.epics.vtype.AlarmSeverity;
22
import org.epics.vtype.AlarmStatus;
23
import org.epics.vtype.Display;
24
import org.epics.vtype.EnumDisplay;
25
import org.epics.vtype.Time;
26
import org.epics.vtype.VBoolean;
27
import org.epics.vtype.VDouble;
28
import org.epics.vtype.VDoubleArray;
29
import org.epics.vtype.VEnum;
30
import org.epics.vtype.VFloat;
31
import org.epics.vtype.VInt;
32
import org.epics.vtype.VLong;
33
import org.epics.vtype.VString;
34
import org.epics.vtype.VStringArray;
35
import org.epics.vtype.VTable;
36
import org.epics.vtype.VType;
37
import org.epics.vtype.VUInt;
38
import org.epics.vtype.VULong;
39
import org.phoebus.pv.PV;
40
import org.yamcs.client.ParameterSubscription;
41
import org.yamcs.protobuf.Pvalue.AcquisitionStatus;
42
import org.yamcs.protobuf.Pvalue.ParameterValue;
43
import org.yamcs.protobuf.SubscribeParametersRequest;
44
import org.yamcs.protobuf.SubscribeParametersRequest.Action;
45
import org.yamcs.protobuf.Yamcs.NamedObjectId;
46

47
/**
48
 * Keeps track of {@link IPV} registration state and takes care of establishing or re-establishing a
49
 * bundled parameter subscription against Yamcs.
50
 */
51
public class YamcsSubscriptionService implements YamcsAware, ParameterSubscription.Listener {
52

53
  private static final Logger log = Logger.getLogger(YamcsSubscriptionService.class.getName());
1✔
54

55
  private String instanceName;
56
  private String currentProcessor;
57

58
  private Map<NamedObjectId, Set<YamcsPV>> pvsById = new LinkedHashMap<>();
1✔
59

60
  private ParameterSubscription subscription;
61

62
  public ParameterSubscription getSubscription() {
63
    return subscription;
×
64
  }
65

66
  private AtomicBoolean subscriptionDirty = new AtomicBoolean(false);
1✔
67
  private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
1✔
68

69
  private Set<ParameterValueListener> parameterValueListeners = new HashSet<>();
1✔
70

71
  private ArrayList<NamedObjectId> ids = new ArrayList<NamedObjectId>();
1✔
72

73
  private String serverName = null;
1✔
74

75
  static final Alarm UDF = Alarm.of(AlarmSeverity.UNDEFINED, AlarmStatus.UNDEFINED, "UDF");
1✔
76

77
  public YamcsSubscriptionService(
78
      ParameterSubscription newSubscriprion,
79
      String newServerName,
80
      String newInstanceName,
81
      String processor) {
1✔
82
    serverName = newServerName;
1✔
83
    subscription = newSubscriprion;
1✔
84
    instanceName = newInstanceName;
1✔
85
    currentProcessor = processor;
1✔
86
    subscription.addListener(this);
1✔
87

88
    // Periodically check if the subscription needs a refresh
89
    // (PVs send individual events, so this bundles them)
90
    executor.scheduleWithFixedDelay(
1✔
91
        () -> {
92
          if (subscriptionDirty.getAndSet(false) && subscription != null) {
1✔
93
            Set<NamedObjectId> ids = getRequestedIdentifiers();
×
94
            // TODO:Make log level configurable
95
            //            log.info(String.format("Modifying subscription to %s", ids));
96
            subscription.sendMessage(
×
97
                SubscribeParametersRequest.newBuilder()
×
98
                    .setAction(Action.REPLACE)
×
99
                    .setSendFromCache(true)
×
100
                    .setAbortOnInvalid(false)
×
101
                    .setUpdateOnExpiration(true)
×
102
                    .addAllId(ids)
×
103
                    .setProcessor(currentProcessor)
×
104
                    .build());
×
105
          }
106
        },
1✔
107
        500,
108
        500,
109
        TimeUnit.MILLISECONDS);
110
  }
1✔
111

112
  private Set<NamedObjectId> getRequestedIdentifiers() {
113
    return pvsById.entrySet().stream()
×
114
        .filter(entry -> !entry.getValue().isEmpty())
×
115
        .map(Entry::getKey)
×
116
        .collect(Collectors.toSet());
×
117
  }
118

119
  public boolean isSubscriptionAvailable() {
120
    return subscription != null;
×
121
  }
122

123
  /**
124
   * Convert something like yamcs://cfs/CPD/ci/CI_HkTlm_t.usCmdCnt to
125
   * /cfs/CPD/ci/CI_HkTlm_t.usCmdCnt. Very useful for querying the Yamcs server.
126
   *
127
   * @param pvName
128
   * @return
129
   */
130
  private String getYamcsPvName(String pvName, String serverName) {
131
    String subStr = "/" + serverName + ":" + instanceName;
×
132
    return pvName.substring(subStr.length());
×
133
  }
134

135
  @Override
136
  public void changeProcessor(String instance, String processor) {
137
    currentProcessor = processor;
×
138
    executor.execute(
×
139
        () -> {
140
          // Ready to receive some data
141
          Set<NamedObjectId> ids = getRequestedIdentifiers();
×
142
          //          log.fine(String.format("Subscribing to %s [%s/%s]", ids, instance,
143
          // processor));
144
          subscription.sendMessage(
×
145
              SubscribeParametersRequest.newBuilder()
×
146
                  .setInstance(instance)
×
147
                  .setProcessor(processor)
×
148
                  .setSendFromCache(true)
×
149
                  .setAbortOnInvalid(false)
×
150
                  .setUpdateOnExpiration(false)
×
151
                  .addAllId(ids)
×
152
                  .build());
×
153
        });
×
154
  }
×
155

156
  /** Async adds a Yamcs PV for receiving updates. */
157
  public void register(YamcsPV pv) {
158
    NamedObjectId id =
×
159
        YamcsSubscriptionService.identityOf(getYamcsPvName(pv.getName(), serverName));
×
160
    executor.execute(
×
161
        () -> {
162
          Set<YamcsPV> pvs = pvsById.computeIfAbsent(id, x -> new HashSet<>());
×
163
          pvs.add(pv);
×
164
          subscriptionDirty.set(true);
×
165
        });
×
166

167
    ids.add(id);
×
168

169
    try {
170
      subscription.sendMessage(
×
171
          SubscribeParametersRequest.newBuilder()
×
172
              .setInstance(instanceName)
×
173
              .setProcessor(currentProcessor)
×
174
              .setSendFromCache(true)
×
175
              .setAbortOnInvalid(false)
×
176
              .setUpdateOnExpiration(false)
×
177
              .addId(id)
×
178
              .setAction(Action.ADD)
×
179
              .build());
×
180
    } catch (Exception e) {
×
181
      System.out.println("e:" + e);
×
182
    }
×
183
  }
×
184

185
  /** Async removes a Yamcs PV from receiving updates. */
186
  public void unregister(PV pv) {
187
    NamedObjectId id = identityOf(pv.getName());
×
188
    executor.execute(
×
189
        () -> {
190
          Set<YamcsPV> pvs = pvsById.get(id);
×
191
          if (pvs != null) {
×
192
            boolean removed = pvs.remove(pv);
×
193
            if (removed) {
×
194
              subscriptionDirty.set(true);
×
195
            }
196
          }
197
        });
×
198
  }
×
199

200
  public void destroy() {
201
    subscription.cancel(true);
1✔
202
    pvsById.clear();
1✔
203
    YamcsPVFactory.clearPVs();
1✔
204
    executor.shutdown();
1✔
205
  }
1✔
206

207
  public void addParameterValueListener(ParameterValueListener listener) {
208
    parameterValueListeners.add(listener);
×
209
  }
×
210

211
  @Override
212
  public void onInvalidIdentification(NamedObjectId id) {
213
    //        executor.execute(() -> {
214
    //            // We keep the id in pvsById, we want to again receive the invalid
215
    //            // identification when the subscription is updated.
216
    //            Set<PV> pvs = pvsById.get(id);
217
    //            if (pvs != null) {
218
    //                pvs.forEach(PV::setInvalid);
219
    //            }
220
    //        });
221
  }
×
222

223
  public static NamedObjectId identityOf(String pvName) {
224
    return NamedObjectId.newBuilder().setName(pvName).build();
×
225
    //    if (pvName.startsWith("yamcs://")) {
226
    //      return
227
    // NamedObjectId.newBuilder().setName(pvName.substring("yamcs://".length())).build();
228
    //    } else {
229
    //      System.out.println("identityOf2" + pvName);
230
    //    }
231
    //        } else if (pvName.startsWith("para://")) {
232
    //            return NamedObjectId.newBuilder()
233
    //                    .setName(pvName.substring("para://".length()))
234
    //                    .build();
235
    //        } else if (pvName.startsWith("raw://")) {
236
    //            return NamedObjectId.newBuilder()
237
    //                    .setName(pvName.substring("raw://".length()))
238
    //                    .build();
239
    //        } else {
240
    //            return NamedObjectId.newBuilder()
241
    //                    .setName(pvName)
242
    //                    .build();
243
    //        }
244
  }
245

246
  @FunctionalInterface
247
  public static interface ParameterValueListener {
248
    void onData(List<ParameterValue> values);
249
  }
250

251
  /**
252
   * create a VType from a yamcs ParameterValue object.
253
   *
254
   * @param parameter
255
   * @return
256
   */
257
  private VType getVType(ParameterValue parameter) {
258
    ArrayList<String> yamcsValues = new ArrayList<String>();
×
259

260
    Class<? extends VType> valueType = null;
×
261
    VType value = null;
×
262

263
    switch (parameter.getEngValue().getType()) {
×
264
      case AGGREGATE:
265
        // TODO Implement
266
        break;
×
267
      case ARRAY:
268
        // TODO Implement
269
        break;
×
270
      case BINARY:
271
        // TODO Implement
272
        break;
×
273
      case BOOLEAN:
274
        {
275
          yamcsValues.add(Boolean.toString(parameter.getEngValue().getBooleanValue()));
×
276
          valueType = VBoolean.class;
×
277
          break;
×
278
        }
279
      case DOUBLE:
280
        {
281
          yamcsValues.add(Double.toString(parameter.getEngValue().getDoubleValue()));
×
282
          valueType = VDouble.class;
×
283
          break;
×
284
        }
285
        //                case ENUMERATED:
286
        ////                        yamcsValues.add(Enum.toString(parameter.getEngValue().getS));
287
        //                        valueType = VFloat.class;
288
        //                        break;
289
      case FLOAT:
290
        {
291
          yamcsValues.add(Float.toString(parameter.getEngValue().getFloatValue()));
×
292
          valueType = VFloat.class;
×
293
          break;
×
294
        }
295
      case NONE:
296
        // TODO Implement
297
        break;
×
298
      case SINT32:
299
        {
300
          yamcsValues.add(Integer.toString(parameter.getEngValue().getSint32Value()));
×
301
          valueType = VInt.class;
×
302
          break;
×
303
        }
304
      case SINT64:
305
        {
306

307
          //            return
308
          // Value.newBuilder().setType(Type.STRING).setStringValue(String.valueOf(value)).build();
309
          yamcsValues.add(Long.toString(parameter.getEngValue().getSint64Value()));
×
310
          valueType = VLong.class;
×
311
          break;
×
312
        }
313
      case STRING:
314
      case ENUMERATED:
315
        {
316
          yamcsValues.add(parameter.getEngValue().getStringValue());
×
317
          valueType = VString.class;
×
318
          break;
×
319
        }
320
      case TIMESTAMP:
321
        break;
×
322
      case UINT32:
323
        {
324
          yamcsValues.add(Integer.toUnsignedString(parameter.getEngValue().getUint32Value()));
×
325
          valueType = VUInt.class;
×
326
          break;
×
327
        }
328
      case UINT64:
329
        {
330
          yamcsValues.add(Long.toUnsignedString(parameter.getEngValue().getUint64Value()));
×
331
          valueType = VULong.class;
×
332
          break;
×
333
        }
334
      default:
335
        break;
336
    }
337

338
    if (!yamcsValues.isEmpty()) {
×
339
      try {
340
        value = getInitialValue(yamcsValues, valueType, parameter.getAcquisitionStatus());
×
341
      } catch (Exception e) {
×
342
        // TODO Auto-generated catch block
343
        e.printStackTrace();
×
344
      }
×
345
    }
346

347
    return value;
×
348
  }
349

350
  /**
351
   * @param items Items from <code>splitInitialItems</code>
352
   * @return All items as strings, surrounding quotes removed, un-escaping quotes
353
   */
354
  private static List<String> getInitialStrings(List<String> items) {
355
    if (items == null) return Arrays.asList("");
×
356
    final List<String> strings = new ArrayList<>(items.size());
×
357
    for (String item : items)
×
358
      if (item.startsWith("\""))
×
359
        strings.add(item.substring(1, item.length() - 1).replace("\\\"", "\""));
×
360
      else strings.add(item);
×
361
    return strings;
×
362
  }
363

364
  /**
365
   * TODO:Refactor so that not everything is "doubles"
366
   *
367
   * @param items Items from <code>splitInitialItems</code>
368
   * @return Numeric values for all items
369
   * @throws Exception on error
370
   */
371
  public static double[] getInitialDoubles(List<?> items) throws Exception {
372
    final double[] values = new double[items.size()];
×
373
    for (int i = 0; i < values.length; ++i) {
×
374
      try {
375
        final String text = Objects.toString(items.get(i));
×
376
        if (text.startsWith("0x")) values[i] = Integer.parseInt(text.substring(2), 16);
×
377
        else values[i] = Double.parseDouble(text);
×
378
      } catch (NumberFormatException ex) {
×
379
        throw new Exception("Cannot parse number from " + items.get(i));
×
380
      }
×
381
    }
382

383
    return values;
×
384
  }
385

386
  /**
387
   * TODO:We should make a distinction between unsigned and signed long.
388
   *
389
   * @param items Items from <code>splitInitialItems</code>
390
   * @return Numeric values for all items
391
   * @throws Exception on error
392
   */
393
  public static long[] getInitialLongs(List<?> items) throws Exception {
394
    final long[] values = new long[items.size()];
×
395
    for (int i = 0; i < values.length; ++i) {
×
396
      try {
397
        final String text = Objects.toString(items.get(i));
×
398
        if (text.startsWith("0x")) {
×
399
          values[i] = Integer.parseInt(text.substring(2), 16);
×
400
        } else {
401
          values[i] = Long.parseLong(text);
×
402
        }
403
      } catch (NumberFormatException ex) {
×
404
        throw new Exception("Cannot parse number from " + items.get(i));
×
405
      }
×
406
    }
407

408
    return values;
×
409
  }
410

411
  /**
412
   * @param items Items from <code>splitInitialItems</code>
413
   * @return Boolean list of all items
414
   */
415
  private static List<Boolean> getInitialBooleans(List<String> items) {
416
    if (items == null) return Arrays.asList(Boolean.FALSE);
×
417
    return items.stream()
×
418
        .map(
×
419
            item -> {
420
              return Boolean.parseBoolean(item);
×
421
            })
422
        .collect(Collectors.toList());
×
423
  }
424

425
  /**
426
   * @param items Items from <code>splitInitialItems</code>, i.e. strings are quoted
427
   * @param type Desired VType
428
   * @return VType for initial value
429
   * @throws Exception on error
430
   */
431
  public static VType getInitialValue(
432
      final List<String> items, Class<? extends VType> type, AcquisitionStatus status)
433
      throws Exception {
434
    Alarm alarm = Alarm.none();
×
435
    switch (status) {
×
436
      case ACQUIRED:
437
        break;
×
438
      case EXPIRED:
439
        alarm = Alarm.noValue();
×
440
        break;
×
441
      case INVALID:
442
        break;
×
443
      case NOT_RECEIVED:
444
        break;
×
445
      default:
446
        break;
447
    }
448

449
    if (type == VDouble.class) {
×
450
      if (items == null) return VDouble.of(0.0, alarm, Time.now(), Display.none());
×
451
      if (items.size() == 1)
×
452
        return VDouble.of(getInitialDoubles(items)[0], alarm, Time.now(), Display.none());
×
453
      else throw new Exception("Expected one number, got " + items);
×
454
    }
455

456
    if (type == VFloat.class) {
×
457
      if (items == null) return VFloat.of(0.0, alarm, Time.now(), Display.none());
×
458
      if (items.size() == 1)
×
459
        return VFloat.of(getInitialDoubles(items)[0], alarm, Time.now(), Display.none());
×
460
      else throw new Exception("Expected one number, got " + items);
×
461
    }
462

463
    if (type == VLong.class) {
×
464
      if (items.size() == 1)
×
465
        return VLong.of((long) getInitialLongs(items)[0], alarm, Time.now(), Display.none());
×
466
      else throw new Exception("Expected one number, got " + items);
×
467
    }
468

469
    if (type == VULong.class) {
×
470
      if (items.size() == 1)
×
471
        return VLong.of((long) getInitialLongs(items)[0], alarm, Time.now(), Display.none());
×
472
      else throw new Exception("Expected one number, got " + items);
×
473
    }
474

475
    if (type == VInt.class) {
×
476
      if (items.size() == 1)
×
477
        return VInt.of((long) getInitialDoubles(items)[0], alarm, Time.now(), Display.none());
×
478
      else throw new Exception("Expected one number, got " + items);
×
479
    }
480

481
    if (type == VUInt.class) {
×
482
      if (items.size() == 1)
×
483
        return VInt.of((long) getInitialDoubles(items)[0], alarm, Time.now(), Display.none());
×
484
      else throw new Exception("Expected one number, got " + items);
×
485
    }
486

487
    if (type == VBoolean.class) {
×
488
      if (items == null || items.size() == 1)
×
489
        return VBoolean.of(getInitialBooleans(items).get(0), alarm, Time.now());
×
490
      else throw new Exception("Expected one boolean, got " + items);
×
491
    }
492

493
    if (type == VString.class) {
×
494
      if (items == null || items.size() == 1)
×
495
        return VString.of(getInitialStrings(items).get(0), alarm, Time.now());
×
496
      else throw new Exception("Expected one string, got " + items);
×
497
    }
498

499
    if (type == VDoubleArray.class)
×
500
      return VDoubleArray.of(
×
501
          ArrayDouble.of(getInitialDoubles(items)), alarm, Time.now(), Display.none());
×
502

503
    //        if (type == VBooleanArray.class)
504
    //            return VBooleanArray.of(ArrayBoolean.of(getInitialBooleans(items)), Alarm.none(),
505
    // Time.now());
506

507
    if (type == VStringArray.class)
×
508
      return VStringArray.of(getInitialStrings(items), alarm, Time.now());
×
509

510
    if (type == VEnum.class) {
×
511
      if (items.size() < 2) throw new Exception("VEnum needs at least '(index, \"Label0\")'");
×
512
      final int initial;
513
      try {
514
        initial = Integer.parseInt(items.get(0));
×
515
      } catch (NumberFormatException ex) {
×
516
        throw new Exception("Cannot parse enum index", ex);
×
517
      }
×
518
      // Preserve original list
519
      final List<String> copy = new ArrayList<>(items.size() - 1);
×
520
      for (int i = 1; i < items.size(); ++i) copy.add(items.get(i));
×
521
      final List<String> labels = getInitialStrings(copy);
×
522
      return VEnum.of(initial, EnumDisplay.of(labels), alarm, Time.now());
×
523
    }
524

525
    if (type == VTable.class) {
×
526
      final List<String> headers = getInitialStrings(items);
×
527
      final List<Class<?>> types = new ArrayList<>();
×
528
      final List<Object> values = new ArrayList<>();
×
529
      while (headers.size() > values.size()) { // Assume each column is of type string, no values
×
530
        types.add(String.class);
×
531
        values.add(Collections.emptyList());
×
532
      }
533
      return VTable.of(types, headers, values);
×
534
    }
535
    throw new Exception("Cannot obtain type " + type.getSimpleName() + " from " + items);
×
536
  }
537

538
  @Override
539
  public void onData(List<ParameterValue> values) {
540
    // TODO
541
    for (ParameterValue p : values) {
×
542
      try {
543
        pvsById.get(p.getId()).iterator().next().updateValue(getVType(p));
×
544
      } catch (Exception e) {
×
545
        // TODO Auto-generated catch block
546
        e.printStackTrace();
×
547
      }
×
548
      ;
549
    }
×
550
  }
×
551
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc