• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #162

28 Nov 2024 05:03PM UTC coverage: 0.0%. Remained the same
#162

push

lorenzo-gomez-windhover
-Updates for yamcs 5.9.8

0 of 4 new or added lines in 1 file covered. (0.0%)

614 existing lines in 11 files now uncovered.

0 of 6789 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/IpUdpWrapperStream.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.util.Arrays;
5
import java.util.concurrent.TimeUnit;
6
import java.util.concurrent.atomic.AtomicBoolean;
7
import java.util.concurrent.atomic.AtomicLong;
8
import org.yamcs.AbstractYamcsService;
9
import org.yamcs.ConfigurationException;
10
import org.yamcs.InitException;
11
import org.yamcs.Spec;
12
import org.yamcs.TmPacket;
13
import org.yamcs.YConfiguration;
14
import org.yamcs.YamcsServer;
15
import org.yamcs.events.EventProducer;
16
import org.yamcs.events.EventProducerFactory;
17
import org.yamcs.logging.Log;
18
import org.yamcs.tctm.Link;
19
import org.yamcs.time.SimulationTimeService;
20
import org.yamcs.time.TimeService;
21
import org.yamcs.utils.DataRateMeter;
22
import org.yamcs.utils.StringConverter;
23
import org.yamcs.yarch.ColumnDefinition;
24
import org.yamcs.yarch.DataType;
25
import org.yamcs.yarch.Stream;
26
import org.yamcs.yarch.StreamSubscriber;
27
import org.yamcs.yarch.Tuple;
28
import org.yamcs.yarch.TupleDefinition;
29
import org.yamcs.yarch.YarchDatabase;
30
import org.yamcs.yarch.YarchDatabaseInstance;
31

32
/**
33
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
34
 *
35
 * @author nm
36
 */
UNCOV
37
public class IpUdpWrapperStream extends AbstractYamcsService implements Link, StreamSubscriber {
×
38
  RateLimiter outRateLimiter;
39
  protected YConfiguration config;
40
  protected String linkName;
UNCOV
41
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
42
  protected Log log;
43
  protected EventProducer eventProducer;
44
  protected TimeService timeService;
UNCOV
45
  protected AtomicLong inPacketCount = new AtomicLong(0);
×
UNCOV
46
  protected AtomicLong outPacketCount = new AtomicLong(0);
×
47
  protected boolean updateSimulationTime;
48
  DataRateMeter inPacketRateMeter = new DataRateMeter();
×
UNCOV
49
  DataRateMeter outPacketRateMeter = new DataRateMeter();
×
50
  DataRateMeter inDataRateMeter = new DataRateMeter();
×
51
  DataRateMeter outDataRateMeter = new DataRateMeter();
×
52
  protected PacketPreprocessor packetPreprocessor;
53
  protected Stream inStream;
54
  protected Stream outStream;
55
  int ipIdentification;
UNCOV
56
  String srcAddress = "192.168.1.55";
×
UNCOV
57
  String dstAddress = "192.168.3.2";
×
58
  int srcPort = 42001;
×
59
  int dstPort = 42000;
×
60
  int ttl = 64;
×
61

62
  // FIXME:Temporary. Don't want to be exposing this packet so easily.
63
  private byte[] packet;
64

65
  static TupleDefinition gftdef;
66

67
  static final String RECTIME_CNAME = "rectime";
68
  static final String DATA_CNAME = "data";
69

70
  static {
UNCOV
71
    gftdef = new TupleDefinition();
×
UNCOV
72
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
73
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
74
  }
×
75

76
  /**
77
   * Creates a new UDP Frame Data Link
78
   *
79
   * @throws ConfigurationException if port is not defined in the configuration
80
   */
81
  public void init(String instance, String name, YConfiguration config) {
82
    try {
UNCOV
83
      super.init(instance, name, config);
×
UNCOV
84
    } catch (InitException e1) {
×
85
      // TODO Auto-generated catch block
86
      e1.printStackTrace();
×
UNCOV
87
    }
×
88

89
    this.ipIdentification = 1;
×
UNCOV
90
    this.linkName = name;
×
91
    this.config = config;
×
92
    log = new Log(getClass(), instance);
×
93
    log.setContext(name);
×
94
    eventProducer = EventProducerFactory.getEventProducer(instance, name, 10000);
×
95
    this.timeService = YamcsServer.getTimeService(instance);
×
96
    String inStreamName = config.getString("in_stream");
×
97
    String outStreamName = config.getString("out_stream");
×
98

99
    srcAddress = config.getString("srcAddress", "127.0.0.1");
×
UNCOV
100
    dstAddress = config.getString("dstAddress", "127.0.0.1");
×
101
    srcPort = config.getInt("srcPort");
×
102
    if (srcPort > 0xffff) {
×
103
      throw new ConfigurationException("Source Port must be less than 65536");
×
104
    }
105
    dstPort = config.getInt("dstPort");
×
UNCOV
106
    if (dstPort > 0xffff) {
×
107
      throw new ConfigurationException("Destination Port must be less than 65536");
×
108
    }
109
    ttl = config.getInt("ttl", 64);
×
UNCOV
110
    if (ttl > 0xff) {
×
111
      throw new ConfigurationException("TTL must be less than 256");
×
112
    }
113

UNCOV
114
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
UNCOV
115
    this.inStream = getStream(ydb, inStreamName);
×
116
    this.outStream = getStream(ydb, outStreamName);
×
117

118
    this.inStream.addSubscriber(this);
×
119

120
    if (config.containsKey("frameMaxRate")) {
×
UNCOV
121
      outRateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
122
    }
123

UNCOV
124
    updateSimulationTime = config.getBoolean("updateSimulationTime", false);
×
UNCOV
125
    if (updateSimulationTime) {
×
126
      if (timeService instanceof SimulationTimeService) {
×
127
        SimulationTimeService sts = (SimulationTimeService) timeService;
×
128
        sts.setTime0(0);
×
129
      } else {
×
130
        throw new ConfigurationException(
×
131
            "updateSimulationTime can only be used together with SimulationTimeService "
132
                + "(add 'timeService: org.yamcs.time.SimulationTimeService' in yamcs.<instance>.yaml)");
133
      }
134
    }
UNCOV
135
  }
×
136

137
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
UNCOV
138
    Stream stream = ydb.getStream(streamName);
×
UNCOV
139
    if (stream == null) {
×
140
      try {
141
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
142
        // ydb.execute("create stream " + streamName);
143
      } catch (Exception e) {
×
UNCOV
144
        throw new ConfigurationException(e);
×
145
      }
×
146
      stream = ydb.getStream(streamName);
×
147
    }
148
    return stream;
×
149
  }
150

151
  @Override
152
  public YConfiguration getConfig() {
UNCOV
153
    return config;
×
154
  }
155

156
  @Override
157
  public String getName() {
UNCOV
158
    return linkName;
×
159
  }
160

161
  @Override
162
  public void resetCounters() {
UNCOV
163
    inPacketCount.set(0);
×
UNCOV
164
    outPacketCount.set(0);
×
165
  }
×
166

167
  @Override
168
  public void doStart() {
UNCOV
169
    if (!isDisabled()) {
×
170
      // new Thread(this).start();
171
    }
UNCOV
172
    notifyStarted();
×
UNCOV
173
  }
×
174

175
  @Override
176
  public void doStop() {
UNCOV
177
    notifyStopped();
×
UNCOV
178
  }
×
179

180
  public boolean isRunningAndEnabled() {
UNCOV
181
    State state = state();
×
UNCOV
182
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
183
  }
184

185
  /**
186
   * Sends the packet downstream for processing.
187
   *
188
   * <p>Starting in Yamcs 5.2, if the updateSimulationTime option is set on the link configuration,
189
   *
190
   * <ul>
191
   *   <li>the timeService is expected to be SimulationTimeService
192
   *   <li>at initialization, the time0 is set to 0
193
   *   <li>upon each packet received, the generationTime (as set by the pre-processor) is used to
194
   *       update the simulation elapsed time
195
   * </ul>
196
   *
197
   * <p>Should be called by all sub-classes (instead of directly calling {@link
198
   * TmSink#processPacket(TmPacket)}
199
   *
200
   * @param tmpkt
201
   */
202
  protected void processPacket(TmPacket tmpkt) {
UNCOV
203
    long rectime = tmpkt.getReceptionTime();
×
UNCOV
204
    byte byteArray[] = tmpkt.getPacket();
×
205

206
    inStream.emitTuple(new Tuple(gftdef, Arrays.asList(rectime, byteArray)));
×
207

208
    if (updateSimulationTime) {
×
UNCOV
209
      SimulationTimeService sts = (SimulationTimeService) timeService;
×
210
      if (!tmpkt.isInvalid()) {
×
211
        sts.setSimElapsedTime(tmpkt.getGenerationTime());
×
212
      }
213
    }
UNCOV
214
  }
×
215

216
  /**
217
   * called when a new packet is received to update the statistics
218
   *
219
   * @param packetSize
220
   */
221
  protected void updateInStats(int packetSize) {
UNCOV
222
    inPacketCount.getAndIncrement();
×
UNCOV
223
    inPacketRateMeter.mark(1);
×
224
    inDataRateMeter.mark(packetSize);
×
225
  }
×
226

227
  /**
228
   * called when a new packet is sent to update the statistics
229
   *
230
   * @param packetSize
231
   */
232
  protected void updateOutStats(int packetSize) {
UNCOV
233
    outPacketCount.getAndIncrement();
×
UNCOV
234
    outPacketRateMeter.mark(1);
×
235
    outDataRateMeter.mark(packetSize);
×
236
  }
×
237

238
  @Override
239
  public void disable() {
UNCOV
240
    boolean b = disabled.getAndSet(true);
×
UNCOV
241
    if (!b) {
×
242
      try {
243
        /* TODO */
244
        // doDisable();
245
      } catch (Exception e) {
246
        disabled.set(false);
247
        log.warn("Failed to disable link", e);
248
      }
249
    }
UNCOV
250
  }
×
251

252
  public static byte[] genUdpPacket(
253
      byte[] payload,
254
      int identification,
255
      String srcAddress,
256
      String dstAddress,
257
      int srcPort,
258
      int dstPort,
259
      int TTL) {
260
    /* The total length is the size of the IP header (20 bytes) + UDP header (8 bytes) + payload size */
UNCOV
261
    int totalLength = 20 + 8 + payload.length;
×
UNCOV
262
    byte[] packet = new byte[totalLength];
×
263

264
    /* NOTE: The IP protocol numbers bits left to right, not right to left, so the comments
265
    below are following the same ordering. */
266

267
    /* Version ID - Word 0 bits 0-3 (byte 0, bits 0-3 ... so shift left 4 */
268
    /* This is IPv4 so its always 4.  This is NOT configurable. */
UNCOV
269
    packet[0] = (byte) (4 << 4);
×
270

271
    /* Set IHL - Word 0 bits 4-7 (byte 0, bits 4-7 ... so NO shift
272
     * This is number of 32 bit words. If the options flag is not set, there is no options
273
     * field and the header will be 5 32 bit words in size.  We are not including options, so
274
     * set this to 5. */
UNCOV
275
    packet[0] = (byte) (packet[0] | 5);
×
276

277
    /* Differentiated Services Field.  Word 0 bits 8-15. (byte 1)  We aren't doing any thing
278
     * crazy here. Just set this to zero.
279
     */
UNCOV
280
    packet[1] = 0;
×
281

282
    /* Total Length - Word 0 bits 16-31 (bytes 2-3)
283
     * The total length is the size of the IP header (20 bytes) + UDP header (8 bytes) + payload
284
     */
UNCOV
285
    packet[2] = getHighByteFromInt(totalLength);
×
UNCOV
286
    packet[3] = getLowByteFromInt(totalLength);
×
287

288
    /* Identification.  Word 1 bits 0-15 (bytes 4-5)
289
     */
UNCOV
290
    packet[4] = getHighByteFromInt(identification);
×
UNCOV
291
    packet[5] = getLowByteFromInt(identification);
×
292

293
    /* Flags and fragment offset.  Word 1 bits 16 - 31 (bytes 6-7)
294
     * We're just going to set the "Don't Fragment" bit set because reasons.  This equates to
295
     * 0x4000
296
     */
UNCOV
297
    packet[6] = getHighByteFromInt(0x0000);
×
UNCOV
298
    packet[7] = getLowByteFromInt(0x0000);
×
299

300
    /* TTL. Word 2 bits 0-7 (byte 8) */
UNCOV
301
    packet[8] = getLowByteFromInt(TTL);
×
302

303
    /* Protocol. Word 2 bits 8-15 (byte 9)
304
     * Obviously, we're going to set this to UDP. "UDP" is 17.
305
     */
UNCOV
306
    packet[9] = 17;
×
307

308
    /* Checksum. Word 2 bits 16-31 (bytes 10-11)
309
     * We don't have the packet populated yet, so we can't calculate the checksum just yet. Set
310
     * this to zero for now.
311
     */
UNCOV
312
    packet[10] = 0;
×
UNCOV
313
    packet[11] = 0;
×
314

315
    /* Source IP address. Word 3 bits 0-31 (bytes 12-15)
316
     * First we need to convert the string to a byte array.  Start with splitting the string with "." as
317
     * the token. Then convert each array element to an integer.
318
     */
UNCOV
319
    String[] splitSrcAddress = srcAddress.split("\\.");
×
UNCOV
320
    packet[12] = (byte) (Integer.parseInt(splitSrcAddress[0]));
×
321
    packet[13] = (byte) (Integer.parseInt(splitSrcAddress[1]));
×
322
    packet[14] = (byte) (Integer.parseInt(splitSrcAddress[2]));
×
323
    packet[15] = (byte) (Integer.parseInt(splitSrcAddress[3]));
×
324

325
    /* Destination IP address. Word 4 bits 0-31 (bytes 16-19)
326
     * First we need to convert the string to a byte array.  Start with splitting the string with "." as
327
     * the token. Then convert each array element to an integer.
328
     */
UNCOV
329
    String[] splitDstAddress = dstAddress.split("\\.");
×
UNCOV
330
    packet[16] = (byte) (Integer.parseInt(splitDstAddress[0]));
×
331
    packet[17] = (byte) (Integer.parseInt(splitDstAddress[1]));
×
332
    packet[18] = (byte) (Integer.parseInt(splitDstAddress[2]));
×
333
    packet[19] = (byte) (Integer.parseInt(splitDstAddress[3]));
×
334

335
    /* Now we can populate the checksum. First calculate it. */
UNCOV
336
    int checksum = 0;
×
UNCOV
337
    for (int i = 0; i < 20; i = i + 2) {
×
338
      int value = convertBytesToInt(packet[i], packet[i + 1]);
×
339
      checksum = addWithCarry(checksum, value);
×
340
    }
341
    /* Take the ones complement  */
UNCOV
342
    checksum = ~checksum;
×
343
    /* Store the IP checksum. */
344
    packet[10] = getHighByteFromInt(checksum);
×
UNCOV
345
    packet[11] = getLowByteFromInt(checksum);
×
346

347
    /* UDP */
348
    /* Source Port */
UNCOV
349
    packet[20] = getHighByteFromInt(srcPort);
×
UNCOV
350
    packet[21] = getLowByteFromInt(srcPort);
×
351

352
    /* Destination Port */
UNCOV
353
    packet[22] = getHighByteFromInt(dstPort);
×
UNCOV
354
    packet[23] = getLowByteFromInt(dstPort);
×
355

356
    /* Length */
UNCOV
357
    int udpLength = 8 + payload.length;
×
UNCOV
358
    packet[24] = getHighByteFromInt(udpLength);
×
359
    packet[25] = getLowByteFromInt(udpLength);
×
360

361
    /* Store checksum temporarily */
UNCOV
362
    packet[26] = 0;
×
UNCOV
363
    packet[27] = 0;
×
364

365
    /* Copy payload */
UNCOV
366
    for (int i = 0; i < payload.length; ++i) {
×
UNCOV
367
      packet[28 + i] = payload[i];
×
368
    }
369

370
    /* UDP Checksum
371
     * Start by clearing the checksum
372
     */
UNCOV
373
    checksum = 0;
×
374

375
    /* Now sum the 'pseudo header' starting with the protocol field. */
UNCOV
376
    checksum = addWithCarry(checksum, 0x0011);
×
377

378
    /* Add in the UDP Length field. */
UNCOV
379
    checksum = addWithCarry(checksum, udpLength);
×
380

381
    /* Now add in the UDP header and the payload. */
382
    int i;
UNCOV
383
    for (i = 12; i < (totalLength - 1); i = i + 2) {
×
UNCOV
384
      int value = convertBytesToInt(packet[i], packet[i + 1]);
×
385
      checksum = addWithCarry(checksum, value);
×
386
    }
387
    /* Check for an odd length message.  If the total length is an odd number we have one
388
     * last byte to add in.  The spec states that when this happens, to just add the
389
     * remaining byte as is.  No padding to make it 16 bits.
390
     */
UNCOV
391
    if (i < totalLength) {
×
UNCOV
392
      checksum = addWithCarry(checksum, packet[i] << 16);
×
393
    }
394

395
    /* Get the ones complement */
UNCOV
396
    checksum = ~checksum;
×
UNCOV
397
    packet[26] = getHighByteFromInt(checksum);
×
398
    packet[27] = getLowByteFromInt(checksum);
×
399

400
    return packet;
×
401
  }
402

403
  public static byte getHighByteFromInt(int intValue) {
404
    byte result;
405

406
    /* Get the high byte from the network order integer. */
UNCOV
407
    result = (byte) ((intValue & 0x0000ff00) >> 8);
×
408

409
    return result;
×
410
  }
411

412
  public static byte getLowByteFromInt(int intValue) {
413
    byte result;
414

415
    /* Get the low byte from the network order integer. */
UNCOV
416
    result = (byte) (intValue & 0x000000ff);
×
417

418
    return result;
×
419
  }
420

421
  public static int convertBytesToInt(byte HI, byte LO) {
422
    int result;
423

UNCOV
424
    result = ((int) (HI << 8) & 0x0000ff00) | (int) (LO & 0x000000ff);
×
425

426
    return result;
×
427
  }
428

429
  public static int addWithCarry(int A, int B) {
430
    int result;
431

432
    /* Add the two values together as 32 bit integers.  We need to do this so we
433
     * can get the carry digit next.
434
     */
UNCOV
435
    int temp = (A & 0x0000ffff) + (B & 0x0000ffff);
×
436

437
    /* Get the carry digit */
UNCOV
438
    int carry = ((temp & 0x00010000) >>> 16);
×
439

440
    /* Add the carry digit to the summed result. */
UNCOV
441
    result = (temp + carry) & 0x0000ffff;
×
442

443
    return result;
×
444
  }
445

446
  public static int addWithCarry(int A, byte B) {
447
    int result;
448

449
    /* Add the two values together as 32 bit integers.  We need to do this so we
450
     * can get the carry digit next.
451
     */
UNCOV
452
    int temp = (A & 0x0000ffff) + (B & 0x000000ff);
×
453

454
    /* Get the carry digit */
UNCOV
455
    int carry = ((temp & 0xffff0000) >>> 16);
×
456

457
    /* Add the carry digit to the summed result. */
UNCOV
458
    result = (temp + carry) & 0x0000ffff;
×
459

460
    return result;
×
461
  }
462

463
  public String getPacket() {
UNCOV
464
    return StringConverter.arrayToHexString(packet);
×
465
  }
466

467
  @Override
468
  public void enable() {
UNCOV
469
    boolean b = disabled.getAndSet(false);
×
UNCOV
470
    if (b) {
×
471
      try {
472
        /* TODO */
473
        // doEnable();
474
      } catch (Exception e) {
475
        disabled.set(true);
476
        log.warn("Failed to enable link", e);
477
      }
478
    }
UNCOV
479
  }
×
480

481
  @Override
482
  public long getDataInCount() {
UNCOV
483
    return inPacketCount.get();
×
484
  }
485

486
  @Override
487
  public long getDataOutCount() {
UNCOV
488
    return outPacketCount.get();
×
489
  }
490

491
  @Override
492
  public Status getLinkStatus() {
UNCOV
493
    if (isDisabled()) {
×
UNCOV
494
      return Status.DISABLED;
×
495
    }
496
    if (state() == State.FAILED) {
×
UNCOV
497
      return Status.FAILED;
×
498
    }
499

UNCOV
500
    return connectionStatus();
×
501
  }
502

503
  @Override
504
  public boolean isDisabled() {
UNCOV
505
    return disabled.get();
×
506
  }
507

508
  protected Status connectionStatus() {
UNCOV
509
    return Status.OK;
×
510
  }
511

512
  @Override
513
  public Spec getSpec() {
514
    // TODO Auto-generated method stub
UNCOV
515
    return super.getSpec();
×
516
  }
517

518
  @Override
519
  public void onTuple(Stream arg0, Tuple tuple) {
UNCOV
520
    if (isRunningAndEnabled()) {
×
521

522
      byte[] packet;
523

UNCOV
524
      this.ipIdentification++;
×
UNCOV
525
      packet =
×
526
          genUdpPacket(
×
527
              tuple.getColumn(DATA_CNAME),
×
528
              this.ipIdentification,
529
              this.srcAddress,
530
              this.dstAddress,
531
              this.srcPort,
532
              this.dstPort,
533
              this.ttl);
534

535
      // long recTime = tuple.getColumn(PreparedCommand.CNAME_GENTIME);
UNCOV
536
      if (packet == null) {
×
UNCOV
537
        throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
538
      } else {
539
        outStream.emitTuple(
×
UNCOV
540
            new Tuple(gftdef, Arrays.asList(tuple.getColumn(RECTIME_CNAME), packet)));
×
541

542
        updateOutStats(packet.length);
×
543
      }
544
    }
UNCOV
545
  }
×
546
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc