• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #94

pending completion
#94

Pull #36

lorenzo-gomez-windhover
-Bump version
Pull Request #36: Archive to sim

126 of 126 new or added lines in 1 file covered. (100.0%)

0 of 5278 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/archive/ReplayToUDPService.java
1
package com.windhoverlabs.yamcs.archive;
2

3
import com.google.protobuf.Timestamp;
4
import com.google.protobuf.util.Timestamps;
5
import com.windhoverlabs.yamcs.archive.api.SimMessage;
6
import java.io.IOException;
7
import java.net.DatagramPacket;
8
import java.net.DatagramSocket;
9
import java.net.InetAddress;
10
import java.net.SocketException;
11
import java.net.UnknownHostException;
12
import java.text.ParseException;
13
import java.util.HashMap;
14
import java.util.List;
15
import java.util.Map;
16
import java.util.Map.Entry;
17
import java.util.stream.Collectors;
18
import org.yamcs.AbstractYamcsService;
19
import org.yamcs.ConfigurationException;
20
import org.yamcs.InitException;
21
import org.yamcs.Processor;
22
import org.yamcs.ProcessorException;
23
import org.yamcs.ProcessorFactory;
24
import org.yamcs.ValidationException;
25
import org.yamcs.YConfiguration;
26
import org.yamcs.archive.ReplayOptions;
27
import org.yamcs.client.ClientException;
28
import org.yamcs.client.ConnectionListener;
29
import org.yamcs.client.ParameterSubscription;
30
import org.yamcs.client.YamcsClient;
31
import org.yamcs.logging.Log;
32
import org.yamcs.protobuf.Pvalue.ParameterValue;
33
import org.yamcs.protobuf.SubscribeParametersRequest;
34
import org.yamcs.protobuf.SubscribeParametersRequest.Action;
35
import org.yamcs.protobuf.Yamcs.EndAction;
36
import org.yamcs.protobuf.Yamcs.NamedObjectId;
37
import org.yamcs.protobuf.Yamcs.ReplayRequest;
38

39
public class ReplayToUDPService extends AbstractYamcsService
×
40
    implements ParameterSubscription.Listener, ConnectionListener {
41
  private ReplayOptions replayOptions;
42
  private String processorName;
43
  private Processor processor;
44

45
  private String start;
46
  private boolean reverse; // TODO: Add to conifg
47
  private String stop;
48
  private Timestamp startTimeStamp;
49
  private Timestamp stopTimeStamp;
50

51
  private ParameterSubscription subscription;
52

53
  private HashMap<String, ParameterValue> paramsToSend = new HashMap<String, ParameterValue>();
×
54

55
  private YamcsClient yclient;
56

57
  private DatagramSocket outSocket;
58
  private Map<String, String> pvMap;
59
  private String yamcsHost;
60
  private int yamcsPort;
61

62
  private String udpHost;
63
  private int udpPort;
64
  private InetAddress udpAddress;
65

66
  @Override
67
  public void init(String yamcsInstance, String serviceName, YConfiguration config)
68
      throws InitException {
69
    this.yamcsInstance = yamcsInstance;
×
70
    this.serviceName = serviceName;
×
71
    this.config = config;
×
72
    processorName = this.config.getString("processorName", "ReplayToUDPService");
×
73
    //    TODO:Probably a much easier way of doing this...
74
    //    YamcsServer.getServer().getInstance(yamcsInstance).addProcessor(processor);
75
    //    YamcsServer.getServer().getInstance(yamcsInstance)
76

77
    start = this.config.getString("start");
×
78
    stop = this.config.getString("stop");
×
79

80
    pvMap = this.config.getMap("pvMap");
×
81

82
    yamcsHost = this.getConfig().getString("yamcsHost", "http://localhost");
×
83
    yamcsPort = this.getConfig().getInt("yamcsPort", 8090);
×
84

85
    udpHost = this.getConfig().getString("udpHost", "172.16.100.237");
×
86
    udpPort = this.getConfig().getInt("udpPort", 8000);
×
87

88
    log = new Log(getClass(), yamcsInstance);
×
89
    try {
90
      startTimeStamp = Timestamps.parse(start);
×
91
      stopTimeStamp = Timestamps.parse(stop);
×
92
    } catch (ParseException e) {
×
93
      // TODO Auto-generated catch block
94
      e.printStackTrace();
×
95
    }
×
96

97
    //    TODO:Add these options to YAML
98
    replayOptions =
×
99
        new ReplayOptions(
100
            ReplayRequest.newBuilder()
×
101
                .setStart(startTimeStamp)
×
102
                .setStop(stopTimeStamp)
×
103
                .setEndAction(EndAction.LOOP)
×
104
                .setAutostart(true)
×
105
                .build());
×
106

107
    try {
108
      processor =
×
109
          ProcessorFactory.create(
×
110
              yamcsInstance,
111
              processorName,
112
              "Archive",
113
              ReplayToUDPService.class.toString(),
×
114
              replayOptions);
115
    } catch (ProcessorException | ConfigurationException | ValidationException | InitException e) {
×
116
      // TODO Auto-generated catch block
117
      e.printStackTrace();
×
118
    }
×
119
  }
×
120

121
  @Override
122
  protected void doStart() {
123
    // TODO Auto-generated method stub
124

125
    log.info("Starting new processor '{}'", processor.getName());
×
126
    processor.startAsync();
×
127
    processor.awaitRunning();
×
128

129
    //    TODO: This is unnecessarily complicated
130
    yclient =
×
131
        YamcsClient.newBuilder(yamcsHost + ":" + yamcsPort)
×
132
            //            .withConnectionAttempts(config.getInt("connectionAttempts", 20))
133
            //            .withRetryDelay(reconnectionDelay)
134
            //            .withVerifyTls(config.getBoolean("verifyTls", true))
135
            .build();
×
136
    yclient.addConnectionListener(this);
×
137

138
    try {
139
      yclient.connectWebSocket();
×
140
    } catch (ClientException e) {
×
141
      // TODO Auto-generated catch block
142
      e.printStackTrace();
×
143
    }
×
144

145
    try {
146
      outSocket = new DatagramSocket();
×
147
    } catch (SocketException e) {
×
148
      // TODO Auto-generated catch block
149
      e.printStackTrace();
×
150
    }
×
151

152
    try {
153
      udpAddress = InetAddress.getByName(udpHost);
×
154
    } catch (UnknownHostException e) {
×
155
      // TODO Auto-generated catch block
156
      e.printStackTrace();
×
157
    }
×
158
    ;
159

160
    notifyStarted();
×
161
  }
×
162

163
  public static NamedObjectId identityOf(String pvName) {
164
    return NamedObjectId.newBuilder().setName(pvName).build();
×
165
  }
166

167
  /** Async adds a Yamcs PV for receiving updates. */
168
  public void register(String pvName) {
169
    NamedObjectId id = identityOf(pvName);
×
170
    try {
171
      subscription.sendMessage(
×
172
          SubscribeParametersRequest.newBuilder()
×
173
              .setInstance(this.yamcsInstance)
×
174
              .setProcessor(processorName)
×
175
              .setSendFromCache(true)
×
176
              .setAbortOnInvalid(false)
×
177
              .setUpdateOnExpiration(false)
×
178
              .addId(id)
×
179
              .setAction(Action.ADD)
×
180
              .build());
×
181
    } catch (Exception e) {
×
182
      System.out.println("e:" + e);
×
183
    }
×
184
  }
×
185

186
  @Override
187
  protected void doStop() {
188
    // TODO Auto-generated method stub
189
    processor.doStop();
×
190
    outSocket.close();
×
191
    notifyStopped();
×
192
  }
×
193

194
  @Override
195
  public void onData(List<ParameterValue> values) {
196
    // TODO Auto-generated method stub
197
    for (ParameterValue p : values) {
×
198
      if (pvMap.containsValue(p.getId().getName())) {
×
199
        String pvLabel =
×
200
            pvMap.entrySet().stream()
×
201
                .filter(entry -> entry.getValue().equals(p.getId().getName()))
×
202
                .map(Entry::getKey)
×
203
                .collect(Collectors.toList())
×
204
                .get(0);
×
205
        paramsToSend.put(pvLabel, p);
×
206
      }
207
    }
×
208
    SimMessage.VehicleStateMessage.Builder msgBuilder = SimMessage.VehicleStateMessage.newBuilder();
×
209
    for (Map.Entry<String, ParameterValue> pSet : paramsToSend.entrySet()) {
×
210
      org.yamcs.protobuf.Yamcs.Value pv = pSet.getValue().getEngValue();
×
211
      switch (pv.getType()) {
×
212
        case AGGREGATE:
213
          break;
×
214
        case ARRAY:
215
          break;
×
216
        case BINARY:
217
          break;
×
218
        case BOOLEAN:
219
          break;
×
220
        case DOUBLE:
221
          msgBuilder.setField(
×
222
              SimMessage.VehicleStateMessage.getDescriptor().findFieldByName(pSet.getKey()),
×
223
              pv.getDoubleValue());
×
224
          break;
×
225
        case ENUMERATED:
226
          break;
×
227
        case FLOAT:
228
          msgBuilder.setField(
×
229
              SimMessage.VehicleStateMessage.getDescriptor().findFieldByName(pSet.getKey()),
×
230
              pv.getFloatValue());
×
231
          break;
×
232
        case NONE:
233
          break;
×
234
        case SINT32:
235
          break;
×
236
        case SINT64:
237
          break;
×
238
        case STRING:
239
          break;
×
240
        case TIMESTAMP:
241
          break;
×
242
        case UINT32:
243
          break;
×
244
        case UINT64:
245
          break;
×
246
        default:
247
          break;
248
      }
249
    }
×
250
    SimMessage.VehicleStateMessage msg = msgBuilder.build();
×
251
    DatagramPacket dtg =
×
252
        new DatagramPacket(msg.toByteArray(), msg.toByteArray().length, udpAddress, udpPort);
×
253

254
    try {
255
      outSocket.send(dtg);
×
256
    } catch (IOException e) {
×
257
      // TODO Auto-generated catch block
258
      e.printStackTrace();
×
259
    }
×
260
  }
×
261

262
  @Override
263
  public void connecting() {
264
    // TODO Auto-generated method stub
265

266
  }
×
267

268
  public void connected() {
269
    //          TODO:Send Event instead?
270
    //    System.out.println("*****connected*****");
271
    subscription = yclient.createParameterSubscription();
×
272
    subscription.addListener(this);
×
273
    // TODO:Make this configurable
274
    for (Map.Entry<String, String> pvName : pvMap.entrySet()) {
×
275
      register(pvName.getValue());
×
276
    }
×
277
  }
×
278

279
  @Override
280
  public void connectionFailed(Throwable cause) {
281
    // TODO Auto-generated method stub
282

283
  }
×
284

285
  @Override
286
  public void disconnected() {
287
    // TODO Auto-generated method stub
288

289
  }
×
290
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc