• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #167

12 Dec 2024 12:23AM UTC coverage: 0.0%. Remained the same
#167

push

lorenzo-gomez-windhover
-Load pipeline config. WIP.

0 of 411 new or added lines in 4 files covered. (0.0%)

1 existing line in 1 file now uncovered.

0 of 8703 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/com/video/VideoDataLink.java
1
package com.windhoverlabs.com.video;
2

3
import static org.bytedeco.ffmpeg.global.avcodec.*;
4
import static org.bytedeco.ffmpeg.global.avformat.*;
5
import static org.bytedeco.ffmpeg.global.avutil.*;
6
import static org.bytedeco.ffmpeg.global.swscale.*;
7

8
import java.io.*;
9
import java.io.IOException;
10
import java.net.DatagramPacket;
11
import java.net.DatagramSocket;
12
import java.net.SocketException;
13
import java.util.ArrayList;
14
import java.util.List;
15
import java.util.concurrent.Executors;
16
import java.util.concurrent.ScheduledExecutorService;
17
import java.util.function.Supplier;
18
import org.bytedeco.ffmpeg.avcodec.*;
19
import org.bytedeco.ffmpeg.avformat.*;
20
import org.bytedeco.ffmpeg.avutil.*;
21
import org.bytedeco.ffmpeg.global.avdevice;
22
import org.bytedeco.ffmpeg.swscale.*;
23
import org.bytedeco.javacpp.*;
24
import org.yamcs.ConfigurationException;
25
import org.yamcs.StandardTupleDefinitions;
26
import org.yamcs.TmPacket;
27
import org.yamcs.YConfiguration;
28
import org.yamcs.YamcsServer;
29
import org.yamcs.mdb.Mdb;
30
import org.yamcs.parameter.ParameterValue;
31
import org.yamcs.protobuf.Event.EventSeverity;
32
import org.yamcs.protobuf.Yamcs.Value.Type;
33
import org.yamcs.tctm.AbstractTmDataLink;
34
import org.yamcs.utils.ValueUtility;
35
import org.yamcs.xtce.BooleanParameterType;
36
import org.yamcs.xtce.FloatParameterType;
37
import org.yamcs.xtce.IntegerParameterType;
38
import org.yamcs.xtce.NameDescription;
39
import org.yamcs.xtce.Parameter;
40
import org.yamcs.xtce.ParameterType;
41
import org.yamcs.xtce.StringParameterType;
42
import org.yamcs.xtce.XtceDb;
43
import org.yamcs.yarch.DataType;
44
import org.yamcs.yarch.Stream;
45
import org.yamcs.yarch.Tuple;
46
import org.yamcs.yarch.TupleDefinition;
47
import org.yamcs.yarch.YarchDatabase;
48
import org.yamcs.yarch.YarchDatabaseInstance;
49
import org.yamcs.yarch.protobuf.Db.Event;
50

51
/**
52
 * Receives telemetry packets via UDP. One UDP datagram = one TM packet.
53
 *
54
 * <p>Options:
55
 *
56
 * <ul>
57
 *   <li>{@code port} - the UDP port to listen to
58
 *   <li>{@code maxLength} - the maximum length of the datagram (and thus the TM packet length +
59
 *       initialBytesToStrip). If a datagram longer than this size will be received, it will be
60
 *       truncated. Default: 1500 (bytes)
61
 *   <li>{@code initialBytesToStrip} - if configured, skip that number of bytes from the beginning
62
 *       of the datagram. Default: 0
63
 * </ul>
64
 */
65
public class VideoDataLink extends AbstractTmDataLink implements Runnable {
×
66
  private volatile int invalidDatagramCount = 0;
×
67

68
  private DatagramSocket tmSocket;
69
  private int port;
70

71
  static final int MAX_LENGTH = 1500;
72
  DatagramPacket datagram;
73
  int maxLength;
74
  int initialBytesToStrip;
75
  int rcvBufferSize;
76

77
  Mdb mdb;
78

79
  private VariableParam frameParam;
80

81
  private Stream videoStream;
82

83
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
×
84

85
  ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
×
86

87
  /**
88
   * Creates a new UDP TM Data Link
89
   *
90
   * @throws ConfigurationException if port is not defined in the configuration
91
   */
92
  @Override
93
  public void init(String instance, String name, YConfiguration config)
94
      throws ConfigurationException {
95
    super.init(instance, name, config);
×
96
    port = config.getInt("port");
×
97
    maxLength = config.getInt("maxLength", MAX_LENGTH);
×
98
    initialBytesToStrip = config.getInt("initialBytesToStrip", 0);
×
99
    rcvBufferSize = config.getInt("rcvBufferSize", 0);
×
100
    datagram = new DatagramPacket(new byte[maxLength], maxLength);
×
101

102
    this.mdb = YamcsServer.getServer().getInstance(yamcsInstance).getMdb();
×
103

104
    frameParam = VariableParam.getForFullyQualifiedName("/yamcs/pop-os/links/Video/frameData");
×
105

106
    ParameterType ptype = getBasicType(mdb, Type.BINARY);
×
107

108
    frameParam.setParameterType(ptype);
×
109

110
    if (mdb.getParameter(frameParam.getQualifiedName()) == null) {
×
111
      log.debug("Adding OPCUA object as parameter to mdb:{}", frameParam.getQualifiedName());
×
112
      try {
113
        mdb.addParameter(frameParam, true, true);
×
114
      } catch (Exception e) {
×
115
        // TODO Auto-generated catch block
116
        //        internalLogger.info(e.toString());
117
        //        internalLogger.info("Failed to add PV:" + p.getQualifiedName());
118
        org.yamcs.yarch.protobuf.Db.Event ev =
119
            Event.newBuilder()
×
120
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
121
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
122
                .setSource(this.linkName)
×
123
                .setType(this.linkName)
×
124
                .setMessage("Failed to add PV:" + frameParam.getQualifiedName())
×
125
                .setSeverity(EventSeverity.ERROR)
×
126
                .build();
×
127
        eventProducer.sendEvent(ev);
×
128
      }
×
129
    } else {
130
      frameParam = (VariableParam) mdb.getParameter(frameParam.getQualifiedName());
×
131
    }
132

133
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(yamcsInstance);
×
134

135
    this.videoStream = getStream(ydb, "video_frames_stream");
×
136

NEW
137
    List<Object> pipelines = config.getList("Pipelines");
×
138

NEW
139
    System.out.println("pipelines-->" + pipelines);
×
UNCOV
140
  }
×
141

142
  @Override
143
  public void doStart() {
144
    if (!isDisabled()) {
×
145
      //      try {
146
      //        //        readVideo();
147
      //        streamVideoOverRTP();
148
      //      } catch (IOException e) {
149
      //        // TODO Auto-generated catch block
150
      //        e.printStackTrace();
151
      //      }
152
      try {
153
        tmSocket = new DatagramSocket(port);
×
154
        if (rcvBufferSize > 0) {
×
155
          tmSocket.setReceiveBufferSize(rcvBufferSize);
×
156
        }
157
        Thread thread = new Thread(this);
×
158
        thread.setName("UdpTmDataLink-" + linkName);
×
159
        thread.start();
×
160
      } catch (SocketException e) {
×
161
        notifyFailed(e);
×
162
      }
×
163
    }
164
    notifyStarted();
×
165
  }
×
166

167
  @Override
168
  public void doStop() {
169
    if (tmSocket != null) {
×
170
      tmSocket.close();
×
171
    }
172
    notifyStopped();
×
173
  }
×
174

175
  EReturnCode Initialize() {
176
    avformat_network_init();
×
177
    avdevice.avdevice_register_all();
×
178

179
    //          av_log_set_callback(MMC_CustomLogCallback);
180

181
    //          av_log_set_level(ConfigTable->AVLogLevel);
182

183
    return EReturnCode.OK;
×
184
  }
185

186
  private static ParameterType getOrCreateType(
187
      XtceDb mdb, String name, Supplier<ParameterType.Builder<?>> supplier) {
188

189
    String fqn = XtceDb.YAMCS_SPACESYSTEM_NAME + NameDescription.PATH_SEPARATOR + name;
×
190
    ParameterType ptype = mdb.getParameterType(fqn);
×
191
    if (ptype != null) {
×
192
      return ptype;
×
193
    }
194
    ParameterType.Builder<?> typeb = supplier.get().setName(name);
×
195

196
    ptype = typeb.build();
×
197
    ((NameDescription) ptype).setQualifiedName(fqn);
×
198

199
    return ((Mdb) mdb).addSystemParameterType(ptype);
×
200
  }
201

202
  public static ParameterType getBasicType(XtceDb mdb, Type type) {
203
    ParameterType pType = null;
×
204
    switch (type) {
×
205
      case BOOLEAN:
206
        return getOrCreateType(mdb, "boolean", () -> new BooleanParameterType.Builder());
×
207
      case STRING:
208
        return getOrCreateType(mdb, "string", () -> new StringParameterType.Builder());
×
209

210
      case FLOAT:
211
        return getOrCreateType(
×
212
            mdb, "float32", () -> new FloatParameterType.Builder().setSizeInBits(32));
×
213
      case DOUBLE:
214
        return getOrCreateType(
×
215
            mdb, "float64", () -> new FloatParameterType.Builder().setSizeInBits(64));
×
216
      case SINT32:
217
        return getOrCreateType(
×
218
            mdb,
219
            "sint32",
220
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(true));
×
221
      case SINT64:
222
        return getOrCreateType(
×
223
            mdb,
224
            "sint64",
225
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(true));
×
226
      case UINT32:
227
        return getOrCreateType(
×
228
            mdb,
229
            "uint32",
230
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(false));
×
231
      case UINT64:
232
        return getOrCreateType(
×
233
            mdb,
234
            "uint64",
235
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(false));
×
236
      default:
237
        break;
238
    }
239

240
    return pType;
×
241
  }
242

243
  private void writeFrameToDB(byte data[]) {
244
    TupleDefinition tdef = gftdef.copy();
×
245

246
    int numberfRecords = 1;
×
247

248
    List<Object> cols = new ArrayList<>(4 + numberfRecords);
×
249

250
    tdef = gftdef.copy();
×
251
    long gentime = timeService.getMissionTime();
×
252
    cols.add(gentime);
×
253
    cols.add("/yamcs/pop-os/");
×
254
    cols.add(0);
×
255
    cols.add(gentime);
×
256

257
    tdef.addColumn(frameParam.getQualifiedName(), DataType.PARAMETER_VALUE);
×
258

259
    cols.add(getPV(frameParam, gentime, data));
×
260

261
    pushTuple(tdef, cols);
×
262
  }
×
263

264
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
265
    Stream stream = ydb.getStream(streamName);
×
266
    if (stream == null) {
×
267
      try {
268
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
269
      } catch (Exception e) {
×
270
        throw new ConfigurationException(e);
×
271
      }
×
272

273
      stream = ydb.getStream(streamName);
×
274
    }
275
    return stream;
×
276
  }
277

278
  private synchronized void pushTuple(TupleDefinition tdef, List<Object> cols) {
279
    Tuple t;
280
    t = new Tuple(tdef, cols);
×
281
    videoStream.emitTuple(t);
×
282
  }
×
283

284
  public ParameterValue getNewPv(Parameter parameter, long time) {
285
    ParameterValue pv = new ParameterValue(parameter);
×
286
    pv.setAcquisitionTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime());
×
287
    pv.setGenerationTime(time);
×
288
    return pv;
×
289
  }
290

291
  public ParameterValue getPV(Parameter parameter, long time, double v) {
292
    ParameterValue pv = getNewPv(parameter, time);
×
293
    pv.setEngValue(ValueUtility.getDoubleValue(v));
×
294
    pv.setRawValue(ValueUtility.getDoubleValue(v));
×
295
    return pv;
×
296
  }
297

298
  public ParameterValue getPV(Parameter parameter, long time, byte v[]) {
299
    ParameterValue pv = getNewPv(parameter, time);
×
300
    pv.setEngValue(ValueUtility.getBinaryValue(v));
×
301
    pv.setRawValue(ValueUtility.getBinaryValue(v));
×
302
    return pv;
×
303
  }
304

305
  static void save_frame(AVFrame pFrame, int width, int height, int f_idx) throws IOException {
306
    // Open file
307
    String szFilename = String.format("frame%d_.ppm", f_idx);
×
308
    OutputStream pFile = new FileOutputStream(szFilename);
×
309

310
    // Write header
311
    pFile.write(String.format("P6\n%d %d\n255\n", width, height).getBytes());
×
312

313
    // Write pixel data
314
    BytePointer data = pFrame.data(0);
×
315
    byte[] bytes = new byte[width * 3];
×
316
    int l = pFrame.linesize(0);
×
317
    for (int y = 0; y < height; y++) {
×
318
      data.position(y * l).get(bytes);
×
319
      pFile.write(bytes);
×
320
    }
321

322
    // Close file
323
    pFile.close();
×
324
  }
×
325

326
  /**
327
   * Concatenates the root with the subsystems and returns a qualified name
328
   *
329
   * @param root
330
   */
331
  public static String qualifiedName(String root, String... subsystems) {
332
    if (root.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
333
      throw new IllegalArgumentException(
×
334
          "root has to start with " + NameDescription.PATH_SEPARATOR);
335
    }
336
    StringBuilder sb = new StringBuilder();
×
337
    sb.append(root);
×
338
    for (String s : subsystems) {
×
339
      if (s.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
340
        sb.append(NameDescription.PATH_SEPARATOR);
×
341
      }
342
      sb.append(s);
×
343
    }
344
    return sb.toString();
×
345
  }
346

347
  @Override
348
  public void run() {
349
    while (isRunningAndEnabled()) {
×
350
      TmPacket tmpkt = getNextPacket();
×
351
      if (tmpkt != null) {
×
352
        processPacket(tmpkt);
×
353
      }
354
    }
×
355
  }
×
356

357
  /**
358
   * Called to retrieve the next packet. It blocks in readining on the multicast socket
359
   *
360
   * @return anything that looks as a valid packet, just the size is taken into account to decide if
361
   *     it's valid or not
362
   */
363
  public TmPacket getNextPacket() {
364
    byte[] packet = null;
×
365

366
    while (isRunning()) {
×
367
      try {
368
        tmSocket.receive(datagram);
×
369
        int pktLength = datagram.getLength() - initialBytesToStrip;
×
370

371
        if (pktLength <= 0) {
×
372
          log.warn(
×
373
              "received datagram of size {} <= {} (initialBytesToStrip); ignored.",
374
              datagram.getLength(),
×
375
              initialBytesToStrip);
×
376
          invalidDatagramCount++;
×
377
          continue;
×
378
        }
379

380
        updateStats(datagram.getLength());
×
381
        packet = new byte[pktLength];
×
382
        System.arraycopy(
×
383
            datagram.getData(), datagram.getOffset() + initialBytesToStrip, packet, 0, pktLength);
×
384
        break;
×
385
      } catch (IOException e) {
×
386
        if (!isRunning()
×
387
            || isDisabled()) { // the shutdown or disable will close the socket and that will
×
388
          // generate an exception
389
          // which we ignore here
390
          return null;
×
391
        }
392
        log.warn("exception thrown when reading from the UDP socket at port {}", port, e);
×
393
      }
×
394
    }
395

396
    if (packet != null) {
×
397
      TmPacket tmPacket = new TmPacket(timeService.getMissionTime(), packet);
×
398
      tmPacket.setEarthRceptionTime(timeService.getHresMissionTime());
×
399
      return packetPreprocessor.process(tmPacket);
×
400
    } else {
401
      return null;
×
402
    }
403
  }
404

405
  /** returns statistics with the number of datagram received and the number of invalid datagrams */
406
  @Override
407
  public String getDetailedStatus() {
408
    if (isDisabled()) {
×
409
      return "DISABLED";
×
410
    } else {
411
      return String.format(
×
412
          "OK (%s) %nValid datagrams received: %d%nInvalid datagrams received: %d",
413
          port, packetCount.get(), invalidDatagramCount);
×
414
    }
415
  }
416

417
  /** Sets the disabled to true such that getNextPacket ignores the received datagrams */
418
  @Override
419
  public void doDisable() {
420
    if (tmSocket != null) {
×
421
      tmSocket.close();
×
422
      tmSocket = null;
×
423
    }
424
  }
×
425

426
  /**
427
   * Sets the disabled to false such that getNextPacket does not ignore the received datagrams
428
   *
429
   * @throws SocketException
430
   */
431
  @Override
432
  public void doEnable() throws SocketException {
433
    tmSocket = new DatagramSocket(port);
×
434
    new Thread(this).start();
×
435
  }
×
436

437
  @Override
438
  protected Status connectionStatus() {
439
    return Status.OK;
×
440
  }
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc