• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #162

28 Nov 2024 05:03PM UTC coverage: 0.0%. Remained the same
#162

push

lorenzo-gomez-windhover
-Updates for yamcs 5.9.8

0 of 4 new or added lines in 1 file covered. (0.0%)

614 existing lines in 11 files now uncovered.

0 of 6789 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/UdpStreamInProvider.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import java.io.IOException;
4
import java.net.DatagramPacket;
5
import java.net.DatagramSocket;
6
import java.net.SocketException;
7
import java.nio.ByteBuffer;
8
import java.util.Arrays;
9
import java.util.concurrent.atomic.AtomicBoolean;
10
import java.util.concurrent.atomic.AtomicLong;
11
import org.yamcs.AbstractYamcsService;
12
import org.yamcs.ConfigurationException;
13
import org.yamcs.Spec;
14
import org.yamcs.TmPacket;
15
import org.yamcs.YConfiguration;
16
import org.yamcs.YamcsServer;
17
import org.yamcs.events.EventProducer;
18
import org.yamcs.events.EventProducerFactory;
19
import org.yamcs.logging.Log;
20
import org.yamcs.tctm.Link;
21
import org.yamcs.time.SimulationTimeService;
22
import org.yamcs.time.TimeService;
23
import org.yamcs.utils.DataRateMeter;
24
import org.yamcs.utils.YObjectLoader;
25
import org.yamcs.yarch.ColumnDefinition;
26
import org.yamcs.yarch.DataType;
27
import org.yamcs.yarch.Stream;
28
import org.yamcs.yarch.Tuple;
29
import org.yamcs.yarch.TupleDefinition;
30
import org.yamcs.yarch.YarchDatabase;
31
import org.yamcs.yarch.YarchDatabaseInstance;
32

33
/**
34
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
35
 *
36
 * @author nm
37
 */
UNCOV
38
public class UdpStreamInProvider extends AbstractYamcsService implements Link, Runnable {
×
39
  protected YConfiguration config;
40
  protected String linkName;
UNCOV
41
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
42
  protected Log log;
43
  protected EventProducer eventProducer;
44
  protected TimeService timeService;
UNCOV
45
  protected AtomicLong packetCount = new AtomicLong(0);
×
46
  protected boolean updateSimulationTime;
47
  DataRateMeter packetRateMeter = new DataRateMeter();
×
UNCOV
48
  DataRateMeter dataRateMeter = new DataRateMeter();
×
49
  protected org.yamcs.tctm.PacketPreprocessor packetPreprocessor;
50
  protected boolean hasPreprocessor = false;
×
51
  protected Stream stream;
52

53
  static TupleDefinition gftdef;
54

55
  static final String RECTIME_CNAME = "rectime";
56
  static final String DATA_CNAME = "data";
57

58
  static {
UNCOV
59
    gftdef = new TupleDefinition();
×
UNCOV
60
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
61
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
62
  }
×
63

64
  static final String CFG_PREPRO_CLASS = "packetPreprocessorClassName";
65

66
  static final int MAX_LENGTH = 1500;
67

68
  private DatagramSocket tmSocket;
69
  private int port;
70
  private int offset;
71
  private int rightTrim;
72
  private int rcvBufferSize;
73
  int maxLength;
74
  //  private volatile int invalidDatagramCount = 0;
75

76
  DatagramPacket datagram;
77
  String packetPreprocessorClassName;
78
  Object packetPreprocessorArgs;
79
  Thread thread;
80

81
  /**
82
   * Creates a new UDP Frame Data Link
83
   *
84
   * @throws ConfigurationException if port is not defined in the configuration
85
   */
86
  public void init(String instance, String name, YConfiguration config)
87
      throws ConfigurationException {
88
    // super.init(instance, name, config);
UNCOV
89
    this.config = config;
×
UNCOV
90
    this.linkName = name;
×
91
    port = config.getInt("port");
×
92
    offset = config.getInt("offset", 0);
×
93
    rcvBufferSize = config.getInt("rcvBufferSize", 0);
×
94
    rightTrim = config.getInt("rightTrim", 0);
×
95
    maxLength = config.getInt("maxLength", MAX_LENGTH);
×
96
    log = new Log(getClass(), instance);
×
97
    log.setContext(name);
×
98
    eventProducer = EventProducerFactory.getEventProducer(instance, name, 10000);
×
99
    this.timeService = YamcsServer.getTimeService(instance);
×
100
    String streamName = config.getString("stream");
×
101

102
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
UNCOV
103
    this.stream = getStream(ydb, streamName);
×
104

105
    if (config.containsKey(CFG_PREPRO_CLASS)) {
×
UNCOV
106
      this.hasPreprocessor = true;
×
107

108
      this.packetPreprocessorClassName = config.getString(CFG_PREPRO_CLASS);
×
109

110
      if (config.containsKey("packetPreprocessorArgs")) {
×
UNCOV
111
        this.packetPreprocessorArgs = config.getConfig("packetPreprocessorArgs");
×
112
      }
113

114
      try {
UNCOV
115
        if (packetPreprocessorArgs != null) {
×
UNCOV
116
          packetPreprocessor =
×
117
              YObjectLoader.loadObject(
×
118
                  packetPreprocessorClassName, instance, packetPreprocessorArgs);
119
        } else {
UNCOV
120
          packetPreprocessor = YObjectLoader.loadObject(packetPreprocessorClassName, instance);
×
121
        }
122
      } catch (ConfigurationException e) {
×
UNCOV
123
        log.error("Cannot instantiate the packet preprocessor", e);
×
124
        throw e;
×
125
      }
×
126
    }
127

UNCOV
128
    datagram = new DatagramPacket(new byte[maxLength], maxLength);
×
129

130
    updateSimulationTime = config.getBoolean("updateSimulationTime", false);
×
UNCOV
131
    if (updateSimulationTime) {
×
132
      if (timeService instanceof SimulationTimeService) {
×
133
        SimulationTimeService sts = (SimulationTimeService) timeService;
×
134
        sts.setTime0(0);
×
135
      } else {
×
136
        throw new ConfigurationException(
×
137
            "updateSimulationTime can only be used together with SimulationTimeService "
138
                + "(add 'timeService: org.yamcs.time.SimulationTimeService' in yamcs.<instance>.yaml)");
139
      }
140
    }
UNCOV
141
  }
×
142

143
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
UNCOV
144
    Stream stream = ydb.getStream(streamName);
×
UNCOV
145
    if (stream == null) {
×
146
      try {
147
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
148
        // ydb.execute("create stream " + streamName);
149
      } catch (Exception e) {
×
UNCOV
150
        throw new ConfigurationException(e);
×
151
      }
×
152
      stream = ydb.getStream(streamName);
×
153
    }
154
    return stream;
×
155
  }
156

157
  @Override
158
  public YConfiguration getConfig() {
UNCOV
159
    return config;
×
160
  }
161

162
  @Override
163
  public String getName() {
UNCOV
164
    return linkName;
×
165
  }
166

167
  @Override
UNCOV
168
  public void resetCounters() {}
×
169

170
  @Override
171
  public void doStart() {
UNCOV
172
    if (!isDisabled()) {
×
173
      try {
174
        tmSocket = new DatagramSocket(port);
×
UNCOV
175
        if (rcvBufferSize > 0) {
×
176
          tmSocket.setReceiveBufferSize(rcvBufferSize);
×
177
        }
178
        new Thread(this).start();
×
UNCOV
179
      } catch (SocketException e) {
×
180
        notifyFailed(e);
×
181
      }
×
182
    }
183
    notifyStarted();
×
UNCOV
184
  }
×
185

186
  @Override
187
  public void doStop() {
UNCOV
188
    tmSocket.close();
×
UNCOV
189
    notifyStopped();
×
190
  }
×
191

192
  public boolean isRunningAndEnabled() {
UNCOV
193
    State state = state();
×
UNCOV
194
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
195
  }
196

197
  /**
198
   * Sends the packet downstream for processing.
199
   *
200
   * <p>Starting in Yamcs 5.2, if the updateSimulationTime option is set on the link configuration,
201
   *
202
   * <ul>
203
   *   <li>the timeService is expected to be SimulationTimeService
204
   *   <li>at initialization, the time0 is set to 0
205
   *   <li>upon each packet received, the generationTime (as set by the pre-processor) is used to
206
   *       update the simulation elapsed time
207
   * </ul>
208
   *
209
   * <p>Should be called by all sub-classes (instead of directly calling {@link
210
   * TmSink#processPacket(TmPacket)}
211
   *
212
   * @param tmpkt
213
   */
214
  protected void processPacket(TmPacket tmpkt) {
UNCOV
215
    long rectime = tmpkt.getReceptionTime();
×
UNCOV
216
    byte byteArray[] = tmpkt.getPacket();
×
217

218
    byte[] trimmedByteArray =
×
UNCOV
219
        Arrays.copyOfRange(byteArray, this.offset, byteArray.length - this.rightTrim);
×
220

221
    stream.emitTuple(new Tuple(gftdef, Arrays.asList(rectime, trimmedByteArray)));
×
222

223
    if (updateSimulationTime) {
×
UNCOV
224
      SimulationTimeService sts = (SimulationTimeService) timeService;
×
225
      if (!tmpkt.isInvalid()) {
×
226
        sts.setSimElapsedTime(tmpkt.getGenerationTime());
×
227
      }
228
    }
UNCOV
229
  }
×
230

231
  @Override
232
  public void run() {
UNCOV
233
    while (isRunningAndEnabled()) {
×
UNCOV
234
      TmPacket tmpkt = getNextPacket();
×
235
      if (tmpkt != null) {
×
236
        processPacket(tmpkt);
×
237
      }
238
    }
×
UNCOV
239
  }
×
240

241
  /**
242
   * called when a new packet is received to update the statistics
243
   *
244
   * @param packetSize
245
   */
246
  protected void updateStats(int packetSize) {
UNCOV
247
    packetCount.getAndIncrement();
×
UNCOV
248
    packetRateMeter.mark(1);
×
249
    dataRateMeter.mark(packetSize);
×
250
  }
×
251

252
  /**
253
   * Called to retrieve the next packet. It blocks in reading on the multicast socket
254
   *
255
   * @return anything that looks as a valid packet, just the size is taken into account to decide if
256
   *     it's valid or not
257
   */
258
  public TmPacket getNextPacket() {
UNCOV
259
    ByteBuffer packet = null;
×
260

261
    while (isRunning()) {
×
262
      try {
263
        tmSocket.receive(datagram);
×
UNCOV
264
        updateStats(datagram.getLength());
×
265
        packet = ByteBuffer.allocate(datagram.getLength());
×
266
        packet.put(datagram.getData(), datagram.getOffset(), datagram.getLength());
×
267
        break;
×
268
      } catch (IOException e) {
×
269
        if (!isRunning()
×
270
            || isDisabled()) { // the shutdown or disable will close the socket and that will
×
271
          // generate an exception
272
          // which we ignore here
UNCOV
273
          return null;
×
274
        }
275
        log.warn("exception thrown when reading from the UDP socket at port {}", port, e);
×
UNCOV
276
      }
×
277
    }
278

UNCOV
279
    if (packet != null) {
×
UNCOV
280
      TmPacket tmPacket = new TmPacket(timeService.getMissionTime(), packet.array());
×
281
      tmPacket.setEarthRceptionTime(timeService.getHresMissionTime());
×
282
      if (this.hasPreprocessor) {
×
283
        return packetPreprocessor.process(tmPacket);
×
284
      } else {
285
        return tmPacket;
×
286
      }
287
    } else {
UNCOV
288
      return null;
×
289
    }
290
  }
291

292
  @Override
293
  public void disable() {
UNCOV
294
    boolean b = disabled.getAndSet(true);
×
UNCOV
295
    if (!b) {
×
296
      try {
297
        /* TODO */
298
        // doDisable();
299
      } catch (Exception e) {
300
        disabled.set(false);
301
        log.warn("Failed to disable link", e);
302
      }
303
    }
UNCOV
304
  }
×
305

306
  @Override
307
  public void enable() {
UNCOV
308
    boolean b = disabled.getAndSet(false);
×
UNCOV
309
    if (b) {
×
310
      try {
311
        /* TODO */
312
        // doEnable();
313
      } catch (Exception e) {
314
        disabled.set(true);
315
        log.warn("Failed to enable link", e);
316
      }
317
    }
UNCOV
318
  }
×
319

320
  @Override
321
  public long getDataInCount() {
UNCOV
322
    return packetCount.get();
×
323
  }
324

325
  @Override
326
  public long getDataOutCount() {
UNCOV
327
    return 0;
×
328
  }
329

330
  @Override
331
  public Status getLinkStatus() {
UNCOV
332
    if (isDisabled()) {
×
UNCOV
333
      return Status.DISABLED;
×
334
    }
335
    if (state() == State.FAILED) {
×
UNCOV
336
      return Status.FAILED;
×
337
    }
338

UNCOV
339
    return connectionStatus();
×
340
  }
341

342
  @Override
343
  public boolean isDisabled() {
UNCOV
344
    return disabled.get();
×
345
  }
346

347
  protected Status connectionStatus() {
UNCOV
348
    return Status.OK;
×
349
  }
350

351
  @Override
352
  public Spec getSpec() {
353
    // TODO Auto-generated method stub
UNCOV
354
    return super.getSpec();
×
355
  }
356
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc