• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #162

28 Nov 2024 05:03PM UTC coverage: 0.0%. Remained the same
#162

push

lorenzo-gomez-windhover
-Updates for yamcs 5.9.8

0 of 4 new or added lines in 1 file covered. (0.0%)

614 existing lines in 11 files now uncovered.

0 of 6789 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/yamcs/util/UdpToTcpProxy.java
1
package com.windhoverlabs.yamcs.util;
2

3
import java.io.IOException;
4
import java.net.DatagramPacket;
5
import java.net.DatagramSocket;
6
import java.net.InetSocketAddress;
7
import java.net.ServerSocket;
8
import java.net.Socket;
9
import java.net.SocketException;
10
import java.util.ArrayList;
11
import java.util.Collections;
12
import java.util.HashSet;
13
import java.util.List;
14
import java.util.Set;
15
import java.util.concurrent.Executors;
16
import java.util.concurrent.ThreadFactory;
17
import java.util.concurrent.ThreadPoolExecutor;
18
import java.util.concurrent.atomic.AtomicBoolean;
19
import org.yamcs.ConfigurationException;
20
import org.yamcs.Spec;
21
import org.yamcs.StandardTupleDefinitions;
22
import org.yamcs.ValidationException;
23
import org.yamcs.YConfiguration;
24
import org.yamcs.cmdhistory.CommandHistoryPublisher;
25
import org.yamcs.commanding.PreparedCommand;
26
import org.yamcs.logging.Log;
27
import org.yamcs.parameter.ParameterValue;
28
import org.yamcs.parameter.SystemParametersService;
29
import org.yamcs.protobuf.Yamcs.Value.Type;
30
import org.yamcs.tctm.AbstractThreadedTcDataLink;
31
import org.yamcs.tctm.Link.Status;
32
import org.yamcs.xtce.Parameter;
33
import org.yamcs.yarch.TupleDefinition;
34

35
// public class UdpToTcpProxy extends AbstractThreadedTcDataLink implements SystemParametersProducer
36
// {
37
public class UdpToTcpProxy extends AbstractThreadedTcDataLink {
×
38

39
  private Log log;
40
  protected YConfiguration config;
41
  protected String linkName;
42
  protected AtomicBoolean disabled = new AtomicBoolean(false);
×
43

44
  private int udpPort;
45
  private int tcpPort;
46
  private Parameter udpPortParam;
47
  private Parameter tcpPortParam;
48

49
  private DatagramSocket udpSocket;
50
  private ServerSocket tcpServerSocket;
51

52
  private Thread udpListenerThread;
53
  private Thread tcpServerThread;
54
  private ThreadPoolExecutor clientHandlerExecutor;
55

56
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
×
57

58
  private Set<Socket> tcpClients = Collections.synchronizedSet(new HashSet<>());
×
59

60
  private CommandHistoryPublisher commandHistoryPublisher;
61

62
  @Override
63
  public void init(String instance, String name, YConfiguration config)
64
      throws ConfigurationException {
65
    super.init(instance, name, config);
×
66

67
    this.log = new Log(getClass(), instance);
×
68
    this.config = config;
×
69

70
    /* Validate the configuration that the user passed us. */
71
    try {
72
      config = getSpec().validate(config);
×
73
    } catch (ValidationException e) {
×
74
      log.error("Failed configuration validation.", e);
×
75
      notifyFailed(e);
×
76
    }
×
77

78
    this.linkName = name;
×
79

80
    // Read configuration
81
    this.udpPort = config.getInt("udpPort");
×
82
    this.tcpPort = config.getInt("tcpPort");
×
83
  }
×
84

85
  @Override
86
  protected void doStart() {
87
    log.info("Starting UdpToTcpProxy: " + getName());
×
88

89
    try {
90
      // Initialize UDP socket
91
      udpSocket = new DatagramSocket(udpPort);
×
92
      udpListenerThread = new Thread(this::udpListener);
×
93
      udpListenerThread.setName("UdpListenerThread");
×
94
      udpListenerThread.start();
×
95
      log.info("UDP listener started on port " + udpPort);
×
96

97
      // Initialize TCP server
98
      tcpServerSocket = new ServerSocket();
×
99
      tcpServerSocket.bind(new InetSocketAddress(tcpPort));
×
100
      tcpServerThread = new Thread(this::tcpServer);
×
101
      tcpServerThread.setName("TcpServerThread");
×
102
      tcpServerThread.start();
×
103
      log.info("TCP server started on port " + tcpPort);
×
104

105
      // Executor for handling client connections
106
      clientHandlerExecutor =
×
107
          (ThreadPoolExecutor)
108
              Executors.newCachedThreadPool(
×
109
                  new ThreadFactory() {
×
110
                    private int count = 0;
×
111

112
                    @Override
113
                    public Thread newThread(Runnable r) {
114
                      return new Thread(r, "TcpClientHandler-" + count++);
×
115
                    }
116
                  });
117

118
      super.doStart();
×
119
    } catch (IOException e) {
×
120
      log.error("Failed to start UdpToTcpProxy: " + e.getMessage());
×
121
      notifyFailed(e);
×
122
    }
×
123
  }
×
124

125
  @Override
126
  protected void doStop() {
127
    log.info("Stopping UdpToTcpProxy: " + getName());
×
128

129
    disabled.set(true);
×
130

131
    // Close UDP socket
132
    if (udpSocket != null && !udpSocket.isClosed()) {
×
133
      udpSocket.close();
×
134
    }
135

136
    // Close TCP server socket
137
    try {
138
      if (tcpServerSocket != null && !tcpServerSocket.isClosed()) {
×
139
        tcpServerSocket.close();
×
140
      }
141
    } catch (IOException e) {
×
142
      log.warn("Error closing TCP server socket", e);
×
143
    }
×
144

145
    // Close all client sockets
146
    synchronized (tcpClients) {
×
147
      for (Socket client : tcpClients) {
×
148
        try {
149
          client.close();
×
150
        } catch (IOException e) {
×
151
          log.warn("Error closing client socket", e);
×
152
        }
×
153
      }
×
154
      tcpClients.clear();
×
155
    }
×
156

157
    // Shutdown executor
158
    if (clientHandlerExecutor != null && !clientHandlerExecutor.isShutdown()) {
×
159
      clientHandlerExecutor.shutdownNow();
×
160
    }
161

162
    super.doStop();
×
163
  }
×
164

165
  private void udpListener() {
166
    byte[] buffer = new byte[65535];
×
167
    DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
×
168

169
    while (!disabled.get() && !udpSocket.isClosed()) {
×
170
      try {
171
        udpSocket.receive(packet);
×
172
        int length = packet.getLength();
×
173
        byte[] data = new byte[length];
×
174
        System.arraycopy(packet.getData(), packet.getOffset(), data, 0, length);
×
175

176
        /* Update PV telemetry */
177
        TupleDefinition tdef = gftdef.copy();
×
178
        // pushTuple(tdef, cols);
179

180
        // Forward data to all connected TCP clients
181
        forwardToTcpClients(data);
×
182
      } catch (SocketException e) {
×
183
        if (!disabled.get()) {
×
184
          log.error("UDP socket error", e);
×
185
        }
186
        break;
×
187
      } catch (IOException e) {
×
188
        log.error("Error receiving UDP packet", e);
×
189
      }
×
190
    }
191
  }
×
192

193
  private void tcpServer() {
194
    while (!disabled.get() && !tcpServerSocket.isClosed()) {
×
195
      try {
196
        Socket clientSocket = tcpServerSocket.accept();
×
197
        tcpClients.add(clientSocket);
×
198
        log.info("New TCP client connected: " + clientSocket.getRemoteSocketAddress());
×
199

200
        // Handle client disconnection asynchronously
201
        clientHandlerExecutor.submit(() -> handleClient(clientSocket));
×
202
      } catch (SocketException e) {
×
203
        if (!disabled.get()) {
×
204
          log.error("TCP server socket error", e);
×
205
        }
206
        break;
×
207
      } catch (IOException e) {
×
208
        log.error("Error accepting TCP client connection", e);
×
209
      }
×
210
    }
211
  }
×
212

213
  private void handleClient(Socket clientSocket) {
214
    try {
215
      while (!clientSocket.isClosed()) {
×
216
        if (clientSocket.getInputStream().read() == -1) {
×
217
          break;
×
218
        }
219
      }
220
    } catch (IOException e) {
×
221
      log.warn("Client connection error", e);
×
222
    } finally {
223
      try {
224
        clientSocket.close();
×
225
      } catch (IOException e) {
×
226
        log.warn("Error closing client socket", e);
×
227
      }
×
228
      tcpClients.remove(clientSocket);
×
229
      log.info("TCP client disconnected: " + clientSocket.getRemoteSocketAddress());
×
230
    }
231
  }
×
232

233
  private void forwardToTcpClients(byte[] data) {
234
    synchronized (tcpClients) {
×
235
      for (Socket client : tcpClients) {
×
236
        try {
237
          client.getOutputStream().write(data);
×
238
          client.getOutputStream().flush();
×
239
        } catch (IOException e) {
×
240
          log.warn("Error sending data to client: " + client.getRemoteSocketAddress(), e);
×
241
          try {
242
            client.close();
×
243
          } catch (IOException ex) {
×
244
            log.warn("Error closing client socket", ex);
×
245
          }
×
246
          tcpClients.remove(client);
×
247
        }
×
248
      }
×
249
    }
×
250
  }
×
251

252
  //  @Override
253
  //  public YConfiguration getConfig() {
254
  //    return config;
255
  //  }
256
  //
257
  //  @Override
258
  //  public String getName() {
259
  //    return linkName;
260
  //  }
261
  //
262
  //  @Override
263
  //  public void resetCounters() {
264
  //    // TODO
265
  //  }
266
  //
267
  //  @Override
268
  //  public long getDataInCount() {
269
  //    // TODO
270
  //    return 0;
271
  //  }
272
  //
273
  //  @Override
274
  //  public long getDataOutCount() {
275
  //    // TODO
276
  //    return 0;
277
  //  }
278

279
  //  @Override
280
  //  public boolean isDisabled() {
281
  //    return disabled.get();
282
  //  }
283

284
  //  @Override
285
  //  public void disable() {
286
  //    boolean b = disabled.getAndSet(true);
287
  //    if (!b) {
288
  //      try {
289
  //        /* TODO */
290
  //        // doDisable();
291
  //      } catch (Exception e) {
292
  //        disabled.set(false);
293
  //        log.warn("Failed to disable link", e);
294
  //      }
295
  //    }
296
  //  }
297
  //
298
  //  @Override
299
  //  public void enable() {
300
  //    boolean b = disabled.getAndSet(false);
301
  //    if (b) {
302
  //      try {
303
  //        /* TODO */
304
  //        // doEnable();
305
  //      } catch (Exception e) {
306
  //        disabled.set(true);
307
  //        log.warn("Failed to enable link", e);
308
  //      }
309
  //    }
310
  //  }
311

312
  @Override
313
  public Status getLinkStatus() {
314
    if (isDisabled()) {
×
315
      return Status.DISABLED;
×
316
    }
317
    if (state() == State.FAILED) {
×
318
      return Status.FAILED;
×
319
    }
320

321
    return Status.OK;
×
322
  }
323

324
  @Override
325
  public Spec getSpec() {
326
    Spec spec = getDefaultSpec();
×
327
    spec.addOption("udpPort", Spec.OptionType.INTEGER).withRequired(true);
×
328
    spec.addOption("tcpPort", Spec.OptionType.INTEGER).withRequired(true);
×
329
    return spec;
×
330
  }
331

332
  @Override
333
  public void setupSystemParameters(SystemParametersService sysParamService) {
334
    super.setupSystemParameters(sysParamService);
×
335

336
    udpPortParam =
×
337
        sysParamService.createSystemParameter(
×
338
            linkName + "/udpPort", Type.UINT64, "The current UDP port the plugin is listening to.");
339

340
    tcpPortParam =
×
341
        sysParamService.createSystemParameter(
×
342
            linkName + "/tcpPort", Type.UINT64, "The current TCP port the plugin is listening to.");
343
  }
×
344

345
  @Override
346
  public List<ParameterValue> getSystemParameters(long gentime) {
347
    ArrayList<ParameterValue> list = new ArrayList<>();
×
348

NEW
349
    list.add(org.yamcs.parameter.SystemParametersService.getPV(udpPortParam, gentime, udpPort));
×
350

NEW
351
    list.add(org.yamcs.parameter.SystemParametersService.getPV(tcpPortParam, gentime, tcpPort));
×
352

353
    try {
NEW
354
      super.collectSystemParameters(gentime, list);
×
355
    } catch (Exception e) {
×
356
      log.error("Exception caught when collecting link system parameters", e);
×
357
    }
×
358

359
    return list;
×
360
  }
361

362
  @Override
363
  protected Status connectionStatus() {
364
    return Status.OK;
×
365
  }
366

367
  @Override
368
  public void uplinkCommand(PreparedCommand pc) throws IOException {
369
    log.info("Received command.");
×
NEW
370
    dataOut(1, pc.getBinary().length);
×
371
    ackCommand(pc.getCommandId());
×
372
  }
×
373

374
  @Override
375
  protected void startUp() throws Exception {
376
    // TODO Auto-generated method stub
377
  }
×
378

379
  @Override
380
  protected void shutDown() throws Exception {
381
    // TODO Auto-generated method stub
382
  }
×
383

384
  @Override
385
  public String getDetailedStatus() {
386
    return String.format("OK");
×
387
  }
388
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc