• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #84

pending completion
#84

push

lorenzo-gomez-windhover
-Add StreamTmDataUdpOutLink

73 of 73 new or added lines in 1 file covered. (100.0%)

0 of 4478 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/StreamTmDataUdpOutLink.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import java.io.IOException;
4
import java.net.DatagramPacket;
5
import java.net.DatagramSocket;
6
import java.net.InetAddress;
7
import java.net.SocketException;
8
import java.net.UnknownHostException;
9
import java.util.Collection;
10
import java.util.List;
11
import java.util.concurrent.atomic.AtomicLong;
12
import org.yamcs.ConfigurationException;
13
import org.yamcs.YConfiguration;
14
import org.yamcs.events.EventProducer;
15
import org.yamcs.events.EventProducerFactory;
16
import org.yamcs.parameter.ParameterValue;
17
import org.yamcs.parameter.SystemParametersService;
18
import org.yamcs.protobuf.Yamcs.Value.Type;
19
import org.yamcs.tctm.AbstractTmDataLink;
20
import org.yamcs.tctm.ParameterDataLink;
21
import org.yamcs.tctm.ParameterSink;
22
import org.yamcs.xtce.SystemParameter;
23
import org.yamcs.yarch.ColumnDefinition;
24
import org.yamcs.yarch.DataType;
25
import org.yamcs.yarch.Stream;
26
import org.yamcs.yarch.StreamSubscriber;
27
import org.yamcs.yarch.Tuple;
28
import org.yamcs.yarch.TupleDefinition;
29
import org.yamcs.yarch.YarchDatabase;
30
import org.yamcs.yarch.YarchDatabaseInstance;
31

32
/**
33
 * Receives telemetry packets via UDP. One UDP datagram = one TM packet.
34
 *
35
 * <p>Options:
36
 *
37
 * <ul>
38
 *   <li>{@code port} - the UDP port to listen to
39
 *   <li>{@code maxLength} - the maximum length of the datagram (and thus the TM packet length +
40
 *       initialBytesToStrip). If a datagram longer than this size will be received, it will be
41
 *       truncated. Default: 1500 (bytes)
42
 *   <li>{@code initialBytesToStrip} - if configured, skip that number of bytes from the beginning
43
 *       of the datagram. Default: 0
44
 * </ul>
45
 */
46
public class StreamTmDataUdpOutLink extends AbstractTmDataLink
×
47
    implements StreamSubscriber, ParameterDataLink {
48
  private volatile int invalidDatagramCount = 0;
×
49

50
  static final int MAX_LENGTH = 1500;
51
  private int port;
52
  private String host;
53
  InetAddress address;
54
  protected Stream stream;
55
  protected Stream crcStream;
56
  static TupleDefinition gftdef;
57
  private DatagramSocket socket;
58

59
  static final String RECTIME_CNAME = "rectime";
60
  static final String DATA_CNAME = "data";
61

62
  private int checksumSuccessCount = 0;
×
63
  protected AtomicLong crcSuccessCountAtomic = new AtomicLong(0);
×
64

65
  EventProducer eventProducer =
×
66
      EventProducerFactory.getEventProducer(null, this.getClass().getSimpleName(), 10000);
×
67

68
  private SystemParameter parametercrcSucessCountParameter;
69

70
  private ParameterSink parameterSink;
71

72
  static {
73
    gftdef = new TupleDefinition();
×
74
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
75
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
76
  }
×
77

78
  /**
79
   * Creates a new UDP TM Data Link
80
   *
81
   * @throws ConfigurationException if port is not defined in the configuration
82
   */
83
  @Override
84
  public void init(String instance, String name, YConfiguration config)
85
      throws ConfigurationException {
86
    super.init(instance, name, config);
×
87
    host = config.getString("host");
×
88
    port = config.getInt("port");
×
89

90
    String streamName = config.getString("in_stream");
×
91
    this.linkName = name;
×
92

93
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
94
    this.stream = getStream(ydb, streamName);
×
95

96
    try {
97
      address = InetAddress.getByName(host);
×
98
    } catch (UnknownHostException e) {
×
99
      // TODO Auto-generated catch block
100
      e.printStackTrace();
×
101
    }
×
102

103
    this.stream.addSubscriber(this);
×
104
  }
×
105

106
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
107
    Stream stream = ydb.getStream(streamName);
×
108
    if (stream == null) {
×
109
      try {
110
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
111
        // ydb.execute("create stream " + streamName);
112
      } catch (Exception e) {
×
113
        throw new ConfigurationException(e);
×
114
      }
×
115
      stream = ydb.getStream(streamName);
×
116
    }
117
    return stream;
×
118
  }
119

120
  @Override
121
  public void doStart() {
122
    try {
123
      socket = new DatagramSocket();
×
124
    } catch (SocketException e) {
×
125
      // TODO Auto-generated catch block
126
      e.printStackTrace();
×
127
    }
×
128
    notifyStarted();
×
129
  }
×
130

131
  @Override
132
  public void doStop() {
133
    socket.close();
×
134

135
    notifyStopped();
×
136
  }
×
137

138
  /** returns statistics with the number of datagram received and the number of invalid datagrams */
139
  @Override
140
  public String getDetailedStatus() {
141
    if (isDisabled()) {
×
142
      return "DISABLED";
×
143
    } else {
144
      return String.format(
×
145
          "OK %nValid datagrams received: %d%nInvalid datagrams received: %d",
146
          packetCount.get(), invalidDatagramCount);
×
147
    }
148
  }
149

150
  /** Sets the disabled to true such that getNextPacket ignores the received datagrams */
151
  @Override
152
  public void doDisable() {}
×
153

154
  /**
155
   * Sets the disabled to false such that getNextPacket does not ignore the received datagrams
156
   *
157
   * @throws SocketException
158
   */
159
  @Override
160
  public void doEnable() {}
×
161

162
  @Override
163
  protected Status connectionStatus() {
164
    return Status.OK;
×
165
  }
166

167
  @Override
168
  public void onTuple(Stream arg0, Tuple tuple) {
169
    if (isRunningAndEnabled()) {
×
170
      byte[] pktData = tuple.getColumn(DATA_CNAME);
×
171
      if (pktData == null) {
×
172
        throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
173
      } else {
174
        DatagramPacket dtg = new DatagramPacket(pktData, pktData.length, address, port);
×
175
        try {
176
          socket.send(dtg);
×
177
          updateStats(pktData.length);
×
178
        } catch (IOException e) {
×
179
          log.warn("Error sending datagram", e);
×
180
          notifyFailed(e);
×
181
          return;
×
182
        }
×
183
      }
184
    }
185
  }
×
186

187
  @Override
188
  public void setupSystemParameters(SystemParametersService sysParamService) {
189
    super.setupSystemParameters(sysParamService);
×
190
    parametercrcSucessCountParameter =
×
191
        sysParamService.createSystemParameter(
×
192
            linkName + "/checksumSuccessCount",
193
            Type.SINT32,
194
            "Number of successful checksum checks");
195
  }
×
196

197
  @Override
198
  protected void collectSystemParameters(long time, List<ParameterValue> list) {
199
    super.collectSystemParameters(time, list);
×
200
    list.add(
×
201
        SystemParametersService.getPV(
×
202
            parametercrcSucessCountParameter, time, checksumSuccessCount));
203
  }
×
204

205
  @Override
206
  public void setParameterSink(ParameterSink parameterSink) {
207
    this.parameterSink = parameterSink;
×
208
  }
×
209

210
  protected void updateParameters(
211
      long gentime, String group, int seqNum, Collection<ParameterValue> params) {
212
    crcSuccessCountAtomic.set(checksumSuccessCount);
×
213

214
    parameterSink.updateParameters(gentime, group, seqNum, params);
×
215
  }
×
216
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc