• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / phoebus / #81

08 Sep 2023 03:40AM UTC coverage: 16.559% (-0.003%) from 16.562%
#81

push

lorenzo-gomez-windhover
-Format files

3 of 3 new or added lines in 1 file covered. (100.0%)

17767 of 107298 relevant lines covered (16.56%)

0.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.44
/core/commander-core/src/main/java/com/windhoverlabs/yamcs/core/CMDR_YamcsInstance.java
1
package com.windhoverlabs.yamcs.core;
2

3
import com.windhoverlabs.pv.yamcs.YamcsPV;
4
import com.windhoverlabs.pv.yamcs.YamcsSubscriptionService;
5
import java.time.Duration;
6
import java.time.Instant;
7
import java.util.ArrayList;
8
import java.util.HashMap;
9
import java.util.List;
10
import java.util.concurrent.ExecutionException;
11
import java.util.concurrent.ScheduledThreadPoolExecutor;
12
import java.util.concurrent.TimeUnit;
13
import java.util.function.Consumer;
14
import java.util.logging.Logger;
15
import javafx.collections.FXCollections;
16
import javafx.collections.ObservableList;
17
import org.yamcs.TmPacket;
18
import org.yamcs.YamcsServer;
19
import org.yamcs.client.EventSubscription;
20
import org.yamcs.client.LinkSubscription;
21
import org.yamcs.client.MessageListener;
22
import org.yamcs.client.PacketSubscription;
23
import org.yamcs.client.Page;
24
import org.yamcs.client.YamcsClient;
25
import org.yamcs.client.archive.ArchiveClient;
26
import org.yamcs.client.mdb.MissionDatabaseClient.ListOptions;
27
import org.yamcs.client.processor.ProcessorClient;
28
import org.yamcs.mdb.ProcessingStatistics;
29
import org.yamcs.protobuf.CreateEventRequest;
30
import org.yamcs.protobuf.GetServerInfoResponse;
31
import org.yamcs.protobuf.GetServerInfoResponse.CommandOptionInfo;
32
import org.yamcs.protobuf.Mdb.ParameterInfo;
33
import org.yamcs.protobuf.Pvalue.ParameterValue;
34
import org.yamcs.protobuf.SubscribeEventsRequest;
35
import org.yamcs.protobuf.TmPacketData;
36
import org.yamcs.protobuf.links.LinkInfo;
37
import org.yamcs.protobuf.links.SubscribeLinksRequest;
38
import org.yamcs.utils.TimeEncoding;
39

40
// import org.yamcs.protobuf.Event;
41

42
public class CMDR_YamcsInstance extends YamcsObject<YamcsObject<?>> {
43
  public static final Logger logger = Logger.getLogger(CMDR_YamcsInstance.class.getPackageName());
1✔
44
  public static String OBJECT_TYPE = "instance";
1✔
45
  private ProcessorClient yamcsProcessor = null;
1✔
46
  private YamcsSubscriptionService paramSubscriptionService;
47
  private EventSubscription eventSubscription;
48
  private LinkSubscription linkSubscription;
49
  private MissionDatabase missionDatabase;
50
  //  private EventSubscription eventSubscription;
51
  private ArchiveClient yamcsArchiveClient;
52
  private CMDR_YamcsInstanceState instanceState;
53

54
  // TODO:Not sure if we want to have this on every instance and their server...just want it to work
55
  // for now.
56
  // Useful for "special" command link arguments such as cop1Bypass
57
  private HashMap<String, CommandOptionInfo> extraCommandArgs =
1✔
58
      new HashMap<String, CommandOptionInfo>();
59

60
  private ObservableList<CommandOption> optionsList = FXCollections.observableArrayList();
1✔
61
  private ObservableList<CMDR_Event> events = FXCollections.observableArrayList();
1✔
62
  private ObservableList<LinkInfo> links = FXCollections.observableArrayList();
1✔
63

64
  private HashMap<String, LinkInfo> linksMap = new HashMap<String, LinkInfo>();
1✔
65

66
  public HashMap<String, LinkInfo> getLinksMap() {
67
    return linksMap;
×
68
  }
69

70
  private HashMap<String, Boolean> activeInLinks = new HashMap<String, Boolean>();
1✔
71

72
  private HashMap<String, Instant> LastUpdateLinks = new HashMap<String, Instant>();
1✔
73

74
  public HashMap<String, Boolean> getActiveInLinks() {
75
    return activeInLinks;
×
76
  }
77

78
  private HashMap<String, Boolean> activeOutLinks = new HashMap<String, Boolean>();
1✔
79
  private ScheduledThreadPoolExecutor timer;
80

81
  public ObservableList<LinkInfo> getLinks() {
82
    return links;
×
83
  }
84

85
  public CMDR_YamcsInstanceState getInstanceState() {
86
    return instanceState;
×
87
  }
88

89
  // Make this class generic?
90
  public class CommandOption {
91
    private String id;
92
    private String value;
93

94
    public CommandOption(String newId, String value) {
×
95
      this.id = newId;
×
96
      this.value = value;
×
97
    }
×
98

99
    public String getValue() {
100
      return this.value;
×
101
    }
102

103
    public String getId() {
104
      return this.id;
×
105
    }
106

107
    public void setValue(String newValue) {
108
      this.value = newValue;
×
109
    }
×
110
  }
111

112
  public ObservableList<CommandOption> getOptionsList() {
113
    return optionsList;
×
114
  }
115

116
  public HashMap<String, CommandOptionInfo> getExtraCommandArgs() {
117
    return extraCommandArgs;
×
118
  }
119

120
  public ArchiveClient getYamcsArchiveClient() {
121
    return yamcsArchiveClient;
1✔
122
  }
123

124
  public ObservableList<CMDR_Event> getEvents() {
125
    return events;
1✔
126
  }
127

128
  public ProcessorClient getYamcsProcessor() {
129
    return yamcsProcessor;
1✔
130
  }
131

132
  public CMDR_YamcsInstance(String name) {
133
    super(name);
1✔
134
  }
1✔
135

136
  @Override
137
  public ObservableList<YamcsObject<?>> getItems() {
138
    return FXCollections.emptyObservableList();
1✔
139
  }
140

141
  @Override
142
  public void createAndAddChild(String name) {
143
    throw new IllegalStateException("CMDR_YamcsInstance does not allow child items");
1✔
144
  }
145

146
  @Override
147
  public String getObjectType() {
148
    return OBJECT_TYPE;
1✔
149
  }
150

151
  protected void initProcessorClient(YamcsClient yamcsClient) {
152
    yamcsProcessor = yamcsClient.createProcessorClient(getName(), "realtime");
1✔
153
  }
1✔
154

155
  protected void initYamcsSubscriptionService(
156
      YamcsClient yamcsClient, String serverName, String procesor) {
157
    paramSubscriptionService =
1✔
158
        new YamcsSubscriptionService(
159
            yamcsClient.createParameterSubscription(), serverName, this.getName(), procesor);
1✔
160
  }
1✔
161

162
  protected void initLinkSubscription(YamcsClient yamcsClient, String serverName) {
163
    linkSubscription = yamcsClient.createLinkSubscription();
1✔
164
    linkSubscription.addMessageListener(
1✔
165
        linkEvent -> {
166
          switch (linkEvent.getType()) {
1✔
167
            case REGISTERED:
168
            case UPDATED:
169
              {
170
                var link = linkEvent.getLinkInfo();
1✔
171
                LinkInfo linkFromList = null;
1✔
172

173
                LastUpdateLinks.put(link.getName(), Instant.now());
1✔
174

175
                linksMap.put(link.getName(), link);
1✔
176

177
                boolean linkExistsInlList = false;
1✔
178

179
                for (var l : links) {
1✔
180
                  if (l != null) {
1✔
181
                    if (l.getName().equals(link.getName())) {
1✔
182
                      linkFromList = l;
×
183
                      linkExistsInlList = true;
×
184
                    }
185
                  }
186
                }
1✔
187

188
                if (linkExistsInlList) {
1✔
189
                  links.remove(linkFromList);
×
190
                }
191
                links.add(linksMap.get(link.getName()));
1✔
192
              }
193

194
              break;
1✔
195
            case UNREGISTERED:
196
              //               TODO but not currently sent by Yamcs
197
          }
198
        });
1✔
199

200
    linkSubscription.sendMessage(SubscribeLinksRequest.newBuilder().setInstance(getName()).build());
1✔
201
  }
1✔
202

203
  protected void initEventSubscription(YamcsClient yamcsClient, String serverName) {
204
    eventSubscription = yamcsClient.createEventSubscription();
1✔
205
    eventSubscription.addMessageListener(
1✔
206
        event -> {
207
          events.add(
×
208
              new CMDR_Event(
209
                  event.getMessage(),
×
210
                  Instant.ofEpochSecond(
×
211
                      event.getGenerationTime().getSeconds(), event.getGenerationTime().getNanos()),
×
212
                  event.getSeverity(),
×
213
                  event.getType(),
×
214
                  Instant.ofEpochSecond(
×
215
                      event.getReceptionTime().getSeconds(), event.getReceptionTime().getNanos()),
×
216
                  event.getSource(),
×
217
                  this.getName()));
×
218
        });
×
219

220
    yamcsArchiveClient = yamcsClient.createArchiveClient(getName());
1✔
221

222
    eventSubscription.sendMessage(
1✔
223
        SubscribeEventsRequest.newBuilder().setInstance(getName()).build());
1✔
224
  }
1✔
225

226
  private MissionDatabase loadMissionDatabase(YamcsClient client) {
227
    var missionDatabase = new MissionDatabase();
1✔
228

229
    var mdbClient = client.createMissionDatabaseClient(getName());
1✔
230

231
    try {
232
      var page = mdbClient.listParameters(ListOptions.limit(500)).get();
1✔
233
      page.iterator().forEachRemaining(missionDatabase::addParameter);
1✔
234
      while (page.hasNextPage()) {
1✔
235
        page = page.getNextPage().get();
×
236
        page.iterator().forEachRemaining(missionDatabase::addParameter);
×
237
      }
238

239
      var commandPage = mdbClient.listCommands(ListOptions.limit(200)).get();
1✔
240
      commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
241
      while (commandPage.hasNextPage()) {
1✔
242
        commandPage = commandPage.getNextPage().get();
1✔
243
        commandPage.iterator().forEachRemaining(missionDatabase::addCommand);
1✔
244
      }
245
    } catch (Exception e) {
×
246
      e.printStackTrace();
×
247
      //          throw new Exception("Failed to load mission database", e);
248
    }
1✔
249
    return missionDatabase;
1✔
250
  }
251

252
  protected void initMDBParameterRDequest(YamcsClient yamcsClient, String serverName) {
253
    var mdb = yamcsClient.createMissionDatabaseClient(getName()).listParameters();
1✔
254
    Page<ParameterInfo> paramsPage = null;
1✔
255
    try {
256
      paramsPage = mdb.get();
1✔
257
    } catch (InterruptedException | ExecutionException e) {
×
258
      // TODO Auto-generated catch block
259
      e.printStackTrace();
×
260
    }
1✔
261
    var it = paramsPage.iterator();
1✔
262
    it.forEachRemaining(
1✔
263
        p -> {
264
          //          System.out.println("p-->" + p.getQualifiedName());
265

266
          for (var m : p.getType().getMemberList()) {
1✔
267
            //            System.out.println("p member-->" + m.getName());
268
          }
1✔
269
        });
1✔
270
    while (paramsPage.hasNextPage()) {
1✔
271
      //      var it = paramsPage.iterator();
272
      try {
273
        paramsPage = paramsPage.getNextPage().get();
×
274
      } catch (InterruptedException | ExecutionException e) {
×
275
        // TODO Auto-generated catch block
276
        e.printStackTrace();
×
277
      }
×
278
      it = paramsPage.iterator();
×
279
      it.forEachRemaining(
×
280
          p -> {
281
            //            System.out.println("p-->" + p.getQualifiedName());
282

283
            for (var m : p.getType().getMemberList()) {
×
284
              //              System.out.println("p member-->" + m.getName());
285
            }
×
286
          });
×
287
    }
288
  }
1✔
289

290
  /**
291
   * Initializes all of the subscriptions to the servers such as event and parameter subscriptions.
292
   * Always call this AFTER the websocket connection to YAMCS has been established. Ideally inside
293
   * the connected() method of a org.yamcs.client.ConnectionListener. Otherwise, one might cause a
294
   * race between the time we "connect" via the websocket and the time we create these
295
   * subscriptions.
296
   *
297
   * @param yamcsClient
298
   * @param serverName
299
   */
300
  // TODO:This shoud return whether or not the instance activated successfully.
301
  public void activate(YamcsClient yamcsClient, String serverName) {
302
    initProcessorClient(yamcsClient);
1✔
303
    initYamcsSubscriptionService(yamcsClient, serverName, "realtime");
1✔
304
    initEventSubscription(yamcsClient, serverName);
1✔
305
    initLinkSubscription(yamcsClient, serverName);
1✔
306
    initMDBParameterRDequest(yamcsClient, serverName);
1✔
307

308
    missionDatabase = loadMissionDatabase(yamcsClient);
1✔
309

310
    try {
311
      initCommandOptions(yamcsClient);
1✔
312
    } catch (InterruptedException e) {
×
313
      // TODO Auto-generated catch block
314
      e.printStackTrace();
×
315
      return;
×
316
    } catch (ExecutionException e) {
×
317
      // TODO Auto-generated catch block
318
      e.printStackTrace();
×
319
      return;
×
320
    }
1✔
321
    instanceState = CMDR_YamcsInstanceState.ACTIVATED;
1✔
322
  }
1✔
323

324
  public void subscribeTMStats(YamcsClient yamcsClient, Consumer<ProcessingStatistics> consumer) {
325
    //    TODO:Don't use the YAMCS thread pool. Use the Java one.
326
    timer = YamcsServer.getServer().getThreadPoolExecutor();
×
327

328
    //    Make "rf_replay configurable"
329

330
    timer.scheduleAtFixedRate(
×
331
        () -> {
332
          ProcessingStatistics ps =
333
              YamcsServer.getServer()
×
334
                  .getInstance(getName())
×
335
                  .getProcessor("rf_replay")
×
336
                  .getTmProcessor()
×
337
                  .getStatistics();
×
338
          consumer.accept(ps);
×
339
        },
×
340
        1,
341
        1,
342
        TimeUnit.SECONDS);
343
  }
×
344

345
  private void initPacketSubscription(YamcsClient yamcsClient) {
346
    PacketSubscription subscription = yamcsClient.createPacketSubscription();
×
347
    subscription.addMessageListener(
×
348
        new MessageListener<TmPacketData>() {
×
349

350
          @Override
351
          public void onMessage(TmPacketData message) {
352
            TmPacket pwt =
×
353
                new TmPacket(
354
                    TimeEncoding.fromProtobufTimestamp(message.getReceptionTime()),
×
355
                    TimeEncoding.fromProtobufTimestamp(message.getGenerationTime()),
×
356
                    message.getSequenceNumber(),
×
357
                    message.getPacket().toByteArray());
×
358
            //            packetsTable.packetReceived(pwt);
359
          }
×
360

361
          @Override
362
          public void onError(Throwable t) {
363
            //            showError("Error subscribing: " + t.getMessage());
364
          }
×
365
        });
366

367
    //    subscription.sendMessage(SubscribeTMStatisticsRequest.newBuilder()
368
    //            .setInstance(getName())
369
    ////            .setStream(connectData.streamName)
370
    //            .build());
371
  }
×
372

373
  public MissionDatabase getMissionDatabase() {
374
    return missionDatabase;
×
375
  }
376

377
  private void initCommandOptions(YamcsClient yamcsClient)
378
      throws InterruptedException, ExecutionException {
379
    GetServerInfoResponse info = yamcsClient.getServerInfo().get();
1✔
380
    System.out.println("initCommandOptions-->1");
1✔
381
    for (CommandOptionInfo o : info.getCommandOptionsList()) {
1✔
382
      extraCommandArgs.put(o.getId(), o);
×
383

384
      // Eventually check the type and create Commandoption accordingly
385
      optionsList.add(new CommandOption(o.getId(), ""));
×
386
      System.out.println("initCommandOptions-->2" + optionsList);
×
387
    }
×
388
    System.out.println("initCommandOptions-->3");
1✔
389
  }
1✔
390

391
  public void deActivate(YamcsClient yamcsClient, String serverName) {
392
    // TODO:unInit resources...
393
    instanceState = CMDR_YamcsInstanceState.DEACTIVATED;
1✔
394
    if (eventSubscription != null) {
1✔
395
      eventSubscription.cancel(true);
1✔
396
      paramSubscriptionService.destroy();
1✔
397
    }
398
  }
1✔
399

400
  public EventSubscription getEventSubscription() {
401
    return eventSubscription;
1✔
402
  }
403

404
  public void subscribePV(YamcsPV pv) {
405
    // TODO:Have to let the caller know whether were able to successfully subscribe
406
    // to this pv or not.
407
    paramSubscriptionService.register(pv);
×
408
  }
×
409

410
  /** Creates and publishes an event to YAMCS instance. */
411
  public void publishEvent(String message, YamcsClient yamcsClient) {
412
    yamcsClient.createEvent(
×
413
        CreateEventRequest.newBuilder()
×
414
            .setInstance(getName())
×
415
            .setMessage(message)
×
416
            .setSource("Commander")
×
417
            .build());
×
418
  }
×
419

420
  public ArrayList<String> getProcessors(YamcsClient yamcsClient) {
421

422
    ArrayList<String> processors = new ArrayList<String>();
×
423
    try {
424
      yamcsClient
×
425
          .listProcessors(getName())
×
426
          .get()
×
427
          .forEach(
×
428
              p -> {
429
                processors.add(p.getName());
×
430
              });
×
431
    } catch (InterruptedException | ExecutionException e) {
×
432
      // TODO Auto-generated catch block
433
      e.printStackTrace();
×
434
    }
×
435

436
    return processors;
×
437
  }
438

439
  public void switchProcessor(YamcsClient yamcsClient, String serverName, String processorName) {
440
    //          This seems redundant....
441
    paramSubscriptionService.destroy();
×
442
    initYamcsSubscriptionService(yamcsClient, serverName, processorName);
×
443
  }
×
444

445
  public void getParameters(
446
      YamcsClient yamcsClient,
447
      List<String> parameters,
448
      Instant start,
449
      Instant end,
450
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
451

452
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
453
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
454
    for (var p : parameters) {
×
455
      try {
456
        pages.add(this.getYamcsArchiveClient().listValues(p, start, end).get());
×
457
      } catch (InterruptedException | ExecutionException e) {
×
458
        // TODO Auto-generated catch block
459
        e.printStackTrace();
×
460
      }
×
461
    }
×
462

463
    consumer.accept(pages);
×
464
  }
×
465

466
  public void getParameter(
467
      YamcsClient yamcsClient,
468
      String parameter,
469
      Instant start,
470
      Instant end,
471
      Consumer<ArrayList<Page<ParameterValue>>> consumer) {
472

473
    //    this.getYamcsArchiveClient().streamValues(parameters, consumer, start, end);
474
    ArrayList<Page<ParameterValue>> pages = new ArrayList<Page<ParameterValue>>();
×
475
    try {
476
      pages.add(this.getYamcsArchiveClient().listValues(parameter, start, end).get());
×
477
    } catch (InterruptedException | ExecutionException e) {
×
478
      // TODO Auto-generated catch block
479
      e.printStackTrace();
×
480
    }
×
481

482
    consumer.accept(pages);
×
483
  }
×
484

485
  public boolean isLinkActive(String linkName) {
486
    return Duration.between(Instant.now(), LastUpdateLinks.get(linkName)).toMillis() < 1000;
×
487
  }
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc