• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #83

pending completion
#83

push

web-flow
Merge pull request #34 from WindhoverLabs/telemetry_checksum

-Fix bug in SlipStreamEncoder (missing null check).

40 of 40 new or added lines in 2 files covered. (100.0%)

0 of 4405 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/SlipStreamEncoder.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.io.ByteArrayOutputStream;
5
import java.io.IOException;
6
import java.util.Arrays;
7
import java.util.concurrent.TimeUnit;
8
import java.util.concurrent.atomic.AtomicBoolean;
9
import java.util.concurrent.atomic.AtomicLong;
10
import org.yamcs.AbstractYamcsService;
11
import org.yamcs.ConfigurationException;
12
import org.yamcs.InitException;
13
import org.yamcs.Spec;
14
import org.yamcs.TmPacket;
15
import org.yamcs.YConfiguration;
16
import org.yamcs.YamcsServer;
17
import org.yamcs.events.EventProducer;
18
import org.yamcs.events.EventProducerFactory;
19
import org.yamcs.logging.Log;
20
import org.yamcs.parameter.SystemParametersProducer;
21
import org.yamcs.tctm.Link;
22
import org.yamcs.time.SimulationTimeService;
23
import org.yamcs.time.TimeService;
24
import org.yamcs.utils.DataRateMeter;
25
import org.yamcs.utils.StringConverter;
26
import org.yamcs.yarch.ColumnDefinition;
27
import org.yamcs.yarch.DataType;
28
import org.yamcs.yarch.Stream;
29
import org.yamcs.yarch.StreamSubscriber;
30
import org.yamcs.yarch.Tuple;
31
import org.yamcs.yarch.TupleDefinition;
32
import org.yamcs.yarch.YarchDatabase;
33
import org.yamcs.yarch.YarchDatabaseInstance;
34

35
/**
36
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
37
 *
38
 * @author nm
39
 */
40
public class SlipStreamEncoder extends AbstractYamcsService
×
41
    implements Link, StreamSubscriber, SystemParametersProducer {
42
  private final byte END = (byte) 0xc0;
×
43
  private final byte ESC = (byte) 0xdb;
×
44
  private final byte ESC_END = (byte) 0xdc;
×
45
  private final byte ESC_ESC = (byte) 0xdd;
×
46
  RateLimiter outRateLimiter;
47
  protected YConfiguration config;
48
  protected String linkName;
49
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
50
  protected Log log;
51
  protected EventProducer eventProducer;
52
  protected TimeService timeService;
53
  protected AtomicLong inPacketCount = new AtomicLong(0);
×
54
  protected AtomicLong outPacketCount = new AtomicLong(0);
×
55
  protected boolean updateSimulationTime;
56
  DataRateMeter inPacketRateMeter = new DataRateMeter();
×
57
  DataRateMeter outPacketRateMeter = new DataRateMeter();
×
58
  DataRateMeter inDataRateMeter = new DataRateMeter();
×
59
  DataRateMeter outDataRateMeter = new DataRateMeter();
×
60
  protected PacketPreprocessor packetPreprocessor;
61
  protected Stream inStream;
62
  protected Stream outStream;
63
  protected Boolean prefaceSlipEnd;
64

65
  // FIXME:Temporary. Don't want to be exposing this packet so easily.
66
  private byte[] packet;
67

68
  static TupleDefinition gftdef;
69

70
  static final String RECTIME_CNAME = "rectime";
71
  static final String DATA_CNAME = "data";
72

73
  static {
74
    gftdef = new TupleDefinition();
×
75
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
76
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
77
  }
×
78

79
  /**
80
   * Creates a new UDP Frame Data Link
81
   *
82
   * @throws ConfigurationException if port is not defined in the configuration
83
   */
84
  public void init(String instance, String name, YConfiguration config) {
85
    try {
86
      super.init(instance, name, config);
×
87
    } catch (InitException e1) {
×
88
      // TODO Auto-generated catch block
89
      e1.printStackTrace();
×
90
    }
×
91

92
    this.linkName = name;
×
93
    this.config = config;
×
94
    log = new Log(getClass(), instance);
×
95
    log.setContext(name);
×
96
    eventProducer = EventProducerFactory.getEventProducer(instance, name, 10000);
×
97
    this.timeService = YamcsServer.getTimeService(instance);
×
98
    String inStreamName = config.getString("in_stream");
×
99
    String outStreamName = config.getString("out_stream");
×
100

101
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
102
    this.inStream = getStream(ydb, inStreamName);
×
103
    this.outStream = getStream(ydb, outStreamName);
×
104

105
    this.inStream.addSubscriber(this);
×
106

107
    if (config.containsKey("frameMaxRate")) {
×
108
      outRateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
109
    }
110

111
    prefaceSlipEnd = config.getBoolean("prefaceSlipEnd", false);
×
112

113
    updateSimulationTime = config.getBoolean("updateSimulationTime", false);
×
114
    if (updateSimulationTime) {
×
115
      if (timeService instanceof SimulationTimeService) {
×
116
        SimulationTimeService sts = (SimulationTimeService) timeService;
×
117
        sts.setTime0(0);
×
118
      } else {
×
119
        throw new ConfigurationException(
×
120
            "updateSimulationTime can only be used together with SimulationTimeService "
121
                + "(add 'timeService: org.yamcs.time.SimulationTimeService' in yamcs.<instance>.yaml)");
122
      }
123
    }
124
  }
×
125

126
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
127
    Stream stream = ydb.getStream(streamName);
×
128
    if (stream == null) {
×
129
      try {
130
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
131
        // ydb.execute("create stream " + streamName);
132
      } catch (Exception e) {
×
133
        throw new ConfigurationException(e);
×
134
      }
×
135
      stream = ydb.getStream(streamName);
×
136
    }
137
    return stream;
×
138
  }
139

140
  @Override
141
  public YConfiguration getConfig() {
142
    return config;
×
143
  }
144

145
  @Override
146
  public String getName() {
147
    return linkName;
×
148
  }
149

150
  @Override
151
  public void resetCounters() {
152
    inPacketCount.set(0);
×
153
    outPacketCount.set(0);
×
154
  }
×
155

156
  @Override
157
  public void doStart() {
158
    if (!isDisabled()) {
×
159
      // new Thread(this).start();
160
    }
161
    notifyStarted();
×
162
  }
×
163

164
  @Override
165
  public void doStop() {
166
    notifyStopped();
×
167
  }
×
168

169
  public boolean isRunningAndEnabled() {
170
    State state = state();
×
171
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
172
  }
173

174
  /**
175
   * Sends the packet downstream for processing.
176
   *
177
   * <p>Starting in Yamcs 5.2, if the updateSimulationTime option is set on the link configuration,
178
   *
179
   * <ul>
180
   *   <li>the timeService is expected to be SimulationTimeService
181
   *   <li>at initialization, the time0 is set to 0
182
   *   <li>upon each packet received, the generationTime (as set by the pre-processor) is used to
183
   *       update the simulation elapsed time
184
   * </ul>
185
   *
186
   * <p>Should be called by all sub-classes (instead of directly calling {@link
187
   * TmSink#processPacket(TmPacket)}
188
   *
189
   * @param tmpkt
190
   */
191
  protected void processPacket(TmPacket tmpkt) {
192
    long rectime = tmpkt.getReceptionTime();
×
193
    byte byteArray[] = tmpkt.getPacket();
×
194

195
    inStream.emitTuple(new Tuple(gftdef, Arrays.asList(rectime, byteArray)));
×
196

197
    if (updateSimulationTime) {
×
198
      SimulationTimeService sts = (SimulationTimeService) timeService;
×
199
      if (!tmpkt.isInvalid()) {
×
200
        sts.setSimElapsedTime(tmpkt.getGenerationTime());
×
201
      }
202
    }
203
  }
×
204

205
  /**
206
   * called when a new packet is received to update the statistics
207
   *
208
   * @param packetSize
209
   */
210
  protected void updateInStats(int packetSize) {
211
    inPacketCount.getAndIncrement();
×
212
    inPacketRateMeter.mark(1);
×
213
    inDataRateMeter.mark(packetSize);
×
214
  }
×
215

216
  /**
217
   * called when a new packet is sent to update the statistics
218
   *
219
   * @param packetSize
220
   */
221
  protected void updateOutStats(int packetSize) {
222
    outPacketCount.getAndIncrement();
×
223
    outPacketRateMeter.mark(1);
×
224
    outDataRateMeter.mark(packetSize);
×
225
  }
×
226

227
  @Override
228
  public void disable() {
229
    boolean b = disabled.getAndSet(true);
×
230
    if (!b) {
×
231
      try {
232
        /* TODO */
233
        // doDisable();
234
      } catch (Exception e) {
235
        disabled.set(false);
236
        log.warn("Failed to disable link", e);
237
      }
238
    }
239
  }
×
240

241
  /**
242
   * This implements the receiving side of RFC 1055. For more information on the standard, go to
243
   * https://datatracker.ietf.org/doc/html/rfc1055 This method is based on the snippet from RFC
244
   * 1055, Page 5.
245
   *
246
   * <p>WARNING: Do not use this code yet. It needs plenty of refactoring.
247
   */
248
  protected byte[] encodeMessage(byte[] pktData) throws IOException {
249
    ByteArrayOutputStream payload = new ByteArrayOutputStream();
×
250

251
    byte[] temp = new byte[1];
×
252

253
    if (this.prefaceSlipEnd) {
×
254
      payload.write(END);
×
255
    }
256

257
    for (byte character : pktData) {
×
258
      switch (character) {
×
259
          /* if it's the same code as an END character, we send a
260
           * special two character code so as not to make the
261
           * receiver think we sent an END
262
           */
263
        case END:
264
          temp[0] = (byte) (ESC);
×
265
          payload.write(temp[0]);
×
266
          temp[0] = (byte) (ESC_END);
×
267
          payload.write(temp[0]);
×
268
          break;
×
269

270
          /* if it's the same code as an ESC character,
271
           * we send a special two character code so as not
272
           * to make the receiver think we sent an ESC
273
           */
274
        case ESC:
275
          temp[0] = (byte) (ESC);
×
276
          payload.write(temp);
×
277
          temp[0] = (byte) ESC_ESC;
×
278
          payload.write(temp);
×
279
          break;
×
280

281
          /* otherwise, we just send the character
282
           */
283
        default:
284
          payload.write(Byte.toUnsignedInt(character));
×
285
          break;
286
      }
287
    }
288

289
    payload.write(END);
×
290

291
    return payload.toByteArray();
×
292
  }
293

294
  public String getPacket() {
295
    return StringConverter.arrayToHexString(packet);
×
296
  }
297

298
  @Override
299
  public void enable() {
300
    boolean b = disabled.getAndSet(false);
×
301
    if (b) {
×
302
      try {
303
        /* TODO */
304
        // doEnable();
305
      } catch (Exception e) {
306
        disabled.set(true);
307
        log.warn("Failed to enable link", e);
308
      }
309
    }
310
  }
×
311

312
  @Override
313
  public long getDataInCount() {
314
    return inPacketCount.get();
×
315
  }
316

317
  @Override
318
  public long getDataOutCount() {
319
    return outPacketCount.get();
×
320
  }
321

322
  @Override
323
  public Status getLinkStatus() {
324
    if (isDisabled()) {
×
325
      return Status.DISABLED;
×
326
    }
327
    if (state() == State.FAILED) {
×
328
      return Status.FAILED;
×
329
    }
330

331
    return connectionStatus();
×
332
  }
333

334
  @Override
335
  public boolean isDisabled() {
336
    return disabled.get();
×
337
  }
338

339
  protected Status connectionStatus() {
340
    return Status.OK;
×
341
  }
342

343
  @Override
344
  public Spec getSpec() {
345
    // TODO Auto-generated method stub
346
    return super.getSpec();
×
347
  }
348

349
  @Override
350
  public void onTuple(Stream arg0, Tuple tuple) {
351
    if (isRunningAndEnabled()) {
×
352

353
      byte[] packet;
354
      try {
355
        packet = encodeMessage(tuple.getColumn(DATA_CNAME));
×
356

357
        // long recTime = tuple.getColumn(PreparedCommand.CNAME_GENTIME);
358
        if (packet == null) {
×
359
          throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
360
        } else {
361
          outStream.emitTuple(
×
362
              new Tuple(gftdef, Arrays.asList(tuple.getColumn(RECTIME_CNAME), packet)));
×
363

364
          updateOutStats(packet.length);
×
365
        }
366
      } catch (IOException e) {
×
367
        // TODO Auto-generated catch block
368
        e.printStackTrace();
×
369
      }
×
370
    }
371
  }
×
372
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc