• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #92

pending completion
#92

push

lorenzo-gomez-windhover
Make addresses and ports configurable in ReplayToUDPService.

29 of 29 new or added lines in 1 file covered. (100.0%)

0 of 5261 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/archive/ReplayToUDPService.java
1
package com.windhoverlabs.yamcs.archive;
2

3
import com.google.protobuf.Timestamp;
4
import com.google.protobuf.util.Timestamps;
5
import com.windhoverlabs.yamcs.archive.api.SimMessage;
6
import java.io.IOException;
7
import java.net.DatagramPacket;
8
import java.net.DatagramSocket;
9
import java.net.InetAddress;
10
import java.net.SocketException;
11
import java.net.UnknownHostException;
12
import java.text.ParseException;
13
import java.util.HashMap;
14
import java.util.LinkedHashMap;
15
import java.util.List;
16
import java.util.Map;
17
import java.util.Map.Entry;
18
import java.util.Set;
19
import java.util.stream.Collectors;
20
import org.yamcs.AbstractYamcsService;
21
import org.yamcs.ConfigurationException;
22
import org.yamcs.InitException;
23
import org.yamcs.Processor;
24
import org.yamcs.ProcessorException;
25
import org.yamcs.ProcessorFactory;
26
import org.yamcs.ValidationException;
27
import org.yamcs.YConfiguration;
28
import org.yamcs.archive.ReplayOptions;
29
import org.yamcs.client.ClientException;
30
import org.yamcs.client.ConnectionListener;
31
import org.yamcs.client.ParameterSubscription;
32
import org.yamcs.client.YamcsClient;
33
import org.yamcs.logging.Log;
34
import org.yamcs.protobuf.Pvalue.ParameterValue;
35
import org.yamcs.protobuf.SubscribeParametersRequest;
36
import org.yamcs.protobuf.SubscribeParametersRequest.Action;
37
import org.yamcs.protobuf.Yamcs.EndAction;
38
import org.yamcs.protobuf.Yamcs.NamedObjectId;
39
import org.yamcs.protobuf.Yamcs.ReplayRequest;
40

41
public class ReplayToUDPService extends AbstractYamcsService
×
42
    implements ParameterSubscription.Listener, ConnectionListener {
43
  private ReplayOptions replayOptions;
44
  private String processorName;
45
  private Processor processor;
46

47
  private String start;
48
  private boolean reverse; // TODO: Add to conifg
49
  private String stop;
50
  private Timestamp startTimeStamp;
51
  private Timestamp stopTimeStamp;
52

53
  private ParameterSubscription subscription;
54

55
  private Map<NamedObjectId, Set<String>> pvsById = new LinkedHashMap<>();
×
56

57
  private HashMap<String, ParameterValue> paramsToSend = new HashMap<String, ParameterValue>();
×
58

59
  private YamcsClient yclient;
60

61
  private DatagramSocket outSocket;
62
  private Map<String, String> pvMap;
63
  private String yamcsHost;
64
  private int yamcsPort;
65

66
  private String udpHost;
67
  private int udpPort;
68
  private InetAddress udpAddress;
69

70
  @Override
71
  public void init(String yamcsInstance, String serviceName, YConfiguration config)
72
      throws InitException {
73
    this.yamcsInstance = yamcsInstance;
×
74
    this.serviceName = serviceName;
×
75
    this.config = config;
×
76
    processorName = this.config.getString("processorName", "ReplayToUDPService");
×
77
    //    TODO:Probably a much easier way of doing this...
78
    //    YamcsServer.getServer().getInstance(yamcsInstance).addProcessor(processor);
79
    //    YamcsServer.getServer().getInstance(yamcsInstance)
80

81
    start = this.config.getString("start");
×
82
    stop = this.config.getString("stop");
×
83

84
    pvMap = this.config.getMap("pvMap");
×
85

86
    yamcsHost = this.getConfig().getString("yamcsHost", "http://localhost");
×
87
    yamcsPort = this.getConfig().getInt("yamcsPort", 8090);
×
88

89
    udpHost = this.getConfig().getString("udpHost", "172.16.100.237");
×
90
    udpPort = this.getConfig().getInt("udpPort", 8000);
×
91

92
    log = new Log(getClass(), yamcsInstance);
×
93
    try {
94
      startTimeStamp = Timestamps.parse(start);
×
95
      stopTimeStamp = Timestamps.parse(stop);
×
96
    } catch (ParseException e) {
×
97
      // TODO Auto-generated catch block
98
      e.printStackTrace();
×
99
    }
×
100

101
    //    TODO:Add these options to YAML
102
    replayOptions =
×
103
        new ReplayOptions(
104
            ReplayRequest.newBuilder()
×
105
                .setStart(startTimeStamp)
×
106
                .setStop(stopTimeStamp)
×
107
                .setEndAction(EndAction.LOOP)
×
108
                .setAutostart(true)
×
109
                .build());
×
110

111
    try {
112
      processor =
×
113
          ProcessorFactory.create(
×
114
              yamcsInstance,
115
              processorName,
116
              "Archive",
117
              ReplayToUDPService.class.toString(),
×
118
              replayOptions);
119
    } catch (ProcessorException | ConfigurationException | ValidationException | InitException e) {
×
120
      // TODO Auto-generated catch block
121
      e.printStackTrace();
×
122
    }
×
123
  }
×
124

125
  @Override
126
  protected void doStart() {
127
    // TODO Auto-generated method stub
128

129
    log.info("Starting new processor '{}'", processor.getName());
×
130
    processor.startAsync();
×
131
    processor.awaitRunning();
×
132

133
    //    TODO: This is unnecessarily complicated
134
    yclient =
×
135
        YamcsClient.newBuilder(yamcsHost + ":" + yamcsPort)
×
136
            //            .withConnectionAttempts(config.getInt("connectionAttempts", 20))
137
            //            .withRetryDelay(reconnectionDelay)
138
            //            .withVerifyTls(config.getBoolean("verifyTls", true))
139
            .build();
×
140
    yclient.addConnectionListener(this);
×
141

142
    try {
143
      yclient.connectWebSocket();
×
144
    } catch (ClientException e) {
×
145
      // TODO Auto-generated catch block
146
      e.printStackTrace();
×
147
    }
×
148

149
    try {
150
      outSocket = new DatagramSocket();
×
151
    } catch (SocketException e) {
×
152
      // TODO Auto-generated catch block
153
      e.printStackTrace();
×
154
    }
×
155

156
    try {
157
      udpAddress = InetAddress.getByName(udpHost);
×
158
    } catch (UnknownHostException e) {
×
159
      // TODO Auto-generated catch block
160
      e.printStackTrace();
×
161
    }
×
162
    ;
163

164
    notifyStarted();
×
165
  }
×
166

167
  public static NamedObjectId identityOf(String pvName) {
168
    return NamedObjectId.newBuilder().setName(pvName).build();
×
169
  }
170

171
  /** Async adds a Yamcs PV for receiving updates. */
172
  public void register(String pvName) {
173
    NamedObjectId id = identityOf(pvName);
×
174
    try {
175
      subscription.sendMessage(
×
176
          SubscribeParametersRequest.newBuilder()
×
177
              .setInstance(this.yamcsInstance)
×
178
              .setProcessor(processorName)
×
179
              .setSendFromCache(true)
×
180
              .setAbortOnInvalid(false)
×
181
              .setUpdateOnExpiration(false)
×
182
              .addId(id)
×
183
              .setAction(Action.ADD)
×
184
              .build());
×
185
    } catch (Exception e) {
×
186
      System.out.println("e:" + e);
×
187
    }
×
188
  }
×
189

190
  @Override
191
  protected void doStop() {
192
    // TODO Auto-generated method stub
193
    processor.doStop();
×
194
    outSocket.close();
×
195
    notifyStopped();
×
196
  }
×
197

198
  @Override
199
  public void onData(List<ParameterValue> values) {
200
    // TODO Auto-generated method stub
201
    for (ParameterValue p : values) {
×
202
      if (pvMap.containsValue(p.getId().getName())) {
×
203
        String pvLabel =
×
204
            pvMap.entrySet().stream()
×
205
                .filter(entry -> entry.getValue().equals(p.getId().getName()))
×
206
                .map(Entry::getKey)
×
207
                .collect(Collectors.toList())
×
208
                .get(0);
×
209
        paramsToSend.put(pvLabel, p);
×
210
      }
211
    }
×
212

213
    SimMessage.VehicleStateMessage msg =
214
        SimMessage.VehicleStateMessage.newBuilder()
×
215
            .setAlt(paramsToSend.get("Altitude").getEngValue().getFloatValue())
×
216
            .setLat(paramsToSend.get("Lat").getEngValue().getDoubleValue())
×
217
            .setLon(paramsToSend.get("Lon").getEngValue().getDoubleValue())
×
218
            .setPitch(Math.toRadians(paramsToSend.get("Pitch").getEngValue().getFloatValue()))
×
219
            .setRoll(Math.toRadians(paramsToSend.get("Roll").getEngValue().getFloatValue()))
×
220
            .setYaw(Math.toRadians(paramsToSend.get("Yaw").getEngValue().getFloatValue()))
×
221
            .build();
×
222
    DatagramPacket dtg =
×
223
        new DatagramPacket(msg.toByteArray(), msg.toByteArray().length, udpAddress, udpPort);
×
224

225
    try {
226
      outSocket.send(dtg);
×
227
    } catch (IOException e) {
×
228
      // TODO Auto-generated catch block
229
      e.printStackTrace();
×
230
    }
×
231
  }
×
232

233
  @Override
234
  public void connecting() {
235
    // TODO Auto-generated method stub
236

237
  }
×
238

239
  public void connected() {
240
    //          TODO:Send Event instead?
241
    //    System.out.println("*****connected*****");
242
    subscription = yclient.createParameterSubscription();
×
243
    subscription.addListener(this);
×
244
    // TODO:Make this configurable
245
    for (Map.Entry<String, String> pvName : pvMap.entrySet()) {
×
246
      register(pvName.getValue());
×
247
    }
×
248
  }
×
249

250
  @Override
251
  public void connectionFailed(Throwable cause) {
252
    // TODO Auto-generated method stub
253

254
  }
×
255

256
  @Override
257
  public void disconnected() {
258
    // TODO Auto-generated method stub
259

260
  }
×
261
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc