• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #166

11 Dec 2024 11:23PM UTC coverage: 0.0%. Remained the same
#166

push

lorenzo-gomez-windhover
-Cleanup

0 of 8298 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/com/video/VideoDataLink.java
1
package com.windhoverlabs.com.video;
2

3
import static org.bytedeco.ffmpeg.global.avcodec.*;
4
import static org.bytedeco.ffmpeg.global.avformat.*;
5
import static org.bytedeco.ffmpeg.global.avutil.*;
6
import static org.bytedeco.ffmpeg.global.swscale.*;
7

8
import java.io.*;
9
import java.io.IOException;
10
import java.net.DatagramPacket;
11
import java.net.DatagramSocket;
12
import java.net.SocketException;
13
import java.util.ArrayList;
14
import java.util.List;
15
import java.util.concurrent.Executors;
16
import java.util.concurrent.ScheduledExecutorService;
17
import java.util.function.Supplier;
18
import org.bytedeco.ffmpeg.avcodec.*;
19
import org.bytedeco.ffmpeg.avformat.*;
20
import org.bytedeco.ffmpeg.avutil.*;
21
import org.bytedeco.ffmpeg.global.avdevice;
22
import org.bytedeco.ffmpeg.swscale.*;
23
import org.bytedeco.javacpp.*;
24
import org.yamcs.ConfigurationException;
25
import org.yamcs.StandardTupleDefinitions;
26
import org.yamcs.TmPacket;
27
import org.yamcs.YConfiguration;
28
import org.yamcs.YamcsServer;
29
import org.yamcs.mdb.Mdb;
30
import org.yamcs.parameter.ParameterValue;
31
import org.yamcs.protobuf.Event.EventSeverity;
32
import org.yamcs.protobuf.Yamcs.Value.Type;
33
import org.yamcs.tctm.AbstractTmDataLink;
34
import org.yamcs.utils.ValueUtility;
35
import org.yamcs.xtce.BooleanParameterType;
36
import org.yamcs.xtce.FloatParameterType;
37
import org.yamcs.xtce.IntegerParameterType;
38
import org.yamcs.xtce.NameDescription;
39
import org.yamcs.xtce.Parameter;
40
import org.yamcs.xtce.ParameterType;
41
import org.yamcs.xtce.StringParameterType;
42
import org.yamcs.xtce.XtceDb;
43
import org.yamcs.yarch.DataType;
44
import org.yamcs.yarch.Stream;
45
import org.yamcs.yarch.Tuple;
46
import org.yamcs.yarch.TupleDefinition;
47
import org.yamcs.yarch.YarchDatabase;
48
import org.yamcs.yarch.YarchDatabaseInstance;
49
import org.yamcs.yarch.protobuf.Db.Event;
50

51
/**
52
 * Receives telemetry packets via UDP. One UDP datagram = one TM packet.
53
 *
54
 * <p>Options:
55
 *
56
 * <ul>
57
 *   <li>{@code port} - the UDP port to listen to
58
 *   <li>{@code maxLength} - the maximum length of the datagram (and thus the TM packet length +
59
 *       initialBytesToStrip). If a datagram longer than this size will be received, it will be
60
 *       truncated. Default: 1500 (bytes)
61
 *   <li>{@code initialBytesToStrip} - if configured, skip that number of bytes from the beginning
62
 *       of the datagram. Default: 0
63
 * </ul>
64
 */
65
public class VideoDataLink extends AbstractTmDataLink implements Runnable {
×
66
  private volatile int invalidDatagramCount = 0;
×
67

68
  private DatagramSocket tmSocket;
69
  private int port;
70

71
  static final int MAX_LENGTH = 1500;
72
  DatagramPacket datagram;
73
  int maxLength;
74
  int initialBytesToStrip;
75
  int rcvBufferSize;
76

77
  Mdb mdb;
78

79
  private VariableParam frameParam;
80

81
  private Stream videoStream;
82

83
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
×
84

85
  ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
×
86

87
  /**
88
   * Creates a new UDP TM Data Link
89
   *
90
   * @throws ConfigurationException if port is not defined in the configuration
91
   */
92
  @Override
93
  public void init(String instance, String name, YConfiguration config)
94
      throws ConfigurationException {
95
    super.init(instance, name, config);
×
96
    port = config.getInt("port");
×
97
    maxLength = config.getInt("maxLength", MAX_LENGTH);
×
98
    initialBytesToStrip = config.getInt("initialBytesToStrip", 0);
×
99
    rcvBufferSize = config.getInt("rcvBufferSize", 0);
×
100
    datagram = new DatagramPacket(new byte[maxLength], maxLength);
×
101

102
    this.mdb = YamcsServer.getServer().getInstance(yamcsInstance).getMdb();
×
103

104
    frameParam = VariableParam.getForFullyQualifiedName("/yamcs/pop-os/links/Video/frameData");
×
105

106
    ParameterType ptype = getBasicType(mdb, Type.BINARY);
×
107

108
    frameParam.setParameterType(ptype);
×
109

110
    if (mdb.getParameter(frameParam.getQualifiedName()) == null) {
×
111
      log.debug("Adding OPCUA object as parameter to mdb:{}", frameParam.getQualifiedName());
×
112
      try {
113
        mdb.addParameter(frameParam, true, true);
×
114
      } catch (Exception e) {
×
115
        // TODO Auto-generated catch block
116
        //        internalLogger.info(e.toString());
117
        //        internalLogger.info("Failed to add PV:" + p.getQualifiedName());
118
        org.yamcs.yarch.protobuf.Db.Event ev =
119
            Event.newBuilder()
×
120
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
121
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
122
                .setSource(this.linkName)
×
123
                .setType(this.linkName)
×
124
                .setMessage("Failed to add PV:" + frameParam.getQualifiedName())
×
125
                .setSeverity(EventSeverity.ERROR)
×
126
                .build();
×
127
        eventProducer.sendEvent(ev);
×
128
      }
×
129
    } else {
130
      frameParam = (VariableParam) mdb.getParameter(frameParam.getQualifiedName());
×
131
    }
132

133
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(yamcsInstance);
×
134

135
    this.videoStream = getStream(ydb, "video_frames_stream");
×
136
  }
×
137

138
  @Override
139
  public void doStart() {
140
    if (!isDisabled()) {
×
141
      //      try {
142
      //        //        readVideo();
143
      //        streamVideoOverRTP();
144
      //      } catch (IOException e) {
145
      //        // TODO Auto-generated catch block
146
      //        e.printStackTrace();
147
      //      }
148
      try {
149
        tmSocket = new DatagramSocket(port);
×
150
        if (rcvBufferSize > 0) {
×
151
          tmSocket.setReceiveBufferSize(rcvBufferSize);
×
152
        }
153
        Thread thread = new Thread(this);
×
154
        thread.setName("UdpTmDataLink-" + linkName);
×
155
        thread.start();
×
156
      } catch (SocketException e) {
×
157
        notifyFailed(e);
×
158
      }
×
159
    }
160
    notifyStarted();
×
161
  }
×
162

163
  @Override
164
  public void doStop() {
165
    if (tmSocket != null) {
×
166
      tmSocket.close();
×
167
    }
168
    notifyStopped();
×
169
  }
×
170

171
  EReturnCode Initialize() {
172
    avformat_network_init();
×
173
    avdevice.avdevice_register_all();
×
174

175
    //          av_log_set_callback(MMC_CustomLogCallback);
176

177
    //          av_log_set_level(ConfigTable->AVLogLevel);
178

179
    return EReturnCode.OK;
×
180
  }
181

182
  private static ParameterType getOrCreateType(
183
      XtceDb mdb, String name, Supplier<ParameterType.Builder<?>> supplier) {
184

185
    String fqn = XtceDb.YAMCS_SPACESYSTEM_NAME + NameDescription.PATH_SEPARATOR + name;
×
186
    ParameterType ptype = mdb.getParameterType(fqn);
×
187
    if (ptype != null) {
×
188
      return ptype;
×
189
    }
190
    ParameterType.Builder<?> typeb = supplier.get().setName(name);
×
191

192
    ptype = typeb.build();
×
193
    ((NameDescription) ptype).setQualifiedName(fqn);
×
194

195
    return ((Mdb) mdb).addSystemParameterType(ptype);
×
196
  }
197

198
  public static ParameterType getBasicType(XtceDb mdb, Type type) {
199
    ParameterType pType = null;
×
200
    switch (type) {
×
201
      case BOOLEAN:
202
        return getOrCreateType(mdb, "boolean", () -> new BooleanParameterType.Builder());
×
203
      case STRING:
204
        return getOrCreateType(mdb, "string", () -> new StringParameterType.Builder());
×
205

206
      case FLOAT:
207
        return getOrCreateType(
×
208
            mdb, "float32", () -> new FloatParameterType.Builder().setSizeInBits(32));
×
209
      case DOUBLE:
210
        return getOrCreateType(
×
211
            mdb, "float64", () -> new FloatParameterType.Builder().setSizeInBits(64));
×
212
      case SINT32:
213
        return getOrCreateType(
×
214
            mdb,
215
            "sint32",
216
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(true));
×
217
      case SINT64:
218
        return getOrCreateType(
×
219
            mdb,
220
            "sint64",
221
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(true));
×
222
      case UINT32:
223
        return getOrCreateType(
×
224
            mdb,
225
            "uint32",
226
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(false));
×
227
      case UINT64:
228
        return getOrCreateType(
×
229
            mdb,
230
            "uint64",
231
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(false));
×
232
      default:
233
        break;
234
    }
235

236
    return pType;
×
237
  }
238

239
  private void writeFrameToDB(byte data[]) {
240
    TupleDefinition tdef = gftdef.copy();
×
241

242
    int numberfRecords = 1;
×
243

244
    List<Object> cols = new ArrayList<>(4 + numberfRecords);
×
245

246
    tdef = gftdef.copy();
×
247
    long gentime = timeService.getMissionTime();
×
248
    cols.add(gentime);
×
249
    cols.add("/yamcs/pop-os/");
×
250
    cols.add(0);
×
251
    cols.add(gentime);
×
252

253
    tdef.addColumn(frameParam.getQualifiedName(), DataType.PARAMETER_VALUE);
×
254

255
    cols.add(getPV(frameParam, gentime, data));
×
256

257
    pushTuple(tdef, cols);
×
258
  }
×
259

260
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
261
    Stream stream = ydb.getStream(streamName);
×
262
    if (stream == null) {
×
263
      try {
264
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
265
      } catch (Exception e) {
×
266
        throw new ConfigurationException(e);
×
267
      }
×
268

269
      stream = ydb.getStream(streamName);
×
270
    }
271
    return stream;
×
272
  }
273

274
  private synchronized void pushTuple(TupleDefinition tdef, List<Object> cols) {
275
    Tuple t;
276
    t = new Tuple(tdef, cols);
×
277
    videoStream.emitTuple(t);
×
278
  }
×
279

280
  public ParameterValue getNewPv(Parameter parameter, long time) {
281
    ParameterValue pv = new ParameterValue(parameter);
×
282
    pv.setAcquisitionTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime());
×
283
    pv.setGenerationTime(time);
×
284
    return pv;
×
285
  }
286

287
  public ParameterValue getPV(Parameter parameter, long time, double v) {
288
    ParameterValue pv = getNewPv(parameter, time);
×
289
    pv.setEngValue(ValueUtility.getDoubleValue(v));
×
290
    pv.setRawValue(ValueUtility.getDoubleValue(v));
×
291
    return pv;
×
292
  }
293

294
  public ParameterValue getPV(Parameter parameter, long time, byte v[]) {
295
    ParameterValue pv = getNewPv(parameter, time);
×
296
    pv.setEngValue(ValueUtility.getBinaryValue(v));
×
297
    pv.setRawValue(ValueUtility.getBinaryValue(v));
×
298
    return pv;
×
299
  }
300

301
  static void save_frame(AVFrame pFrame, int width, int height, int f_idx) throws IOException {
302
    // Open file
303
    String szFilename = String.format("frame%d_.ppm", f_idx);
×
304
    OutputStream pFile = new FileOutputStream(szFilename);
×
305

306
    // Write header
307
    pFile.write(String.format("P6\n%d %d\n255\n", width, height).getBytes());
×
308

309
    // Write pixel data
310
    BytePointer data = pFrame.data(0);
×
311
    byte[] bytes = new byte[width * 3];
×
312
    int l = pFrame.linesize(0);
×
313
    for (int y = 0; y < height; y++) {
×
314
      data.position(y * l).get(bytes);
×
315
      pFile.write(bytes);
×
316
    }
317

318
    // Close file
319
    pFile.close();
×
320
  }
×
321

322
  /**
323
   * Concatenates the root with the subsystems and returns a qualified name
324
   *
325
   * @param root
326
   */
327
  public static String qualifiedName(String root, String... subsystems) {
328
    if (root.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
329
      throw new IllegalArgumentException(
×
330
          "root has to start with " + NameDescription.PATH_SEPARATOR);
331
    }
332
    StringBuilder sb = new StringBuilder();
×
333
    sb.append(root);
×
334
    for (String s : subsystems) {
×
335
      if (s.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
336
        sb.append(NameDescription.PATH_SEPARATOR);
×
337
      }
338
      sb.append(s);
×
339
    }
340
    return sb.toString();
×
341
  }
342

343
  @Override
344
  public void run() {
345
    while (isRunningAndEnabled()) {
×
346
      TmPacket tmpkt = getNextPacket();
×
347
      if (tmpkt != null) {
×
348
        processPacket(tmpkt);
×
349
      }
350
    }
×
351
  }
×
352

353
  /**
354
   * Called to retrieve the next packet. It blocks in readining on the multicast socket
355
   *
356
   * @return anything that looks as a valid packet, just the size is taken into account to decide if
357
   *     it's valid or not
358
   */
359
  public TmPacket getNextPacket() {
360
    byte[] packet = null;
×
361

362
    while (isRunning()) {
×
363
      try {
364
        tmSocket.receive(datagram);
×
365
        int pktLength = datagram.getLength() - initialBytesToStrip;
×
366

367
        if (pktLength <= 0) {
×
368
          log.warn(
×
369
              "received datagram of size {} <= {} (initialBytesToStrip); ignored.",
370
              datagram.getLength(),
×
371
              initialBytesToStrip);
×
372
          invalidDatagramCount++;
×
373
          continue;
×
374
        }
375

376
        updateStats(datagram.getLength());
×
377
        packet = new byte[pktLength];
×
378
        System.arraycopy(
×
379
            datagram.getData(), datagram.getOffset() + initialBytesToStrip, packet, 0, pktLength);
×
380
        break;
×
381
      } catch (IOException e) {
×
382
        if (!isRunning()
×
383
            || isDisabled()) { // the shutdown or disable will close the socket and that will
×
384
          // generate an exception
385
          // which we ignore here
386
          return null;
×
387
        }
388
        log.warn("exception thrown when reading from the UDP socket at port {}", port, e);
×
389
      }
×
390
    }
391

392
    if (packet != null) {
×
393
      TmPacket tmPacket = new TmPacket(timeService.getMissionTime(), packet);
×
394
      tmPacket.setEarthRceptionTime(timeService.getHresMissionTime());
×
395
      return packetPreprocessor.process(tmPacket);
×
396
    } else {
397
      return null;
×
398
    }
399
  }
400

401
  /** returns statistics with the number of datagram received and the number of invalid datagrams */
402
  @Override
403
  public String getDetailedStatus() {
404
    if (isDisabled()) {
×
405
      return "DISABLED";
×
406
    } else {
407
      return String.format(
×
408
          "OK (%s) %nValid datagrams received: %d%nInvalid datagrams received: %d",
409
          port, packetCount.get(), invalidDatagramCount);
×
410
    }
411
  }
412

413
  /** Sets the disabled to true such that getNextPacket ignores the received datagrams */
414
  @Override
415
  public void doDisable() {
416
    if (tmSocket != null) {
×
417
      tmSocket.close();
×
418
      tmSocket = null;
×
419
    }
420
  }
×
421

422
  /**
423
   * Sets the disabled to false such that getNextPacket does not ignore the received datagrams
424
   *
425
   * @throws SocketException
426
   */
427
  @Override
428
  public void doEnable() throws SocketException {
429
    tmSocket = new DatagramSocket(port);
×
430
    new Thread(this).start();
×
431
  }
×
432

433
  @Override
434
  protected Status connectionStatus() {
435
    return Status.OK;
×
436
  }
437
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc