• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #84

21 Sep 2023 02:26AM UTC coverage: 16.568% (+0.009%) from 16.559%
#84

push

lorenzo-gomez-windhover
-Add packets viewer. WIP.

129 of 129 new or added lines in 3 files covered. (100.0%)

17798 of 107426 relevant lines covered (16.57%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.86
/core/commander-core/src/main/java/com/windhoverlabs/yamcs/core/CMDR_YamcsInstance.java
1
package com.windhoverlabs.yamcs.core;
2

3
import com.windhoverlabs.pv.yamcs.YamcsPV;
4
import com.windhoverlabs.pv.yamcs.YamcsSubscriptionService;
5
import java.time.Duration;
6
import java.time.Instant;
7
import java.util.ArrayList;
8
import java.util.HashMap;
9
import java.util.List;
10
import java.util.concurrent.ExecutionException;
11
import java.util.concurrent.ScheduledThreadPoolExecutor;
12
import java.util.concurrent.TimeUnit;
13
import java.util.function.Consumer;
14
import java.util.logging.Logger;
15
import javafx.collections.FXCollections;
16
import javafx.collections.ObservableList;
17
import org.yamcs.TmPacket;
18
import org.yamcs.YamcsServer;
19
import org.yamcs.client.EventSubscription;
20
import org.yamcs.client.LinkSubscription;
21
import org.yamcs.client.MessageListener;
22
import org.yamcs.client.PacketSubscription;
23
import org.yamcs.client.Page;
24
import org.yamcs.client.YamcsClient;
25
import org.yamcs.client.archive.ArchiveClient;
26
import org.yamcs.client.mdb.MissionDatabaseClient.ListOptions;
27
import org.yamcs.client.processor.ProcessorClient;
28
import org.yamcs.mdb.ProcessingStatistics;
29
import org.yamcs.protobuf.CreateEventRequest;
30
import org.yamcs.protobuf.GetServerInfoResponse;
31
import org.yamcs.protobuf.GetServerInfoResponse.CommandOptionInfo;
32
import org.yamcs.protobuf.Mdb.ParameterInfo;
33
import org.yamcs.protobuf.Pvalue.ParameterValue;
34
import org.yamcs.protobuf.SubscribeEventsRequest;
35
import org.yamcs.protobuf.TmPacketData;
36
import org.yamcs.protobuf.TmStatistics;
37
import org.yamcs.protobuf.links.LinkInfo;
38
import org.yamcs.protobuf.links.SubscribeLinksRequest;
39
import org.yamcs.utils.TimeEncoding;
40

41
// import org.yamcs.protobuf.Event;
42

43
public class CMDR_YamcsInstance extends YamcsObject<YamcsObject<?>> {
44
  public static final Logger logger = Logger.getLogger(CMDR_YamcsInstance.class.getPackageName());
1✔
45
  public static String OBJECT_TYPE = "instance";
1✔
46
  private ProcessorClient yamcsProcessor = null;
1✔
47
  private YamcsSubscriptionService paramSubscriptionService;
48
  private EventSubscription eventSubscription;
49
  private LinkSubscription linkSubscription;
50
  private MissionDatabase missionDatabase;
51
  //  private EventSubscription eventSubscription;
52
  private ArchiveClient yamcsArchiveClient;
53
  private CMDR_YamcsInstanceState instanceState;
54

55
  // TODO:Not sure if we want to have this on every instance and their server...just want it to work
56
  // for now.
57
  // Useful for "special" command link arguments such as cop1Bypass
58
  private HashMap<String, CommandOptionInfo> extraCommandArgs =
1✔
59
      new HashMap<String, CommandOptionInfo>();
60

61
  private ObservableList<CommandOption> optionsList = FXCollections.observableArrayList();
1✔
62
  private ObservableList<CMDR_Event> events = FXCollections.observableArrayList();
1✔
63
  private ObservableList<LinkInfo> links = FXCollections.observableArrayList();
1✔
64
  private ObservableList<TmStatistics> packets = FXCollections.observableArrayList();
1✔
65

66
  public ObservableList<TmStatistics> getPackets() {
67
    return packets;
×
68
  }
69

70
  private HashMap<String, LinkInfo> linksMap = new HashMap<String, LinkInfo>();
1✔
71

72
  public HashMap<String, LinkInfo> getLinksMap() {
73
    return linksMap;
×
74
  }
75

76
  private HashMap<String, Boolean> activeInLinks = new HashMap<String, Boolean>();
1✔
77

78
  private HashMap<String, Instant> LastUpdateLinks = new HashMap<String, Instant>();
1✔
79

80
  public HashMap<String, Boolean> getActiveInLinks() {
81
    return activeInLinks;
×
82
  }
83

84
  private HashMap<String, Boolean> activeOutLinks = new HashMap<String, Boolean>();
1✔
85
  private ScheduledThreadPoolExecutor timer;
86

87
  public ObservableList<LinkInfo> getLinks() {
88
    return links;
×
89
  }
90

91
  public CMDR_YamcsInstanceState getInstanceState() {
92
    return instanceState;
×
93
  }
94

95
  // Make this class generic?
96
  public class CommandOption {
97
    private String id;
98
    private String value;
99

100
    public CommandOption(String newId, String value) {
×
101
      this.id = newId;
×
102
      this.value = value;
×
103
    }
×
104

105
    public String getValue() {
106
      return this.value;
×
107
    }
108

109
    public String getId() {
110
      return this.id;
×
111
    }
112

113
    public void setValue(String newValue) {
114
      this.value = newValue;
×
115
    }
×
116
  }
117

118
  public ObservableList<CommandOption> getOptionsList() {
119
    return optionsList;
×
120
  }
121

122
  public HashMap<String, CommandOptionInfo> getExtraCommandArgs() {
123
    return extraCommandArgs;
×
124
  }
125

126
  public ArchiveClient getYamcsArchiveClient() {
127
    return yamcsArchiveClient;
1✔
128
  }
129

130
  public ObservableList<CMDR_Event> getEvents() {
131
    return events;
1✔
132
  }
133

134
  public ProcessorClient getYamcsProcessor() {
135
    return yamcsProcessor;
1✔
136
  }
137

138
  public CMDR_YamcsInstance(String name) {
139
    super(name);
1✔
140
  }
1✔
141

142
  @Override
143
  public ObservableList<YamcsObject<?>> getItems() {
144
    return FXCollections.emptyObservableList();
1✔
145
  }
146

147
  @Override
148
  public void createAndAddChild(String name) {
149
    throw new IllegalStateException("CMDR_YamcsInstance does not allow child items");
1✔
150
  }
151

152
  @Override
153
  public String getObjectType() {
154
    return OBJECT_TYPE;
1✔
155
  }
156

157
  protected void initProcessorClient(YamcsClient yamcsClient) {
158
    yamcsProcessor = yamcsClient.createProcessorClient(getName(), "realtime");
1✔
159
  }
1✔
160

161
  protected void initYamcsSubscriptionService(
162
      YamcsClient yamcsClient, String serverName, String procesor) {
163
    paramSubscriptionService =
1✔
164
        new YamcsSubscriptionService(
165
            yamcsClient.createParameterSubscription(), serverName, this.getName(), procesor);
1✔
166
  }
1✔
167

168
  protected void initLinkSubscription(YamcsClient yamcsClient, String serverName) {
169
    linkSubscription = yamcsClient.createLinkSubscription();
1✔
170
    linkSubscription.addMessageListener(
1✔
171
        linkEvent -> {
172
          switch (linkEvent.getType()) {
1✔
173
            case REGISTERED:
174
            case UPDATED:
175
              {
176
                var link = linkEvent.getLinkInfo();
1✔
177
                LinkInfo linkFromList = null;
1✔
178

179
                LastUpdateLinks.put(link.getName(), Instant.now());
1✔
180

181
                linksMap.put(link.getName(), link);
1✔
182

183
                boolean linkExistsInlList = false;
1✔
184

185
                for (var l : links) {
1✔
186
                  if (l != null) {
1✔
187
                    if (l.getName().equals(link.getName())) {
1✔
188
                      linkFromList = l;
×
189
                      linkExistsInlList = true;
×
190
                    }
191
                  }
192
                }
1✔
193

194
                if (linkExistsInlList) {
1✔
195
                  links.remove(linkFromList);
×
196
                }
197
                links.add(linksMap.get(link.getName()));
1✔
198
              }
199

200
              break;
1✔
201
            case UNREGISTERED:
202
              //               TODO but not currently sent by Yamcs
203
          }
204
        });
1✔
205

206
    linkSubscription.sendMessage(SubscribeLinksRequest.newBuilder().setInstance(getName()).build());
1✔
207
  }
1✔
208

209
  protected void initEventSubscription(YamcsClient yamcsClient, String serverName) {
210
    eventSubscription = yamcsClient.createEventSubscription();
1✔
211
    eventSubscription.addMessageListener(
1✔
212
        event -> {
213
          events.add(
×
214
              new CMDR_Event(
215
                  event.getMessage(),
×
216
                  Instant.ofEpochSecond(
×
217
                      event.getGenerationTime().getSeconds(), event.getGenerationTime().getNanos()),
×
218
                  event.getSeverity(),
×
219
                  event.getType(),
×
220
                  Instant.ofEpochSecond(
×
221
                      event.getReceptionTime().getSeconds(), event.getReceptionTime().getNanos()),
×
222
                  event.getSource(),
×
223
                  this.getName()));
×
224
        });
×
225

226
    yamcsArchiveClient = yamcsClient.createArchiveClient(getName());
1✔
227

228
    eventSubscription.sendMessage(
1✔
229
        SubscribeEventsRequest.newBuilder().setInstance(getName()).build());
1✔
230
  }
1✔
231

232
  private MissionDatabase loadMissionDatabase(YamcsClient client) {
233
    var missionDatabase = new MissionDatabase();
1✔
234

235
    var mdbClient = client.createMissionDatabaseClient(getName());
1✔
236

237
    try {
238
      var page = mdbClient.listParameters(ListOptions.limit(500)).get();
1✔
239
      page.iterator().forEachRemaining(missionDatabase::addParameter);
1✔
240
      while (page.hasNextPage()) {
1✔
241
        page = page.getNextPage().get();
×
242
        page.iterator().forEachRemaining(missionDatabase::addParameter);
×
243
      }
244

245
      var commandPage = mdbClient.listCommands(ListOptions.limit(200)).get();
1✔
246
      commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
247
      while (commandPage.hasNextPage()) {
1✔
248
        commandPage = commandPage.getNextPage().get();
1✔
249
        commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
250
      }
251
    } catch (Exception e) {
×
252
      e.printStackTrace();
×
253
      //          throw new Exception("Failed to load mission database", e);
254
    }
1✔
255
    return missionDatabase;
1✔
256
  }
257

258
  protected void initMDBParameterRDequest(YamcsClient yamcsClient, String serverName) {
259
    var mdb = yamcsClient.createMissionDatabaseClient(getName()).listParameters();
1✔
260
    Page<ParameterInfo> paramsPage = null;
1✔
261
    try {
262
      paramsPage = mdb.get();
1✔
263
    } catch (InterruptedException | ExecutionException e) {
×
264
      // TODO Auto-generated catch block
265
      e.printStackTrace();
×
266
    }
1✔
267
    var it = paramsPage.iterator();
1✔
268
    it.forEachRemaining(
1✔
269
        p -> {
270
          //          System.out.println("p-->" + p.getQualifiedName());
271

272
          for (var m : p.getType().getMemberList()) {
1✔
273
            //            System.out.println("p member-->" + m.getName());
274
          }
1✔
275
        });
1✔
276
    while (paramsPage.hasNextPage()) {
1✔
277
      //      var it = paramsPage.iterator();
278
      try {
279
        paramsPage = paramsPage.getNextPage().get();
×
280
      } catch (InterruptedException | ExecutionException e) {
×
281
        // TODO Auto-generated catch block
282
        e.printStackTrace();
×
283
      }
×
284
      it = paramsPage.iterator();
×
285
      it.forEachRemaining(
×
286
          p -> {
287
            //            System.out.println("p-->" + p.getQualifiedName());
288

289
            for (var m : p.getType().getMemberList()) {
×
290
              //              System.out.println("p member-->" + m.getName());
291
            }
×
292
          });
×
293
    }
294
  }
1✔
295

296
  /**
297
   * Initializes all of the subscriptions to the servers such as event and parameter subscriptions.
298
   * Always call this AFTER the websocket connection to YAMCS has been established. Ideally inside
299
   * the connected() method of a org.yamcs.client.ConnectionListener. Otherwise, one might cause a
300
   * race between the time we "connect" via the websocket and the time we create these
301
   * subscriptions.
302
   *
303
   * @param yamcsClient
304
   * @param serverName
305
   */
306
  // TODO:This shoud return whether or not the instance activated successfully.
307
  public void activate(YamcsClient yamcsClient, String serverName) {
308
    initProcessorClient(yamcsClient);
1✔
309
    initYamcsSubscriptionService(yamcsClient, serverName, "realtime");
1✔
310
    initEventSubscription(yamcsClient, serverName);
1✔
311
    initLinkSubscription(yamcsClient, serverName);
1✔
312
    initMDBParameterRDequest(yamcsClient, serverName);
1✔
313
    initTMStats(yamcsClient);
1✔
314

315
    missionDatabase = loadMissionDatabase(yamcsClient);
1✔
316

317
    try {
318
      initCommandOptions(yamcsClient);
1✔
319
    } catch (InterruptedException e) {
×
320
      // TODO Auto-generated catch block
321
      e.printStackTrace();
×
322
      return;
×
323
    } catch (ExecutionException e) {
×
324
      // TODO Auto-generated catch block
325
      e.printStackTrace();
×
326
      return;
×
327
    }
1✔
328
    instanceState = CMDR_YamcsInstanceState.ACTIVATED;
1✔
329
  }
1✔
330

331
  public void subscribeTMStats(YamcsClient yamcsClient, Consumer<ProcessingStatistics> consumer) {
332
    //    TODO:Don't use the YAMCS thread pool. Use the Java one.
333
    timer = YamcsServer.getServer().getThreadPoolExecutor();
1✔
334

335
    //    Make "realtime configurable"
336

337
    timer.scheduleAtFixedRate(
1✔
338
        () -> {
339
          System.out.println("scheduleAtFixedRate1");
×
340
          YamcsServer.getServer();
×
341
          System.out.println("scheduleAtFixedRate2:" + YamcsServer.getServer());
×
342
          System.out.println("Instance:" + getName());
×
343
          var instance = YamcsServer.getServer().getInstance(getName());
×
344

345
          System.out.println("scheduleAtFixedRate3:" + instance);
×
346
          YamcsServer.getServer().getInstance(getName()).getProcessor("realtime");
×
347

348
          System.out.println("scheduleAtFixedRate4");
×
349
          ProcessingStatistics ps =
350
              YamcsServer.getServer()
×
351
                  .getInstance(getName())
×
352
                  .getProcessor("realtime")
×
353
                  .getTmProcessor()
×
354
                  .getStatistics();
×
355
          //           ps =
356
          //              YamcsServer.getServer()
357
          //                  .getInstance(getName())
358
          //                  .getProcessor("realtime")
359
          //                  .getTmProcessor()
360
          //                  .getStatistics();
361
          System.out.println("scheduleAtFixedRate5:" + ps);
×
362
          consumer.accept(ps);
×
363
        },
×
364
        1,
365
        1,
366
        TimeUnit.SECONDS);
367
  }
1✔
368

369
  public void initTMStats(YamcsClient yamcsClient) {
370
    System.out.println("initTMStats**************88");
1✔
371
    //
372
    //    ManagementListener listener = new ManagementListener() {
373
    //        @Override
374
    //        public void statisticsUpdated(Processor statsProcessor, Statistics stats) {
375
    //            if (statsProcessor.getName().equals()) {
376
    //                observer.next(stats);
377
    //            }
378
    //        }
379
    //    };
380
    //    observer.setCancelHandler(() ->
381
    // ManagementService.getInstance().removeManagementListener(listener));
382
    //    ManagementService.getInstance().addManagementListener(listener);
383

384
    //
385
    subscribeTMStats(
1✔
386
        yamcsClient,
387
        stats -> {
388
          packets.clear();
×
389
          System.out.println("stats...");
×
390
          for (var s : stats.snapshot()) {
×
391
            packets.add(s);
×
392
          }
×
393
        });
×
394
  }
1✔
395

396
  private void initPacketSubscription(YamcsClient yamcsClient) {
397
    PacketSubscription subscription = yamcsClient.createPacketSubscription();
×
398
    //    yamcsClient.createProcessorClient(OBJECT_TYPE, OBJECT_TYPE)
399
    subscription.addMessageListener(
×
400
        new MessageListener<TmPacketData>() {
×
401

402
          @Override
403
          public void onMessage(TmPacketData message) {
404
            TmPacket pwt =
×
405
                new TmPacket(
406
                    TimeEncoding.fromProtobufTimestamp(message.getReceptionTime()),
×
407
                    TimeEncoding.fromProtobufTimestamp(message.getGenerationTime()),
×
408
                    message.getSequenceNumber(),
×
409
                    message.getPacket().toByteArray());
×
410
            //            packetsTable.packetReceived(pwt);
411
          }
×
412

413
          @Override
414
          public void onError(Throwable t) {
415
            //            showError("Error subscribing: " + t.getMessage());
416
          }
×
417
        });
418

419
    //    subscription.sendMessage(SubscribeTMStatisticsRequest.newBuilder()
420
    //            .setInstance(getName())
421
    ////            .setStream(connectData.streamName)
422
    //            .build());
423
  }
×
424

425
  public MissionDatabase getMissionDatabase() {
426
    return missionDatabase;
×
427
  }
428

429
  private void initCommandOptions(YamcsClient yamcsClient)
430
      throws InterruptedException, ExecutionException {
431
    GetServerInfoResponse info = yamcsClient.getServerInfo().get();
1✔
432
    System.out.println("initCommandOptions-->1");
1✔
433
    for (CommandOptionInfo o : info.getCommandOptionsList()) {
1✔
434
      extraCommandArgs.put(o.getId(), o);
×
435

436
      // Eventually check the type and create Commandoption accordingly
437
      optionsList.add(new CommandOption(o.getId(), ""));
×
438
      System.out.println("initCommandOptions-->2" + optionsList);
×
439
    }
×
440
    System.out.println("initCommandOptions-->3");
1✔
441
  }
1✔
442

443
  public void deActivate(YamcsClient yamcsClient, String serverName) {
444
    // TODO:unInit resources...
445
    instanceState = CMDR_YamcsInstanceState.DEACTIVATED;
1✔
446
    if (eventSubscription != null) {
1✔
447
      eventSubscription.cancel(true);
1✔
448
      paramSubscriptionService.destroy();
1✔
449
    }
450
  }
1✔
451

452
  public EventSubscription getEventSubscription() {
453
    return eventSubscription;
1✔
454
  }
455

456
  public void subscribePV(YamcsPV pv) {
457
    // TODO:Have to let the caller know whether were able to successfully subscribe
458
    // to this pv or not.
459
    paramSubscriptionService.register(pv);
×
460
  }
×
461

462
  /** Creates and publishes an event to YAMCS instance. */
463
  public void publishEvent(String message, YamcsClient yamcsClient) {
464
    yamcsClient.createEvent(
×
465
        CreateEventRequest.newBuilder()
×
466
            .setInstance(getName())
×
467
            .setMessage(message)
×
468
            .setSource("Commander")
×
469
            .build());
×
470
  }
×
471

472
  public ArrayList<String> getProcessors(YamcsClient yamcsClient) {
473

474
    ArrayList<String> processors = new ArrayList<String>();
×
475
    try {
476
      yamcsClient
×
477
          .listProcessors(getName())
×
478
          .get()
×
479
          .forEach(
×
480
              p -> {
481
                processors.add(p.getName());
×
482
              });
×
483
    } catch (InterruptedException | ExecutionException e) {
×
484
      // TODO Auto-generated catch block
485
      e.printStackTrace();
×
486
    }
×
487

488
    return processors;
×
489
  }
490

491
  public void switchProcessor(YamcsClient yamcsClient, String serverName, String processorName) {
492
    //          This seems redundant....
493
    paramSubscriptionService.destroy();
×
494
    initYamcsSubscriptionService(yamcsClient, serverName, processorName);
×
495
  }
×
496

497
  public void getParameters(
498
      YamcsClient yamcsClient,
499
      List<String> parameters,
500
      Instant start,
501
      Instant end,
502
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
503

504
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
505
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
506
    for (var p : parameters) {
×
507
      try {
508
        pages.add(this.getYamcsArchiveClient().listValues(p, start, end).get());
×
509
      } catch (InterruptedException | ExecutionException e) {
×
510
        // TODO Auto-generated catch block
511
        e.printStackTrace();
×
512
      }
×
513
    }
×
514

515
    consumer.accept(pages);
×
516
  }
×
517

518
  public void getParameter(
519
      YamcsClient yamcsClient,
520
      String parameter,
521
      Instant start,
522
      Instant end,
523
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
524

525
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
526
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
527
    try {
528
      pages.add(this.getYamcsArchiveClient().listValues(parameter, start, end).get());
×
529
    } catch (InterruptedException | ExecutionException e) {
×
530
      // TODO Auto-generated catch block
531
      e.printStackTrace();
×
532
    }
×
533

534
    consumer.accept(pages);
×
535
  }
×
536

537
  public boolean isLinkActive(String linkName) {
538
    return Duration.between(Instant.now(), LastUpdateLinks.get(linkName)).toMillis() < 1000;
×
539
  }
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc