• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #162

28 Nov 2024 05:03PM UTC coverage: 0.0%. Remained the same
#162

push

lorenzo-gomez-windhover
-Updates for yamcs 5.9.8

0 of 4 new or added lines in 1 file covered. (0.0%)

614 existing lines in 11 files now uncovered.

0 of 6789 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/SlipStreamDecoder.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import java.io.ByteArrayInputStream;
4
import java.io.ByteArrayOutputStream;
5
import java.io.DataInputStream;
6
import java.io.IOException;
7
import java.util.Arrays;
8
import java.util.Collection;
9
import java.util.concurrent.atomic.AtomicLong;
10
import org.yamcs.AbstractYamcsService;
11
import org.yamcs.ConfigurationException;
12
import org.yamcs.InitException;
13
import org.yamcs.YConfiguration;
14
import org.yamcs.YamcsServer;
15
import org.yamcs.events.EventProducer;
16
import org.yamcs.events.EventProducerFactory;
17
import org.yamcs.logging.Log;
18
import org.yamcs.parameter.ParameterValue;
19
import org.yamcs.parameter.SystemParametersProducer;
20
import org.yamcs.time.TimeService;
21
import org.yamcs.utils.DataRateMeter;
22
import org.yamcs.yarch.ColumnDefinition;
23
import org.yamcs.yarch.DataType;
24
import org.yamcs.yarch.Stream;
25
import org.yamcs.yarch.StreamSubscriber;
26
import org.yamcs.yarch.Tuple;
27
import org.yamcs.yarch.TupleDefinition;
28
import org.yamcs.yarch.YarchDatabase;
29
import org.yamcs.yarch.YarchDatabaseInstance;
30

31
/**
32
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
33
 *
34
 * @author nm
35
 */
36
public class SlipStreamDecoder extends AbstractYamcsService
×
37
    implements StreamSubscriber, SystemParametersProducer {
38
  private final byte END = (byte) 0xc0;
×
39
  private final byte ESC = (byte) 0xdb;
×
UNCOV
40
  private final byte ESC_END = (byte) 0xdc;
×
UNCOV
41
  private final byte ESC_ESC = (byte) 0xdd;
×
42

43
  protected YConfiguration config;
44
  protected Log log;
45
  protected EventProducer eventProducer;
46
  protected TimeService timeService;
UNCOV
47
  protected AtomicLong inPacketCount = new AtomicLong(0);
×
48
  protected AtomicLong outPacketCount = new AtomicLong(0);
×
49
  protected boolean updateSimulationTime;
50
  DataRateMeter inPacketRateMeter = new DataRateMeter();
×
51
  DataRateMeter outPacketRateMeter = new DataRateMeter();
×
UNCOV
52
  DataRateMeter inDataRateMeter = new DataRateMeter();
×
UNCOV
53
  DataRateMeter outDataRateMeter = new DataRateMeter();
×
54
  protected Stream inStream;
55
  protected Stream outStream;
56

57
  private byte[] packet;
58

59
  private int offset;
60
  private int rightTrim;
61

62
  static TupleDefinition gftdef;
63

64
  static final String RECTIME_CNAME = "rectime";
65
  static final String DATA_CNAME = "data";
66

67
  static {
68
    gftdef = new TupleDefinition();
×
69
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
UNCOV
70
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
UNCOV
71
  }
×
72

73
  /**
74
   * Creates a new UDP Frame Data Link
75
   *
76
   * @throws ConfigurationException if port is not defined in the configuration
77
   */
78
  public void init(String instance, String name, YConfiguration config) {
79
    try {
UNCOV
80
      super.init(instance, name, config);
×
81
    } catch (InitException e1) {
×
82
      // TODO Auto-generated catch block
UNCOV
83
      e1.printStackTrace();
×
84
    }
×
85

86
    this.config = config;
×
87
    log = new Log(getClass(), instance);
×
88
    log.setContext(name);
×
89
    eventProducer = EventProducerFactory.getEventProducer(instance, name, 10000);
×
90
    this.timeService = YamcsServer.getTimeService(instance);
×
UNCOV
91
    String inStreamName = config.getString("in_stream");
×
92
    String outStreamName = config.getString("out_stream");
×
93

UNCOV
94
    this.offset = config.getInt("offset", 0);
×
95
    this.rightTrim = config.getInt("rightTrim", 0);
×
96

97
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
UNCOV
98
    this.inStream = getStream(ydb, inStreamName);
×
99
    this.outStream = getStream(ydb, outStreamName);
×
100

UNCOV
101
    this.inStream.addSubscriber(this);
×
102

103
    // if (config.containsKey("frameMaxRate")) {
104
    //  outRateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
105
    // }
106

107
    // if (config.containsKey(CFG_PREPRO_CLASS)) {
108
    //  this.hasPreprocessor = true;
109
    // }
110

111
    // updateSimulationTime = config.getBoolean("updateSimulationTime", false);
112
    // if (updateSimulationTime) {
113
    //  if (timeService instanceof SimulationTimeService) {
114
    //    SimulationTimeService sts = (SimulationTimeService) timeService;
115
    //    sts.setTime0(0);
116
    //  } else {
117
    //    throw new ConfigurationException(
118
    //        "updateSimulationTime can only be used together with SimulationTimeService "
119
    //            + "(add 'timeService: org.yamcs.time.SimulationTimeService' in
120
    // yamcs.<instance>.yaml)");
121
    //  }
122
    // }
123

124
    // if (maxLength < 0) {
125
    //  throw new ConfigurationException("'maxLength' must be defined.");
126
    // }
127

128
    // if (maxLength < minLength) {
129
    //  throw new ConfigurationException(
130
    //      "'maxLength' (" + maxLength + ") must not be less than 'minLength' (" + minLength +
131
    // ").");
132
    // }
133

134
    // if (maxLength < 0) {
135
    //  throw new ConfigurationException(
136
    //      "'maxLength' (" + maxLength + ") must be greater than zero.");
137
    // }
138

139
    // if (minLength < 0) {
140
    //  throw new ConfigurationException(
141
    //      "'minLength' (" + maxLength + ") must be greater than zero.");
142
    // }
143

144
    // if (dropMalformed && (maxLength < 0)) {
145
    //  throw new ConfigurationException(
146
    //      "'dropMalformed' must not be 'true' unless 'maxLength' is defined.");
147
    // }
148

149
    // asm = fromHexString(asmString);
150

151
    // if (minLength == maxLength) {
152
    //  fixedLength = minLength;
153
    // }
154

155
    // outOfSyncByteCount = 0;
156
    // inSyncByteCount = 0;
157
    // rcvdCaduCount = 0;
158
    // rcvdFatCaduCount = 0;
159
    // parserState = ParserState.OUT_OF_SYNC;
UNCOV
160
  }
×
161

162
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
UNCOV
163
    Stream stream = ydb.getStream(streamName);
×
164
    if (stream == null) {
×
165
      try {
166
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
167
        // ydb.execute("create stream " + streamName);
168
      } catch (Exception e) {
×
169
        throw new ConfigurationException(e);
×
UNCOV
170
      }
×
171
      stream = ydb.getStream(streamName);
×
172
    }
UNCOV
173
    return stream;
×
174
  }
175

176
  @Override
177
  public YConfiguration getConfig() {
UNCOV
178
    return config;
×
179
  }
180

181
  // @Override
182
  // public String getName() {
183
  //  return linkName;
184
  // }
185

186
  @Override
187
  public void doStart() {
188
    // if (!isDisabled()) {
189
    //  // new Thread(this).start();
190
    // }
UNCOV
191
    notifyStarted();
×
UNCOV
192
  }
×
193

194
  @Override
195
  public void doStop() {
UNCOV
196
    notifyStopped();
×
UNCOV
197
  }
×
198

199
  public boolean isRunningAndEnabled() {
UNCOV
200
    State state = state();
×
UNCOV
201
    return (state == State.RUNNING || state == State.STARTING);
×
202
  }
203

204
  /**
205
   * Sends the packet downstream for processing.
206
   *
207
   * <p>Starting in Yamcs 5.2, if the updateSimulationTime option is set on the link configuration,
208
   *
209
   * <ul>
210
   *   <li>the timeService is expected to be SimulationTimeService
211
   *   <li>at initialization, the time0 is set to 0
212
   *   <li>upon each packet received, the generationTime (as set by the pre-processor) is used to
213
   *       update the simulation elapsed time
214
   * </ul>
215
   *
216
   * <p>Should be called by all sub-classes (instead of directly calling {@link
217
   * TmSink#processPacket(TmPacket)}
218
   *
219
   * @param tmpkt
220
   */
221
  /*
222
  protected void processPacket(TmPacket tmpkt) {
223
    long rectime = tmpkt.getReceptionTime();
224
    byte byteArray[] = tmpkt.getPacket();
225

226
    int payloadSize = byteArray.length - this.offset - this.rightTrim;
227

228
    if(byteArray.length < payloadSize) {
229
      log.warn("Ignoring partial packet");
230
    } else {
231
      byte[] trimmedByteArray =
232
          Arrays.copyOfRange(byteArray, this.offset, payloadSize);
233

234
      inStream.emitTuple(new Tuple(gftdef, Arrays.asList(rectime, trimmedByteArray)));
235

236
      if (updateSimulationTime) {
237
        SimulationTimeService sts = (SimulationTimeService) timeService;
238
        if (!tmpkt.isInvalid()) {
239
          sts.setSimElapsedTime(tmpkt.getGenerationTime());
240
        }
241
      }
242
    }
243
  }
244
  */
245

246
  /**
247
   * called when a new packet is received to update the statistics
248
   *
249
   * @param packetSize
250
   */
251
  protected void updateInStats(int packetSize) {
252
    inPacketCount.getAndIncrement();
×
253
    inPacketRateMeter.mark(1);
×
UNCOV
254
    inDataRateMeter.mark(packetSize);
×
UNCOV
255
  }
×
256

257
  /**
258
   * called when a new packet is sent to update the statistics
259
   *
260
   * @param packetSize
261
   */
262
  protected void updateOutStats(int packetSize) {
263
    outPacketCount.getAndIncrement();
×
264
    outPacketRateMeter.mark(1);
×
UNCOV
265
    outDataRateMeter.mark(packetSize);
×
UNCOV
266
  }
×
267

268
  /**
269
   * This implements the receiving side of RFC 1055. For more information on the standard, go to
270
   * https://datatracker.ietf.org/doc/html/rfc1055 This method is based on the snippet from RFC
271
   * 1055, Page 5.
272
   *
273
   * <p>WARNING: Do not use this code yet. It needs plenty of refactoring.
274
   */
275
  protected byte[] getPayload(byte[] pktData) throws IOException {
276
    DataInputStream data = new DataInputStream(new ByteArrayInputStream(pktData));
×
277

278
    byte[] nextByte = new byte[1];
×
279

UNCOV
280
    ByteArrayOutputStream payload = new ByteArrayOutputStream();
×
281

282
    /* sit in a loop reading bytes until we put together
283
     * a whole packet.
284
     * Make sure not to copy them into the packet if we
285
     * run out of room.
286
     */
287
    // TODO:Add a MAX_PACKET_SIZE configuration arg.
UNCOV
288
    while (data.available() > 0) {
×
289
      /* get a character to process
290
       */
UNCOV
291
      data.readFully(nextByte, 0, 1);
×
292

293
      /* handle bytestuffing if necessary
294
       */
295

UNCOV
296
      switch (nextByte[0]) {
×
297

298
          /* if it's an END character then we're done with
299
           * the packet
300
           */
301
        case END:
302
          /* a minor optimization: if there is no
303
           * data in the packet, ignore it. This is
304
           * meant to avoid bothering IP with all
305
           * the empty packets generated by the
306
           * duplicate END characters which are in
307
           * turn sent to try to detect line noise.
308
           *   if(received)
309
           *     return received;
310
           *    else
311
           *     break;
312
           */
UNCOV
313
          return payload.toByteArray();
×
314

315
          /* if it's the same code as an ESC character, wait
316
           * and get another character and then figure out
317
           * what to store in the packet based on that.
318
           */
319
          /*Fallthrough*/
320
        case ESC:
UNCOV
321
          data.readFully(nextByte, 0, 1);
×
322
          /* if "c" is not one of these two, then we
323
           * have a protocol violation.  The best bet
324
           * seems to be to leave the byte alone and
325
           * just stuff it into the packet
326
           */
327
          switch (nextByte[0]) {
×
328
            case ESC_END:
UNCOV
329
              nextByte[0] = END;
×
330
              break;
×
331
            case ESC_ESC:
UNCOV
332
              nextByte[0] = ESC;
×
333
              break;
334
          }
335

336
          /* here we fall into the default handler and let
337
           * it store the byte for us
338
           */
339
        default:
UNCOV
340
          payload.write(nextByte[0]);
×
341
      }
342
    }
343

UNCOV
344
    return payload.toByteArray();
×
345
  }
346

347
  /**
348
   * Getter methods for understanding the state of the parser. Very useful when exposed to the
349
   * server as system parameters.
350
   *
351
   * @return
352
   */
353
  // public int getOutOfSyncByteCount() {
354
  //  return outOfSyncByteCount;
355
  // }
356

357
  // public int getInSyncByteCount() {
358
  //  return inSyncByteCount;
359
  // }
360

361
  // public int getAsmCursor() {
362
  //  return asmCursor;
363
  // }
364

365
  // public int getFatFrameBytes() {
366
  //  return fatFrameBytes;
367
  // }
368

369
  // public int getCaduLength() {
370
  //  return caduLength;
371
  // }
372

373
  // public int getFixedLength() {
374
  //  return fixedLength;
375
  // }
376

377
  // public int getFatFrameCount() {
378
  //  return fatFrameCount;
379
  // }
380

381
  // public String getPacket() {
382
  //  return StringConverter.arrayToHexString(packet);
383
  // }
384

385
  // public int getRcvdCaduCount() {
386
  //  return rcvdCaduCount;
387
  // }
388

389
  /** Resets rcvdCaduCount and fatFrameCount to 0. */
390
  // public void resetCounts() {
391
  //  rcvdCaduCount = 0;
392
  //  fatFrameCount = 0;
393
  // }
394
  //
395
  // @Override
396
  // public void enable() {
397
  //  boolean b = disabled.getAndSet(false);
398
  //  if (b) {
399
  //    try {
400
  //      /* TODO */
401
  //      // doEnable();
402
  //    } catch (Exception e) {
403
  //      disabled.set(true);
404
  //      log.warn("Failed to enable link", e);
405
  //    }
406
  //  }
407
  // }
408

409
  // @Override
410
  // public long getDataInCount() {
411
  //  return inPacketCount.get();
412
  // }
413

414
  // @Override
415
  // public long getDataOutCount() {
416
  //  return outPacketCount.get();
417
  // }
418

419
  // @Override
420
  // public Status getLinkStatus() {
421
  //  if (isDisabled()) {
422
  //    return Status.DISABLED;
423
  //  }
424
  //  if (state() == State.FAILED) {
425
  //    return Status.FAILED;
426
  //  }
427
  //
428
  //  return connectionStatus();
429
  // }
430

431
  // @Override
432
  // public boolean isDisabled() {
433
  //  return disabled.get();
434
  // }
435

436
  // protected Status connectionStatus() {
437
  //  return Status.OK;
438
  // }
439

440
  // @Override
441
  // public Spec getSpec() {
442
  //  // TODO Auto-generated method stub
443
  //  return super.getSpec();
444
  // }
445

446
  @Override
447
  public void onTuple(Stream arg0, Tuple tuple) {
UNCOV
448
    if (isRunningAndEnabled()) {
×
449

450
      byte[] packet;
451

452
      try {
453
        packet = getPayload(tuple.getColumn(DATA_CNAME));
×
454

UNCOV
455
        if (packet == null) {
×
456
          log.error("Parsed an empty SLIP message. Ignoring message.");
×
457
        } else {
458
          int trimmedPacketSize = packet.length - this.offset - this.rightTrim;
×
UNCOV
459
          byte[] trimmedPacket =
×
460
              Arrays.copyOfRange(packet, this.offset, packet.length - this.rightTrim);
×
461

UNCOV
462
          if (trimmedPacket.length <= 0) {
×
463
            log.error("Packet length is <= 0");
×
464
          } else {
UNCOV
465
            outStream.emitTuple(
×
466
                new Tuple(gftdef, Arrays.asList(tuple.getColumn(RECTIME_CNAME), trimmedPacket)));
×
467

UNCOV
468
            updateOutStats(trimmedPacket.length);
×
469
          }
470
        }
471
      } catch (IOException e) {
×
472
        // TODO Auto-generated catch block
UNCOV
473
        e.printStackTrace();
×
474
      }
×
475
    }
UNCOV
476
  }
×
477

478
  @Override
479
  public Collection<ParameterValue> getSystemParameters(long gentime) {
480
    // TODO Auto-generated method stub
UNCOV
481
    return null;
×
482
  }
483
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc