• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #165

11 Dec 2024 11:21PM UTC coverage: 0.0%. Remained the same
#165

push

lorenzo-gomez-windhover
-Cleanup

0 of 2 new or added lines in 1 file covered. (0.0%)

5 existing lines in 1 file now uncovered.

0 of 8298 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/com/video/VideoDataLink.java
1
package com.windhoverlabs.com.video;
2

3
import static org.bytedeco.ffmpeg.global.avcodec.*;
4
import static org.bytedeco.ffmpeg.global.avformat.*;
5
import static org.bytedeco.ffmpeg.global.avutil.*;
6
import static org.bytedeco.ffmpeg.global.swscale.*;
7

8
import java.io.*;
9
import java.io.IOException;
10
import java.net.DatagramPacket;
11
import java.net.DatagramSocket;
12
import java.net.SocketException;
13
import java.util.ArrayList;
14
import java.util.List;
15
import java.util.concurrent.Executors;
16
import java.util.concurrent.ScheduledExecutorService;
17
import java.util.function.Supplier;
18
import org.bytedeco.ffmpeg.avcodec.*;
19
import org.bytedeco.ffmpeg.avformat.*;
20
import org.bytedeco.ffmpeg.avutil.*;
21
import org.bytedeco.ffmpeg.global.avdevice;
22
import org.bytedeco.ffmpeg.swscale.*;
23
import org.bytedeco.javacpp.*;
24
import org.yamcs.ConfigurationException;
25
import org.yamcs.StandardTupleDefinitions;
26
import org.yamcs.TmPacket;
27
import org.yamcs.YConfiguration;
28
import org.yamcs.YamcsServer;
29
import org.yamcs.mdb.Mdb;
30
import org.yamcs.parameter.ParameterValue;
31
import org.yamcs.protobuf.Event.EventSeverity;
32
import org.yamcs.protobuf.Yamcs.Value.Type;
33
import org.yamcs.tctm.AbstractTmDataLink;
34
import org.yamcs.utils.ValueUtility;
35
import org.yamcs.xtce.BooleanParameterType;
36
import org.yamcs.xtce.FloatParameterType;
37
import org.yamcs.xtce.IntegerParameterType;
38
import org.yamcs.xtce.NameDescription;
39
import org.yamcs.xtce.Parameter;
40
import org.yamcs.xtce.ParameterType;
41
import org.yamcs.xtce.StringParameterType;
42
import org.yamcs.xtce.XtceDb;
43
import org.yamcs.yarch.DataType;
44
import org.yamcs.yarch.Stream;
45
import org.yamcs.yarch.Tuple;
46
import org.yamcs.yarch.TupleDefinition;
47
import org.yamcs.yarch.YarchDatabase;
48
import org.yamcs.yarch.YarchDatabaseInstance;
49
import org.yamcs.yarch.protobuf.Db.Event;
50

51
/**
52
 * Receives telemetry packets via UDP. One UDP datagram = one TM packet.
53
 *
54
 * <p>Options:
55
 *
56
 * <ul>
57
 *   <li>{@code port} - the UDP port to listen to
58
 *   <li>{@code maxLength} - the maximum length of the datagram (and thus the TM packet length +
59
 *       initialBytesToStrip). If a datagram longer than this size will be received, it will be
60
 *       truncated. Default: 1500 (bytes)
61
 *   <li>{@code initialBytesToStrip} - if configured, skip that number of bytes from the beginning
62
 *       of the datagram. Default: 0
63
 * </ul>
64
 */
65
public class VideoDataLink extends AbstractTmDataLink implements Runnable {
×
66
  private volatile int invalidDatagramCount = 0;
×
67

68
  private DatagramSocket tmSocket;
69
  private int port;
70

71
  static final int MAX_LENGTH = 1500;
72
  DatagramPacket datagram;
73
  int maxLength;
74
  int initialBytesToStrip;
75
  int rcvBufferSize;
76

77
  Mdb mdb;
78

79
  private VariableParam frameParam;
80

81
  private Stream videoStream;
82

83
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
×
84

85
  ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
×
86

87
  /**
88
   * Creates a new UDP TM Data Link
89
   *
90
   * @throws ConfigurationException if port is not defined in the configuration
91
   */
92
  @Override
93
  public void init(String instance, String name, YConfiguration config)
94
      throws ConfigurationException {
95
    super.init(instance, name, config);
×
96
    port = config.getInt("port");
×
97
    maxLength = config.getInt("maxLength", MAX_LENGTH);
×
98
    initialBytesToStrip = config.getInt("initialBytesToStrip", 0);
×
99
    rcvBufferSize = config.getInt("rcvBufferSize", 0);
×
100
    datagram = new DatagramPacket(new byte[maxLength], maxLength);
×
101

102
    this.mdb = YamcsServer.getServer().getInstance(yamcsInstance).getMdb();
×
103

104
    frameParam = VariableParam.getForFullyQualifiedName("/yamcs/pop-os/links/Video/frameData");
×
105

106
    ParameterType ptype = getBasicType(mdb, Type.BINARY);
×
107

108
    //    ParameterType ptype = getBasicType(mdb, Type.FLOAT);
109

110
    frameParam.setParameterType(ptype);
×
111

112
    if (mdb.getParameter(frameParam.getQualifiedName()) == null) {
×
113
      log.debug("Adding OPCUA object as parameter to mdb:{}", frameParam.getQualifiedName());
×
114
      try {
115
        mdb.addParameter(frameParam, true, true);
×
116
      } catch (Exception e) {
×
117
        // TODO Auto-generated catch block
118
        //        internalLogger.info(e.toString());
119
        //        internalLogger.info("Failed to add PV:" + p.getQualifiedName());
120
        org.yamcs.yarch.protobuf.Db.Event ev =
121
            Event.newBuilder()
×
122
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
123
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
124
                .setSource(this.linkName)
×
125
                .setType(this.linkName)
×
126
                .setMessage("Failed to add PV:" + frameParam.getQualifiedName())
×
127
                .setSeverity(EventSeverity.ERROR)
×
128
                .build();
×
129
        eventProducer.sendEvent(ev);
×
130
      }
×
131
    } else {
132
      frameParam = (VariableParam) mdb.getParameter(frameParam.getQualifiedName());
×
133
    }
134

135
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(yamcsInstance);
×
136

137
    //    this.opcuaStreamName = config.getString("opcuaStream");
138
    this.videoStream = getStream(ydb, "video_frames_stream");
×
139
  }
×
140

141
  @Override
142
  public void doStart() {
143
    if (!isDisabled()) {
×
144
      //      try {
145
      //        //        readVideo();
146
      //        streamVideoOverRTP();
147
      //      } catch (IOException e) {
148
      //        // TODO Auto-generated catch block
149
      //        e.printStackTrace();
150
      //      }
151
      try {
152
        tmSocket = new DatagramSocket(port);
×
153
        if (rcvBufferSize > 0) {
×
154
          tmSocket.setReceiveBufferSize(rcvBufferSize);
×
155
        }
156
        Thread thread = new Thread(this);
×
157
        thread.setName("UdpTmDataLink-" + linkName);
×
158
        thread.start();
×
159
      } catch (SocketException e) {
×
160
        notifyFailed(e);
×
161
      }
×
162
    }
163
    notifyStarted();
×
164
  }
×
165

166
  @Override
167
  public void doStop() {
168
    if (tmSocket != null) {
×
169
      tmSocket.close();
×
170
    }
171
    notifyStopped();
×
172
  }
×
173

174
  EReturnCode Initialize() {
UNCOV
175
    avformat_network_init();
×
NEW
176
    avdevice.avdevice_register_all();
×
177

178
    //          av_log_set_callback(MMC_CustomLogCallback);
179

180
    //          av_log_set_level(ConfigTable->AVLogLevel);
181

NEW
182
    return EReturnCode.OK;
×
183
  }
184

185
  private static ParameterType getOrCreateType(
186
      XtceDb mdb, String name, Supplier<ParameterType.Builder<?>> supplier) {
187

188
    String fqn = XtceDb.YAMCS_SPACESYSTEM_NAME + NameDescription.PATH_SEPARATOR + name;
×
189
    ParameterType ptype = mdb.getParameterType(fqn);
×
190
    if (ptype != null) {
×
191
      return ptype;
×
192
    }
193
    ParameterType.Builder<?> typeb = supplier.get().setName(name);
×
194

195
    ptype = typeb.build();
×
196
    ((NameDescription) ptype).setQualifiedName(fqn);
×
197

198
    return ((Mdb) mdb).addSystemParameterType(ptype);
×
199
  }
200

201
  public static ParameterType getBasicType(XtceDb mdb, Type type) {
202
    ParameterType pType = null;
×
203
    switch (type) {
×
204
      case BOOLEAN:
205
        return getOrCreateType(mdb, "boolean", () -> new BooleanParameterType.Builder());
×
206
      case STRING:
207
        return getOrCreateType(mdb, "string", () -> new StringParameterType.Builder());
×
208

209
      case FLOAT:
210
        return getOrCreateType(
×
211
            mdb, "float32", () -> new FloatParameterType.Builder().setSizeInBits(32));
×
212
      case DOUBLE:
213
        return getOrCreateType(
×
214
            mdb, "float64", () -> new FloatParameterType.Builder().setSizeInBits(64));
×
215
      case SINT32:
216
        return getOrCreateType(
×
217
            mdb,
218
            "sint32",
219
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(true));
×
220
      case SINT64:
221
        return getOrCreateType(
×
222
            mdb,
223
            "sint64",
224
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(true));
×
225
      case UINT32:
226
        return getOrCreateType(
×
227
            mdb,
228
            "uint32",
229
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(false));
×
230
      case UINT64:
231
        return getOrCreateType(
×
232
            mdb,
233
            "uint64",
234
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(false));
×
235
      default:
236
        break;
237
    }
238

239
    return pType;
×
240
  }
241

242
  private void writeFrameToDB(byte data[]) {
243
    TupleDefinition tdef = gftdef.copy();
×
244

245
    int numberfRecords = 1;
×
246

247
    List<Object> cols = new ArrayList<>(4 + numberfRecords);
×
248

249
    tdef = gftdef.copy();
×
250
    long gentime = timeService.getMissionTime();
×
251
    cols.add(gentime);
×
252
    cols.add("/yamcs/pop-os/");
×
253
    cols.add(0);
×
254
    cols.add(gentime);
×
255

256
    tdef.addColumn(frameParam.getQualifiedName(), DataType.PARAMETER_VALUE);
×
257

UNCOV
258
    cols.add(getPV(frameParam, gentime, data));
×
259

260
    pushTuple(tdef, cols);
×
261
  }
×
262

263
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
264
    Stream stream = ydb.getStream(streamName);
×
265
    if (stream == null) {
×
266
      try {
267
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
268
      } catch (Exception e) {
×
269
        throw new ConfigurationException(e);
×
270
      }
×
271

272
      stream = ydb.getStream(streamName);
×
273
    }
274
    return stream;
×
275
  }
276

277
  private synchronized void pushTuple(TupleDefinition tdef, List<Object> cols) {
278
    Tuple t;
279
    t = new Tuple(tdef, cols);
×
280
    videoStream.emitTuple(t);
×
281
  }
×
282

283
  public ParameterValue getNewPv(Parameter parameter, long time) {
284
    ParameterValue pv = new ParameterValue(parameter);
×
285
    pv.setAcquisitionTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime());
×
286
    pv.setGenerationTime(time);
×
287
    return pv;
×
288
  }
289

290
  public ParameterValue getPV(Parameter parameter, long time, double v) {
291
    ParameterValue pv = getNewPv(parameter, time);
×
292
    pv.setEngValue(ValueUtility.getDoubleValue(v));
×
293
    pv.setRawValue(ValueUtility.getDoubleValue(v));
×
294
    return pv;
×
295
  }
296

297
  public ParameterValue getPV(Parameter parameter, long time, byte v[]) {
298
    ParameterValue pv = getNewPv(parameter, time);
×
299
    pv.setEngValue(ValueUtility.getBinaryValue(v));
×
300
    pv.setRawValue(ValueUtility.getBinaryValue(v));
×
301
    return pv;
×
302
  }
303

304
  static void save_frame(AVFrame pFrame, int width, int height, int f_idx) throws IOException {
305
    // Open file
306
    String szFilename = String.format("frame%d_.ppm", f_idx);
×
307
    OutputStream pFile = new FileOutputStream(szFilename);
×
308

309
    // Write header
310
    pFile.write(String.format("P6\n%d %d\n255\n", width, height).getBytes());
×
311

312
    // Write pixel data
313
    BytePointer data = pFrame.data(0);
×
314
    byte[] bytes = new byte[width * 3];
×
315
    int l = pFrame.linesize(0);
×
316
    for (int y = 0; y < height; y++) {
×
317
      data.position(y * l).get(bytes);
×
318
      pFile.write(bytes);
×
319
    }
320

321
    // Close file
322
    pFile.close();
×
323
  }
×
324

325
  /**
326
   * Concatenates the root with the subsystems and returns a qualified name
327
   *
328
   * @param root
329
   */
330
  public static String qualifiedName(String root, String... subsystems) {
331
    if (root.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
332
      throw new IllegalArgumentException(
×
333
          "root has to start with " + NameDescription.PATH_SEPARATOR);
334
    }
335
    StringBuilder sb = new StringBuilder();
×
336
    sb.append(root);
×
337
    for (String s : subsystems) {
×
338
      if (s.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
339
        sb.append(NameDescription.PATH_SEPARATOR);
×
340
      }
341
      sb.append(s);
×
342
    }
343
    return sb.toString();
×
344
  }
345

346
  @Override
347
  public void run() {
UNCOV
348
    while (isRunningAndEnabled()) {
×
UNCOV
349
      TmPacket tmpkt = getNextPacket();
×
UNCOV
350
      if (tmpkt != null) {
×
351
        processPacket(tmpkt);
×
352
      }
353
    }
×
354
  }
×
355

356
  /**
357
   * Called to retrieve the next packet. It blocks in readining on the multicast socket
358
   *
359
   * @return anything that looks as a valid packet, just the size is taken into account to decide if
360
   *     it's valid or not
361
   */
362
  public TmPacket getNextPacket() {
363
    byte[] packet = null;
×
364

365
    while (isRunning()) {
×
366
      try {
367
        tmSocket.receive(datagram);
×
368
        int pktLength = datagram.getLength() - initialBytesToStrip;
×
369

370
        if (pktLength <= 0) {
×
371
          log.warn(
×
372
              "received datagram of size {} <= {} (initialBytesToStrip); ignored.",
373
              datagram.getLength(),
×
374
              initialBytesToStrip);
×
375
          invalidDatagramCount++;
×
376
          continue;
×
377
        }
378

379
        updateStats(datagram.getLength());
×
380
        packet = new byte[pktLength];
×
381
        System.arraycopy(
×
382
            datagram.getData(), datagram.getOffset() + initialBytesToStrip, packet, 0, pktLength);
×
383
        break;
×
384
      } catch (IOException e) {
×
385
        if (!isRunning()
×
386
            || isDisabled()) { // the shutdown or disable will close the socket and that will
×
387
          // generate an exception
388
          // which we ignore here
389
          return null;
×
390
        }
391
        log.warn("exception thrown when reading from the UDP socket at port {}", port, e);
×
392
      }
×
393
    }
394

395
    if (packet != null) {
×
396
      TmPacket tmPacket = new TmPacket(timeService.getMissionTime(), packet);
×
397
      tmPacket.setEarthRceptionTime(timeService.getHresMissionTime());
×
398
      return packetPreprocessor.process(tmPacket);
×
399
    } else {
400
      return null;
×
401
    }
402
  }
403

404
  /** returns statistics with the number of datagram received and the number of invalid datagrams */
405
  @Override
406
  public String getDetailedStatus() {
407
    if (isDisabled()) {
×
408
      return "DISABLED";
×
409
    } else {
410
      return String.format(
×
411
          "OK (%s) %nValid datagrams received: %d%nInvalid datagrams received: %d",
412
          port, packetCount.get(), invalidDatagramCount);
×
413
    }
414
  }
415

416
  /** Sets the disabled to true such that getNextPacket ignores the received datagrams */
417
  @Override
418
  public void doDisable() {
419
    if (tmSocket != null) {
×
420
      tmSocket.close();
×
421
      tmSocket = null;
×
422
    }
423
  }
×
424

425
  /**
426
   * Sets the disabled to false such that getNextPacket does not ignore the received datagrams
427
   *
428
   * @throws SocketException
429
   */
430
  @Override
431
  public void doEnable() throws SocketException {
432
    tmSocket = new DatagramSocket(port);
×
433
    new Thread(this).start();
×
434
  }
×
435

436
  @Override
437
  protected Status connectionStatus() {
438
    return Status.OK;
×
439
  }
440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc