• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #83

pending completion
#83

push

web-flow
Merge pull request #34 from WindhoverLabs/telemetry_checksum

-Fix bug in SlipStreamEncoder (missing null check).

40 of 40 new or added lines in 2 files covered. (100.0%)

0 of 4405 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/StreamTmDataLink.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import java.net.SocketException;
4
import java.util.Arrays;
5
import java.util.Collection;
6
import java.util.List;
7
import java.util.concurrent.atomic.AtomicLong;
8
import org.yamcs.ConfigurationException;
9
import org.yamcs.TmPacket;
10
import org.yamcs.YConfiguration;
11
import org.yamcs.events.EventProducer;
12
import org.yamcs.events.EventProducerFactory;
13
import org.yamcs.parameter.ParameterValue;
14
import org.yamcs.parameter.SystemParametersService;
15
import org.yamcs.protobuf.Yamcs.Value.Type;
16
import org.yamcs.tctm.AbstractTmDataLink;
17
import org.yamcs.tctm.ParameterDataLink;
18
import org.yamcs.tctm.ParameterSink;
19
import org.yamcs.xtce.SystemParameter;
20
import org.yamcs.yarch.ColumnDefinition;
21
import org.yamcs.yarch.DataType;
22
import org.yamcs.yarch.Stream;
23
import org.yamcs.yarch.StreamSubscriber;
24
import org.yamcs.yarch.Tuple;
25
import org.yamcs.yarch.TupleDefinition;
26
import org.yamcs.yarch.YarchDatabase;
27
import org.yamcs.yarch.YarchDatabaseInstance;
28

29
/**
30
 * Receives telemetry packets via UDP. One UDP datagram = one TM packet.
31
 *
32
 * <p>Options:
33
 *
34
 * <ul>
35
 *   <li>{@code port} - the UDP port to listen to
36
 *   <li>{@code maxLength} - the maximum length of the datagram (and thus the TM packet length +
37
 *       initialBytesToStrip). If a datagram longer than this size will be received, it will be
38
 *       truncated. Default: 1500 (bytes)
39
 *   <li>{@code initialBytesToStrip} - if configured, skip that number of bytes from the beginning
40
 *       of the datagram. Default: 0
41
 * </ul>
42
 */
43
public class StreamTmDataLink extends AbstractTmDataLink
×
44
    implements StreamSubscriber, ParameterDataLink {
45
  private volatile int invalidDatagramCount = 0;
×
46

47
  static final int MAX_LENGTH = 1500;
48
  int maxLength;
49
  int initialBytesToStrip;
50
  protected Stream stream;
51
  protected Stream crcStream;
52
  static TupleDefinition gftdef;
53

54
  static final String RECTIME_CNAME = "rectime";
55
  static final String DATA_CNAME = "data";
56

57
  private int checksumSuccessCount = 0;
×
58
  protected AtomicLong crcSuccessCountAtomic = new AtomicLong(0);
×
59

60
  EventProducer eventProducer =
×
61
      EventProducerFactory.getEventProducer(null, this.getClass().getSimpleName(), 10000);
×
62

63
  private SystemParameter parametercrcSucessCountParameter;
64

65
  private ParameterSink parameterSink;
66
  private boolean checkSumCheck;
67

68
  static {
69
    gftdef = new TupleDefinition();
×
70
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
71
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
72
  }
×
73

74
  /**
75
   * Creates a new UDP TM Data Link
76
   *
77
   * @throws ConfigurationException if port is not defined in the configuration
78
   */
79
  @Override
80
  public void init(String instance, String name, YConfiguration config)
81
      throws ConfigurationException {
82
    super.init(instance, name, config);
×
83
    maxLength = config.getInt("maxLength", MAX_LENGTH);
×
84
    initialBytesToStrip = config.getInt("initialBytesToStrip", 0);
×
85

86
    String streamName = config.getString("in_stream");
×
87
    checkSumCheck = config.getBoolean("checksum");
×
88
    String crcStreamName = config.getString("crc_stream");
×
89
    this.linkName = name;
×
90

91
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
92
    this.stream = getStream(ydb, streamName);
×
93
    this.crcStream = getStream(ydb, crcStreamName);
×
94

95
    this.stream.addSubscriber(this);
×
96
  }
×
97

98
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
99
    Stream stream = ydb.getStream(streamName);
×
100
    if (stream == null) {
×
101
      try {
102
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
103
        // ydb.execute("create stream " + streamName);
104
      } catch (Exception e) {
×
105
        throw new ConfigurationException(e);
×
106
      }
×
107
      stream = ydb.getStream(streamName);
×
108
    }
109
    return stream;
×
110
  }
111

112
  @Override
113
  public void doStart() {
114
    notifyStarted();
×
115
  }
×
116

117
  @Override
118
  public void doStop() {
119
    notifyStopped();
×
120
  }
×
121

122
  /** returns statistics with the number of datagram received and the number of invalid datagrams */
123
  @Override
124
  public String getDetailedStatus() {
125
    if (isDisabled()) {
×
126
      return "DISABLED";
×
127
    } else {
128
      return String.format(
×
129
          "OK %nValid datagrams received: %d%nInvalid datagrams received: %d",
130
          packetCount.get(), invalidDatagramCount);
×
131
    }
132
  }
133

134
  /** Sets the disabled to true such that getNextPacket ignores the received datagrams */
135
  @Override
136
  public void doDisable() {}
×
137

138
  /**
139
   * Sets the disabled to false such that getNextPacket does not ignore the received datagrams
140
   *
141
   * @throws SocketException
142
   */
143
  @Override
144
  public void doEnable() {}
×
145

146
  @Override
147
  protected Status connectionStatus() {
148
    return Status.OK;
×
149
  }
150

151
  @Override
152
  public void onTuple(Stream arg0, Tuple tuple) {
153
    if (isRunningAndEnabled()) {
×
154
      byte[] packet;
155
      packet = tuple.getColumn(DATA_CNAME);
×
156

157
      int trimmedPacketLength = packet.length - initialBytesToStrip;
×
158

159
      if (trimmedPacketLength <= 0) {
×
160
        log.error(
×
161
            "received datagram of size {} <= {} (initialBytesToStrip); ignored.",
162
            packet.length,
×
163
            initialBytesToStrip);
×
164
        invalidDatagramCount++;
×
165
        return;
×
166
      }
167

168
      byte[] trimmedPacket = new byte[trimmedPacketLength];
×
169
      System.arraycopy(packet, initialBytesToStrip, trimmedPacket, 0, trimmedPacketLength);
×
170

171
      byte[] trimmedSuccessPacket = new byte[trimmedPacket.length - 1];
×
172

173
      if (checkSumCheck) {
×
174
        System.arraycopy(trimmedPacket, 0, trimmedSuccessPacket, 0, trimmedPacket.length - 1);
×
175

176
        if (isCheckSumValid(trimmedPacket)) {
×
177
          checksumSuccessCount++;
×
178
        } else {
179
          crcStream.emitTuple(
×
180
              new Tuple(gftdef, Arrays.asList(timeService.getMissionTime(), trimmedPacket)));
×
181
          return;
×
182
        }
183
      } else {
184

185
      }
186

187
      TmPacket tmPacket;
188
      if (checkSumCheck) {
×
189
        updateStats(trimmedSuccessPacket.length);
×
190

191
        tmPacket = new TmPacket(timeService.getMissionTime(), trimmedSuccessPacket);
×
192
      } else {
193
        updateStats(trimmedPacket.length);
×
194
        tmPacket = new TmPacket(timeService.getMissionTime(), trimmedPacket);
×
195
      }
196
      tmPacket.setEarthRceptionTime(timeService.getHresMissionTime());
×
197
      tmPacket = packetPreprocessor.process(tmPacket);
×
198
      if (tmPacket != null) {
×
199
        processPacket(tmPacket);
×
200
      }
201
    }
202
  }
×
203

204
  public boolean isCheckSumValid(byte[] packet) {
205
    int CHECKSUM_OFFSET = packet.length - 1;
×
206

207
    int checksum = 0xFF;
×
208
    for (int i = 0; i < packet.length - 1; i++) {
×
209
      checksum = checksum ^ packet[i];
×
210
    }
211

212
    return (packet[CHECKSUM_OFFSET] & 0xFF) == (checksum & 0xFF) || true;
×
213
  }
214

215
  @Override
216
  public void setupSystemParameters(SystemParametersService sysParamService) {
217
    super.setupSystemParameters(sysParamService);
×
218
    parametercrcSucessCountParameter =
×
219
        sysParamService.createSystemParameter(
×
220
            linkName + "/checksumSuccessCount",
221
            Type.SINT32,
222
            "Number of successful checksum checks");
223
  }
×
224

225
  @Override
226
  protected void collectSystemParameters(long time, List<ParameterValue> list) {
227
    super.collectSystemParameters(time, list);
×
228
    list.add(
×
229
        SystemParametersService.getPV(
×
230
            parametercrcSucessCountParameter, time, checksumSuccessCount));
231
  }
×
232

233
  @Override
234
  public void setParameterSink(ParameterSink parameterSink) {
235
    this.parameterSink = parameterSink;
×
236
  }
×
237

238
  protected void updateParameters(
239
      long gentime, String group, int seqNum, Collection<ParameterValue> params) {
240
    crcSuccessCountAtomic.set(checksumSuccessCount);
×
241

242
    parameterSink.updateParameters(gentime, group, seqNum, params);
×
243
  }
×
244
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc