• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #90

22 Sep 2023 06:10AM UTC coverage: 16.585% (-0.001%) from 16.586%
#90

push

lorenzo-gomez-windhover
-Minimally functional user events.

8 of 8 new or added lines in 1 file covered. (100.0%)

17812 of 107400 relevant lines covered (16.58%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.58
/core/commander-core/src/main/java/com/windhoverlabs/yamcs/core/CMDR_YamcsInstance.java
1
package com.windhoverlabs.yamcs.core;
2

3
import com.windhoverlabs.pv.yamcs.YamcsPV;
4
import com.windhoverlabs.pv.yamcs.YamcsSubscriptionService;
5
import com.windhoverlabs.yamcs.core.YamcsWebSocketClient.TmStatistics;
6
import java.time.Duration;
7
import java.time.Instant;
8
import java.util.ArrayList;
9
import java.util.HashMap;
10
import java.util.List;
11
import java.util.concurrent.ExecutionException;
12
import java.util.concurrent.ScheduledThreadPoolExecutor;
13
import java.util.function.Consumer;
14
import java.util.logging.Logger;
15
import javafx.collections.FXCollections;
16
import javafx.collections.ObservableList;
17
import org.yamcs.TmPacket;
18
import org.yamcs.client.EventSubscription;
19
import org.yamcs.client.LinkSubscription;
20
import org.yamcs.client.MessageListener;
21
import org.yamcs.client.PacketSubscription;
22
import org.yamcs.client.Page;
23
import org.yamcs.client.YamcsClient;
24
import org.yamcs.client.archive.ArchiveClient;
25
import org.yamcs.client.mdb.MissionDatabaseClient.ListOptions;
26
import org.yamcs.client.processor.ProcessorClient;
27
import org.yamcs.mdb.ProcessingStatistics;
28
import org.yamcs.protobuf.CreateEventRequest;
29
import org.yamcs.protobuf.GetServerInfoResponse;
30
import org.yamcs.protobuf.GetServerInfoResponse.CommandOptionInfo;
31
import org.yamcs.protobuf.Mdb.ParameterInfo;
32
import org.yamcs.protobuf.Pvalue.ParameterValue;
33
import org.yamcs.protobuf.SubscribeEventsRequest;
34
import org.yamcs.protobuf.TmPacketData;
35
// import org.yamcs.protobuf.TmStatistics;
36
import org.yamcs.protobuf.links.LinkInfo;
37
import org.yamcs.protobuf.links.SubscribeLinksRequest;
38
import org.yamcs.utils.TimeEncoding;
39

40
// import org.yamcs.protobuf.Event;
41

42
public class CMDR_YamcsInstance extends YamcsObject<YamcsObject<?>> {
43
  public static final Logger logger = Logger.getLogger(CMDR_YamcsInstance.class.getPackageName());
1✔
44
  public static String OBJECT_TYPE = "instance";
1✔
45
  private ProcessorClient yamcsProcessor = null;
1✔
46
  private YamcsSubscriptionService paramSubscriptionService;
47
  private EventSubscription eventSubscription;
48
  private LinkSubscription linkSubscription;
49
  private MissionDatabase missionDatabase;
50
  //  private EventSubscription eventSubscription;
51
  private ArchiveClient yamcsArchiveClient;
52
  private CMDR_YamcsInstanceState instanceState;
53

54
  // TODO:Not sure if we want to have this on every instance and their server...just want it to work
55
  // for now.
56
  // Useful for "special" command link arguments such as cop1Bypass
57
  private HashMap<String, CommandOptionInfo> extraCommandArgs =
1✔
58
      new HashMap<String, CommandOptionInfo>();
59

60
  private ObservableList<CommandOption> optionsList = FXCollections.observableArrayList();
1✔
61
  private ObservableList<CMDR_Event> events = FXCollections.observableArrayList();
1✔
62
  private ObservableList<LinkInfo> links = FXCollections.observableArrayList();
1✔
63
  private ObservableList<TmStatistics> packets = FXCollections.observableArrayList();
1✔
64

65
  public ObservableList<TmStatistics> getPackets() {
66
    return packets;
×
67
  }
68

69
  private HashMap<String, LinkInfo> linksMap = new HashMap<String, LinkInfo>();
1✔
70

71
  public HashMap<String, LinkInfo> getLinksMap() {
72
    return linksMap;
×
73
  }
74

75
  private HashMap<String, Boolean> activeInLinks = new HashMap<String, Boolean>();
1✔
76

77
  private HashMap<String, Instant> LastUpdateLinks = new HashMap<String, Instant>();
1✔
78

79
  public HashMap<String, Boolean> getActiveInLinks() {
80
    return activeInLinks;
×
81
  }
82

83
  private HashMap<String, Boolean> activeOutLinks = new HashMap<String, Boolean>();
1✔
84
  private ScheduledThreadPoolExecutor timer;
85

86
  public ObservableList<LinkInfo> getLinks() {
87
    return links;
×
88
  }
89

90
  public CMDR_YamcsInstanceState getInstanceState() {
91
    return instanceState;
×
92
  }
93

94
  // Make this class generic?
95
  public class CommandOption {
96
    private String id;
97
    private String value;
98

99
    public CommandOption(String newId, String value) {
×
100
      this.id = newId;
×
101
      this.value = value;
×
102
    }
×
103

104
    public String getValue() {
105
      return this.value;
×
106
    }
107

108
    public String getId() {
109
      return this.id;
×
110
    }
111

112
    public void setValue(String newValue) {
113
      this.value = newValue;
×
114
    }
×
115
  }
116

117
  public ObservableList<CommandOption> getOptionsList() {
118
    return optionsList;
×
119
  }
120

121
  public HashMap<String, CommandOptionInfo> getExtraCommandArgs() {
122
    return extraCommandArgs;
×
123
  }
124

125
  public ArchiveClient getYamcsArchiveClient() {
126
    return yamcsArchiveClient;
1✔
127
  }
128

129
  public ObservableList<CMDR_Event> getEvents() {
130
    return events;
1✔
131
  }
132

133
  public ProcessorClient getYamcsProcessor() {
134
    return yamcsProcessor;
1✔
135
  }
136

137
  public CMDR_YamcsInstance(String name) {
138
    super(name);
1✔
139
  }
1✔
140

141
  @Override
142
  public ObservableList<YamcsObject<?>> getItems() {
143
    return FXCollections.emptyObservableList();
1✔
144
  }
145

146
  @Override
147
  public void createAndAddChild(String name) {
148
    throw new IllegalStateException("CMDR_YamcsInstance does not allow child items");
1✔
149
  }
150

151
  @Override
152
  public String getObjectType() {
153
    return OBJECT_TYPE;
1✔
154
  }
155

156
  protected void initProcessorClient(YamcsClient yamcsClient) {
157
    yamcsProcessor = yamcsClient.createProcessorClient(getName(), "realtime");
1✔
158
  }
1✔
159

160
  protected void initYamcsSubscriptionService(
161
      YamcsClient yamcsClient, String serverName, String procesor) {
162
    paramSubscriptionService =
1✔
163
        new YamcsSubscriptionService(
164
            yamcsClient.createParameterSubscription(), serverName, this.getName(), procesor);
1✔
165
  }
1✔
166

167
  protected void initLinkSubscription(YamcsClient yamcsClient, String serverName) {
168
    linkSubscription = yamcsClient.createLinkSubscription();
1✔
169
    linkSubscription.addMessageListener(
1✔
170
        linkEvent -> {
171
          switch (linkEvent.getType()) {
1✔
172
            case REGISTERED:
173
            case UPDATED:
174
              {
175
                var link = linkEvent.getLinkInfo();
1✔
176
                LinkInfo linkFromList = null;
1✔
177

178
                LastUpdateLinks.put(link.getName(), Instant.now());
1✔
179

180
                linksMap.put(link.getName(), link);
1✔
181

182
                boolean linkExistsInlList = false;
1✔
183

184
                for (var l : links) {
1✔
185
                  if (l != null) {
1✔
186
                    if (l.getName().equals(link.getName())) {
1✔
187
                      linkFromList = l;
×
188
                      linkExistsInlList = true;
×
189
                    }
190
                  }
191
                }
1✔
192

193
                if (linkExistsInlList) {
1✔
194
                  links.remove(linkFromList);
×
195
                }
196
                links.add(linksMap.get(link.getName()));
1✔
197
              }
198

199
              break;
1✔
200
            case UNREGISTERED:
201
              //               TODO but not currently sent by Yamcs
202
          }
203
        });
1✔
204

205
    linkSubscription.sendMessage(SubscribeLinksRequest.newBuilder().setInstance(getName()).build());
1✔
206
  }
1✔
207

208
  protected void initEventSubscription(YamcsClient yamcsClient, String serverName) {
209
    eventSubscription = yamcsClient.createEventSubscription();
1✔
210
    eventSubscription.addMessageListener(
1✔
211
        event -> {
212
          events.add(
×
213
              new CMDR_Event(
214
                  event.getMessage(),
×
215
                  Instant.ofEpochSecond(
×
216
                      event.getGenerationTime().getSeconds(), event.getGenerationTime().getNanos()),
×
217
                  event.getSeverity(),
×
218
                  event.getType(),
×
219
                  Instant.ofEpochSecond(
×
220
                      event.getReceptionTime().getSeconds(), event.getReceptionTime().getNanos()),
×
221
                  event.getSource(),
×
222
                  this.getName()));
×
223
        });
×
224

225
    yamcsArchiveClient = yamcsClient.createArchiveClient(getName());
1✔
226

227
    eventSubscription.sendMessage(
1✔
228
        SubscribeEventsRequest.newBuilder().setInstance(getName()).build());
1✔
229
  }
1✔
230

231
  private MissionDatabase loadMissionDatabase(YamcsClient client) {
232
    var missionDatabase = new MissionDatabase();
1✔
233

234
    var mdbClient = client.createMissionDatabaseClient(getName());
1✔
235

236
    try {
237
      var page = mdbClient.listParameters(ListOptions.limit(500)).get();
1✔
238
      page.iterator().forEachRemaining(missionDatabase::addParameter);
1✔
239
      while (page.hasNextPage()) {
1✔
240
        page = page.getNextPage().get();
×
241
        page.iterator().forEachRemaining(missionDatabase::addParameter);
×
242
      }
243

244
      var commandPage = mdbClient.listCommands(ListOptions.limit(200)).get();
1✔
245
      commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
246
      while (commandPage.hasNextPage()) {
1✔
247
        commandPage = commandPage.getNextPage().get();
1✔
248
        commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
249
      }
250
    } catch (Exception e) {
×
251
      e.printStackTrace();
×
252
      //          throw new Exception("Failed to load mission database", e);
253
    }
1✔
254
    return missionDatabase;
1✔
255
  }
256

257
  protected void initMDBParameterRDequest(YamcsClient yamcsClient, String serverName) {
258
    var mdb = yamcsClient.createMissionDatabaseClient(getName()).listParameters();
1✔
259
    Page<ParameterInfo> paramsPage = null;
1✔
260
    try {
261
      paramsPage = mdb.get();
1✔
262
    } catch (InterruptedException | ExecutionException e) {
×
263
      // TODO Auto-generated catch block
264
      e.printStackTrace();
×
265
    }
1✔
266
    var it = paramsPage.iterator();
1✔
267
    it.forEachRemaining(
1✔
268
        p -> {
269
          //          System.out.println("p-->" + p.getQualifiedName());
270

271
          for (var m : p.getType().getMemberList()) {
1✔
272
            //            System.out.println("p member-->" + m.getName());
273
          }
1✔
274
        });
1✔
275
    while (paramsPage.hasNextPage()) {
1✔
276
      //      var it = paramsPage.iterator();
277
      try {
278
        paramsPage = paramsPage.getNextPage().get();
×
279
      } catch (InterruptedException | ExecutionException e) {
×
280
        // TODO Auto-generated catch block
281
        e.printStackTrace();
×
282
      }
×
283
      it = paramsPage.iterator();
×
284
      it.forEachRemaining(
×
285
          p -> {
286
            //            System.out.println("p-->" + p.getQualifiedName());
287

288
            for (var m : p.getType().getMemberList()) {
×
289
              //              System.out.println("p member-->" + m.getName());
290
            }
×
291
          });
×
292
    }
293
  }
1✔
294

295
  /**
296
   * Initializes all of the subscriptions to the servers such as event and parameter subscriptions.
297
   * Always call this AFTER the websocket connection to YAMCS has been established. Ideally inside
298
   * the connected() method of a org.yamcs.client.ConnectionListener. Otherwise, one might cause a
299
   * race between the time we "connect" via the websocket and the time we create these
300
   * subscriptions.
301
   *
302
   * @param yamcsClient
303
   * @param serverName
304
   */
305
  // TODO:This shoud return whether or not the instance activated successfully.
306
  public void activate(YamcsClient yamcsClient, String serverName) {
307
    initProcessorClient(yamcsClient);
1✔
308
    initYamcsSubscriptionService(yamcsClient, serverName, "realtime");
1✔
309
    initEventSubscription(yamcsClient, serverName);
1✔
310
    initLinkSubscription(yamcsClient, serverName);
1✔
311
    initMDBParameterRDequest(yamcsClient, serverName);
1✔
312
    initTMStats(yamcsClient);
1✔
313

314
    missionDatabase = loadMissionDatabase(yamcsClient);
1✔
315

316
    try {
317
      initCommandOptions(yamcsClient);
1✔
318
    } catch (InterruptedException e) {
×
319
      // TODO Auto-generated catch block
320
      e.printStackTrace();
×
321
      return;
×
322
    } catch (ExecutionException e) {
×
323
      // TODO Auto-generated catch block
324
      e.printStackTrace();
×
325
      return;
×
326
    }
1✔
327
    instanceState = CMDR_YamcsInstanceState.ACTIVATED;
1✔
328
  }
1✔
329

330
  public void subscribeTMStats(YamcsClient yamcsClient, Consumer<ProcessingStatistics> consumer) {
331
    //    TODO:Don't use the YAMCS thread pool. Use the Java one.
332
    //    timer = YamcsServer.getServer().getThreadPoolExecutor();
333
    //
334
    //    //    Make "realtime configurable"
335
    //
336
    //    timer.scheduleAtFixedRate(
337
    //        () -> {
338
    //          System.out.println("scheduleAtFixedRate1");
339
    //          YamcsServer.getServer();
340
    //          System.out.println("scheduleAtFixedRate2:" + YamcsServer.getServer());
341
    //          System.out.println("Instance:" + getName());
342
    //          var instance = YamcsServer.getServer().getInstance(getName());
343
    //
344
    //          System.out.println("scheduleAtFixedRate3:" + instance);
345
    //          YamcsServer.getServer().getInstance(getName()).getProcessor("realtime");
346
    //
347
    //          System.out.println("scheduleAtFixedRate4");
348
    //          ProcessingStatistics ps =
349
    //              YamcsServer.getServer()
350
    //                  .getInstance(getName())
351
    //                  .getProcessor("realtime")
352
    //                  .getTmProcessor()
353
    //                  .getStatistics();
354
    //          //           ps =
355
    //          //              YamcsServer.getServer()
356
    //          //                  .getInstance(getName())
357
    //          //                  .getProcessor("realtime")
358
    //          //                  .getTmProcessor()
359
    //          //                  .getStatistics();
360
    //          System.out.println("scheduleAtFixedRate5:" + ps);
361
    //          consumer.accept(ps);
362
    //        },
363
    //        1,
364
    //        1,
365
    //        TimeUnit.SECONDS);
366
    //          TODO:Add the API call to YMACS server side
367
  }
×
368

369
  public void initTMStats(YamcsClient yamcsClient) {
370
    try {
371
      new YamcsWebSocketClient(
1✔
372
          stats -> {
373
            if (stats != null) {
1✔
374
              packets.clear();
×
375
              for (TmStatistics s : stats) {
×
376
                packets.add(s);
×
377
              }
×
378
            }
379
          },
1✔
380
          yamcsClient.getHost(),
1✔
381
          yamcsClient.getPort(),
1✔
382
          getName(),
1✔
383
          yamcsClient.listProcessors(getName()).get().get(0).getName());
1✔
384
    } catch (InterruptedException e) {
×
385
      // TODO Auto-generated catch block
386
      e.printStackTrace();
×
387
    } catch (ExecutionException e) {
×
388
      // TODO Auto-generated catch block
389
      e.printStackTrace();
×
390
    }
1✔
391

392
    //
393
    //    subscribeTMStats(
394
    //        yamcsClient,
395
    //        stats -> {
396
    //          packets.clear();
397
    //          System.out.println("stats...");
398
    //          for (var s : stats.snapshot()) {
399
    //            packets.add(s);
400
    //          }
401
    //        });
402
  }
1✔
403

404
  private void initPacketSubscription(YamcsClient yamcsClient) {
405
    PacketSubscription subscription = yamcsClient.createPacketSubscription();
×
406
    //    yamcsClient.createProcessorClient(OBJECT_TYPE, OBJECT_TYPE)
407
    subscription.addMessageListener(
×
408
        new MessageListener<TmPacketData>() {
×
409

410
          @Override
411
          public void onMessage(TmPacketData message) {
412
            TmPacket pwt =
×
413
                new TmPacket(
414
                    TimeEncoding.fromProtobufTimestamp(message.getReceptionTime()),
×
415
                    TimeEncoding.fromProtobufTimestamp(message.getGenerationTime()),
×
416
                    message.getSequenceNumber(),
×
417
                    message.getPacket().toByteArray());
×
418
            //            packetsTable.packetReceived(pwt);
419
          }
×
420

421
          @Override
422
          public void onError(Throwable t) {
423
            //            showError("Error subscribing: " + t.getMessage());
424
          }
×
425
        });
426

427
    //    subscription.sendMessage(SubscribeTMStatisticsRequest.newBuilder()
428
    //            .setInstance(getName())
429
    ////            .setStream(connectData.streamName)
430
    //            .build());
431
  }
×
432

433
  public MissionDatabase getMissionDatabase() {
434
    return missionDatabase;
×
435
  }
436

437
  private void initCommandOptions(YamcsClient yamcsClient)
438
      throws InterruptedException, ExecutionException {
439
    GetServerInfoResponse info = yamcsClient.getServerInfo().get();
1✔
440
    System.out.println("initCommandOptions-->1");
1✔
441
    for (CommandOptionInfo o : info.getCommandOptionsList()) {
1✔
442
      extraCommandArgs.put(o.getId(), o);
×
443

444
      // Eventually check the type and create Commandoption accordingly
445
      optionsList.add(new CommandOption(o.getId(), ""));
×
446
      System.out.println("initCommandOptions-->2" + optionsList);
×
447
    }
×
448
    System.out.println("initCommandOptions-->3");
1✔
449
  }
1✔
450

451
  public void deActivate(YamcsClient yamcsClient, String serverName) {
452
    // TODO:unInit resources...
453
    instanceState = CMDR_YamcsInstanceState.DEACTIVATED;
1✔
454
    if (eventSubscription != null) {
1✔
455
      eventSubscription.cancel(true);
1✔
456
      paramSubscriptionService.destroy();
1✔
457
    }
458
  }
1✔
459

460
  public EventSubscription getEventSubscription() {
461
    return eventSubscription;
1✔
462
  }
463

464
  public void subscribePV(YamcsPV pv) {
465
    // TODO:Have to let the caller know whether were able to successfully subscribe
466
    // to this pv or not.
467
    paramSubscriptionService.register(pv);
×
468
  }
×
469

470
  /** Creates and publishes an event to YAMCS instance. */
471
  public void publishEvent(String message, YamcsClient yamcsClient) {
472
    yamcsClient.createEvent(
×
473
        CreateEventRequest.newBuilder()
×
474
            .setInstance(getName())
×
475
            .setMessage(message)
×
476
            .setSource("Commander")
×
477
            .build());
×
478
  }
×
479

480
  /** Creates and publishes an event to YAMCS instance. */
481
  public void publishEvent(CMDR_Event e, YamcsClient yamcsClient) {
482
    yamcsClient.createEvent(
×
483
        CreateEventRequest.newBuilder()
×
484
            .setInstance(getName())
×
485
            .setMessage(e.getMessage())
×
486
            .setSource(e.getSource())
×
487
            .setSeverity(e.getSeverity().toString())
×
488
            .build());
×
489
  }
×
490

491
  public ArrayList<String> getProcessors(YamcsClient yamcsClient) {
492

493
    ArrayList<String> processors = new ArrayList<String>();
×
494
    try {
495
      yamcsClient
×
496
          .listProcessors(getName())
×
497
          .get()
×
498
          .forEach(
×
499
              p -> {
500
                processors.add(p.getName());
×
501
              });
×
502
    } catch (InterruptedException | ExecutionException e) {
×
503
      // TODO Auto-generated catch block
504
      e.printStackTrace();
×
505
    }
×
506

507
    return processors;
×
508
  }
509

510
  public void switchProcessor(YamcsClient yamcsClient, String serverName, String processorName) {
511
    //          This seems redundant....
512
    paramSubscriptionService.destroy();
×
513
    initYamcsSubscriptionService(yamcsClient, serverName, processorName);
×
514
  }
×
515

516
  public void getParameters(
517
      YamcsClient yamcsClient,
518
      List<String> parameters,
519
      Instant start,
520
      Instant end,
521
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
522

523
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
524
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
525
    for (var p : parameters) {
×
526
      try {
527
        pages.add(this.getYamcsArchiveClient().listValues(p, start, end).get());
×
528
      } catch (InterruptedException | ExecutionException e) {
×
529
        // TODO Auto-generated catch block
530
        e.printStackTrace();
×
531
      }
×
532
    }
×
533

534
    consumer.accept(pages);
×
535
  }
×
536

537
  public void getParameter(
538
      YamcsClient yamcsClient,
539
      String parameter,
540
      Instant start,
541
      Instant end,
542
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
543

544
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
545
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
546
    try {
547
      pages.add(this.getYamcsArchiveClient().listValues(parameter, start, end).get());
×
548
    } catch (InterruptedException | ExecutionException e) {
×
549
      // TODO Auto-generated catch block
550
      e.printStackTrace();
×
551
    }
×
552

553
    consumer.accept(pages);
×
554
  }
×
555

556
  public boolean isLinkActive(String linkName) {
557
    return Duration.between(Instant.now(), LastUpdateLinks.get(linkName)).toMillis() < 1000;
×
558
  }
559
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc