• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #162

28 Nov 2024 05:03PM UTC coverage: 0.0%. Remained the same
#162

push

lorenzo-gomez-windhover
-Updates for yamcs 5.9.8

0 of 4 new or added lines in 1 file covered. (0.0%)

614 existing lines in 11 files now uncovered.

0 of 6789 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/SlipStreamEncoder.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.io.ByteArrayOutputStream;
5
import java.io.IOException;
6
import java.util.Arrays;
7
import java.util.concurrent.TimeUnit;
8
import java.util.concurrent.atomic.AtomicBoolean;
9
import java.util.concurrent.atomic.AtomicLong;
10
import org.yamcs.AbstractYamcsService;
11
import org.yamcs.ConfigurationException;
12
import org.yamcs.InitException;
13
import org.yamcs.Spec;
14
import org.yamcs.TmPacket;
15
import org.yamcs.YConfiguration;
16
import org.yamcs.YamcsServer;
17
import org.yamcs.events.EventProducer;
18
import org.yamcs.events.EventProducerFactory;
19
import org.yamcs.logging.Log;
20
import org.yamcs.tctm.Link;
21
import org.yamcs.time.SimulationTimeService;
22
import org.yamcs.time.TimeService;
23
import org.yamcs.utils.DataRateMeter;
24
import org.yamcs.utils.StringConverter;
25
import org.yamcs.yarch.ColumnDefinition;
26
import org.yamcs.yarch.DataType;
27
import org.yamcs.yarch.Stream;
28
import org.yamcs.yarch.StreamSubscriber;
29
import org.yamcs.yarch.Tuple;
30
import org.yamcs.yarch.TupleDefinition;
31
import org.yamcs.yarch.YarchDatabase;
32
import org.yamcs.yarch.YarchDatabaseInstance;
33

34
/**
35
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
36
 *
37
 * @author nm
38
 */
UNCOV
39
public class SlipStreamEncoder extends AbstractYamcsService implements Link, StreamSubscriber {
×
40
  private final byte END = (byte) 0xc0;
×
UNCOV
41
  private final byte ESC = (byte) 0xdb;
×
42
  private final byte ESC_END = (byte) 0xdc;
×
43
  private final byte ESC_ESC = (byte) 0xdd;
×
44
  RateLimiter outRateLimiter;
45
  protected YConfiguration config;
46
  protected String linkName;
UNCOV
47
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
48
  protected Log log;
49
  protected EventProducer eventProducer;
50
  protected TimeService timeService;
UNCOV
51
  protected AtomicLong inPacketCount = new AtomicLong(0);
×
UNCOV
52
  protected AtomicLong outPacketCount = new AtomicLong(0);
×
53
  protected boolean updateSimulationTime;
54
  DataRateMeter inPacketRateMeter = new DataRateMeter();
×
UNCOV
55
  DataRateMeter outPacketRateMeter = new DataRateMeter();
×
56
  DataRateMeter inDataRateMeter = new DataRateMeter();
×
57
  DataRateMeter outDataRateMeter = new DataRateMeter();
×
58
  protected PacketPreprocessor packetPreprocessor;
59
  protected Stream inStream;
60
  protected Stream outStream;
61
  protected Boolean prefaceSlipEnd;
62

63
  // FIXME:Temporary. Don't want to be exposing this packet so easily.
64
  private byte[] packet;
65

66
  static TupleDefinition gftdef;
67

68
  static final String RECTIME_CNAME = "rectime";
69
  static final String DATA_CNAME = "data";
70

71
  static {
UNCOV
72
    gftdef = new TupleDefinition();
×
UNCOV
73
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
74
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
75
  }
×
76

77
  /**
78
   * Creates a new UDP Frame Data Link
79
   *
80
   * @throws ConfigurationException if port is not defined in the configuration
81
   */
82
  public void init(String instance, String name, YConfiguration config) {
83
    try {
UNCOV
84
      super.init(instance, name, config);
×
UNCOV
85
    } catch (InitException e1) {
×
86
      // TODO Auto-generated catch block
87
      e1.printStackTrace();
×
UNCOV
88
    }
×
89

90
    this.linkName = name;
×
UNCOV
91
    this.config = config;
×
92
    log = new Log(getClass(), instance);
×
93
    log.setContext(name);
×
94
    eventProducer = EventProducerFactory.getEventProducer(instance, name, 10000);
×
95
    this.timeService = YamcsServer.getTimeService(instance);
×
96
    String inStreamName = config.getString("in_stream");
×
97
    String outStreamName = config.getString("out_stream");
×
98

99
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
UNCOV
100
    this.inStream = getStream(ydb, inStreamName);
×
101
    this.outStream = getStream(ydb, outStreamName);
×
102

103
    this.inStream.addSubscriber(this);
×
104

105
    if (config.containsKey("frameMaxRate")) {
×
UNCOV
106
      outRateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
107
    }
108

UNCOV
109
    prefaceSlipEnd = config.getBoolean("prefaceSlipEnd", false);
×
110

111
    updateSimulationTime = config.getBoolean("updateSimulationTime", false);
×
UNCOV
112
    if (updateSimulationTime) {
×
113
      if (timeService instanceof SimulationTimeService) {
×
114
        SimulationTimeService sts = (SimulationTimeService) timeService;
×
115
        sts.setTime0(0);
×
116
      } else {
×
117
        throw new ConfigurationException(
×
118
            "updateSimulationTime can only be used together with SimulationTimeService "
119
                + "(add 'timeService: org.yamcs.time.SimulationTimeService' in yamcs.<instance>.yaml)");
120
      }
121
    }
UNCOV
122
  }
×
123

124
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
UNCOV
125
    Stream stream = ydb.getStream(streamName);
×
UNCOV
126
    if (stream == null) {
×
127
      try {
128
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
129
        // ydb.execute("create stream " + streamName);
130
      } catch (Exception e) {
×
UNCOV
131
        throw new ConfigurationException(e);
×
132
      }
×
133
      stream = ydb.getStream(streamName);
×
134
    }
135
    return stream;
×
136
  }
137

138
  @Override
139
  public YConfiguration getConfig() {
UNCOV
140
    return config;
×
141
  }
142

143
  @Override
144
  public String getName() {
UNCOV
145
    return linkName;
×
146
  }
147

148
  @Override
149
  public void resetCounters() {
UNCOV
150
    inPacketCount.set(0);
×
UNCOV
151
    outPacketCount.set(0);
×
152
  }
×
153

154
  @Override
155
  public void doStart() {
UNCOV
156
    if (!isDisabled()) {
×
157
      // new Thread(this).start();
158
    }
UNCOV
159
    notifyStarted();
×
UNCOV
160
  }
×
161

162
  @Override
163
  public void doStop() {
UNCOV
164
    notifyStopped();
×
UNCOV
165
  }
×
166

167
  public boolean isRunningAndEnabled() {
UNCOV
168
    State state = state();
×
UNCOV
169
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
170
  }
171

172
  /**
173
   * Sends the packet downstream for processing.
174
   *
175
   * <p>Starting in Yamcs 5.2, if the updateSimulationTime option is set on the link configuration,
176
   *
177
   * <ul>
178
   *   <li>the timeService is expected to be SimulationTimeService
179
   *   <li>at initialization, the time0 is set to 0
180
   *   <li>upon each packet received, the generationTime (as set by the pre-processor) is used to
181
   *       update the simulation elapsed time
182
   * </ul>
183
   *
184
   * <p>Should be called by all sub-classes (instead of directly calling {@link
185
   * TmSink#processPacket(TmPacket)}
186
   *
187
   * @param tmpkt
188
   */
189
  protected void processPacket(TmPacket tmpkt) {
UNCOV
190
    long rectime = tmpkt.getReceptionTime();
×
UNCOV
191
    byte byteArray[] = tmpkt.getPacket();
×
192

193
    inStream.emitTuple(new Tuple(gftdef, Arrays.asList(rectime, byteArray)));
×
194

195
    if (updateSimulationTime) {
×
UNCOV
196
      SimulationTimeService sts = (SimulationTimeService) timeService;
×
197
      if (!tmpkt.isInvalid()) {
×
198
        sts.setSimElapsedTime(tmpkt.getGenerationTime());
×
199
      }
200
    }
UNCOV
201
  }
×
202

203
  /**
204
   * called when a new packet is received to update the statistics
205
   *
206
   * @param packetSize
207
   */
208
  protected void updateInStats(int packetSize) {
UNCOV
209
    inPacketCount.getAndIncrement();
×
UNCOV
210
    inPacketRateMeter.mark(1);
×
211
    inDataRateMeter.mark(packetSize);
×
212
  }
×
213

214
  /**
215
   * called when a new packet is sent to update the statistics
216
   *
217
   * @param packetSize
218
   */
219
  protected void updateOutStats(int packetSize) {
UNCOV
220
    outPacketCount.getAndIncrement();
×
UNCOV
221
    outPacketRateMeter.mark(1);
×
222
    outDataRateMeter.mark(packetSize);
×
223
  }
×
224

225
  @Override
226
  public void disable() {
UNCOV
227
    boolean b = disabled.getAndSet(true);
×
UNCOV
228
    if (!b) {
×
229
      try {
230
        /* TODO */
231
        // doDisable();
232
      } catch (Exception e) {
233
        disabled.set(false);
234
        log.warn("Failed to disable link", e);
235
      }
236
    }
UNCOV
237
  }
×
238

239
  /**
240
   * This implements the receiving side of RFC 1055. For more information on the standard, go to
241
   * https://datatracker.ietf.org/doc/html/rfc1055 This method is based on the snippet from RFC
242
   * 1055, Page 5.
243
   *
244
   * <p>WARNING: Do not use this code yet. It needs plenty of refactoring.
245
   */
246
  protected byte[] encodeMessage(byte[] pktData) throws IOException {
UNCOV
247
    ByteArrayOutputStream payload = new ByteArrayOutputStream();
×
248

249
    byte[] temp = new byte[1];
×
250

251
    if (this.prefaceSlipEnd) {
×
UNCOV
252
      payload.write(END);
×
253
    }
254

UNCOV
255
    for (byte character : pktData) {
×
UNCOV
256
      switch (character) {
×
257
          /* if it's the same code as an END character, we send a
258
           * special two character code so as not to make the
259
           * receiver think we sent an END
260
           */
261
        case END:
UNCOV
262
          temp[0] = (byte) (ESC);
×
UNCOV
263
          payload.write(temp[0]);
×
264
          temp[0] = (byte) (ESC_END);
×
265
          payload.write(temp[0]);
×
266
          break;
×
267

268
          /* if it's the same code as an ESC character,
269
           * we send a special two character code so as not
270
           * to make the receiver think we sent an ESC
271
           */
272
        case ESC:
UNCOV
273
          temp[0] = (byte) (ESC);
×
UNCOV
274
          payload.write(temp);
×
275
          temp[0] = (byte) ESC_ESC;
×
276
          payload.write(temp);
×
277
          break;
×
278

279
          /* otherwise, we just send the character
280
           */
281
        default:
UNCOV
282
          payload.write(Byte.toUnsignedInt(character));
×
283
          break;
284
      }
285
    }
286

UNCOV
287
    payload.write(END);
×
288

289
    return payload.toByteArray();
×
290
  }
291

292
  public String getPacket() {
UNCOV
293
    return StringConverter.arrayToHexString(packet);
×
294
  }
295

296
  @Override
297
  public void enable() {
UNCOV
298
    boolean b = disabled.getAndSet(false);
×
UNCOV
299
    if (b) {
×
300
      try {
301
        /* TODO */
302
        // doEnable();
303
      } catch (Exception e) {
304
        disabled.set(true);
305
        log.warn("Failed to enable link", e);
306
      }
307
    }
UNCOV
308
  }
×
309

310
  @Override
311
  public long getDataInCount() {
UNCOV
312
    return inPacketCount.get();
×
313
  }
314

315
  @Override
316
  public long getDataOutCount() {
UNCOV
317
    return outPacketCount.get();
×
318
  }
319

320
  @Override
321
  public Status getLinkStatus() {
UNCOV
322
    if (isDisabled()) {
×
UNCOV
323
      return Status.DISABLED;
×
324
    }
325
    if (state() == State.FAILED) {
×
UNCOV
326
      return Status.FAILED;
×
327
    }
328

UNCOV
329
    return connectionStatus();
×
330
  }
331

332
  @Override
333
  public boolean isDisabled() {
UNCOV
334
    return disabled.get();
×
335
  }
336

337
  protected Status connectionStatus() {
UNCOV
338
    return Status.OK;
×
339
  }
340

341
  @Override
342
  public Spec getSpec() {
343
    // TODO Auto-generated method stub
UNCOV
344
    return super.getSpec();
×
345
  }
346

347
  @Override
348
  public void onTuple(Stream arg0, Tuple tuple) {
UNCOV
349
    if (isRunningAndEnabled()) {
×
350

351
      byte[] packet;
352
      try {
UNCOV
353
        packet = encodeMessage(tuple.getColumn(DATA_CNAME));
×
354

355
        // long recTime = tuple.getColumn(PreparedCommand.CNAME_GENTIME);
UNCOV
356
        if (packet == null) {
×
UNCOV
357
          throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
358
        } else {
359
          outStream.emitTuple(
×
UNCOV
360
              new Tuple(gftdef, Arrays.asList(tuple.getColumn(RECTIME_CNAME), packet)));
×
361

362
          updateOutStats(packet.length);
×
363
        }
364
      } catch (IOException e) {
×
365
        // TODO Auto-generated catch block
366
        e.printStackTrace();
×
UNCOV
367
      }
×
368
    }
369
  }
×
370
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc