• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #100

pending completion
#100

push

web-flow
Merge pull request #37 from WindhoverLabs/realtime_udp_replay

-Add realtime support to ReplayToUDPService

24 of 24 new or added lines in 1 file covered. (100.0%)

0 of 5283 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/archive/ReplayToUDPService.java
1
package com.windhoverlabs.yamcs.archive;
2

3
import com.google.protobuf.Timestamp;
4
import com.google.protobuf.util.Timestamps;
5
import com.windhoverlabs.yamcs.archive.api.SimMessage;
6
import java.io.IOException;
7
import java.net.DatagramPacket;
8
import java.net.DatagramSocket;
9
import java.net.InetAddress;
10
import java.net.SocketException;
11
import java.net.UnknownHostException;
12
import java.text.ParseException;
13
import java.util.HashMap;
14
import java.util.List;
15
import java.util.Map;
16
import java.util.Map.Entry;
17
import java.util.stream.Collectors;
18
import org.yamcs.AbstractYamcsService;
19
import org.yamcs.ConfigurationException;
20
import org.yamcs.InitException;
21
import org.yamcs.Processor;
22
import org.yamcs.ProcessorException;
23
import org.yamcs.ProcessorFactory;
24
import org.yamcs.ValidationException;
25
import org.yamcs.YConfiguration;
26
import org.yamcs.archive.ReplayOptions;
27
import org.yamcs.client.ClientException;
28
import org.yamcs.client.ConnectionListener;
29
import org.yamcs.client.ParameterSubscription;
30
import org.yamcs.client.YamcsClient;
31
import org.yamcs.logging.Log;
32
import org.yamcs.protobuf.Pvalue.ParameterValue;
33
import org.yamcs.protobuf.SubscribeParametersRequest;
34
import org.yamcs.protobuf.SubscribeParametersRequest.Action;
35
import org.yamcs.protobuf.Yamcs.EndAction;
36
import org.yamcs.protobuf.Yamcs.NamedObjectId;
37
import org.yamcs.protobuf.Yamcs.ReplayRequest;
38

39
public class ReplayToUDPService extends AbstractYamcsService
×
40
    implements ParameterSubscription.Listener, ConnectionListener {
41
  private ReplayOptions replayOptions;
42
  private String processorName;
43
  private Processor processor;
44

45
  private String start;
46
  private boolean reverse; // TODO: Add to conifg
47
  private String stop;
48
  private Timestamp startTimeStamp;
49
  private Timestamp stopTimeStamp;
50

51
  private ParameterSubscription subscription;
52

53
  private HashMap<String, ParameterValue> paramsToSend = new HashMap<String, ParameterValue>();
×
54

55
  private YamcsClient yclient;
56

57
  private DatagramSocket outSocket;
58
  private Map<String, String> pvMap;
59
  private String yamcsHost;
60
  private int yamcsPort;
61

62
  private String udpHost;
63
  private int udpPort;
64
  private InetAddress udpAddress;
65

66
  private boolean realtime;
67

68
  @Override
69
  public void init(String yamcsInstance, String serviceName, YConfiguration config)
70
      throws InitException {
71
    this.yamcsInstance = yamcsInstance;
×
72
    this.serviceName = serviceName;
×
73
    this.config = config;
×
74
    //    TODO:Probably a much easier way of doing this...
75
    //    YamcsServer.getServer().getInstance(yamcsInstance).addProcessor(processor);
76
    //    YamcsServer.getServer().getInstance(yamcsInstance)
77

78
    realtime = this.config.getBoolean("realtime", false);
×
79
    start = this.config.getString("start");
×
80
    stop = this.config.getString("stop");
×
81

82
    pvMap = this.config.getMap("pvMap");
×
83

84
    yamcsHost = this.getConfig().getString("yamcsHost", "http://localhost");
×
85
    yamcsPort = this.getConfig().getInt("yamcsPort", 8090);
×
86

87
    udpHost = this.getConfig().getString("udpHost", "172.16.100.237");
×
88
    udpPort = this.getConfig().getInt("udpPort", 8000);
×
89

90
    log = new Log(getClass(), yamcsInstance);
×
91

92
    try {
93
      startTimeStamp = Timestamps.parse(start);
×
94
      stopTimeStamp = Timestamps.parse(stop);
×
95
    } catch (ParseException e) {
×
96
      // TODO Auto-generated catch block
97
      e.printStackTrace();
×
98
    }
×
99

100
    //    TODO:Add these options to YAML
101

102
    if (!this.realtime) {
×
103
      processorName = this.config.getString("processorName", "ReplayToUDPService");
×
104
      replayOptions =
×
105
          new ReplayOptions(
106
              ReplayRequest.newBuilder()
×
107
                  .setStart(startTimeStamp)
×
108
                  .setStop(stopTimeStamp)
×
109
                  .setEndAction(EndAction.LOOP)
×
110
                  .setAutostart(true)
×
111
                  .build());
×
112

113
      try {
114
        processor =
×
115
            ProcessorFactory.create(
×
116
                yamcsInstance,
117
                processorName,
118
                "Archive",
119
                ReplayToUDPService.class.toString(),
×
120
                replayOptions);
121
      } catch (ProcessorException
×
122
          | ConfigurationException
123
          | ValidationException
124
          | InitException e) {
125
        // TODO Auto-generated catch block
126
        e.printStackTrace();
×
127
      }
×
128
    } else {
129
      processorName = "realtime";
×
130
    }
131
  }
×
132

133
  @Override
134
  protected void doStart() {
135
    // TODO Auto-generated method stub
136
    if (!realtime) {
×
137
      log.info("Starting new processor '{}'", processor.getName());
×
138
      processor.startAsync();
×
139
      processor.awaitRunning();
×
140
    }
141

142
    //    TODO: This is unnecessarily complicated
143
    yclient =
×
144
        YamcsClient.newBuilder(yamcsHost + ":" + yamcsPort)
×
145
            //            .withConnectionAttempts(config.getInt("connectionAttempts", 20))
146
            //            .withRetryDelay(reconnectionDelay)
147
            //            .withVerifyTls(config.getBoolean("verifyTls", true))
148
            .build();
×
149
    yclient.addConnectionListener(this);
×
150

151
    try {
152
      yclient.connectWebSocket();
×
153
    } catch (ClientException e) {
×
154
      // TODO Auto-generated catch block
155
      e.printStackTrace();
×
156
    }
×
157

158
    try {
159
      outSocket = new DatagramSocket();
×
160
    } catch (SocketException e) {
×
161
      // TODO Auto-generated catch block
162
      e.printStackTrace();
×
163
    }
×
164

165
    try {
166
      udpAddress = InetAddress.getByName(udpHost);
×
167
    } catch (UnknownHostException e) {
×
168
      // TODO Auto-generated catch block
169
      e.printStackTrace();
×
170
    }
×
171
    ;
172

173
    notifyStarted();
×
174
  }
×
175

176
  public static NamedObjectId identityOf(String pvName) {
177
    return NamedObjectId.newBuilder().setName(pvName).build();
×
178
  }
179

180
  /** Async adds a Yamcs PV for receiving updates. */
181
  public void register(String pvName) {
182
    NamedObjectId id = identityOf(pvName);
×
183
    try {
184
      subscription.sendMessage(
×
185
          SubscribeParametersRequest.newBuilder()
×
186
              .setInstance(this.yamcsInstance)
×
187
              .setProcessor(processorName)
×
188
              .setSendFromCache(true)
×
189
              .setAbortOnInvalid(false)
×
190
              .setUpdateOnExpiration(false)
×
191
              .addId(id)
×
192
              .setAction(Action.ADD)
×
193
              .build());
×
194
    } catch (Exception e) {
×
195
      System.out.println("e:" + e);
×
196
    }
×
197
  }
×
198

199
  @Override
200
  protected void doStop() {
201
    // TODO Auto-generated method stub
202
    if (!realtime) {
×
203
      processor.doStop();
×
204
      outSocket.close();
×
205
    }
206
    notifyStopped();
×
207
  }
×
208

209
  @Override
210
  public void onData(List<ParameterValue> values) {
211
    // TODO Auto-generated method stub
212
    for (ParameterValue p : values) {
×
213
      if (pvMap.containsValue(p.getId().getName())) {
×
214
        String pvLabel =
×
215
            pvMap.entrySet().stream()
×
216
                .filter(entry -> entry.getValue().equals(p.getId().getName()))
×
217
                .map(Entry::getKey)
×
218
                .collect(Collectors.toList())
×
219
                .get(0);
×
220
        paramsToSend.put(pvLabel, p);
×
221
      }
222
    }
×
223
    SimMessage.VehicleStateMessage.Builder msgBuilder = SimMessage.VehicleStateMessage.newBuilder();
×
224
    for (Map.Entry<String, ParameterValue> pSet : paramsToSend.entrySet()) {
×
225
      org.yamcs.protobuf.Yamcs.Value pv = pSet.getValue().getEngValue();
×
226
      switch (pv.getType()) {
×
227
        case AGGREGATE:
228
          break;
×
229
        case ARRAY:
230
          break;
×
231
        case BINARY:
232
          break;
×
233
        case BOOLEAN:
234
          break;
×
235
        case DOUBLE:
236
          msgBuilder.setField(
×
237
              SimMessage.VehicleStateMessage.getDescriptor().findFieldByName(pSet.getKey()),
×
238
              pv.getDoubleValue());
×
239
          break;
×
240
        case ENUMERATED:
241
          break;
×
242
        case FLOAT:
243
          msgBuilder.setField(
×
244
              SimMessage.VehicleStateMessage.getDescriptor().findFieldByName(pSet.getKey()),
×
245
              pv.getFloatValue());
×
246
          break;
×
247
        case NONE:
248
          break;
×
249
        case SINT32:
250
          break;
×
251
        case SINT64:
252
          break;
×
253
        case STRING:
254
          break;
×
255
        case TIMESTAMP:
256
          break;
×
257
        case UINT32:
258
          break;
×
259
        case UINT64:
260
          break;
×
261
        default:
262
          break;
263
      }
264
    }
×
265
    SimMessage.VehicleStateMessage msg = msgBuilder.build();
×
266
    DatagramPacket dtg =
×
267
        new DatagramPacket(msg.toByteArray(), msg.toByteArray().length, udpAddress, udpPort);
×
268

269
    try {
270
      outSocket.send(dtg);
×
271
    } catch (IOException e) {
×
272
      // TODO Auto-generated catch block
273
      e.printStackTrace();
×
274
    }
×
275
  }
×
276

277
  @Override
278
  public void connecting() {
279
    // TODO Auto-generated method stub
280

281
  }
×
282

283
  public void connected() {
284
    //          TODO:Send Event instead?
285
    //    System.out.println("*****connected*****");
286
    subscription = yclient.createParameterSubscription();
×
287
    subscription.addListener(this);
×
288
    // TODO:Make this configurable
289
    for (Map.Entry<String, String> pvName : pvMap.entrySet()) {
×
290
      register(pvName.getValue());
×
291
    }
×
292
  }
×
293

294
  @Override
295
  public void connectionFailed(Throwable cause) {
296
    // TODO Auto-generated method stub
297

298
  }
×
299

300
  @Override
301
  public void disconnected() {
302
    // TODO Auto-generated method stub
303

304
  }
×
305
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc