• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #160

26 Nov 2024 06:52PM UTC coverage: 0.0%. Remained the same
#160

push

web-flow
Merge pull request #58 from WindhoverLabs/code_format

-Update code format

0 of 6640 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/UdpStreamInOutProvider.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.io.IOException;
5
import java.net.DatagramPacket;
6
import java.net.DatagramSocket;
7
import java.net.InetAddress;
8
import java.net.SocketException;
9
import java.net.UnknownHostException;
10
import java.nio.ByteBuffer;
11
import java.util.Arrays;
12
import java.util.concurrent.TimeUnit;
13
import java.util.concurrent.atomic.AtomicBoolean;
14
import java.util.concurrent.atomic.AtomicLong;
15
import org.yamcs.AbstractYamcsService;
16
import org.yamcs.ConfigurationException;
17
import org.yamcs.InitException;
18
import org.yamcs.Spec;
19
import org.yamcs.TmPacket;
20
import org.yamcs.YConfiguration;
21
import org.yamcs.YamcsServer;
22
import org.yamcs.events.EventProducer;
23
import org.yamcs.events.EventProducerFactory;
24
import org.yamcs.logging.Log;
25
import org.yamcs.tctm.Link;
26
import org.yamcs.time.SimulationTimeService;
27
import org.yamcs.time.TimeService;
28
import org.yamcs.utils.DataRateMeter;
29
import org.yamcs.utils.YObjectLoader;
30
import org.yamcs.yarch.ColumnDefinition;
31
import org.yamcs.yarch.DataType;
32
import org.yamcs.yarch.Stream;
33
import org.yamcs.yarch.StreamSubscriber;
34
import org.yamcs.yarch.Tuple;
35
import org.yamcs.yarch.TupleDefinition;
36
import org.yamcs.yarch.YarchDatabase;
37
import org.yamcs.yarch.YarchDatabaseInstance;
38

39
/**
40
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
41
 *
42
 * @author nm
43
 */
44
public class UdpStreamInOutProvider extends AbstractYamcsService
×
45
    implements Link, StreamSubscriber, Runnable {
46
  String host;
47
  DatagramSocket socket;
48
  InetAddress address;
49
  Thread thread;
50
  RateLimiter outRateLimiter;
51
  protected YConfiguration config;
52
  protected String linkName;
53
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
54
  protected Log log;
55
  protected EventProducer eventProducer;
56
  protected TimeService timeService;
57
  protected AtomicLong inPacketCount = new AtomicLong(0);
×
58
  protected AtomicLong outPacketCount = new AtomicLong(0);
×
59
  protected boolean updateSimulationTime;
60
  DataRateMeter inPacketRateMeter = new DataRateMeter();
×
61
  DataRateMeter outPacketRateMeter = new DataRateMeter();
×
62
  DataRateMeter inDataRateMeter = new DataRateMeter();
×
63
  DataRateMeter outDataRateMeter = new DataRateMeter();
×
64
  protected PacketPreprocessor packetPreprocessor;
65
  protected boolean hasPreprocessor = false;
×
66
  protected Stream inStream;
67
  protected Stream outStream;
68

69
  static TupleDefinition gftdef;
70

71
  static final String RECTIME_CNAME = "rectime";
72
  static final String DATA_CNAME = "data";
73

74
  static {
75
    gftdef = new TupleDefinition();
×
76
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
77
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
78
  }
×
79

80
  static final String CFG_PREPRO_CLASS = "packetPreprocessorClassName";
81

82
  static final int MAX_LENGTH = 1500;
83

84
  private DatagramSocket inSocket;
85
  private DatagramSocket outSocket;
86
  private int inPort;
87
  private int outPort;
88
  private int offset;
89
  private int rightTrim;
90
  int maxLength;
91
  //  private volatile int invalidDatagramCount = 0;
92

93
  DatagramPacket datagram;
94
  String packetPreprocessorClassName;
95
  Object packetPreprocessorArgs;
96

97
  /**
98
   * Creates a new UDP Frame Data Link
99
   *
100
   * @throws ConfigurationException if port is not defined in the configuration
101
   */
102
  public void init(String instance, String name, YConfiguration config) {
103
    try {
104
      super.init(instance, name, config);
×
105
    } catch (InitException e1) {
×
106
      // TODO Auto-generated catch block
107
      e1.printStackTrace();
×
108
    }
×
109
    host = config.getString("host");
×
110
    this.linkName = name;
×
111
    this.config = config;
×
112
    this.inPort = config.getInt("in_port");
×
113
    this.outPort = config.getInt("out_port");
×
114
    offset = config.getInt("offset", 0);
×
115
    rightTrim = config.getInt("rightTrim", 0);
×
116
    maxLength = config.getInt("maxLength", MAX_LENGTH);
×
117
    log = new Log(getClass(), instance);
×
118
    log.setContext(name);
×
119
    eventProducer = EventProducerFactory.getEventProducer(instance, name, 10000);
×
120
    this.timeService = YamcsServer.getTimeService(instance);
×
121
    String inStreamName = config.getString("in_stream");
×
122
    String outStreamName = config.getString("out_stream");
×
123

124
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
125
    this.inStream = getStream(ydb, inStreamName);
×
126
    this.outStream = getStream(ydb, outStreamName);
×
127

128
    this.inStream.addSubscriber(this);
×
129

130
    try {
131
      address = InetAddress.getByName(host);
×
132
    } catch (UnknownHostException e) {
×
133
      throw new ConfigurationException("Cannot resolve host '" + host + "'", e);
×
134
    }
×
135
    if (config.containsKey("frameMaxRate")) {
×
136
      outRateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
137
    }
138

139
    if (config.containsKey(CFG_PREPRO_CLASS)) {
×
140
      this.hasPreprocessor = true;
×
141

142
      this.packetPreprocessorClassName = config.getString(CFG_PREPRO_CLASS);
×
143

144
      if (config.containsKey("packetPreprocessorArgs")) {
×
145
        this.packetPreprocessorArgs = config.getConfig("packetPreprocessorArgs");
×
146
      }
147

148
      try {
149
        if (packetPreprocessorArgs != null) {
×
150
          packetPreprocessor =
×
151
              YObjectLoader.loadObject(
×
152
                  packetPreprocessorClassName, instance, packetPreprocessorArgs);
153
        } else {
154
          packetPreprocessor = YObjectLoader.loadObject(packetPreprocessorClassName, instance);
×
155
        }
156
      } catch (ConfigurationException e) {
×
157
        log.error("Cannot instantiate the packet preprocessor", e);
×
158
        throw e;
×
159
      }
×
160
    }
161

162
    datagram = new DatagramPacket(new byte[maxLength], maxLength);
×
163

164
    updateSimulationTime = config.getBoolean("updateSimulationTime", false);
×
165
    if (updateSimulationTime) {
×
166
      if (timeService instanceof SimulationTimeService) {
×
167
        SimulationTimeService sts = (SimulationTimeService) timeService;
×
168
        sts.setTime0(0);
×
169
      } else {
×
170
        throw new ConfigurationException(
×
171
            "updateSimulationTime can only be used together with SimulationTimeService "
172
                + "(add 'timeService: org.yamcs.time.SimulationTimeService' in yamcs.<instance>.yaml)");
173
      }
174
    }
175
  }
×
176

177
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
178
    Stream stream = ydb.getStream(streamName);
×
179
    if (stream == null) {
×
180
      try {
181
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
182
        // ydb.execute("create stream " + streamName);
183
      } catch (Exception e) {
×
184
        throw new ConfigurationException(e);
×
185
      }
×
186
      stream = ydb.getStream(streamName);
×
187
    }
188
    return stream;
×
189
  }
190

191
  @Override
192
  public YConfiguration getConfig() {
193
    return config;
×
194
  }
195

196
  @Override
197
  public String getName() {
198
    return linkName;
×
199
  }
200

201
  @Override
202
  public void resetCounters() {
203
    inPacketCount.set(0);
×
204
    outPacketCount.set(0);
×
205
  }
×
206

207
  @Override
208
  public void doStart() {
209
    if (!isDisabled()) {
×
210
      try {
211
        inSocket = new DatagramSocket(inPort);
×
212
        outSocket = new DatagramSocket();
×
213
        new Thread(this).start();
×
214
      } catch (SocketException e) {
×
215
        log.error("Socket exception", e);
×
216
        notifyFailed(e);
×
217
      }
×
218
    }
219
    notifyStarted();
×
220
  }
×
221

222
  @Override
223
  public void doStop() {
224
    inSocket.close();
×
225
    outSocket.close();
×
226
    notifyStopped();
×
227
  }
×
228

229
  public boolean isRunningAndEnabled() {
230
    State state = state();
×
231
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
232
  }
233

234
  /**
235
   * Sends the packet downstream for processing.
236
   *
237
   * <p>Starting in Yamcs 5.2, if the updateSimulationTime option is set on the link configuration,
238
   *
239
   * <ul>
240
   *   <li>the timeService is expected to be SimulationTimeService
241
   *   <li>at initialization, the time0 is set to 0
242
   *   <li>upon each packet received, the generationTime (as set by the pre-processor) is used to
243
   *       update the simulation elapsed time
244
   * </ul>
245
   *
246
   * <p>Should be called by all sub-classes (instead of directly calling {@link
247
   * TmSink#processPacket(TmPacket)}
248
   *
249
   * @param tmpkt
250
   */
251
  protected void processPacket(TmPacket tmpkt) {
252
    long rectime = tmpkt.getReceptionTime();
×
253
    byte byteArray[] = tmpkt.getPacket();
×
254

255
    byte[] trimmedByteArray =
×
256
        Arrays.copyOfRange(byteArray, this.offset, byteArray.length - this.rightTrim);
×
257

258
    outStream.emitTuple(new Tuple(gftdef, Arrays.asList(rectime, trimmedByteArray)));
×
259

260
    if (updateSimulationTime) {
×
261
      SimulationTimeService sts = (SimulationTimeService) timeService;
×
262
      if (!tmpkt.isInvalid()) {
×
263
        sts.setSimElapsedTime(tmpkt.getGenerationTime());
×
264
      }
265
    }
266
  }
×
267

268
  @Override
269
  public void run() {
270
    while (isRunningAndEnabled()) {
×
271
      TmPacket tmpkt = getNextPacket();
×
272
      if (tmpkt != null) {
×
273
        processPacket(tmpkt);
×
274
      }
275
    }
×
276
  }
×
277

278
  /**
279
   * called when a new packet is received to update the statistics
280
   *
281
   * @param packetSize
282
   */
283
  protected void updateInStats(int packetSize) {
284
    inPacketCount.getAndIncrement();
×
285
    inPacketRateMeter.mark(1);
×
286
    inDataRateMeter.mark(packetSize);
×
287
  }
×
288

289
  /**
290
   * called when a new packet is sent to update the statistics
291
   *
292
   * @param packetSize
293
   */
294
  protected void updateOutStats(int packetSize) {
295
    outPacketCount.getAndIncrement();
×
296
    outPacketRateMeter.mark(1);
×
297
    outDataRateMeter.mark(packetSize);
×
298
  }
×
299

300
  /**
301
   * Called to retrieve the next packet. It blocks in reading on the multicast socket
302
   *
303
   * @return anything that looks as a valid packet, just the size is taken into account to decide if
304
   *     it's valid or not
305
   */
306
  public TmPacket getNextPacket() {
307
    ByteBuffer packet = null;
×
308

309
    while (isRunning()) {
×
310
      try {
311
        inSocket.receive(datagram);
×
312
        updateInStats(datagram.getLength());
×
313
        packet = ByteBuffer.allocate(datagram.getLength());
×
314
        packet.put(datagram.getData(), datagram.getOffset(), datagram.getLength());
×
315
        break;
×
316
      } catch (IOException e) {
×
317
        if (!isRunning()
×
318
            || isDisabled()) { // the shutdown or disable will close the socket and that will
×
319
          // generate an exception
320
          // which we ignore here
321
          return null;
×
322
        }
323
        log.warn("exception thrown when reading from the UDP socket at port {}", inPort, e);
×
324
      }
×
325
    }
326

327
    if (packet != null) {
×
328
      TmPacket tmPacket = new TmPacket(timeService.getMissionTime(), packet.array());
×
329
      tmPacket.setEarthRceptionTime(timeService.getHresMissionTime());
×
330
      if (this.hasPreprocessor) {
×
331
        return packetPreprocessor.process(tmPacket);
×
332
      } else {
333
        return tmPacket;
×
334
      }
335
    } else {
336
      return null;
×
337
    }
338
  }
339

340
  @Override
341
  public void disable() {
342
    boolean b = disabled.getAndSet(true);
×
343
    if (!b) {
×
344
      try {
345
        /* TODO */
346
        // doDisable();
347
      } catch (Exception e) {
348
        disabled.set(false);
349
        log.warn("Failed to disable link", e);
350
      }
351
    }
352
  }
×
353

354
  @Override
355
  public void enable() {
356
    boolean b = disabled.getAndSet(false);
×
357
    if (b) {
×
358
      try {
359
        /* TODO */
360
        // doEnable();
361
      } catch (Exception e) {
362
        disabled.set(true);
363
        log.warn("Failed to enable link", e);
364
      }
365
    }
366
  }
×
367

368
  @Override
369
  public long getDataInCount() {
370
    return inPacketCount.get();
×
371
  }
372

373
  @Override
374
  public long getDataOutCount() {
375
    return outPacketCount.get();
×
376
  }
377

378
  @Override
379
  public Status getLinkStatus() {
380
    if (isDisabled()) {
×
381
      return Status.DISABLED;
×
382
    }
383
    if (state() == State.FAILED) {
×
384
      return Status.FAILED;
×
385
    }
386

387
    return connectionStatus();
×
388
  }
389

390
  @Override
391
  public boolean isDisabled() {
392
    return disabled.get();
×
393
  }
394

395
  protected Status connectionStatus() {
396
    return Status.OK;
×
397
  }
398

399
  @Override
400
  public Spec getSpec() {
401
    // TODO Auto-generated method stub
402
    return super.getSpec();
×
403
  }
404

405
  @Override
406
  public void onTuple(Stream arg0, Tuple tuple) {
407
    if (isRunningAndEnabled()) {
×
408
      byte[] pktData = tuple.getColumn(DATA_CNAME);
×
409
      long recTime = tuple.getColumn(RECTIME_CNAME);
×
410
      if (pktData == null) {
×
411
        throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
412
      } else {
413
        DatagramPacket dtg = new DatagramPacket(pktData, pktData.length, address, outPort);
×
414
        try {
415
          outSocket.send(dtg);
×
416
          updateOutStats(pktData.length);
×
417
        } catch (IOException e) {
×
418
          log.warn("Error sending datagram", e);
×
419
          notifyFailed(e);
×
420
          return;
×
421
        }
×
422
      }
423
    }
424
  }
×
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc