• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #91

22 Sep 2023 09:12PM UTC coverage: 16.587% (+0.002%) from 16.585%
#91

push

lorenzo-gomez-windhover
Merge branch '88_out_of_memory_issues' of github.com:WindhoverLabs/phoebus into 88_out_of_memory_issues

8 of 8 new or added lines in 1 file covered. (100.0%)

17815 of 107401 relevant lines covered (16.59%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

45.81
/core/commander-core/src/main/java/com/windhoverlabs/yamcs/core/CMDR_YamcsInstance.java
1
package com.windhoverlabs.yamcs.core;
2

3
import com.windhoverlabs.pv.yamcs.YamcsPV;
4
import com.windhoverlabs.pv.yamcs.YamcsSubscriptionService;
5
import com.windhoverlabs.yamcs.core.YamcsWebSocketClient.TmStatistics;
6
import java.time.Duration;
7
import java.time.Instant;
8
import java.util.ArrayList;
9
import java.util.HashMap;
10
import java.util.List;
11
import java.util.concurrent.ExecutionException;
12
import java.util.concurrent.ScheduledThreadPoolExecutor;
13
import java.util.function.Consumer;
14
import java.util.logging.Logger;
15
import javafx.collections.FXCollections;
16
import javafx.collections.ObservableList;
17
import org.yamcs.TmPacket;
18
import org.yamcs.client.EventSubscription;
19
import org.yamcs.client.LinkSubscription;
20
import org.yamcs.client.MessageListener;
21
import org.yamcs.client.PacketSubscription;
22
import org.yamcs.client.Page;
23
import org.yamcs.client.YamcsClient;
24
import org.yamcs.client.archive.ArchiveClient;
25
import org.yamcs.client.mdb.MissionDatabaseClient.ListOptions;
26
import org.yamcs.client.processor.ProcessorClient;
27
import org.yamcs.mdb.ProcessingStatistics;
28
import org.yamcs.protobuf.CreateEventRequest;
29
import org.yamcs.protobuf.GetServerInfoResponse;
30
import org.yamcs.protobuf.GetServerInfoResponse.CommandOptionInfo;
31
import org.yamcs.protobuf.Mdb.ParameterInfo;
32
import org.yamcs.protobuf.Pvalue.ParameterValue;
33
import org.yamcs.protobuf.SubscribeEventsRequest;
34
import org.yamcs.protobuf.TmPacketData;
35
// import org.yamcs.protobuf.TmStatistics;
36
import org.yamcs.protobuf.links.LinkInfo;
37
import org.yamcs.protobuf.links.SubscribeLinksRequest;
38
import org.yamcs.utils.TimeEncoding;
39

40
// import org.yamcs.protobuf.Event;
41

42
public class CMDR_YamcsInstance extends YamcsObject<YamcsObject<?>> {
43
  public static final Logger logger = Logger.getLogger(CMDR_YamcsInstance.class.getPackageName());
1✔
44
  public static String OBJECT_TYPE = "instance";
1✔
45
  private ProcessorClient yamcsProcessor = null;
1✔
46
  private YamcsSubscriptionService paramSubscriptionService;
47
  private EventSubscription eventSubscription;
48
  private LinkSubscription linkSubscription;
49
  private MissionDatabase missionDatabase;
50
  //  private EventSubscription eventSubscription;
51
  private ArchiveClient yamcsArchiveClient;
52
  private CMDR_YamcsInstanceState instanceState;
53

54
  // TODO:Not sure if we want to have this on every instance and their server...just want it to work
55
  // for now.
56
  // Useful for "special" command link arguments such as cop1Bypass
57
  private HashMap<String, CommandOptionInfo> extraCommandArgs =
1✔
58
      new HashMap<String, CommandOptionInfo>();
59

60
  private ObservableList<CommandOption> optionsList = FXCollections.observableArrayList();
1✔
61
  private ObservableList<CMDR_Event> events = FXCollections.observableArrayList();
1✔
62
  private ObservableList<LinkInfo> links = FXCollections.observableArrayList();
1✔
63
  private ObservableList<TmStatistics> packets = FXCollections.observableArrayList();
1✔
64

65
  public ObservableList<TmStatistics> getPackets() {
66
    return packets;
×
67
  }
68

69
  private HashMap<String, LinkInfo> linksMap = new HashMap<String, LinkInfo>();
1✔
70

71
  public HashMap<String, LinkInfo> getLinksMap() {
72
    return linksMap;
×
73
  }
74

75
  private HashMap<String, Boolean> activeInLinks = new HashMap<String, Boolean>();
1✔
76

77
  private HashMap<String, Instant> LastUpdateLinks = new HashMap<String, Instant>();
1✔
78

79
  public HashMap<String, Boolean> getActiveInLinks() {
80
    return activeInLinks;
×
81
  }
82

83
  private HashMap<String, Boolean> activeOutLinks = new HashMap<String, Boolean>();
1✔
84
  private ScheduledThreadPoolExecutor timer;
85
  private YamcsWebSocketClient statsWS;
86

87
  public ObservableList<LinkInfo> getLinks() {
88
    return links;
×
89
  }
90

91
  public CMDR_YamcsInstanceState getInstanceState() {
92
    return instanceState;
×
93
  }
94

95
  // Make this class generic?
96
  public class CommandOption {
97
    private String id;
98
    private String value;
99

100
    public CommandOption(String newId, String value) {
×
101
      this.id = newId;
×
102
      this.value = value;
×
103
    }
×
104

105
    public String getValue() {
106
      return this.value;
×
107
    }
108

109
    public String getId() {
110
      return this.id;
×
111
    }
112

113
    public void setValue(String newValue) {
114
      this.value = newValue;
×
115
    }
×
116
  }
117

118
  public ObservableList<CommandOption> getOptionsList() {
119
    return optionsList;
×
120
  }
121

122
  public HashMap<String, CommandOptionInfo> getExtraCommandArgs() {
123
    return extraCommandArgs;
×
124
  }
125

126
  public ArchiveClient getYamcsArchiveClient() {
127
    return yamcsArchiveClient;
1✔
128
  }
129

130
  public ObservableList<CMDR_Event> getEvents() {
131
    return events;
1✔
132
  }
133

134
  public ProcessorClient getYamcsProcessor() {
135
    return yamcsProcessor;
1✔
136
  }
137

138
  public CMDR_YamcsInstance(String name) {
139
    super(name);
1✔
140
  }
1✔
141

142
  @Override
143
  public ObservableList<YamcsObject<?>> getItems() {
144
    return FXCollections.emptyObservableList();
1✔
145
  }
146

147
  @Override
148
  public void createAndAddChild(String name) {
149
    throw new IllegalStateException("CMDR_YamcsInstance does not allow child items");
1✔
150
  }
151

152
  @Override
153
  public String getObjectType() {
154
    return OBJECT_TYPE;
1✔
155
  }
156

157
  protected void initProcessorClient(YamcsClient yamcsClient) {
158
    yamcsProcessor = yamcsClient.createProcessorClient(getName(), "realtime");
1✔
159
  }
1✔
160

161
  protected void initYamcsSubscriptionService(
162
      YamcsClient yamcsClient, String serverName, String procesor) {
163
    paramSubscriptionService =
1✔
164
        new YamcsSubscriptionService(
165
            yamcsClient.createParameterSubscription(), serverName, this.getName(), procesor);
1✔
166
  }
1✔
167

168
  protected void initLinkSubscription(YamcsClient yamcsClient, String serverName) {
169
    linkSubscription = yamcsClient.createLinkSubscription();
1✔
170
    linkSubscription.addMessageListener(
1✔
171
        linkEvent -> {
172
          switch (linkEvent.getType()) {
1✔
173
            case REGISTERED:
174
            case UPDATED:
175
              {
176
                var link = linkEvent.getLinkInfo();
1✔
177
                LinkInfo linkFromList = null;
1✔
178

179
                LastUpdateLinks.put(link.getName(), Instant.now());
1✔
180

181
                linksMap.put(link.getName(), link);
1✔
182

183
                boolean linkExistsInlList = false;
1✔
184

185
                for (var l : links) {
1✔
186
                  if (l != null) {
1✔
187
                    if (l.getName().equals(link.getName())) {
1✔
188
                      linkFromList = l;
×
189
                      linkExistsInlList = true;
×
190
                    }
191
                  }
192
                }
1✔
193

194
                if (linkExistsInlList) {
1✔
195
                  links.remove(linkFromList);
×
196
                }
197
                links.add(linksMap.get(link.getName()));
1✔
198
              }
199

200
              break;
1✔
201
            case UNREGISTERED:
202
              //               TODO but not currently sent by Yamcs
203
          }
204
        });
1✔
205

206
    linkSubscription.sendMessage(SubscribeLinksRequest.newBuilder().setInstance(getName()).build());
1✔
207
  }
1✔
208

209
  protected void initEventSubscription(YamcsClient yamcsClient, String serverName) {
210
    eventSubscription = yamcsClient.createEventSubscription();
1✔
211
    eventSubscription.addMessageListener(
1✔
212
        event -> {
213
          events.add(
×
214
              new CMDR_Event(
215
                  event.getMessage(),
×
216
                  Instant.ofEpochSecond(
×
217
                      event.getGenerationTime().getSeconds(), event.getGenerationTime().getNanos()),
×
218
                  event.getSeverity(),
×
219
                  event.getType(),
×
220
                  Instant.ofEpochSecond(
×
221
                      event.getReceptionTime().getSeconds(), event.getReceptionTime().getNanos()),
×
222
                  event.getSource(),
×
223
                  this.getName()));
×
224
        });
×
225

226
    yamcsArchiveClient = yamcsClient.createArchiveClient(getName());
1✔
227

228
    eventSubscription.sendMessage(
1✔
229
        SubscribeEventsRequest.newBuilder().setInstance(getName()).build());
1✔
230
  }
1✔
231

232
  private MissionDatabase loadMissionDatabase(YamcsClient client) {
233
    var missionDatabase = new MissionDatabase();
1✔
234

235
    var mdbClient = client.createMissionDatabaseClient(getName());
1✔
236

237
    try {
238
      var page = mdbClient.listParameters(ListOptions.limit(500)).get();
1✔
239
      page.iterator().forEachRemaining(missionDatabase::addParameter);
1✔
240
      while (page.hasNextPage()) {
1✔
241
        page = page.getNextPage().get();
×
242
        page.iterator().forEachRemaining(missionDatabase::addParameter);
×
243
      }
244

245
      var commandPage = mdbClient.listCommands(ListOptions.limit(200)).get();
1✔
246
      commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
247
      while (commandPage.hasNextPage()) {
1✔
248
        commandPage = commandPage.getNextPage().get();
1✔
249
        commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
250
      }
251
    } catch (Exception e) {
×
252
      e.printStackTrace();
×
253
      //          throw new Exception("Failed to load mission database", e);
254
    }
1✔
255
    return missionDatabase;
1✔
256
  }
257

258
  protected void initMDBParameterRDequest(YamcsClient yamcsClient, String serverName) {
259
    var mdb = yamcsClient.createMissionDatabaseClient(getName()).listParameters();
1✔
260
    Page<ParameterInfo> paramsPage = null;
1✔
261
    try {
262
      paramsPage = mdb.get();
1✔
263
    } catch (InterruptedException | ExecutionException e) {
×
264
      // TODO Auto-generated catch block
265
      e.printStackTrace();
×
266
    }
1✔
267
    var it = paramsPage.iterator();
1✔
268
    it.forEachRemaining(
1✔
269
        p -> {
270
          //          System.out.println("p-->" + p.getQualifiedName());
271

272
          for (var m : p.getType().getMemberList()) {
1✔
273
            //            System.out.println("p member-->" + m.getName());
274
          }
1✔
275
        });
1✔
276
    while (paramsPage.hasNextPage()) {
1✔
277
      //      var it = paramsPage.iterator();
278
      try {
279
        paramsPage = paramsPage.getNextPage().get();
×
280
      } catch (InterruptedException | ExecutionException e) {
×
281
        // TODO Auto-generated catch block
282
        e.printStackTrace();
×
283
      }
×
284
      it = paramsPage.iterator();
×
285
      it.forEachRemaining(
×
286
          p -> {
287
            //            System.out.println("p-->" + p.getQualifiedName());
288

289
            for (var m : p.getType().getMemberList()) {
×
290
              //              System.out.println("p member-->" + m.getName());
291
            }
×
292
          });
×
293
    }
294
  }
1✔
295

296
  /**
297
   * Initializes all of the subscriptions to the servers such as event and parameter subscriptions.
298
   * Always call this AFTER the websocket connection to YAMCS has been established. Ideally inside
299
   * the connected() method of a org.yamcs.client.ConnectionListener. Otherwise, one might cause a
300
   * race between the time we "connect" via the websocket and the time we create these
301
   * subscriptions.
302
   *
303
   * @param yamcsClient
304
   * @param serverName
305
   */
306
  // TODO:This shoud return whether or not the instance activated successfully.
307
  public void activate(YamcsClient yamcsClient, String serverName) {
308
    initProcessorClient(yamcsClient);
1✔
309
    initYamcsSubscriptionService(yamcsClient, serverName, "realtime");
1✔
310
    initEventSubscription(yamcsClient, serverName);
1✔
311
    initLinkSubscription(yamcsClient, serverName);
1✔
312
    initMDBParameterRDequest(yamcsClient, serverName);
1✔
313
    initTMStats(yamcsClient);
1✔
314

315
    missionDatabase = loadMissionDatabase(yamcsClient);
1✔
316

317
    try {
318
      initCommandOptions(yamcsClient);
1✔
319
    } catch (InterruptedException e) {
×
320
      // TODO Auto-generated catch block
321
      e.printStackTrace();
×
322
      return;
×
323
    } catch (ExecutionException e) {
×
324
      // TODO Auto-generated catch block
325
      e.printStackTrace();
×
326
      return;
×
327
    }
1✔
328
    instanceState = CMDR_YamcsInstanceState.ACTIVATED;
1✔
329
  }
1✔
330

331
  public void subscribeTMStats(YamcsClient yamcsClient, Consumer<ProcessingStatistics> consumer) {
332
    //    TODO:Don't use the YAMCS thread pool. Use the Java one.
333
    //    timer = YamcsServer.getServer().getThreadPoolExecutor();
334
    //
335
    //    //    Make "realtime configurable"
336
    //
337
    //    timer.scheduleAtFixedRate(
338
    //        () -> {
339
    //          System.out.println("scheduleAtFixedRate1");
340
    //          YamcsServer.getServer();
341
    //          System.out.println("scheduleAtFixedRate2:" + YamcsServer.getServer());
342
    //          System.out.println("Instance:" + getName());
343
    //          var instance = YamcsServer.getServer().getInstance(getName());
344
    //
345
    //          System.out.println("scheduleAtFixedRate3:" + instance);
346
    //          YamcsServer.getServer().getInstance(getName()).getProcessor("realtime");
347
    //
348
    //          System.out.println("scheduleAtFixedRate4");
349
    //          ProcessingStatistics ps =
350
    //              YamcsServer.getServer()
351
    //                  .getInstance(getName())
352
    //                  .getProcessor("realtime")
353
    //                  .getTmProcessor()
354
    //                  .getStatistics();
355
    //          //           ps =
356
    //          //              YamcsServer.getServer()
357
    //          //                  .getInstance(getName())
358
    //          //                  .getProcessor("realtime")
359
    //          //                  .getTmProcessor()
360
    //          //                  .getStatistics();
361
    //          System.out.println("scheduleAtFixedRate5:" + ps);
362
    //          consumer.accept(ps);
363
    //        },
364
    //        1,
365
    //        1,
366
    //        TimeUnit.SECONDS);
367
    //          TODO:Add the API call to YMACS server side
368
  }
×
369

370
  public void initTMStats(YamcsClient yamcsClient) {
371
    try {
372
      statsWS =
1✔
373
          new YamcsWebSocketClient(
374
              stats -> {
375
                if (stats != null) {
1✔
376
                  packets.clear();
×
377
                  for (TmStatistics s : stats) {
×
378
                    packets.add(s);
×
379
                  }
×
380
                }
381
              },
1✔
382
              yamcsClient.getHost(),
1✔
383
              yamcsClient.getPort(),
1✔
384
              getName(),
1✔
385
              yamcsClient.listProcessors(getName()).get().get(0).getName());
1✔
386
    } catch (InterruptedException e) {
×
387
      // TODO Auto-generated catch block
388
      e.printStackTrace();
×
389
    } catch (ExecutionException e) {
×
390
      // TODO Auto-generated catch block
391
      e.printStackTrace();
×
392
    }
1✔
393

394
    //
395
    //    subscribeTMStats(
396
    //        yamcsClient,
397
    //        stats -> {
398
    //          packets.clear();
399
    //          System.out.println("stats...");
400
    //          for (var s : stats.snapshot()) {
401
    //            packets.add(s);
402
    //          }
403
    //        });
404
  }
1✔
405

406
  private void initPacketSubscription(YamcsClient yamcsClient) {
407
    PacketSubscription subscription = yamcsClient.createPacketSubscription();
×
408
    //    yamcsClient.createProcessorClient(OBJECT_TYPE, OBJECT_TYPE)
409
    subscription.addMessageListener(
×
410
        new MessageListener<TmPacketData>() {
×
411

412
          @Override
413
          public void onMessage(TmPacketData message) {
414
            TmPacket pwt =
×
415
                new TmPacket(
416
                    TimeEncoding.fromProtobufTimestamp(message.getReceptionTime()),
×
417
                    TimeEncoding.fromProtobufTimestamp(message.getGenerationTime()),
×
418
                    message.getSequenceNumber(),
×
419
                    message.getPacket().toByteArray());
×
420
            //            packetsTable.packetReceived(pwt);
421
          }
×
422

423
          @Override
424
          public void onError(Throwable t) {
425
            //            showError("Error subscribing: " + t.getMessage());
426
          }
×
427
        });
428

429
    //    subscription.sendMessage(SubscribeTMStatisticsRequest.newBuilder()
430
    //            .setInstance(getName())
431
    ////            .setStream(connectData.streamName)
432
    //            .build());
433
  }
×
434

435
  public MissionDatabase getMissionDatabase() {
436
    return missionDatabase;
×
437
  }
438

439
  private void initCommandOptions(YamcsClient yamcsClient)
440
      throws InterruptedException, ExecutionException {
441
    GetServerInfoResponse info = yamcsClient.getServerInfo().get();
1✔
442
    System.out.println("initCommandOptions-->1");
1✔
443
    for (CommandOptionInfo o : info.getCommandOptionsList()) {
1✔
444
      extraCommandArgs.put(o.getId(), o);
×
445

446
      // Eventually check the type and create Commandoption accordingly
447
      optionsList.add(new CommandOption(o.getId(), ""));
×
448
      System.out.println("initCommandOptions-->2" + optionsList);
×
449
    }
×
450
    System.out.println("initCommandOptions-->3");
1✔
451
  }
1✔
452

453
  public void deActivate(YamcsClient yamcsClient, String serverName) {
454
    // TODO:unInit resources...
455
    instanceState = CMDR_YamcsInstanceState.DEACTIVATED;
1✔
456
    if (eventSubscription != null) {
1✔
457
      eventSubscription.cancel(true);
1✔
458
      paramSubscriptionService.destroy();
1✔
459
    }
460

461
    statsWS.close();
1✔
462
  }
1✔
463

464
  public EventSubscription getEventSubscription() {
465
    return eventSubscription;
1✔
466
  }
467

468
  public void subscribePV(YamcsPV pv) {
469
    // TODO:Have to let the caller know whether were able to successfully subscribe
470
    // to this pv or not.
471
    paramSubscriptionService.register(pv);
×
472
  }
×
473

474
  /** Creates and publishes an event to YAMCS instance. */
475
  public void publishEvent(String message, YamcsClient yamcsClient) {
476
    yamcsClient.createEvent(
×
477
        CreateEventRequest.newBuilder()
×
478
            .setInstance(getName())
×
479
            .setMessage(message)
×
480
            .setSource("Commander")
×
481
            .build());
×
482
  }
×
483

484
  /** Creates and publishes an event to YAMCS instance. */
485
  public void publishEvent(CMDR_Event e, YamcsClient yamcsClient) {
486
    yamcsClient.createEvent(
×
487
        CreateEventRequest.newBuilder()
×
488
            .setInstance(getName())
×
489
            .setMessage(e.getMessage())
×
490
            .setSource(e.getSource())
×
491
            .setSeverity(e.getSeverity().toString())
×
492
            .build());
×
493
  }
×
494

495
  public ArrayList<String> getProcessors(YamcsClient yamcsClient) {
496

497
    ArrayList<String> processors = new ArrayList<String>();
×
498
    try {
499
      yamcsClient
×
500
          .listProcessors(getName())
×
501
          .get()
×
502
          .forEach(
×
503
              p -> {
504
                processors.add(p.getName());
×
505
              });
×
506
    } catch (InterruptedException | ExecutionException e) {
×
507
      // TODO Auto-generated catch block
508
      e.printStackTrace();
×
509
    }
×
510

511
    return processors;
×
512
  }
513

514
  public void switchProcessor(YamcsClient yamcsClient, String serverName, String processorName) {
515
    //          This seems redundant....
516
    paramSubscriptionService.destroy();
×
517
    initYamcsSubscriptionService(yamcsClient, serverName, processorName);
×
518
  }
×
519

520
  public void getParameters(
521
      YamcsClient yamcsClient,
522
      List<String> parameters,
523
      Instant start,
524
      Instant end,
525
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
526

527
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
528
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
529
    for (var p : parameters) {
×
530
      try {
531
        pages.add(this.getYamcsArchiveClient().listValues(p, start, end).get());
×
532
      } catch (InterruptedException | ExecutionException e) {
×
533
        // TODO Auto-generated catch block
534
        e.printStackTrace();
×
535
      }
×
536
    }
×
537

538
    consumer.accept(pages);
×
539
  }
×
540

541
  public void getParameter(
542
      YamcsClient yamcsClient,
543
      String parameter,
544
      Instant start,
545
      Instant end,
546
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
547

548
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
549
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
550
    try {
551
      pages.add(this.getYamcsArchiveClient().listValues(parameter, start, end).get());
×
552
    } catch (InterruptedException | ExecutionException e) {
×
553
      // TODO Auto-generated catch block
554
      e.printStackTrace();
×
555
    }
×
556

557
    consumer.accept(pages);
×
558
  }
×
559

560
  public boolean isLinkActive(String linkName) {
561
    return Duration.between(Instant.now(), LastUpdateLinks.get(linkName)).toMillis() < 1000;
×
562
  }
563
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc