• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #162

28 Nov 2024 05:03PM UTC coverage: 0.0%. Remained the same
#162

push

lorenzo-gomez-windhover
-Updates for yamcs 5.9.8

0 of 4 new or added lines in 1 file covered. (0.0%)

614 existing lines in 11 files now uncovered.

0 of 6789 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/tctm/UdpStreamOutProvider.java
1
package com.windhoverlabs.yamcs.tctm;
2

3
import com.google.common.util.concurrent.RateLimiter;
4
import java.io.IOException;
5
import java.net.DatagramPacket;
6
import java.net.DatagramSocket;
7
import java.net.InetAddress;
8
import java.net.SocketException;
9
import java.net.UnknownHostException;
10
import java.util.concurrent.TimeUnit;
11
import java.util.concurrent.atomic.AtomicBoolean;
12
import java.util.concurrent.atomic.AtomicLong;
13
import org.yamcs.AbstractYamcsService;
14
import org.yamcs.ConfigurationException;
15
import org.yamcs.InitException;
16
import org.yamcs.Spec;
17
import org.yamcs.YConfiguration;
18
import org.yamcs.events.EventProducer;
19
import org.yamcs.events.EventProducerFactory;
20
import org.yamcs.tctm.Link;
21
import org.yamcs.utils.DataRateMeter;
22
import org.yamcs.yarch.ColumnDefinition;
23
import org.yamcs.yarch.DataType;
24
import org.yamcs.yarch.Stream;
25
import org.yamcs.yarch.StreamSubscriber;
26
import org.yamcs.yarch.Tuple;
27
import org.yamcs.yarch.TupleDefinition;
28
import org.yamcs.yarch.YarchDatabase;
29
import org.yamcs.yarch.YarchDatabaseInstance;
30

31
/**
32
 * Receives telemetry fames via UDP. One UDP datagram = one TM frame.
33
 *
34
 * @author nm
35
 */
UNCOV
36
public class UdpStreamOutProvider extends AbstractYamcsService implements Link, StreamSubscriber {
×
37
  String host;
38
  int port;
39
  DatagramSocket socket;
40
  InetAddress address;
41
  Thread thread;
42
  RateLimiter rateLimiter;
43
  protected Stream stream;
UNCOV
44
  protected AtomicLong packetCount = new AtomicLong(0);
×
UNCOV
45
  DataRateMeter packetRateMeter = new DataRateMeter();
×
46
  DataRateMeter dataRateMeter = new DataRateMeter();
×
47
  protected String linkName;
48
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
49
  private boolean receiving;
50
  private boolean enabledAtStartup = true;
×
51
  protected EventProducer eventProducer;
52

53
  static TupleDefinition gftdef;
54

55
  static final String RECTIME_CNAME = "rectime";
56
  static final String DATA_CNAME = "data";
57

58
  static {
UNCOV
59
    gftdef = new TupleDefinition();
×
UNCOV
60
    gftdef.addColumn(new ColumnDefinition(RECTIME_CNAME, DataType.TIMESTAMP));
×
61
    gftdef.addColumn(new ColumnDefinition(DATA_CNAME, DataType.BINARY));
×
62
  }
×
63

64
  /**
65
   * Creates a new UDP Frame Data Link
66
   *
67
   * @throws ConfigurationException if port is not defined in the configuration
68
   */
69
  public void init(String instance, String name, YConfiguration config) {
70
    try {
UNCOV
71
      super.init(instance, name, config);
×
UNCOV
72
    } catch (InitException e1) {
×
73
      // TODO Auto-generated catch block
74
      e1.printStackTrace();
×
UNCOV
75
    }
×
76
    host = config.getString("host");
×
77
    port = config.getInt("port");
×
78
    enabledAtStartup = config.getBoolean("enabledAtStartup");
×
79
    String streamName = config.getString("stream");
×
80
    this.linkName = name;
×
81

82
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(instance);
×
UNCOV
83
    this.stream = getStream(ydb, streamName);
×
84

85
    this.stream.addSubscriber(this);
×
86

87
    try {
UNCOV
88
      address = InetAddress.getByName(host);
×
UNCOV
89
    } catch (UnknownHostException e) {
×
90
      throw new ConfigurationException("Cannot resolve host '" + host + "'", e);
×
91
    }
×
92
    if (config.containsKey("frameMaxRate")) {
×
93
      rateLimiter = RateLimiter.create(config.getDouble("frameMaxRate"), 1, TimeUnit.SECONDS);
×
94
    }
95
    receiving = true;
×
96

97
    eventProducer =
×
UNCOV
98
        EventProducerFactory.getEventProducer(instance, this.getClass().getSimpleName(), 10000);
×
99

100
    if (!enabledAtStartup) {
×
UNCOV
101
      disabled.getAndSet(true);
×
102
    }
103
  }
×
104

105
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
UNCOV
106
    Stream stream = ydb.getStream(streamName);
×
UNCOV
107
    if (stream == null) {
×
108
      try {
109
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
110
        // ydb.execute("create stream " + streamName);
111
      } catch (Exception e) {
×
UNCOV
112
        throw new ConfigurationException(e);
×
113
      }
×
114
      stream = ydb.getStream(streamName);
×
115
    }
116
    return stream;
×
117
  }
118

119
  @Override
120
  public YConfiguration getConfig() {
UNCOV
121
    return config;
×
122
  }
123

124
  @Override
125
  public String getName() {
UNCOV
126
    return linkName;
×
127
  }
128

129
  @Override
130
  public void resetCounters() {
UNCOV
131
    packetCount.set(0);
×
UNCOV
132
  }
×
133

134
  @Override
135
  public void doStart() {
UNCOV
136
    if (!isDisabled()) {
×
137
      try {
138
        socket = new DatagramSocket();
×
UNCOV
139
      } catch (SocketException e) {
×
140
        notifyFailed(e);
×
141
      }
×
142
    }
143
    notifyStarted();
×
UNCOV
144
  }
×
145

146
  @Override
147
  public void doStop() {
UNCOV
148
    socket.close();
×
UNCOV
149
    notifyStopped();
×
150
  }
×
151

152
  public boolean isRunningAndEnabled() {
UNCOV
153
    State state = state();
×
UNCOV
154
    return (state == State.RUNNING || state == State.STARTING) && !disabled.get();
×
155
  }
156

157
  /**
158
   * called when a new packet is received to update the statistics
159
   *
160
   * @param packetSize
161
   */
162
  protected void updateStats(int packetSize) {
UNCOV
163
    packetCount.getAndIncrement();
×
UNCOV
164
    packetRateMeter.mark(1);
×
165
    dataRateMeter.mark(packetSize);
×
166
  }
×
167

168
  @Override
169
  public void disable() {
UNCOV
170
    boolean b = disabled.getAndSet(true);
×
UNCOV
171
    if (!b) {
×
172
      try {
173
        /* TODO */
174
        // doDisable();
175
      } catch (Exception e) {
176
        disabled.set(false);
177
        log.warn("Failed to disable link", e);
178
      }
179
    }
UNCOV
180
  }
×
181

182
  @Override
183
  public void enable() {
184
    /* TODO */
UNCOV
185
  }
×
186

187
  @Override
188
  public long getDataInCount() {
UNCOV
189
    return 0;
×
190
  }
191

192
  @Override
193
  public long getDataOutCount() {
UNCOV
194
    return packetCount.get();
×
195
  }
196

197
  @Override
198
  public Status getLinkStatus() {
UNCOV
199
    if (isDisabled()) {
×
UNCOV
200
      return Status.DISABLED;
×
201
    }
202
    if (state() == State.FAILED) {
×
UNCOV
203
      return Status.FAILED;
×
204
    }
205

UNCOV
206
    return connectionStatus();
×
207
  }
208

209
  @Override
210
  public boolean isDisabled() {
UNCOV
211
    return disabled.get();
×
212
  }
213

214
  protected Status connectionStatus() {
UNCOV
215
    return Status.OK;
×
216
  }
217

218
  @Override
219
  public void onTuple(Stream arg0, Tuple tuple) {
UNCOV
220
    if (isRunningAndEnabled() && receiving) {
×
UNCOV
221
      byte[] pktData = tuple.getColumn(DATA_CNAME);
×
222
      if (pktData == null) {
×
223
        throw new ConfigurationException("no column named '%s' in the tuple", DATA_CNAME);
×
224
      } else {
225
        DatagramPacket dtg = new DatagramPacket(pktData, pktData.length, address, port);
×
226
        try {
227
          //                synchronized (socket)
228
          //                {
229
          //                socket.send(dtg);
230
          //                }
UNCOV
231
          if (!socket.isClosed()) {
×
UNCOV
232
            socket.send(dtg);
×
233
          }
234
          updateStats(pktData.length);
×
UNCOV
235
        } catch (IOException e) {
×
236
          eventProducer.sendInfo("Error sending datagram:" + e.toString());
×
237
          log.warn("Error sending datagram", e);
×
238
          //          notifyFailed(e);
239
          return;
×
UNCOV
240
        }
×
241
      }
242
    }
UNCOV
243
  }
×
244

245
  @Override
246
  public Spec getSpec() {
247
    // TODO Auto-generated method stub
UNCOV
248
    return super.getSpec();
×
249
  }
250

251
  //  Not using the regular YAMCS "doStop" since I don't think that's meant to be called by API
252
  // users
253
  //   and I don't know if it'll have any side effects.
254
  public void stopReceving() {
UNCOV
255
    closeSockets();
×
UNCOV
256
    receiving = false;
×
257
  }
×
258

259
  public void startReceving() {
UNCOV
260
    if (receiving) {
×
UNCOV
261
      eventProducer.sendInfo("Link is already receving.");
×
262
      return;
×
263
    }
264
    if (disabled.get()) {
×
UNCOV
265
      disabled.getAndSet(false);
×
266
    }
267
    try {
UNCOV
268
      socket = new DatagramSocket(port);
×
UNCOV
269
    } catch (SocketException e) {
×
270
      // TODO Auto-generated catch block
271
      e.printStackTrace();
×
UNCOV
272
    }
×
273
    receiving = true;
×
274
  }
×
275

276
  private void closeSockets() {
277
    //          synchronized (socket)
278
    //          {
279
    //            socket.close();
280
    //
281
    //          }
UNCOV
282
    if (socket != null) {
×
UNCOV
283
      socket.close();
×
284
    }
285
  }
×
286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc