• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #119

22 Sep 2023 10:58PM UTC coverage: 0.0%. Remained the same
#119

push

web-flow
Merge pull request #43 from WindhoverLabs/udp_fixes

Udp fixes

11 of 11 new or added lines in 1 file covered. (100.0%)

0 of 6276 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/UdpStreamOutProvider.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.io.IOException;
5
import java.net.DatagramPacket;
6
import java.net.DatagramSocket;
7
import java.net.InetAddress;
8
import java.net.SocketException;
9
import java.net.UnknownHostException;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.atomic.AtomicLong;
13
import org.yamcs.AbstractYamcsService;
14
import org.yamcs.ConfigurationException;
15
import org.yamcs.InitException;
16
import org.yamcs.Spec;
17
import org.yamcs.YConfiguration;
18
import org.yamcs.events.EventProducer;
19
import org.yamcs.events.EventProducerFactory;
20
import org.yamcs.parameter.SystemParametersProducer;
21
import org.yamcs.tctm.Link;
22
import org.yamcs.utils.DataRateMeter;
23
import org.yamcs.yarch.ColumnDefinition;
24
import org.yamcs.yarch.DataType;
25
import org.yamcs.yarch.Stream;
26
import org.yamcs.yarch.StreamSubscriber;
27
import org.yamcs.yarch.Tuple;
28
import org.yamcs.yarch.TupleDefinition;
29
import org.yamcs.yarch.YarchDatabase;
30
import org.yamcs.yarch.YarchDatabaseInstance;
31

32
/**
33
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
34
 *
35
 * @author nm
36
 */
37
public class UdpStreamOutProvider extends AbstractYamcsService
×
38
    implements Link, StreamSubscriber, SystemParametersProducer {
39
  String host;
40
  int port;
41
  DatagramSocket socket;
42
  InetAddress address;
43
  Thread thread;
44
  RateLimiter rateLimiter;
45
  protected Stream stream;
46
  protected AtomicLong packetCount = new AtomicLong(0);
×
47
  DataRateMeter packetRateMeter = new DataRateMeter();
×
48
  DataRateMeter dataRateMeter = new DataRateMeter();
×
49
  protected String linkName;
50
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
51
  private boolean receiving;
52
  private boolean enabledAtStartup = true;
×
53
  protected EventProducer eventProducer;
54

55
  static TupleDefinition gftdef;
56

57
  static final String RECTIME_CNAME = "rectime";
58
  static final String DATA_CNAME = "data";
59

60
  static {
61
    gftdef = new TupleDefinition();
×
62
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
63
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
64
  }
×
65

66
  /**
67
   * Creates a new UDP Frame Data Link
68
   *
69
   * @throws ConfigurationException if port is not defined in the configuration
70
   */
71
  public void init(String instance, String name, YConfiguration config) {
72
    try {
73
      super.init(instance, name, config);
×
74
    } catch (InitException e1) {
×
75
      // TODO Auto-generated catch block
76
      e1.printStackTrace();
×
77
    }
×
78
    host = config.getString("host");
×
79
    port = config.getInt("port");
×
80
    enabledAtStartup = config.getBoolean("enabledAtStartup");
×
81
    String streamName = config.getString("stream");
×
82
    this.linkName = name;
×
83

84
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
85
    this.stream = getStream(ydb, streamName);
×
86

87
    this.stream.addSubscriber(this);
×
88

89
    try {
90
      address = InetAddress.getByName(host);
×
91
    } catch (UnknownHostException e) {
×
92
      throw new ConfigurationException("Cannot resolve host '" + host + "'", e);
×
93
    }
×
94
    if (config.containsKey("frameMaxRate")) {
×
95
      rateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
96
    }
97
    receiving = true;
×
98

99
    eventProducer =
×
100
        EventProducerFactory.getEventProducer(instance, this.getClass().getSimpleName(), 10000);
×
101

102
    if (!enabledAtStartup) {
×
103
      disabled.getAndSet(true);
×
104
    }
105
  }
×
106

107
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
108
    Stream stream = ydb.getStream(streamName);
×
109
    if (stream == null) {
×
110
      try {
111
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
112
        // ydb.execute("create stream " + streamName);
113
      } catch (Exception e) {
×
114
        throw new ConfigurationException(e);
×
115
      }
×
116
      stream = ydb.getStream(streamName);
×
117
    }
118
    return stream;
×
119
  }
120

121
  @Override
122
  public YConfiguration getConfig() {
123
    return config;
×
124
  }
125

126
  @Override
127
  public String getName() {
128
    return linkName;
×
129
  }
130

131
  @Override
132
  public void resetCounters() {
133
    packetCount.set(0);
×
134
  }
×
135

136
  @Override
137
  public void doStart() {
138
    if (!isDisabled()) {
×
139
      try {
140
        socket = new DatagramSocket();
×
141
      } catch (SocketException e) {
×
142
        notifyFailed(e);
×
143
      }
×
144
    }
145
    notifyStarted();
×
146
  }
×
147

148
  @Override
149
  public void doStop() {
150
    socket.close();
×
151
    notifyStopped();
×
152
  }
×
153

154
  public boolean isRunningAndEnabled() {
155
    State state = state();
×
156
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
157
  }
158

159
  /**
160
   * called when a new packet is received to update the statistics
161
   *
162
   * @param packetSize
163
   */
164
  protected void updateStats(int packetSize) {
165
    packetCount.getAndIncrement();
×
166
    packetRateMeter.mark(1);
×
167
    dataRateMeter.mark(packetSize);
×
168
  }
×
169

170
  @Override
171
  public void disable() {
172
    boolean b = disabled.getAndSet(true);
×
173
    if (!b) {
×
174
      try {
175
        /* TODO */
176
        // doDisable();
177
      } catch (Exception e) {
178
        disabled.set(false);
179
        log.warn("Failed to disable link", e);
180
      }
181
    }
182
  }
×
183

184
  @Override
185
  public void enable() {
186
    /* TODO */
187
  }
×
188

189
  @Override
190
  public long getDataInCount() {
191
    return 0;
×
192
  }
193

194
  @Override
195
  public long getDataOutCount() {
196
    return packetCount.get();
×
197
  }
198

199
  @Override
200
  public Status getLinkStatus() {
201
    if (isDisabled()) {
×
202
      return Status.DISABLED;
×
203
    }
204
    if (state() == State.FAILED) {
×
205
      return Status.FAILED;
×
206
    }
207

208
    return connectionStatus();
×
209
  }
210

211
  @Override
212
  public boolean isDisabled() {
213
    return disabled.get();
×
214
  }
215

216
  protected Status connectionStatus() {
217
    return Status.OK;
×
218
  }
219

220
  @Override
221
  public void onTuple(Stream arg0, Tuple tuple) {
222
    if (isRunningAndEnabled() && receiving) {
×
223
      byte[] pktData = tuple.getColumn(DATA_CNAME);
×
224
      if (pktData == null) {
×
225
        throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
226
      } else {
227
        DatagramPacket dtg = new DatagramPacket(pktData, pktData.length, address, port);
×
228
        try {
229
          //                synchronized (socket)
230
          //                {
231
          //                socket.send(dtg);
232
          //                }
233
          if (!socket.isClosed()) {
×
234
            socket.send(dtg);
×
235
          }
236
          updateStats(pktData.length);
×
237
        } catch (IOException e) {
×
238
          eventProducer.sendInfo("Error sending datagram:" + e.toString());
×
239
          log.warn("Error sending datagram", e);
×
240
          //          notifyFailed(e);
241
          return;
×
242
        }
×
243
      }
244
    }
245
  }
×
246

247
  @Override
248
  public Spec getSpec() {
249
    // TODO Auto-generated method stub
250
    return super.getSpec();
×
251
  }
252

253
  //  Not using the regular YAMCS "doStop" since I don't think that's meant to be called by API
254
  // users
255
  //   and I don't know if it'll have any side effects.
256
  public void stopReceving() {
257
    closeSockets();
×
258
    receiving = false;
×
259
  }
×
260

261
  public void startReceving() {
262
    if (receiving) {
×
263
      eventProducer.sendInfo("Link is already receving.");
×
264
      return;
×
265
    }
266
    if (disabled.get()) {
×
267
      disabled.getAndSet(false);
×
268
    }
269
    try {
270
      socket = new DatagramSocket(port);
×
271
    } catch (SocketException e) {
×
272
      // TODO Auto-generated catch block
273
      e.printStackTrace();
×
274
    }
×
275
    receiving = true;
×
276
  }
×
277

278
  private void closeSockets() {
279
    //          synchronized (socket)
280
    //          {
281
    //            socket.close();
282
    //
283
    //          }
284
    if (socket != null) {
×
285
      socket.close();
×
286
    }
287
  }
×
288
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc