• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

WindhoverLabs / yamcs-cfs / #167

12 Dec 2024 12:23AM UTC coverage: 0.0%. Remained the same
#167

push

lorenzo-gomez-windhover
-Load pipeline config. WIP.

0 of 411 new or added lines in 4 files covered. (0.0%)

1 existing line in 1 file now uncovered.

0 of 8703 relevant lines covered (0.0%)

0.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/main/java/com/windhoverlabs/com/video/VideoDataLinkHardCoded.java
1
package com.windhoverlabs.com.video;
2

3
import static org.bytedeco.ffmpeg.global.avcodec.*;
4
import static org.bytedeco.ffmpeg.global.avformat.*;
5
import static org.bytedeco.ffmpeg.global.avutil.*;
6
import static org.bytedeco.ffmpeg.global.swscale.*;
7

8
import java.io.*;
9
import java.io.IOException;
10
import java.net.DatagramPacket;
11
import java.net.DatagramSocket;
12
import java.net.SocketException;
13
import java.util.ArrayList;
14
import java.util.List;
15
import java.util.concurrent.Executors;
16
import java.util.concurrent.ScheduledExecutorService;
17
import java.util.function.Supplier;
18
import org.bytedeco.ffmpeg.avcodec.*;
19
import org.bytedeco.ffmpeg.avformat.*;
20
import org.bytedeco.ffmpeg.avutil.*;
21
import org.bytedeco.ffmpeg.global.avutil;
22
import org.bytedeco.ffmpeg.swscale.*;
23
import org.bytedeco.javacpp.*;
24
import org.yamcs.ConfigurationException;
25
import org.yamcs.StandardTupleDefinitions;
26
import org.yamcs.TmPacket;
27
import org.yamcs.YConfiguration;
28
import org.yamcs.YamcsServer;
29
import org.yamcs.mdb.Mdb;
30
import org.yamcs.parameter.ParameterValue;
31
import org.yamcs.protobuf.Event.EventSeverity;
32
import org.yamcs.protobuf.Yamcs.Value.Type;
33
import org.yamcs.tctm.AbstractTmDataLink;
34
import org.yamcs.utils.ValueUtility;
35
import org.yamcs.xtce.BooleanParameterType;
36
import org.yamcs.xtce.FloatParameterType;
37
import org.yamcs.xtce.IntegerParameterType;
38
import org.yamcs.xtce.NameDescription;
39
import org.yamcs.xtce.Parameter;
40
import org.yamcs.xtce.ParameterType;
41
import org.yamcs.xtce.StringParameterType;
42
import org.yamcs.xtce.XtceDb;
43
import org.yamcs.yarch.DataType;
44
import org.yamcs.yarch.Stream;
45
import org.yamcs.yarch.Tuple;
46
import org.yamcs.yarch.TupleDefinition;
47
import org.yamcs.yarch.YarchDatabase;
48
import org.yamcs.yarch.YarchDatabaseInstance;
49
import org.yamcs.yarch.protobuf.Db.Event;
50

51
/**
52
 * Receives telemetry packets via UDP. One UDP datagram = one TM packet.
53
 *
54
 * <p>Options:
55
 *
56
 * <ul>
57
 *   <li>{@code port} - the UDP port to listen to
58
 *   <li>{@code maxLength} - the maximum length of the datagram (and thus the TM packet length +
59
 *       initialBytesToStrip). If a datagram longer than this size will be received, it will be
60
 *       truncated. Default: 1500 (bytes)
61
 *   <li>{@code initialBytesToStrip} - if configured, skip that number of bytes from the beginning
62
 *       of the datagram. Default: 0
63
 * </ul>
64
 */
NEW
65
public class VideoDataLinkHardCoded extends AbstractTmDataLink implements Runnable {
×
NEW
66
  private volatile int invalidDatagramCount = 0;
×
67

68
  private DatagramSocket tmSocket;
69
  private int port;
70

71
  static final int MAX_LENGTH = 1500;
72
  DatagramPacket datagram;
73
  int maxLength;
74
  int initialBytesToStrip;
75
  int rcvBufferSize;
76

77
  Mdb mdb;
78

79
  private VariableParam frameParam;
80

81
  private Stream videoStream;
82

NEW
83
  private static TupleDefinition gftdef = StandardTupleDefinitions.PARAMETER.copy();
×
84

NEW
85
  ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
×
86

87
  /**
88
   * Creates a new UDP TM Data Link
89
   *
90
   * @throws ConfigurationException if port is not defined in the configuration
91
   */
92
  @Override
93
  public void init(String instance, String name, YConfiguration config)
94
      throws ConfigurationException {
NEW
95
    super.init(instance, name, config);
×
NEW
96
    port = config.getInt("port");
×
NEW
97
    maxLength = config.getInt("maxLength", MAX_LENGTH);
×
NEW
98
    initialBytesToStrip = config.getInt("initialBytesToStrip", 0);
×
NEW
99
    rcvBufferSize = config.getInt("rcvBufferSize", 0);
×
NEW
100
    datagram = new DatagramPacket(new byte[maxLength], maxLength);
×
101

NEW
102
    this.mdb = YamcsServer.getServer().getInstance(yamcsInstance).getMdb();
×
103

NEW
104
    frameParam = VariableParam.getForFullyQualifiedName("/yamcs/pop-os/links/Video/frameData");
×
105

NEW
106
    ParameterType ptype = getBasicType(mdb, Type.BINARY);
×
107

NEW
108
    frameParam.setParameterType(ptype);
×
109

NEW
110
    if (mdb.getParameter(frameParam.getQualifiedName()) == null) {
×
NEW
111
      log.debug("Adding OPCUA object as parameter to mdb:{}", frameParam.getQualifiedName());
×
112
      try {
NEW
113
        mdb.addParameter(frameParam, true, true);
×
NEW
114
      } catch (Exception e) {
×
115
        // TODO Auto-generated catch block
116
        //        internalLogger.info(e.toString());
117
        //        internalLogger.info("Failed to add PV:" + p.getQualifiedName());
118
        org.yamcs.yarch.protobuf.Db.Event ev =
NEW
119
            Event.newBuilder()
×
NEW
120
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
121
                .setGenerationTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime())
×
NEW
122
                .setSource(this.linkName)
×
NEW
123
                .setType(this.linkName)
×
NEW
124
                .setMessage("Failed to add PV:" + frameParam.getQualifiedName())
×
NEW
125
                .setSeverity(EventSeverity.ERROR)
×
NEW
126
                .build();
×
NEW
127
        eventProducer.sendEvent(ev);
×
NEW
128
      }
×
129
    } else {
NEW
130
      frameParam = (VariableParam) mdb.getParameter(frameParam.getQualifiedName());
×
131
    }
132

NEW
133
    YarchDatabaseInstance ydb = YarchDatabase.getInstance(yamcsInstance);
×
134

NEW
135
    this.videoStream = getStream(ydb, "video_frames_stream");
×
NEW
136
  }
×
137

138
  @Override
139
  public void doStart() {
NEW
140
    if (!isDisabled()) {
×
141
      try {
NEW
142
        tmSocket = new DatagramSocket(port);
×
NEW
143
        if (rcvBufferSize > 0) {
×
NEW
144
          tmSocket.setReceiveBufferSize(rcvBufferSize);
×
145
        }
NEW
146
        Thread thread = new Thread(this);
×
NEW
147
        thread.setName("UdpTmDataLink-" + linkName);
×
NEW
148
        thread.start();
×
NEW
149
      } catch (SocketException e) {
×
NEW
150
        notifyFailed(e);
×
NEW
151
      }
×
152
    }
NEW
153
    notifyStarted();
×
NEW
154
  }
×
155

156
  @Override
157
  public void doStop() {
NEW
158
    if (tmSocket != null) {
×
NEW
159
      tmSocket.close();
×
160
    }
NEW
161
    notifyStopped();
×
NEW
162
  }
×
163

164
  EReturnCode Initialize() {
NEW
165
    avformat_network_init();
×
166
    //          avdevice_register_all();
167

168
    //          av_log_set_callback(MMC_CustomLogCallback);
169

170
    //          av_log_set_level(ConfigTable->AVLogLevel);
171
    //
172
    //          /* First set the channel ID for all the channels.  This is used for
173
    //           * logging and error reporting. */
174
    //          for(uint32 i = 0; i < MMC_CHANNEL_MAX; ++i)
175
    //          {
176
    //                  Channel[i].SetChannelID(i);
177
    //          }
178
    //
179
    //          /* Next, initialize all channels. */
180
    //          for(uint32 i = 0; i < MMC_CHANNEL_MAX; ++i)
181
    //          {
182
    //                  Channel[i].Initialize();
183
    //          }
184

NEW
185
    return EReturnCode.OK;
×
186
  }
187

188
  public void delayByPts(long pts, AVRational timeBase, int startTime) {
189
    // Convert PTS to microseconds
190

NEW
191
    AVRational r = new AVRational();
×
NEW
192
    r.num(1);
×
NEW
193
    r.den(avutil.AV_TIME_BASE);
×
NEW
194
    int playbackTime = (int) av_rescale_q(pts, timeBase, r);
×
NEW
195
    int currentTime = (int) (av_gettime_relative() - startTime);
×
196

NEW
197
    if (playbackTime > currentTime) {
×
198
      // Sleep until it's time to display/send this frame
NEW
199
      int sleepTime = playbackTime - currentTime;
×
NEW
200
      System.out.println("Sleeping for " + sleepTime + " us");
×
201
      //                        Thread.sleep(sleepTime);
NEW
202
      av_usleep(sleepTime);
×
203
    }
NEW
204
  }
×
205

206
  private void streamVideoOverRTP() throws IOException {
NEW
207
    System.out.println("Starting video streaming over RTP...");
×
208

NEW
209
    av_log_set_level(AV_LOG_DEBUG);
×
210

211
    //    av_log_set_callback();
212

NEW
213
    avformat_network_init();
×
214
    //        avdevice_register_all();
215

NEW
216
    int ret, v_stream_idx = -1;
×
NEW
217
    String inputFile = "/home/lgomez/Downloads/217115_small.mp4";
×
NEW
218
    inputFile =
×
219
        "/home/lgomez/Downloads/vecteezy_vancouver-canada-september-16-2023-flight-by-fpv-drone_37202565.mp4";
NEW
220
    String outputURL = "rtp://127.0.0.1:5005";
×
221

222
    //    outputURL =
223
    //
224
    // "/home/lgomez/projects/viper_sitl/squeaky-weasel/software/airliner/build/venus_aero/sassie/sitl_commander_workspace/new_video.mp4";
225

NEW
226
    AVFormatContext inputCtx = avformat_alloc_context();
×
227

NEW
228
    AVFormatContext outputCtx = avformat_alloc_context();
×
229

230
    // Open input video file
NEW
231
    System.out.println("Opening input file: " + inputFile);
×
NEW
232
    if (avformat_open_input(inputCtx, inputFile, null, null) < 0) {
×
NEW
233
      throw new IOException("Failed to open input file");
×
234
    }
235

NEW
236
    System.out.println("Finding stream info...");
×
NEW
237
    if (avformat_find_stream_info(inputCtx, (PointerPointer) null) < 0) {
×
NEW
238
      throw new IOException("Failed to retrieve stream info");
×
239
    }
240

241
    // Find video stream
NEW
242
    System.out.println("Searching for video stream...");
×
NEW
243
    for (int i = 0; i < inputCtx.nb_streams(); i++) {
×
NEW
244
      if (inputCtx.streams(i).codecpar().codec_type() == AVMEDIA_TYPE_VIDEO) {
×
NEW
245
        v_stream_idx = i;
×
NEW
246
        System.out.println("Video stream found at index: " + v_stream_idx);
×
NEW
247
        break;
×
248
      }
249
    }
NEW
250
    if (v_stream_idx == -1) {
×
NEW
251
      throw new IOException("Video stream not found");
×
252
    }
253

NEW
254
    AVStream inputStream = inputCtx.streams(v_stream_idx);
×
NEW
255
    AVCodec codec = avcodec_find_decoder(inputStream.codecpar().codec_id());
×
NEW
256
    System.out.println("av_codec_is_decoder-->" + av_codec_is_decoder(codec));
×
257

NEW
258
    System.out.println("av_codec_is_encoder-->" + av_codec_is_encoder(codec));
×
NEW
259
    if (codec == null) {
×
NEW
260
      throw new IOException("Unsupported codec");
×
261
    }
262

NEW
263
    System.out.println("Allocating codec context...");
×
NEW
264
    AVCodecContext decoderCtx = avcodec_alloc_context3(codec);
×
265

NEW
266
    avcodec_parameters_to_context(decoderCtx, inputStream.codecpar());
×
NEW
267
    avcodec_open2(decoderCtx, codec, (PointerPointer) null);
×
268

269
    // Set up RTP output
NEW
270
    System.out.println("Setting up RTP output: " + outputURL);
×
271

272
    //    if (avformat_alloc_output_context2(outputCtx, null, "rtp_mpegts", outputURL) < 0) {
273
    //      throw new IOException("Failed to create RTP output context");
274
    //    }
275

276
    //    if (avformat_alloc_output_context2(outputCtx, null, null, outputURL) < 0) {
277
    //      throw new IOException("Failed to create RTP output context");
278
    //    }
279

NEW
280
    if (avformat_alloc_output_context2(outputCtx, null, "rtp_mpegts", outputURL) < 0) {
×
NEW
281
      throw new IOException("Failed to create RTP output context");
×
282
    }
283

NEW
284
    AVCodec encoderCodec = avcodec_find_encoder_by_name("libx264");
×
285

286
    //  AVCodec encoderCodec = avcodec_find_encoder_by_name("h264");
287

NEW
288
    if (encoderCodec == null) {
×
NEW
289
      throw new IOException("h264 codec not found");
×
290
    }
291

NEW
292
    AVStream outputStream = avformat_new_stream(outputCtx, encoderCodec);
×
NEW
293
    if (outputStream == null) {
×
NEW
294
      throw new IOException("Failed to create output stream");
×
295
    }
296

NEW
297
    AVRational q = new AVRational();
×
298

NEW
299
    q.num(1);
×
NEW
300
    q.den(30);
×
301

NEW
302
    System.out.println("Configuring encoder...");
×
NEW
303
    AVCodecContext encoderCtx = avcodec_alloc_context3(encoderCodec);
×
NEW
304
    System.out.println("Configuring encoder2...");
×
NEW
305
    System.out.println("encoderCodec.id()-->" + encoderCodec.id());
×
NEW
306
    System.out.println("codec.id()-->" + decoderCtx.width());
×
NEW
307
    System.out.println("codec.id()-->" + decoderCtx.height());
×
NEW
308
    System.out.println("inputStream.time_num()-->" + inputStream.time_base().num());
×
NEW
309
    System.out.println("inputStream.time_den()-->" + inputStream.time_base().den());
×
NEW
310
    System.out.println("av_inv_q(inputStream.time_base())-->" + av_inv_q(inputStream.time_base()));
×
311

312
    //        Context->width = Config->Width;
313
    //        Context->height = Config->Height;
314
    //        Context->time_base = (AVRational){1, (int)Config->FramesPerSecond};
315
    //        Context->pix_fmt = Config->PixelFormat;
316
    //        Context->framerate = (AVRational){(int)Config->FramesPerSecond, 1};
317
    //        Context->bit_rate = Config->BitRate;
318
    //        Context->gop_size = Config->GopSize;
319
    //        Context->max_b_frames = Config->MaxBFrames;
320
    //        Context->gop_size = Config->GopSize;
321
    //        Context->flags |= Config->Flags;
322

NEW
323
    encoderCtx.codec_id(encoderCodec.id());
×
NEW
324
    System.out.println("Configuring encoder3...");
×
NEW
325
    encoderCtx.codec_type(AVMEDIA_TYPE_VIDEO);
×
NEW
326
    encoderCtx.pix_fmt(AV_PIX_FMT_YUV420P);
×
NEW
327
    System.out.println("Configuring encoder4...");
×
328

329
    //    encoderCtx.width(decoderCtx.width());
330
    //    encoderCtx.height(decoderCtx.height());
331
    //
332
    //    encoderCtx.width(640);
333
    //    encoderCtx.height(480);
334
    //
335

NEW
336
    encoderCtx.width(decoderCtx.width());
×
NEW
337
    encoderCtx.height(decoderCtx.height());
×
338

339
    //    AV_TIME_BASE
340

341
    //    avutil.av_get_time_base_q()
342

343
    //    AVMEDIA_TYPE_VIDEO
344

345
    //    av_opt_set()
346
    //
347
    //    av_opt_find()
348
    //
349
    //    avcodec.av_packet_ref()
350
    //
351
    //    av_frame_ref();
352
    //
NEW
353
    av_packet_alloc();
×
354

NEW
355
    System.out.println("Configuring encoder5...");
×
NEW
356
    encoderCtx.time_base(av_inv_q(inputStream.time_base()));
×
357
    //    encoderCtx.time_base(q);
NEW
358
    AVRational fr = new AVRational();
×
359

NEW
360
    fr.den(1);
×
NEW
361
    fr.num(30);
×
NEW
362
    encoderCtx.framerate(fr);
×
NEW
363
    encoderCtx.framerate(decoderCtx.framerate());
×
NEW
364
    System.out.println("Configuring encoder6...");
×
NEW
365
    encoderCtx.bit_rate(400000);
×
NEW
366
    encoderCtx.gop_size(30);
×
NEW
367
    encoderCtx.max_b_frames(0);
×
368

NEW
369
    encoderCtx.hw_device_ctx(null);
×
370

NEW
371
    AVBufferRef HWAccelDeviceContext = new AVBufferRef(null);
×
372
    //    int avRC =
373
    //        av_hwdevice_ctx_create(
374
    //            HWAccelDeviceContext, AV_HWDEVICE_TYPE_CUDA, "0", new AVDictionary(null), 0);
375

376
    //    int avRC =
377
    //        av_hwdevice_ctx_create(
378
    //            HWAccelDeviceContext,
379
    //            AV_HWDEVICE_TYPE_VAAPI,
380
    //            new BytePointer(),
381
    //            new AVDictionary(null),
382
    //            0);
383
    //    if (avRC < 0) {
384
    //      throw new IOException("Failed to create hw device. Error code:" + avRC);
385
    //    }
386

NEW
387
    avcodec_parameters_from_context(outputStream.codecpar(), encoderCtx);
×
NEW
388
    if (avcodec_open2(encoderCtx, encoderCodec, new PointerPointer()) < 0) {
×
NEW
389
      throw new IOException("Failed to open encoder");
×
390
    }
391

NEW
392
    System.out.println("Configuring encoder7...");
×
393

NEW
394
    System.out.println("Configuring encoder8...");
×
395

396
    //    if (avio_open2(outputCtx.pb(), outputURL, AVIO_FLAG_WRITE, null, null) < 0) {
397
    //      throw new IOException("Failed to open RTP output");
398
    //    }
399

400
    // NOTE:This pattern seems to fix it as per
401
    // https://github.com/bytedeco/javacpp-presets/issues/408#issuecomment-291711924
402

NEW
403
    AVIOContext pb = new AVIOContext(null);
×
404

405
    //    if (avio_open(pb, outputURL, AVIO_FLAG_WRITE) < 0) {
406
    //      throw new IOException("Failed to open RTP output");
407
    //    }
408

NEW
409
    avcodec_parameters_copy(outputStream.codecpar(), inputStream.codecpar());
×
410

NEW
411
    AVDictionary options = new AVDictionary(null);
×
NEW
412
    if (avio_open2(pb, outputURL, AVIO_FLAG_WRITE, null, options) < 0) {
×
NEW
413
      throw new IOException("Failed to open RTP output");
×
414
    }
NEW
415
    outputCtx.pb(pb);
×
416

NEW
417
    System.out.println("Configuring encoder9...");
×
418

NEW
419
    avformat_write_header(outputCtx, new AVDictionary(null));
×
420

NEW
421
    System.out.println("RTP streaming setup complete, starting frame processing...");
×
422

NEW
423
    AVFrame frame = av_frame_alloc();
×
NEW
424
    AVPacket packet = new AVPacket();
×
425

426
    //    TODO:Move StartTime assignment to an "Init" method
427

NEW
428
    long StartTime = av_gettime_relative();
×
429

NEW
430
    while (av_read_frame(inputCtx, packet) >= 0) {
×
NEW
431
      if (packet.stream_index() == v_stream_idx) {
×
NEW
432
        System.out.println("Decoding frame...");
×
433

434
        // TODO:The commented section will execute when demux mode is on. This needs to be made
435
        // configurable.
NEW
436
        writeFrame(outputCtx, decoderCtx, outputStream, encoderCtx, frame, packet);
×
NEW
437
        delayByPts(packet.pts(), inputStream.time_base(), ((int) StartTime));
×
438

NEW
439
        av_packet_unref(packet);
×
440
      }
NEW
441
      av_packet_unref(packet);
×
442
    }
443

NEW
444
    System.out.println("Finalizing RTP stream...");
×
NEW
445
    av_write_trailer(outputCtx);
×
446

NEW
447
    avcodec_close(decoderCtx);
×
NEW
448
    avcodec_close(encoderCtx);
×
NEW
449
    avformat_close_input(inputCtx);
×
NEW
450
    avio_closep(outputCtx.pb());
×
NEW
451
    avformat_free_context(outputCtx);
×
452

NEW
453
    System.out.println("Streaming finished successfully");
×
NEW
454
  }
×
455

456
  private EReturnCode RestartInputSource(AVFormatContext Context) {
NEW
457
    EReturnCode rc = EReturnCode.OK;
×
458
    int avRC;
459

460
    //    if (!(Context.flags() & AVFMTCTX_UNSEEKABLE)) {
461
    //      /* Seek back to the beginning of the file */
462
    //      avRC = av_seek_frame(Context, -1, 0, AVSEEK_FLAG_BACKWARD);
463
    //      if (avRC < 0) {
464
    //        ReportAVError("CInputFormat::GetPacket", "av_seek_frame", avRC, __LINE__);
465
    //        rc = EReturnCode.FAILED_EXECUTE;
466
    //        return rc;
467
    //      }
468
    //    }
469

NEW
470
    return rc;
×
471
  }
472

473
  //  private void CInputFormat::Restart()
474
  private void Restart(AVFormatContext Context) {
NEW
475
    EReturnCode rc = EReturnCode.OK;
×
476
    int avRC;
477

478
    //    rc = RestartInputSource(Context);
479
    //
480
    //    /* Reset demuxer state */
481
    //    avRC = avformat_flush(Context);
482
    //    if (avRC < 0) {
483
    //      ReportAVError("CInputFormat::Restart", "avformat_flush", avRC, __LINE__);
484
    //      rc = EReturnCode.FAILED_EXECUTE;
485
    //    }
486
    //
487
    //    StartTime = av_gettime_relative();
488
    //    PtsOffset = NextPts;
489
    //
490
    //    end_of_function:
491
    //    return rc;
NEW
492
  }
×
493

494
  private void writeFrame(
495
      AVFormatContext outputCtx,
496
      AVCodecContext decoderCtx,
497
      AVStream outputStream,
498
      AVCodecContext encoderCtx,
499
      AVFrame frame,
500
      AVPacket packet) {
501
    int ret;
NEW
502
    if (avcodec_send_packet(decoderCtx, packet) >= 0) {
×
NEW
503
      ret = avcodec_receive_frame(decoderCtx, frame);
×
NEW
504
      while (ret >= 0) {
×
NEW
505
        System.out.println("Encoding frame...");
×
NEW
506
        ret = avcodec_send_frame(encoderCtx, frame);
×
507

NEW
508
        System.out.println("ret for avcodec_send_frame-->" + ret);
×
NEW
509
        if (ret >= 0) {
×
NEW
510
          AVPacket outPacket = new AVPacket();
×
NEW
511
          int recv_packets = avcodec_receive_packet(encoderCtx, outPacket);
×
512

NEW
513
          System.out.println("recv_packets-->" + recv_packets);
×
NEW
514
          while (recv_packets >= 0) {
×
NEW
515
            System.out.println("Writing encoded packet to RTP stream...");
×
NEW
516
            outPacket.stream_index(outputStream.index());
×
517

NEW
518
            System.out.println("Length of packet:" + outPacket.size());
×
519

NEW
520
            byte[] outPacketData = new byte[outPacket.size()];
×
NEW
521
            System.out.println("Read data:" + outPacket.data().get(outPacketData));
×
522

NEW
523
            System.out.println("Length of array:" + outPacketData.length);
×
524

NEW
525
            writeFrameToDB(outPacketData);
×
526

NEW
527
            av_write_frame(outputCtx, outPacket);
×
NEW
528
            av_packet_unref(outPacket);
×
529

NEW
530
            recv_packets = avcodec_receive_packet(encoderCtx, outPacket);
×
NEW
531
          }
×
532
        }
NEW
533
        System.out.println("ret for avcodec_receive_frame1 -->" + ret);
×
534

NEW
535
        ret = avcodec_receive_frame(decoderCtx, frame);
×
NEW
536
        System.out.println("ret for avcodec_receive_frame2 -->" + ret);
×
537
      }
538
    }
539

540
    //        av_write_frame(outputCtx, packet);
NEW
541
  }
×
542

543
  private static ParameterType getOrCreateType(
544
      XtceDb mdb, String name, Supplier<ParameterType.Builder<?>> supplier) {
545

NEW
546
    String fqn = XtceDb.YAMCS_SPACESYSTEM_NAME + NameDescription.PATH_SEPARATOR + name;
×
NEW
547
    ParameterType ptype = mdb.getParameterType(fqn);
×
NEW
548
    if (ptype != null) {
×
NEW
549
      return ptype;
×
550
    }
NEW
551
    ParameterType.Builder<?> typeb = supplier.get().setName(name);
×
552

NEW
553
    ptype = typeb.build();
×
NEW
554
    ((NameDescription) ptype).setQualifiedName(fqn);
×
555

NEW
556
    return ((Mdb) mdb).addSystemParameterType(ptype);
×
557
  }
558

559
  public static ParameterType getBasicType(XtceDb mdb, Type type) {
NEW
560
    ParameterType pType = null;
×
NEW
561
    switch (type) {
×
562
      case BOOLEAN:
NEW
563
        return getOrCreateType(mdb, "boolean", () -> new BooleanParameterType.Builder());
×
564
      case STRING:
NEW
565
        return getOrCreateType(mdb, "string", () -> new StringParameterType.Builder());
×
566

567
      case FLOAT:
NEW
568
        return getOrCreateType(
×
NEW
569
            mdb, "float32", () -> new FloatParameterType.Builder().setSizeInBits(32));
×
570
      case DOUBLE:
NEW
571
        return getOrCreateType(
×
NEW
572
            mdb, "float64", () -> new FloatParameterType.Builder().setSizeInBits(64));
×
573
      case SINT32:
NEW
574
        return getOrCreateType(
×
575
            mdb,
576
            "sint32",
NEW
577
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(true));
×
578
      case SINT64:
NEW
579
        return getOrCreateType(
×
580
            mdb,
581
            "sint64",
NEW
582
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(true));
×
583
      case UINT32:
NEW
584
        return getOrCreateType(
×
585
            mdb,
586
            "uint32",
NEW
587
            () -> new IntegerParameterType.Builder().setSizeInBits(32).setSigned(false));
×
588
      case UINT64:
NEW
589
        return getOrCreateType(
×
590
            mdb,
591
            "uint64",
NEW
592
            () -> new IntegerParameterType.Builder().setSizeInBits(64).setSigned(false));
×
593
      default:
594
        break;
595
    }
596

NEW
597
    return pType;
×
598
  }
599

600
  private void writeFrameToDB(byte data[]) {
NEW
601
    TupleDefinition tdef = gftdef.copy();
×
602

NEW
603
    int numberfRecords = 1;
×
604

NEW
605
    List<Object> cols = new ArrayList<>(4 + numberfRecords);
×
606

NEW
607
    tdef = gftdef.copy();
×
NEW
608
    long gentime = timeService.getMissionTime();
×
NEW
609
    cols.add(gentime);
×
NEW
610
    cols.add("/yamcs/pop-os/");
×
NEW
611
    cols.add(0);
×
NEW
612
    cols.add(gentime);
×
613

NEW
614
    tdef.addColumn(frameParam.getQualifiedName(), DataType.PARAMETER_VALUE);
×
615

616
    //    String filePath =
617
    // "/home/lgomez/projects/viper_sitl/squeaky-weasel/software/airliner/build/venus_aero/sassie/sitl_commander_workspace/hello_world.txt";
618
    //
619
    //    try (FileInputStream fis = new FileInputStream(filePath)) {
620
    //      // Create a byte array large enough to hold the file contents
621
    //      byte[] fileBytes = new byte[fis.available()];
622
    //
623
    //      // Read the bytes into the array
624
    //      fis.read(fileBytes);
625
    //
626
    //      // Print the bytes (optional)
627
    //      //        for (byte b : fileBytes) {
628
    //      //            System.out.print(b + " ");
629
    //
630
    //      cols.add(getPV(frameParam, gentime, data));
631
    //
632
    //      //        }
633
    //    } catch (IOException e) {
634
    //      e.printStackTrace();
635
    //    }
636

637
    //    cols.add(getPV(frameParam, gentime, ByteBuffer.allocate(4).putFloat(47.0f).array()));
638

639
    //    cols.add(getPV(frameParam, gentime, 47.0 ));
640

NEW
641
    cols.add(getPV(frameParam, gentime, data));
×
642

NEW
643
    pushTuple(tdef, cols);
×
NEW
644
  }
×
645

646
  private static Stream getStream(YarchDatabaseInstance ydb, String streamName) {
NEW
647
    Stream stream = ydb.getStream(streamName);
×
NEW
648
    if (stream == null) {
×
649
      try {
NEW
650
        ydb.execute("create stream " + streamName + gftdef.getStringDefinition());
×
NEW
651
      } catch (Exception e) {
×
NEW
652
        throw new ConfigurationException(e);
×
NEW
653
      }
×
654

NEW
655
      stream = ydb.getStream(streamName);
×
656
    }
NEW
657
    return stream;
×
658
  }
659

660
  private synchronized void pushTuple(TupleDefinition tdef, List<Object> cols) {
661
    Tuple t;
NEW
662
    t = new Tuple(tdef, cols);
×
NEW
663
    videoStream.emitTuple(t);
×
NEW
664
  }
×
665

666
  public ParameterValue getNewPv(Parameter parameter, long time) {
NEW
667
    ParameterValue pv = new ParameterValue(parameter);
×
NEW
668
    pv.setAcquisitionTime(YamcsServer.getTimeService(yamcsInstance).getMissionTime());
×
NEW
669
    pv.setGenerationTime(time);
×
NEW
670
    return pv;
×
671
  }
672

673
  public ParameterValue getPV(Parameter parameter, long time, double v) {
NEW
674
    ParameterValue pv = getNewPv(parameter, time);
×
NEW
675
    pv.setEngValue(ValueUtility.getDoubleValue(v));
×
NEW
676
    pv.setRawValue(ValueUtility.getDoubleValue(v));
×
NEW
677
    return pv;
×
678
  }
679

680
  public ParameterValue getPV(Parameter parameter, long time, byte v[]) {
NEW
681
    ParameterValue pv = getNewPv(parameter, time);
×
NEW
682
    pv.setEngValue(ValueUtility.getBinaryValue(v));
×
NEW
683
    pv.setRawValue(ValueUtility.getBinaryValue(v));
×
NEW
684
    return pv;
×
685
  }
686

687
  static void save_frame(AVFrame pFrame, int width, int height, int f_idx) throws IOException {
688
    // Open file
NEW
689
    String szFilename = String.format("frame%d_.ppm", f_idx);
×
NEW
690
    OutputStream pFile = new FileOutputStream(szFilename);
×
691

692
    // Write header
NEW
693
    pFile.write(String.format("P6\n%d %d\n255\n", width, height).getBytes());
×
694

695
    // Write pixel data
NEW
696
    BytePointer data = pFrame.data(0);
×
NEW
697
    byte[] bytes = new byte[width * 3];
×
NEW
698
    int l = pFrame.linesize(0);
×
NEW
699
    for (int y = 0; y < height; y++) {
×
NEW
700
      data.position(y * l).get(bytes);
×
NEW
701
      pFile.write(bytes);
×
702
    }
703

704
    // Close file
NEW
705
    pFile.close();
×
NEW
706
  }
×
707

708
  /**
709
   * Concatenates the root with the subsystems and returns a qualified name
710
   *
711
   * @param root
712
   */
713
  public static String qualifiedName(String root, String... subsystems) {
NEW
714
    if (root.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
NEW
715
      throw new IllegalArgumentException(
×
716
          "root has to start with " + NameDescription.PATH_SEPARATOR);
717
    }
NEW
718
    StringBuilder sb = new StringBuilder();
×
NEW
719
    sb.append(root);
×
NEW
720
    for (String s : subsystems) {
×
NEW
721
      if (s.charAt(0) != NameDescription.PATH_SEPARATOR) {
×
NEW
722
        sb.append(NameDescription.PATH_SEPARATOR);
×
723
      }
NEW
724
      sb.append(s);
×
725
    }
NEW
726
    return sb.toString();
×
727
  }
728

729
  private void readVideo() throws IOException {
NEW
730
    System.out.println("Read few frame and write to image");
×
731
    //      if (args.length < 1) {
732
    //          System.out.println("Missing input video file");
733
    //          System.exit(-1);
734
    //      }
NEW
735
    int ret = -1, i = 0, v_stream_idx = -1;
×
NEW
736
    String vf_path = "/home/lgomez/Downloads/ginger_man.mp4";
×
NEW
737
    AVFormatContext fmt_ctx = new AVFormatContext(null);
×
NEW
738
    AVPacket pkt = new AVPacket();
×
739

NEW
740
    ret = avformat_open_input(fmt_ctx, vf_path, null, null);
×
NEW
741
    if (ret < 0) {
×
NEW
742
      System.out.printf("Open video file %s failed \n", vf_path);
×
NEW
743
      throw new IllegalStateException();
×
744
    }
745

746
    // i dont know but without this function, sws_getContext does not work
NEW
747
    if (avformat_find_stream_info(fmt_ctx, (PointerPointer) null) < 0) {
×
NEW
748
      System.exit(-1);
×
749
    }
750

NEW
751
    av_dump_format(fmt_ctx, 0, vf_path, 0);
×
752

NEW
753
    for (i = 0; i < fmt_ctx.nb_streams(); i++) {
×
NEW
754
      if (fmt_ctx.streams(i).codecpar().codec_type() == AVMEDIA_TYPE_VIDEO) {
×
NEW
755
        v_stream_idx = i;
×
NEW
756
        break;
×
757
      }
758
    }
NEW
759
    if (v_stream_idx == -1) {
×
NEW
760
      System.out.println("Cannot find video stream");
×
NEW
761
      throw new IllegalStateException();
×
762
    } else {
NEW
763
      System.out.printf(
×
764
          "Video stream %d with resolution %dx%d\n",
NEW
765
          v_stream_idx,
×
NEW
766
          fmt_ctx.streams(i).codecpar().width(),
×
NEW
767
          fmt_ctx.streams(i).codecpar().height());
×
768
    }
769

NEW
770
    AVCodecContext codec_ctx = avcodec_alloc_context3(null);
×
NEW
771
    avcodec_parameters_to_context(codec_ctx, fmt_ctx.streams(v_stream_idx).codecpar());
×
772

NEW
773
    AVCodec codec = avcodec_find_decoder(codec_ctx.codec_id());
×
NEW
774
    if (codec == null) {
×
NEW
775
      System.out.println("Unsupported codec for video file");
×
NEW
776
      throw new IllegalStateException();
×
777
    }
NEW
778
    ret = avcodec_open2(codec_ctx, codec, (PointerPointer) null);
×
NEW
779
    if (ret < 0) {
×
NEW
780
      System.out.println("Can not open codec");
×
NEW
781
      throw new IllegalStateException();
×
782
    }
783

NEW
784
    AVFrame frm = av_frame_alloc();
×
785

786
    // Allocate an AVFrame structure
NEW
787
    AVFrame pFrameRGB = av_frame_alloc();
×
NEW
788
    if (pFrameRGB == null) {
×
NEW
789
      System.exit(-1);
×
790
    }
791

792
    // Determine required buffer size and allocate buffer
NEW
793
    int numBytes =
×
NEW
794
        av_image_get_buffer_size(AV_PIX_FMT_RGB24, codec_ctx.width(), codec_ctx.height(), 1);
×
NEW
795
    BytePointer buffer = new BytePointer(av_malloc(numBytes));
×
796

NEW
797
    SwsContext sws_ctx =
×
NEW
798
        sws_getContext(
×
NEW
799
            codec_ctx.width(),
×
NEW
800
            codec_ctx.height(),
×
NEW
801
            codec_ctx.pix_fmt(),
×
NEW
802
            codec_ctx.width(),
×
NEW
803
            codec_ctx.height(),
×
804
            AV_PIX_FMT_RGB24,
805
            SWS_BILINEAR,
806
            null,
807
            null,
808
            (DoublePointer) null);
809

NEW
810
    if (sws_ctx == null) {
×
NEW
811
      System.out.println("Can not use sws");
×
NEW
812
      throw new IllegalStateException();
×
813
    }
814

NEW
815
    av_image_fill_arrays(
×
NEW
816
        pFrameRGB.data(),
×
NEW
817
        pFrameRGB.linesize(),
×
818
        buffer,
819
        AV_PIX_FMT_RGB24,
NEW
820
        codec_ctx.width(),
×
NEW
821
        codec_ctx.height(),
×
822
        1);
823

NEW
824
    i = 0;
×
NEW
825
    int ret1 = -1, ret2 = -1, fi = -1;
×
NEW
826
    while (av_read_frame(fmt_ctx, pkt) >= 0) {
×
NEW
827
      if (pkt.stream_index() == v_stream_idx) {
×
NEW
828
        ret1 = avcodec_send_packet(codec_ctx, pkt);
×
NEW
829
        ret2 = avcodec_receive_frame(codec_ctx, frm);
×
NEW
830
        System.out.printf("ret1 %d ret2 %d\n", ret1, ret2);
×
831
        // avcodec_decode_video2(codec_ctx, frm, fi, pkt);
832
      }
833
      // if not check ret2, error occur [swscaler @ 0x1cb3c40] bad src image pointers
834
      // ret2 same as fi
835
      // if (fi && ++i <= 5) {
NEW
836
      if (ret2 >= 0 && ++i <= 100) {
×
NEW
837
        sws_scale(
×
838
            sws_ctx,
NEW
839
            frm.data(),
×
NEW
840
            frm.linesize(),
×
841
            0,
NEW
842
            codec_ctx.height(),
×
NEW
843
            pFrameRGB.data(),
×
NEW
844
            pFrameRGB.linesize());
×
845

NEW
846
        save_frame(pFrameRGB, codec_ctx.width(), codec_ctx.height(), i);
×
847
        // save_frame(frm, codec_ctx.width(), codec_ctx.height(), i);
848
      }
NEW
849
      av_packet_unref(pkt);
×
NEW
850
      if (i >= 100) {
×
NEW
851
        break;
×
852
      }
853
    }
854

NEW
855
    av_frame_free(frm);
×
856

NEW
857
    avcodec_close(codec_ctx);
×
NEW
858
    avcodec_free_context(codec_ctx);
×
859

NEW
860
    avformat_close_input(fmt_ctx);
×
861

NEW
862
    streamVideoOverRTP();
×
NEW
863
    System.out.println("Shutdown");
×
NEW
864
  }
×
865

866
  @Override
867
  public void run() {
868

869
    //    for (int i = 0; i < 10; i++) {
870
    //      writeFrameToDB(new byte[64]);
871
    //    }
872

873
    try {
NEW
874
      streamVideoOverRTP();
×
NEW
875
    } catch (IOException e) {
×
876
      // TODO Auto-generated catch block
NEW
877
      e.printStackTrace();
×
NEW
878
    }
×
879

NEW
880
    while (isRunningAndEnabled()) {
×
881

882
      //      scheduler.scheduleAtFixedRate(
883
      //          () -> {
884
      //            writeFrameToDB(new byte[(int) 1e9 ]);
885
      //          },
886
      //          1,
887
      //          1,
888
      //          TimeUnit.SECONDS);
NEW
889
      TmPacket tmpkt = getNextPacket();
×
890
      //      writeFrameToDB(new byte[64]);
NEW
891
      if (tmpkt != null) {
×
NEW
892
        processPacket(tmpkt);
×
893
      }
NEW
894
    }
×
NEW
895
  }
×
896

897
  /**
898
   * Called to retrieve the next packet. It blocks in readining on the multicast socket
899
   *
900
   * @return anything that looks as a valid packet, just the size is taken into account to decide if
901
   *     it's valid or not
902
   */
903
  public TmPacket getNextPacket() {
NEW
904
    byte[] packet = null;
×
905

NEW
906
    while (isRunning()) {
×
907
      try {
NEW
908
        tmSocket.receive(datagram);
×
NEW
909
        int pktLength = datagram.getLength() - initialBytesToStrip;
×
910

NEW
911
        if (pktLength <= 0) {
×
NEW
912
          log.warn(
×
913
              "received datagram of size {} <= {} (initialBytesToStrip); ignored.",
NEW
914
              datagram.getLength(),
×
NEW
915
              initialBytesToStrip);
×
NEW
916
          invalidDatagramCount++;
×
NEW
917
          continue;
×
918
        }
919

NEW
920
        updateStats(datagram.getLength());
×
NEW
921
        packet = new byte[pktLength];
×
NEW
922
        System.arraycopy(
×
NEW
923
            datagram.getData(), datagram.getOffset() + initialBytesToStrip, packet, 0, pktLength);
×
NEW
924
        break;
×
NEW
925
      } catch (IOException e) {
×
NEW
926
        if (!isRunning()
×
NEW
927
            || isDisabled()) { // the shutdown or disable will close the socket and that will
×
928
          // generate an exception
929
          // which we ignore here
NEW
930
          return null;
×
931
        }
NEW
932
        log.warn("exception thrown when reading from the UDP socket at port {}", port, e);
×
NEW
933
      }
×
934
    }
935

NEW
936
    if (packet != null) {
×
NEW
937
      TmPacket tmPacket = new TmPacket(timeService.getMissionTime(), packet);
×
NEW
938
      tmPacket.setEarthRceptionTime(timeService.getHresMissionTime());
×
NEW
939
      return packetPreprocessor.process(tmPacket);
×
940
    } else {
NEW
941
      return null;
×
942
    }
943
  }
944

945
  /** returns statistics with the number of datagram received and the number of invalid datagrams */
946
  @Override
947
  public String getDetailedStatus() {
NEW
948
    if (isDisabled()) {
×
NEW
949
      return "DISABLED";
×
950
    } else {
NEW
951
      return String.format(
×
952
          "OK (%s) %nValid datagrams received: %d%nInvalid datagrams received: %d",
NEW
953
          port, packetCount.get(), invalidDatagramCount);
×
954
    }
955
  }
956

957
  /** Sets the disabled to true such that getNextPacket ignores the received datagrams */
958
  @Override
959
  public void doDisable() {
NEW
960
    if (tmSocket != null) {
×
NEW
961
      tmSocket.close();
×
NEW
962
      tmSocket = null;
×
963
    }
NEW
964
  }
×
965

966
  /**
967
   * Sets the disabled to false such that getNextPacket does not ignore the received datagrams
968
   *
969
   * @throws SocketException
970
   */
971
  @Override
972
  public void doEnable() throws SocketException {
NEW
973
    tmSocket = new DatagramSocket(port);
×
NEW
974
    new Thread(this).start();
×
NEW
975
  }
×
976

977
  @Override
978
  protected Status connectionStatus() {
NEW
979
    return Status.OK;
×
980
  }
981
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc