• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #109

22 Sep 2023 07:20PM UTC coverage: 0.0%. Remained the same
#109

push

web-flow
Merge pull request #40 from WindhoverLabs/udp_plugin

-Minimally functional UDP plugin.

60 of 60 new or added lines in 3 files covered. (100.0%)

0 of 6268 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/UdpStreamOutProvider.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.io.IOException;
5
import java.net.DatagramPacket;
6
import java.net.DatagramSocket;
7
import java.net.InetAddress;
8
import java.net.SocketException;
9
import java.net.UnknownHostException;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.atomic.AtomicLong;
13
import org.yamcs.AbstractYamcsService;
14
import org.yamcs.ConfigurationException;
15
import org.yamcs.InitException;
16
import org.yamcs.Spec;
17
import org.yamcs.YConfiguration;
18
import org.yamcs.events.EventProducer;
19
import org.yamcs.events.EventProducerFactory;
20
import org.yamcs.parameter.SystemParametersProducer;
21
import org.yamcs.tctm.Link;
22
import org.yamcs.utils.DataRateMeter;
23
import org.yamcs.yarch.ColumnDefinition;
24
import org.yamcs.yarch.DataType;
25
import org.yamcs.yarch.Stream;
26
import org.yamcs.yarch.StreamSubscriber;
27
import org.yamcs.yarch.Tuple;
28
import org.yamcs.yarch.TupleDefinition;
29
import org.yamcs.yarch.YarchDatabase;
30
import org.yamcs.yarch.YarchDatabaseInstance;
31

32
/**
33
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
34
 *
35
 * @author nm
36
 */
37
public class UdpStreamOutProvider extends AbstractYamcsService
×
38
    implements Link, StreamSubscriber, SystemParametersProducer {
39
  String host;
40
  int port;
41
  DatagramSocket socket;
42
  InetAddress address;
43
  Thread thread;
44
  RateLimiter rateLimiter;
45
  protected Stream stream;
46
  protected AtomicLong packetCount = new AtomicLong(0);
×
47
  DataRateMeter packetRateMeter = new DataRateMeter();
×
48
  DataRateMeter dataRateMeter = new DataRateMeter();
×
49
  protected String linkName;
50
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
51
  private boolean receiving;
52
  protected EventProducer eventProducer;
53

54
  static TupleDefinition gftdef;
55

56
  static final String RECTIME_CNAME = "rectime";
57
  static final String DATA_CNAME = "data";
58

59
  static {
60
    gftdef = new TupleDefinition();
×
61
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
62
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
63
  }
×
64

65
  /**
66
   * Creates a new UDP Frame Data Link
67
   *
68
   * @throws ConfigurationException if port is not defined in the configuration
69
   */
70
  public void init(String instance, String name, YConfiguration config) {
71
    try {
72
      super.init(instance, name, config);
×
73
    } catch (InitException e1) {
×
74
      // TODO Auto-generated catch block
75
      e1.printStackTrace();
×
76
    }
×
77
    host = config.getString("host");
×
78
    port = config.getInt("port");
×
79
    String streamName = config.getString("stream");
×
80
    this.linkName = name;
×
81

82
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
83
    this.stream = getStream(ydb, streamName);
×
84

85
    this.stream.addSubscriber(this);
×
86

87
    try {
88
      address = InetAddress.getByName(host);
×
89
    } catch (UnknownHostException e) {
×
90
      throw new ConfigurationException("Cannot resolve host '" + host + "'", e);
×
91
    }
×
92
    if (config.containsKey("frameMaxRate")) {
×
93
      rateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
94
    }
95
    receiving = true;
×
96

97
    eventProducer =
×
98
        EventProducerFactory.getEventProducer(instance, this.getClass().getSimpleName(), 10000);
×
99
  }
×
100

101
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
102
    Stream stream = ydb.getStream(streamName);
×
103
    if (stream == null) {
×
104
      try {
105
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
106
        // ydb.execute("create stream " + streamName);
107
      } catch (Exception e) {
×
108
        throw new ConfigurationException(e);
×
109
      }
×
110
      stream = ydb.getStream(streamName);
×
111
    }
112
    return stream;
×
113
  }
114

115
  @Override
116
  public YConfiguration getConfig() {
117
    return config;
×
118
  }
119

120
  @Override
121
  public String getName() {
122
    return linkName;
×
123
  }
124

125
  @Override
126
  public void resetCounters() {
127
    packetCount.set(0);
×
128
  }
×
129

130
  @Override
131
  public void doStart() {
132
    if (!isDisabled()) {
×
133
      try {
134
        socket = new DatagramSocket();
×
135
      } catch (SocketException e) {
×
136
        notifyFailed(e);
×
137
      }
×
138
    }
139
    notifyStarted();
×
140
  }
×
141

142
  @Override
143
  public void doStop() {
144
    socket.close();
×
145
    notifyStopped();
×
146
  }
×
147

148
  public boolean isRunningAndEnabled() {
149
    State state = state();
×
150
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
151
  }
152

153
  /**
154
   * called when a new packet is received to update the statistics
155
   *
156
   * @param packetSize
157
   */
158
  protected void updateStats(int packetSize) {
159
    packetCount.getAndIncrement();
×
160
    packetRateMeter.mark(1);
×
161
    dataRateMeter.mark(packetSize);
×
162
  }
×
163

164
  @Override
165
  public void disable() {
166
    boolean b = disabled.getAndSet(true);
×
167
    if (!b) {
×
168
      try {
169
        /* TODO */
170
        // doDisable();
171
      } catch (Exception e) {
172
        disabled.set(false);
173
        log.warn("Failed to disable link", e);
174
      }
175
    }
176
  }
×
177

178
  @Override
179
  public void enable() {
180
    /* TODO */
181
  }
×
182

183
  @Override
184
  public long getDataInCount() {
185
    return 0;
×
186
  }
187

188
  @Override
189
  public long getDataOutCount() {
190
    return packetCount.get();
×
191
  }
192

193
  @Override
194
  public Status getLinkStatus() {
195
    if (isDisabled()) {
×
196
      return Status.DISABLED;
×
197
    }
198
    if (state() == State.FAILED) {
×
199
      return Status.FAILED;
×
200
    }
201

202
    return connectionStatus();
×
203
  }
204

205
  @Override
206
  public boolean isDisabled() {
207
    return disabled.get();
×
208
  }
209

210
  protected Status connectionStatus() {
211
    return Status.OK;
×
212
  }
213

214
  @Override
215
  public void onTuple(Stream arg0, Tuple tuple) {
216
    if (isRunningAndEnabled() && receiving) {
×
217
      byte[] pktData = tuple.getColumn(DATA_CNAME);
×
218
      if (pktData == null) {
×
219
        throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
220
      } else {
221
        DatagramPacket dtg = new DatagramPacket(pktData, pktData.length, address, port);
×
222
        try {
223
          //                synchronized (socket)
224
          //                {
225
          //                socket.send(dtg);
226
          //                }
227

228
          socket.send(dtg);
×
229
          updateStats(pktData.length);
×
230
        } catch (IOException e) {
×
231
          log.warn("Error sending datagram", e);
×
232
          notifyFailed(e);
×
233
          return;
×
234
        }
×
235
      }
236
    }
237
  }
×
238

239
  @Override
240
  public Spec getSpec() {
241
    // TODO Auto-generated method stub
242
    return super.getSpec();
×
243
  }
244

245
  //  Not using the regular YAMCS "doStop" since I don't think that's meant to be called by API
246
  // users
247
  //   and I don't know if it'll have any side effects.
248
  public void stopReceving() {
249
    closeSockets();
×
250
    receiving = false;
×
251
  }
×
252

253
  public void startReceving() {
254
    if (receiving) {
×
255
      eventProducer.sendInfo("Link is already receving.");
×
256
      return;
×
257
    }
258
    try {
259
      socket = new DatagramSocket(port);
×
260
    } catch (SocketException e) {
×
261
      // TODO Auto-generated catch block
262
      e.printStackTrace();
×
263
    }
×
264
    receiving = true;
×
265
  }
×
266

267
  private void closeSockets() {
268
    //          synchronized (socket)
269
    //          {
270
    //            socket.close();
271
    //
272
    //          }
273
    socket.close();
×
274
  }
×
275
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc