• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #85

21 Sep 2023 08:08AM UTC coverage: 16.58% (+0.01%) from 16.568%
#85

push

lorenzo-gomez-windhover
-Minimally functional packets viewer

17807 of 107403 relevant lines covered (16.58%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.64
/core/commander-core/src/main/java/com/windhoverlabs/yamcs/core/CMDR_YamcsInstance.java
1
package com.windhoverlabs.yamcs.core;
2

3
import com.windhoverlabs.pv.yamcs.YamcsPV;
4
import com.windhoverlabs.pv.yamcs.YamcsSubscriptionService;
5
import com.windhoverlabs.yamcs.core.YamcsWebSocketClient.TmStatistics;
6
import java.time.Duration;
7
import java.time.Instant;
8
import java.util.ArrayList;
9
import java.util.HashMap;
10
import java.util.List;
11
import java.util.concurrent.ExecutionException;
12
import java.util.concurrent.ScheduledThreadPoolExecutor;
13
import java.util.concurrent.TimeUnit;
14
import java.util.function.Consumer;
15
import java.util.logging.Logger;
16
import javafx.collections.FXCollections;
17
import javafx.collections.ObservableList;
18
import org.yamcs.TmPacket;
19
import org.yamcs.YamcsServer;
20
import org.yamcs.client.EventSubscription;
21
import org.yamcs.client.LinkSubscription;
22
import org.yamcs.client.MessageListener;
23
import org.yamcs.client.PacketSubscription;
24
import org.yamcs.client.Page;
25
import org.yamcs.client.YamcsClient;
26
import org.yamcs.client.archive.ArchiveClient;
27
import org.yamcs.client.mdb.MissionDatabaseClient.ListOptions;
28
import org.yamcs.client.processor.ProcessorClient;
29
import org.yamcs.mdb.ProcessingStatistics;
30
import org.yamcs.protobuf.CreateEventRequest;
31
import org.yamcs.protobuf.GetServerInfoResponse;
32
import org.yamcs.protobuf.GetServerInfoResponse.CommandOptionInfo;
33
import org.yamcs.protobuf.Mdb.ParameterInfo;
34
import org.yamcs.protobuf.Pvalue.ParameterValue;
35
import org.yamcs.protobuf.SubscribeEventsRequest;
36
import org.yamcs.protobuf.TmPacketData;
37
// import org.yamcs.protobuf.TmStatistics;
38
import org.yamcs.protobuf.links.LinkInfo;
39
import org.yamcs.protobuf.links.SubscribeLinksRequest;
40
import org.yamcs.utils.TimeEncoding;
41

42
// import org.yamcs.protobuf.Event;
43

44
public class CMDR_YamcsInstance extends YamcsObject<YamcsObject<?>> {
45
  public static final Logger logger = Logger.getLogger(CMDR_YamcsInstance.class.getPackageName());
1✔
46
  public static String OBJECT_TYPE = "instance";
1✔
47
  private ProcessorClient yamcsProcessor = null;
1✔
48
  private YamcsSubscriptionService paramSubscriptionService;
49
  private EventSubscription eventSubscription;
50
  private LinkSubscription linkSubscription;
51
  private MissionDatabase missionDatabase;
52
  //  private EventSubscription eventSubscription;
53
  private ArchiveClient yamcsArchiveClient;
54
  private CMDR_YamcsInstanceState instanceState;
55

56
  // TODO:Not sure if we want to have this on every instance and their server...just want it to work
57
  // for now.
58
  // Useful for "special" command link arguments such as cop1Bypass
59
  private HashMap<String, CommandOptionInfo> extraCommandArgs =
1✔
60
      new HashMap<String, CommandOptionInfo>();
61

62
  private ObservableList<CommandOption> optionsList = FXCollections.observableArrayList();
1✔
63
  private ObservableList<CMDR_Event> events = FXCollections.observableArrayList();
1✔
64
  private ObservableList<LinkInfo> links = FXCollections.observableArrayList();
1✔
65
  private ObservableList<TmStatistics> packets = FXCollections.observableArrayList();
1✔
66

67
  public ObservableList<TmStatistics> getPackets() {
68
    return packets;
×
69
  }
70

71
  private HashMap<String, LinkInfo> linksMap = new HashMap<String, LinkInfo>();
1✔
72

73
  public HashMap<String, LinkInfo> getLinksMap() {
74
    return linksMap;
×
75
  }
76

77
  private HashMap<String, Boolean> activeInLinks = new HashMap<String, Boolean>();
1✔
78

79
  private HashMap<String, Instant> LastUpdateLinks = new HashMap<String, Instant>();
1✔
80

81
  public HashMap<String, Boolean> getActiveInLinks() {
82
    return activeInLinks;
×
83
  }
84

85
  private HashMap<String, Boolean> activeOutLinks = new HashMap<String, Boolean>();
1✔
86
  private ScheduledThreadPoolExecutor timer;
87

88
  public ObservableList<LinkInfo> getLinks() {
89
    return links;
×
90
  }
91

92
  public CMDR_YamcsInstanceState getInstanceState() {
93
    return instanceState;
×
94
  }
95

96
  // Make this class generic?
97
  public class CommandOption {
98
    private String id;
99
    private String value;
100

101
    public CommandOption(String newId, String value) {
×
102
      this.id = newId;
×
103
      this.value = value;
×
104
    }
×
105

106
    public String getValue() {
107
      return this.value;
×
108
    }
109

110
    public String getId() {
111
      return this.id;
×
112
    }
113

114
    public void setValue(String newValue) {
115
      this.value = newValue;
×
116
    }
×
117
  }
118

119
  public ObservableList<CommandOption> getOptionsList() {
120
    return optionsList;
×
121
  }
122

123
  public HashMap<String, CommandOptionInfo> getExtraCommandArgs() {
124
    return extraCommandArgs;
×
125
  }
126

127
  public ArchiveClient getYamcsArchiveClient() {
128
    return yamcsArchiveClient;
1✔
129
  }
130

131
  public ObservableList<CMDR_Event> getEvents() {
132
    return events;
1✔
133
  }
134

135
  public ProcessorClient getYamcsProcessor() {
136
    return yamcsProcessor;
1✔
137
  }
138

139
  public CMDR_YamcsInstance(String name) {
140
    super(name);
1✔
141
  }
1✔
142

143
  @Override
144
  public ObservableList<YamcsObject<?>> getItems() {
145
    return FXCollections.emptyObservableList();
1✔
146
  }
147

148
  @Override
149
  public void createAndAddChild(String name) {
150
    throw new IllegalStateException("CMDR_YamcsInstance does not allow child items");
1✔
151
  }
152

153
  @Override
154
  public String getObjectType() {
155
    return OBJECT_TYPE;
1✔
156
  }
157

158
  protected void initProcessorClient(YamcsClient yamcsClient) {
159
    yamcsProcessor = yamcsClient.createProcessorClient(getName(), "realtime");
1✔
160
  }
1✔
161

162
  protected void initYamcsSubscriptionService(
163
      YamcsClient yamcsClient, String serverName, String procesor) {
164
    paramSubscriptionService =
1✔
165
        new YamcsSubscriptionService(
166
            yamcsClient.createParameterSubscription(), serverName, this.getName(), procesor);
1✔
167
  }
1✔
168

169
  protected void initLinkSubscription(YamcsClient yamcsClient, String serverName) {
170
    linkSubscription = yamcsClient.createLinkSubscription();
1✔
171
    linkSubscription.addMessageListener(
1✔
172
        linkEvent -> {
173
          switch (linkEvent.getType()) {
1✔
174
            case REGISTERED:
175
            case UPDATED:
176
              {
177
                var link = linkEvent.getLinkInfo();
1✔
178
                LinkInfo linkFromList = null;
1✔
179

180
                LastUpdateLinks.put(link.getName(), Instant.now());
1✔
181

182
                linksMap.put(link.getName(), link);
1✔
183

184
                boolean linkExistsInlList = false;
1✔
185

186
                for (var l : links) {
1✔
187
                  if (l != null) {
1✔
188
                    if (l.getName().equals(link.getName())) {
1✔
189
                      linkFromList = l;
×
190
                      linkExistsInlList = true;
×
191
                    }
192
                  }
193
                }
1✔
194

195
                if (linkExistsInlList) {
1✔
196
                  links.remove(linkFromList);
×
197
                }
198
                links.add(linksMap.get(link.getName()));
1✔
199
              }
200

201
              break;
1✔
202
            case UNREGISTERED:
203
              //               TODO but not currently sent by Yamcs
204
          }
205
        });
1✔
206

207
    linkSubscription.sendMessage(SubscribeLinksRequest.newBuilder().setInstance(getName()).build());
1✔
208
  }
1✔
209

210
  protected void initEventSubscription(YamcsClient yamcsClient, String serverName) {
211
    eventSubscription = yamcsClient.createEventSubscription();
1✔
212
    eventSubscription.addMessageListener(
1✔
213
        event -> {
214
          events.add(
×
215
              new CMDR_Event(
216
                  event.getMessage(),
×
217
                  Instant.ofEpochSecond(
×
218
                      event.getGenerationTime().getSeconds(), event.getGenerationTime().getNanos()),
×
219
                  event.getSeverity(),
×
220
                  event.getType(),
×
221
                  Instant.ofEpochSecond(
×
222
                      event.getReceptionTime().getSeconds(), event.getReceptionTime().getNanos()),
×
223
                  event.getSource(),
×
224
                  this.getName()));
×
225
        });
×
226

227
    yamcsArchiveClient = yamcsClient.createArchiveClient(getName());
1✔
228

229
    eventSubscription.sendMessage(
1✔
230
        SubscribeEventsRequest.newBuilder().setInstance(getName()).build());
1✔
231
  }
1✔
232

233
  private MissionDatabase loadMissionDatabase(YamcsClient client) {
234
    var missionDatabase = new MissionDatabase();
1✔
235

236
    var mdbClient = client.createMissionDatabaseClient(getName());
1✔
237

238
    try {
239
      var page = mdbClient.listParameters(ListOptions.limit(500)).get();
1✔
240
      page.iterator().forEachRemaining(missionDatabase::addParameter);
1✔
241
      while (page.hasNextPage()) {
1✔
242
        page = page.getNextPage().get();
×
243
        page.iterator().forEachRemaining(missionDatabase::addParameter);
×
244
      }
245

246
      var commandPage = mdbClient.listCommands(ListOptions.limit(200)).get();
1✔
247
      commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
248
      while (commandPage.hasNextPage()) {
1✔
249
        commandPage = commandPage.getNextPage().get();
1✔
250
        commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
251
      }
252
    } catch (Exception e) {
×
253
      e.printStackTrace();
×
254
      //          throw new Exception("Failed to load mission database", e);
255
    }
1✔
256
    return missionDatabase;
1✔
257
  }
258

259
  protected void initMDBParameterRDequest(YamcsClient yamcsClient, String serverName) {
260
    var mdb = yamcsClient.createMissionDatabaseClient(getName()).listParameters();
1✔
261
    Page<ParameterInfo> paramsPage = null;
1✔
262
    try {
263
      paramsPage = mdb.get();
1✔
264
    } catch (InterruptedException | ExecutionException e) {
×
265
      // TODO Auto-generated catch block
266
      e.printStackTrace();
×
267
    }
1✔
268
    var it = paramsPage.iterator();
1✔
269
    it.forEachRemaining(
1✔
270
        p -> {
271
          //          System.out.println("p-->" + p.getQualifiedName());
272

273
          for (var m : p.getType().getMemberList()) {
1✔
274
            //            System.out.println("p member-->" + m.getName());
275
          }
1✔
276
        });
1✔
277
    while (paramsPage.hasNextPage()) {
1✔
278
      //      var it = paramsPage.iterator();
279
      try {
280
        paramsPage = paramsPage.getNextPage().get();
×
281
      } catch (InterruptedException | ExecutionException e) {
×
282
        // TODO Auto-generated catch block
283
        e.printStackTrace();
×
284
      }
×
285
      it = paramsPage.iterator();
×
286
      it.forEachRemaining(
×
287
          p -> {
288
            //            System.out.println("p-->" + p.getQualifiedName());
289

290
            for (var m : p.getType().getMemberList()) {
×
291
              //              System.out.println("p member-->" + m.getName());
292
            }
×
293
          });
×
294
    }
295
  }
1✔
296

297
  /**
298
   * Initializes all of the subscriptions to the servers such as event and parameter subscriptions.
299
   * Always call this AFTER the websocket connection to YAMCS has been established. Ideally inside
300
   * the connected() method of a org.yamcs.client.ConnectionListener. Otherwise, one might cause a
301
   * race between the time we "connect" via the websocket and the time we create these
302
   * subscriptions.
303
   *
304
   * @param yamcsClient
305
   * @param serverName
306
   */
307
  // TODO:This shoud return whether or not the instance activated successfully.
308
  public void activate(YamcsClient yamcsClient, String serverName) {
309
    initProcessorClient(yamcsClient);
1✔
310
    initYamcsSubscriptionService(yamcsClient, serverName, "realtime");
1✔
311
    initEventSubscription(yamcsClient, serverName);
1✔
312
    initLinkSubscription(yamcsClient, serverName);
1✔
313
    initMDBParameterRDequest(yamcsClient, serverName);
1✔
314
    initTMStats(yamcsClient);
1✔
315

316
    missionDatabase = loadMissionDatabase(yamcsClient);
1✔
317

318
    try {
319
      initCommandOptions(yamcsClient);
1✔
320
    } catch (InterruptedException e) {
×
321
      // TODO Auto-generated catch block
322
      e.printStackTrace();
×
323
      return;
×
324
    } catch (ExecutionException e) {
×
325
      // TODO Auto-generated catch block
326
      e.printStackTrace();
×
327
      return;
×
328
    }
1✔
329
    instanceState = CMDR_YamcsInstanceState.ACTIVATED;
1✔
330
  }
1✔
331

332
  public void subscribeTMStats(YamcsClient yamcsClient, Consumer<ProcessingStatistics> consumer) {
333
    //    TODO:Don't use the YAMCS thread pool. Use the Java one.
334
    timer = YamcsServer.getServer().getThreadPoolExecutor();
×
335

336
    //    Make "realtime configurable"
337

338
    timer.scheduleAtFixedRate(
×
339
        () -> {
340
          System.out.println("scheduleAtFixedRate1");
×
341
          YamcsServer.getServer();
×
342
          System.out.println("scheduleAtFixedRate2:" + YamcsServer.getServer());
×
343
          System.out.println("Instance:" + getName());
×
344
          var instance = YamcsServer.getServer().getInstance(getName());
×
345

346
          System.out.println("scheduleAtFixedRate3:" + instance);
×
347
          YamcsServer.getServer().getInstance(getName()).getProcessor("realtime");
×
348

349
          System.out.println("scheduleAtFixedRate4");
×
350
          ProcessingStatistics ps =
351
              YamcsServer.getServer()
×
352
                  .getInstance(getName())
×
353
                  .getProcessor("realtime")
×
354
                  .getTmProcessor()
×
355
                  .getStatistics();
×
356
          //           ps =
357
          //              YamcsServer.getServer()
358
          //                  .getInstance(getName())
359
          //                  .getProcessor("realtime")
360
          //                  .getTmProcessor()
361
          //                  .getStatistics();
362
          System.out.println("scheduleAtFixedRate5:" + ps);
×
363
          consumer.accept(ps);
×
364
        },
×
365
        1,
366
        1,
367
        TimeUnit.SECONDS);
368
  }
×
369

370
  public void initTMStats(YamcsClient yamcsClient) {
371
    System.out.println("initTMStats**************88");
1✔
372
    try {
373
      new YamcsWebSocketClient(
1✔
374
          stats -> {
375
            packets.clear();
1✔
376
            for (TmStatistics s : stats) {
×
377
              packets.add(s);
×
378
            }
×
379
          },
×
380
          yamcsClient.getHost(),
1✔
381
          yamcsClient.getPort(),
1✔
382
          getName(),
1✔
383
          yamcsClient.listProcessors(getName()).get().get(0).getName());
1✔
384
    } catch (InterruptedException e) {
×
385
      // TODO Auto-generated catch block
386
      e.printStackTrace();
×
387
    } catch (ExecutionException e) {
×
388
      // TODO Auto-generated catch block
389
      e.printStackTrace();
×
390
    }
1✔
391
    //
392
    //    ManagementListener listener = new ManagementListener() {
393
    //        @Override
394
    //        public void statisticsUpdated(Processor statsProcessor, Statistics stats) {
395
    //            if (statsProcessor.getName().equals()) {
396
    //                observer.next(stats);
397
    //            }
398
    //        }
399
    //    };
400
    //    observer.setCancelHandler(() ->
401
    // ManagementService.getInstance().removeManagementListener(listener));
402
    //    ManagementService.getInstance().addManagementListener(listener);
403

404
    //
405
    //    subscribeTMStats(
406
    //        yamcsClient,
407
    //        stats -> {
408
    //          packets.clear();
409
    //          System.out.println("stats...");
410
    //          for (var s : stats.snapshot()) {
411
    //            packets.add(s);
412
    //          }
413
    //        });
414
  }
1✔
415

416
  private void initPacketSubscription(YamcsClient yamcsClient) {
417
    PacketSubscription subscription = yamcsClient.createPacketSubscription();
×
418
    //    yamcsClient.createProcessorClient(OBJECT_TYPE, OBJECT_TYPE)
419
    subscription.addMessageListener(
×
420
        new MessageListener<TmPacketData>() {
×
421

422
          @Override
423
          public void onMessage(TmPacketData message) {
424
            TmPacket pwt =
×
425
                new TmPacket(
426
                    TimeEncoding.fromProtobufTimestamp(message.getReceptionTime()),
×
427
                    TimeEncoding.fromProtobufTimestamp(message.getGenerationTime()),
×
428
                    message.getSequenceNumber(),
×
429
                    message.getPacket().toByteArray());
×
430
            //            packetsTable.packetReceived(pwt);
431
          }
×
432

433
          @Override
434
          public void onError(Throwable t) {
435
            //            showError("Error subscribing: " + t.getMessage());
436
          }
×
437
        });
438

439
    //    subscription.sendMessage(SubscribeTMStatisticsRequest.newBuilder()
440
    //            .setInstance(getName())
441
    ////            .setStream(connectData.streamName)
442
    //            .build());
443
  }
×
444

445
  public MissionDatabase getMissionDatabase() {
446
    return missionDatabase;
×
447
  }
448

449
  private void initCommandOptions(YamcsClient yamcsClient)
450
      throws InterruptedException, ExecutionException {
451
    GetServerInfoResponse info = yamcsClient.getServerInfo().get();
1✔
452
    System.out.println("initCommandOptions-->1");
1✔
453
    for (CommandOptionInfo o : info.getCommandOptionsList()) {
1✔
454
      extraCommandArgs.put(o.getId(), o);
×
455

456
      // Eventually check the type and create Commandoption accordingly
457
      optionsList.add(new CommandOption(o.getId(), ""));
×
458
      System.out.println("initCommandOptions-->2" + optionsList);
×
459
    }
×
460
    System.out.println("initCommandOptions-->3");
1✔
461
  }
1✔
462

463
  public void deActivate(YamcsClient yamcsClient, String serverName) {
464
    // TODO:unInit resources...
465
    instanceState = CMDR_YamcsInstanceState.DEACTIVATED;
1✔
466
    if (eventSubscription != null) {
1✔
467
      eventSubscription.cancel(true);
1✔
468
      paramSubscriptionService.destroy();
1✔
469
    }
470
  }
1✔
471

472
  public EventSubscription getEventSubscription() {
473
    return eventSubscription;
1✔
474
  }
475

476
  public void subscribePV(YamcsPV pv) {
477
    // TODO:Have to let the caller know whether were able to successfully subscribe
478
    // to this pv or not.
479
    paramSubscriptionService.register(pv);
×
480
  }
×
481

482
  /** Creates and publishes an event to YAMCS instance. */
483
  public void publishEvent(String message, YamcsClient yamcsClient) {
484
    yamcsClient.createEvent(
×
485
        CreateEventRequest.newBuilder()
×
486
            .setInstance(getName())
×
487
            .setMessage(message)
×
488
            .setSource("Commander")
×
489
            .build());
×
490
  }
×
491

492
  public ArrayList<String> getProcessors(YamcsClient yamcsClient) {
493

494
    ArrayList<String> processors = new ArrayList<String>();
×
495
    try {
496
      yamcsClient
×
497
          .listProcessors(getName())
×
498
          .get()
×
499
          .forEach(
×
500
              p -> {
501
                processors.add(p.getName());
×
502
              });
×
503
    } catch (InterruptedException | ExecutionException e) {
×
504
      // TODO Auto-generated catch block
505
      e.printStackTrace();
×
506
    }
×
507

508
    return processors;
×
509
  }
510

511
  public void switchProcessor(YamcsClient yamcsClient, String serverName, String processorName) {
512
    //          This seems redundant....
513
    paramSubscriptionService.destroy();
×
514
    initYamcsSubscriptionService(yamcsClient, serverName, processorName);
×
515
  }
×
516

517
  public void getParameters(
518
      YamcsClient yamcsClient,
519
      List<String> parameters,
520
      Instant start,
521
      Instant end,
522
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
523

524
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
525
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
526
    for (var p : parameters) {
×
527
      try {
528
        pages.add(this.getYamcsArchiveClient().listValues(p, start, end).get());
×
529
      } catch (InterruptedException | ExecutionException e) {
×
530
        // TODO Auto-generated catch block
531
        e.printStackTrace();
×
532
      }
×
533
    }
×
534

535
    consumer.accept(pages);
×
536
  }
×
537

538
  public void getParameter(
539
      YamcsClient yamcsClient,
540
      String parameter,
541
      Instant start,
542
      Instant end,
543
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
544

545
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
546
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
547
    try {
548
      pages.add(this.getYamcsArchiveClient().listValues(parameter, start, end).get());
×
549
    } catch (InterruptedException | ExecutionException e) {
×
550
      // TODO Auto-generated catch block
551
      e.printStackTrace();
×
552
    }
×
553

554
    consumer.accept(pages);
×
555
  }
×
556

557
  public boolean isLinkActive(String linkName) {
558
    return Duration.between(Instant.now(), LastUpdateLinks.get(linkName)).toMillis() < 1000;
×
559
  }
560
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc