• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #162

28 Nov 2024 05:03PM UTC coverage: 0.0%. Remained the same
#162

push

lorenzo-gomez-windhover
-Updates for yamcs 5.9.8

0 of 4 new or added lines in 1 file covered. (0.0%)

614 existing lines in 11 files now uncovered.

0 of 6789 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/UdpParameterDataLink.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import static org.yamcs.parameter.SystemParametersService.getPV;
4

5
import com.google.protobuf.util.JsonFormat;
6
import java.io.ByteArrayInputStream;
7
import java.io.IOException;
8
import java.io.InputStreamReader;
9
import java.io.Reader;
10
import java.net.DatagramPacket;
11
import java.net.DatagramSocket;
12
import java.net.SocketException;
13
import java.util.ArrayList;
14
import java.util.HashMap;
15
import java.util.LinkedHashMap;
16
import java.util.List;
17
import java.util.Map;
18
import java.util.Map.Entry;
19
import org.yamcs.YConfiguration;
20
import org.yamcs.YamcsServer;
21
import org.yamcs.logging.Log;
22
import org.yamcs.parameter.BasicParameterValue;
23
import org.yamcs.parameter.ParameterValue;
24
import org.yamcs.parameter.SystemParametersService;
25
import org.yamcs.protobuf.Pvalue;
26
import org.yamcs.protobuf.Pvalue.ParameterData;
27
import org.yamcs.protobuf.Yamcs.NamedObjectId;
28
import org.yamcs.tctm.ParameterDataLink;
29
import org.yamcs.tctm.ParameterSink;
30
import org.yamcs.time.TimeService;
31
import org.yamcs.xtce.SystemParameter;
32

33
/**
34
 * Very similar to org.yamcs.tctm.UdpParameterDataLink, except that the data is exposed to the
35
 * database as params. Receives PP data via UDP.
36
 *
37
 * <p>The UDP packets are protobuf encoded ParameterData. We don't use any checksum, assume it's
38
 * done by UDP.
39
 *
40
 * @author lgomez
41
 */
UNCOV
42
public class UdpParameterDataLink extends AbstractParameterDataLink
×
43
    implements ParameterDataLink, Runnable {
44

UNCOV
45
  private volatile int validDatagramCount = 0;
×
46
  private volatile int invalidDatagramCount = 0;
×
47
  private volatile boolean disabled = false;
×
48

UNCOV
49
  private int sequenceCount = 0;
×
50

51
  private TimeService timeService;
52
  private DatagramSocket udpSocket;
UNCOV
53
  private int port = 31002;
×
54
  private String defaultRecordingGroup;
55
  private Format format;
56

57
  ParameterSink parameterSink;
58

59
  private HashMap<String, ParameterValue> nameObjectIdtoParamValue;
60

61
  private Log log;
UNCOV
62
  int MAX_LENGTH = 10 * 1024;
×
63

UNCOV
64
  DatagramPacket datagram = new DatagramPacket(new byte[MAX_LENGTH], MAX_LENGTH);
×
65
  YConfiguration config;
66
  String name;
67

68
  private SystemParametersService collector;
69

70
  @Override
71
  public void init(String instance, String name, YConfiguration config) {
UNCOV
72
    super.init(instance, name, config);
×
73
    this.config = config;
×
74
    this.name = name;
×
75
    log = new Log(getClass(), instance);
×
76
    log.setContext(name);
×
77
    timeService = YamcsServer.getTimeService(instance);
×
78
    port = config.getInt("port");
×
79
    defaultRecordingGroup = config.getString("recordingGroup", "DEFAULT");
×
80
    format = config.getBoolean("json", false) ? Format.JSON : Format.PROTOBUF;
×
81
    nameObjectIdtoParamValue = new HashMap<String, ParameterValue>();
×
82
  }
×
83

84
  @Override
85
  protected void doStart() {
UNCOV
86
    if (!isDisabled()) {
×
87
      try {
UNCOV
88
        udpSocket = new DatagramSocket(port);
×
89
        new Thread(this).start();
×
90
      } catch (SocketException e) {
×
91
        notifyFailed(e);
×
92
      }
×
93
    }
94

UNCOV
95
    collector = SystemParametersService.getInstance(yamcsInstance);
×
96

UNCOV
97
    notifyStarted();
×
98
  }
×
99

100
  @Override
101
  protected void doStop() {
UNCOV
102
    if (udpSocket != null) {
×
103
      udpSocket.close();
×
104
    }
UNCOV
105
    notifyStopped();
×
106
  }
×
107

108
  public boolean isRunningAndEnabled() {
UNCOV
109
    State state = state();
×
110
    return (state == State.RUNNING || state == State.STARTING) && !disabled;
×
111
  }
112

113
  @Override
114
  public void run() {
UNCOV
115
    while (isRunningAndEnabled()) {
×
116
      ParameterData pdata = getNextData();
×
117
      if (pdata == null) {
×
118
        continue;
×
119
      }
120

UNCOV
121
      if (pdata.hasGenerationTime()) {
×
122
        log.error("Generation time must be specified for each parameter separately");
×
123
        continue;
×
124
      }
125

UNCOV
126
      long now = timeService.getMissionTime();
×
127
      String recgroup = pdata.hasGroup() ? pdata.getGroup() : defaultRecordingGroup;
×
128
      int sequenceNumber = pdata.hasSeqNum() ? pdata.getSeqNum() : sequenceCount++;
×
129

130
      // Regroup by gentime, just in case multiple parameters are submitted with different times.
UNCOV
131
      Map<Long, List<ParameterValue>> valuesByTime = new LinkedHashMap<>();
×
132

UNCOV
133
      for (Pvalue.ParameterValue gpv : pdata.getParameterList()) {
×
134
        NamedObjectId id = gpv.getId();
×
135
        if (id == null) {
×
136
          log.warn("parameter without id, skipping");
×
137
          continue;
×
138
        }
UNCOV
139
        String fqn = id.getName();
×
140
        if (id.hasNamespace()) {
×
141
          log.trace(
×
142
              "Using namespaced name for parameter {} because fully qualified name not available.",
143
              id);
144
        }
145

146
        //        System.out.println("fqn-->" + fqn);
UNCOV
147
        ParameterValue pv = BasicParameterValue.fromGpb(fqn, gpv);
×
148
        long gentime = gpv.hasGenerationTime() ? pv.getGenerationTime() : now;
×
149
        pv.setGenerationTime(gentime);
×
150

151
        //                pv.setParameter(SystemParameter.getForFullyQualifiedName(fqn));
152

UNCOV
153
        List<ParameterValue> pvals = valuesByTime.computeIfAbsent(gentime, x -> new ArrayList<>());
×
154

155
        //                These are all hacks, we should really be using something like
156
        //                org.yamcs.parameter.SystemParametersService.createSystemParameter(XtceDb,
157
        // String, Value, UnitType)
UNCOV
158
        if (nameObjectIdtoParamValue.get(fqn) == null) {
×
159
          // For now we are doing this ourselves since we want to be able  to add params dynamically
160
          // and not let the link manage it.
161
          //          System.out.println(
162
          //              "pv.getParameter().getParameterType()-->" + pv.getEngValue().getType());
UNCOV
163
          SystemParameter p =
×
164
              collector.createSystemParameter(
×
165
                  "paradigm/" + fqn, pv.getEngValue().getType(), "Data from SIM.");
×
166
          pv.setParameter(p);
×
167
          nameObjectIdtoParamValue.put(fqn, pv);
×
168
        } else {
×
169
          pv.setParameter(nameObjectIdtoParamValue.get(fqn).getParameter());
×
170
          nameObjectIdtoParamValue.put(fqn, pv);
×
171
        }
172

UNCOV
173
        pvals.add(pv);
×
174
      }
×
175

UNCOV
176
      for (Entry<Long, List<ParameterValue>> group : valuesByTime.entrySet()) {
×
177
        parameterSink.updateParameters(
×
178
            (long) group.getKey(), recgroup, sequenceNumber, group.getValue());
×
179
      }
×
180
    }
×
181
  }
×
182

183
  @Override
184
  public List<ParameterValue> getSystemParameters(long gentime) {
UNCOV
185
    super.getSystemParameters(gentime);
×
186
    List<ParameterValue> pvlist = new ArrayList<>();
×
187

UNCOV
188
    for (ParameterValue val : nameObjectIdtoParamValue.values()) {
×
189
      //                System.out.println("val.getParameter()-->" + val.getParameter());
190
      //      System.out.println("val.getEngValue()-->" + val.getEngValue());
191
      //      System.out.println("val.getParameter()-->" + val.getParameter());
192
      //      System.out.println("time-->" + gentime);
UNCOV
193
      pvlist.add(getPV(val.getParameter(), gentime, val.getEngValue()));
×
194
    }
×
195

UNCOV
196
    return pvlist;
×
197
  }
198

199
  /**
200
   * adds system parameters link status and data in/out to the list.
201
   *
202
   * <p>The inheriting classes should call super.collectSystemParameters and then add their own
203
   * parameters to the list
204
   *
205
   * @param time
206
   * @param list
207
   */
208
  protected void collectSystemParameters(long time, List<ParameterValue> list) {
UNCOV
209
    super.collectSystemParameters(time, list);
×
210
    for (ParameterValue val : nameObjectIdtoParamValue.values()) {
×
211
      //                    System.out.println("val.getParameter()-->" + val.getParameter());
212
      //                    System.out.println("val.getEngValue()-->" + val.getEngValue());
213
      //                    System.out.println("val.getParameter()-->" + val.getParameter());
214
      //                    System.out.println("time-->" + time);
UNCOV
215
      list.add(getPV(val.getParameter(), time, 12.5));
×
216
    }
×
217
  }
×
218

219
  /**
220
   * Called to retrieve the next packet. It blocks in reading on the UDP socket.
221
   *
222
   * @return anything that looks as a valid packet, just the size is taken into account to decide if
223
   *     it's valid or not
224
   */
225
  public ParameterData getNextData() {
UNCOV
226
    while (isRunning()) {
×
227
      try {
UNCOV
228
        udpSocket.receive(datagram);
×
229
        validDatagramCount++;
×
230
        return decodeDatagram(datagram.getData(), datagram.getOffset(), datagram.getLength());
×
231
      } catch (IOException e) {
×
232
        // Shutdown or disable will close the socket. That generates an exception
233
        // which we ignore here.
UNCOV
234
        if (!isRunning() || isDisabled()) {
×
235
          return null;
×
236
        }
UNCOV
237
        log.warn("Exception when receiving parameter data: {}'", e.toString());
×
238
        invalidDatagramCount++;
×
239
      }
×
240
    }
UNCOV
241
    return null;
×
242
  }
243

244
  /**
245
   * Decode {@link ParameterData} from the content of a single received UDP Datagram.
246
   *
247
   * <p>{@link UdpParameterDataLink} has configurable support for either Protobuf or JSON-encoded
248
   * data. Extending links may provide a custom decoder by overriding this method.
249
   *
250
   * @param data data buffer. The data received starts from {@code offset} and runs for {@code
251
   *     length} long.
252
   * @param offset offset of the data received
253
   * @param length length of the data received
254
   */
255
  public ParameterData decodeDatagram(byte[] data, int offset, int length) throws IOException {
UNCOV
256
    switch (format) {
×
257
      case JSON:
UNCOV
258
        try (Reader reader =
×
259
            new InputStreamReader(new ByteArrayInputStream(data, offset, length))) {
UNCOV
260
          ParameterData.Builder builder = ParameterData.newBuilder();
×
261
          JsonFormat.parser().merge(reader, builder);
×
262
          return builder.build();
×
263
        }
264
      case PROTOBUF:
UNCOV
265
        return ParameterData.newBuilder().mergeFrom(data, offset, length).build();
×
266
      default:
UNCOV
267
        throw new IllegalStateException("Unexpected format " + format);
×
268
    }
269
  }
270

271
  @Override
272
  public Status getLinkStatus() {
UNCOV
273
    return disabled ? Status.DISABLED : Status.OK;
×
274
  }
275

276
  /**
277
   * Returns statistics with the number of datagrams received and the number of invalid datagrams
278
   */
279
  @Override
280
  public String getDetailedStatus() {
UNCOV
281
    if (disabled) {
×
282
      return "DISABLED";
×
283
    } else {
UNCOV
284
      return String.format(
×
285
          "OK (%s)\nValid datagrams received: %d\nInvalid datagrams received: %d",
UNCOV
286
          port, validDatagramCount, invalidDatagramCount);
×
287
    }
288
  }
289

290
  /** Sets the disabled to true such that getNextPacket ignores the received datagrams */
291
  @Override
292
  public void disable() {
UNCOV
293
    disabled = true;
×
294
    if (udpSocket != null) {
×
295
      udpSocket.close();
×
296
      udpSocket = null;
×
297
    }
UNCOV
298
  }
×
299

300
  /** Sets the disabled to false such that getNextPacket does not ignore the received datagrams */
301
  @Override
302
  public void enable() {
UNCOV
303
    disabled = false;
×
304
    try {
UNCOV
305
      udpSocket = new DatagramSocket(port);
×
306
      new Thread(this).start();
×
307
    } catch (SocketException e) {
×
308
      disabled = false;
×
309
      log.warn("Failed to enable link", e);
×
310
    }
×
311
  }
×
312

313
  @Override
314
  public boolean isDisabled() {
UNCOV
315
    return disabled;
×
316
  }
317

318
  @Override
319
  public long getDataInCount() {
UNCOV
320
    return validDatagramCount;
×
321
  }
322

323
  @Override
324
  public long getDataOutCount() {
UNCOV
325
    return 0;
×
326
  }
327

328
  @Override
329
  public void resetCounters() {
UNCOV
330
    validDatagramCount = 0;
×
331
    invalidDatagramCount = 0;
×
332
  }
×
333

334
  @Override
335
  public void setParameterSink(ParameterSink parameterSink) {
UNCOV
336
    this.parameterSink = parameterSink;
×
337
  }
×
338

339
  @Override
340
  public YConfiguration getConfig() {
UNCOV
341
    return config;
×
342
  }
343

344
  @Override
345
  public String getName() {
UNCOV
346
    return name;
×
347
  }
348

349
  /** Default supported data formats */
UNCOV
350
  private static enum Format {
×
351
    JSON,
×
352
    PROTOBUF;
×
353
  }
354

355
  @Override
356
  protected Status connectionStatus() {
357
    // TODO Auto-generated method stub
UNCOV
358
    return Status.OK;
×
359
  }
360
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc