• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 28232587735

26 Jun 2026 10:32AM UTC coverage: 69.216%. First build
28232587735

Pull #1472

github

web-flow
Merge b6330b8fb into dcbd69002
Pull Request #1472: Fix FFMPEG commands by adding spaces

22 of 43 new or added lines in 2 files covered. (51.16%)

13684 of 19770 relevant lines covered (69.22%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.04
/pod/video_encode_transcript/Encoding_video.py
1
"""Esup-Pod video encoding."""
2

3
import argparse
1✔
4
import json
1✔
5
import logging
1✔
6
import os
1✔
7
import time
1✔
8
import unicodedata
1✔
9

10
from webvtt import Caption, WebVTT
1✔
11

12
if __name__ == "__main__":
1✔
13
    from encoding_settings import (
×
14
        FFMPEG_AUDIO_BITRATE,
15
        FFMPEG_CMD,
16
        FFMPEG_CREATE_OVERVIEW,
17
        FFMPEG_CREATE_THUMBNAIL,
18
        FFMPEG_CRF,
19
        FFMPEG_DRESSING_AUDIO,
20
        FFMPEG_DRESSING_CONCAT,
21
        FFMPEG_DRESSING_FILTER_COMPLEX,
22
        FFMPEG_DRESSING_INPUT,
23
        FFMPEG_DRESSING_OUTPUT,
24
        FFMPEG_DRESSING_SCALE,
25
        FFMPEG_DRESSING_SILENT,
26
        FFMPEG_DRESSING_WATERMARK,
27
        FFMPEG_EXTRACT_SUBTITLE,
28
        FFMPEG_EXTRACT_THUMBNAIL,
29
        FFMPEG_HLS_COMMON_PARAMS,
30
        FFMPEG_HLS_ENCODE_PARAMS,
31
        FFMPEG_HLS_TIME,
32
        FFMPEG_INPUT,
33
        FFMPEG_LEVEL,
34
        FFMPEG_LIBX,
35
        FFMPEG_M4A_ENCODE,
36
        FFMPEG_MP3_ENCODE,
37
        FFMPEG_MP4_ENCODE,
38
        FFMPEG_NB_THREADS,
39
        FFMPEG_NB_THUMBNAIL,
40
        FFMPEG_PRESET,
41
        FFMPEG_PROFILE,
42
        FFPROBE_CMD,
43
        FFPROBE_GET_INFO,
44
    )
45
    from encoding_utils import (
×
46
        check_file,
47
        get_dressing_position_value,
48
        get_info_from_video,
49
        get_list_rendition,
50
        launch_cmd,
51
    )
52
else:
53
    from .encoding_settings import (
1✔
54
        FFMPEG_AUDIO_BITRATE,
55
        FFMPEG_CMD,
56
        FFMPEG_CREATE_OVERVIEW,
57
        FFMPEG_CREATE_THUMBNAIL,
58
        FFMPEG_CRF,
59
        FFMPEG_DRESSING_AUDIO,
60
        FFMPEG_DRESSING_CONCAT,
61
        FFMPEG_DRESSING_FILTER_COMPLEX,
62
        FFMPEG_DRESSING_INPUT,
63
        FFMPEG_DRESSING_OUTPUT,
64
        FFMPEG_DRESSING_SCALE,
65
        FFMPEG_DRESSING_SILENT,
66
        FFMPEG_DRESSING_WATERMARK,
67
        FFMPEG_EXTRACT_SUBTITLE,
68
        FFMPEG_EXTRACT_THUMBNAIL,
69
        FFMPEG_HLS_COMMON_PARAMS,
70
        FFMPEG_HLS_ENCODE_PARAMS,
71
        FFMPEG_HLS_TIME,
72
        FFMPEG_INPUT,
73
        FFMPEG_LEVEL,
74
        FFMPEG_LIBX,
75
        FFMPEG_M4A_ENCODE,
76
        FFMPEG_MP3_ENCODE,
77
        FFMPEG_MP4_ENCODE,
78
        FFMPEG_NB_THREADS,
79
        FFMPEG_NB_THUMBNAIL,
80
        FFMPEG_PRESET,
81
        FFMPEG_PROFILE,
82
        FFPROBE_CMD,
83
        FFPROBE_GET_INFO,
84
    )
85
    from .encoding_utils import (
1✔
86
        check_file,
87
        get_dressing_position_value,
88
        get_info_from_video,
89
        get_list_rendition,
90
        launch_cmd,
91
    )
92

93
__author__ = "Nicolas CAN <nicolas.can@univ-lille.fr>"
1✔
94
__license__ = "LGPL v3"
1✔
95

96
image_codec = ["jpeg", "gif", "png", "bmp", "jpg"]
1✔
97

98
"""
1✔
99
 - Get video source
100
 - get alls track from video source
101
 - encode tracks in HLS and mp4
102
 - save it
103
"""
104

105
try:
1✔
106
    from django.conf import settings
1✔
107

108
    FFMPEG_CMD = getattr(settings, "FFMPEG_CMD", FFMPEG_CMD)
1✔
109
    FFPROBE_CMD = getattr(settings, "FFPROBE_CMD", FFPROBE_CMD)
1✔
110
    FFPROBE_GET_INFO = getattr(settings, "FFPROBE_GET_INFO", FFPROBE_GET_INFO)
1✔
111
    FFMPEG_CRF = getattr(settings, "FFMPEG_CRF", FFMPEG_CRF)
1✔
112
    FFMPEG_PRESET = getattr(settings, "FFMPEG_PRESET", FFMPEG_PRESET)
1✔
113
    FFMPEG_PROFILE = getattr(settings, "FFMPEG_PROFILE", FFMPEG_PROFILE)
1✔
114
    FFMPEG_LEVEL = getattr(settings, "FFMPEG_LEVEL", FFMPEG_LEVEL)
1✔
115
    FFMPEG_HLS_TIME = getattr(settings, "FFMPEG_HLS_TIME", FFMPEG_HLS_TIME)
1✔
116
    FFMPEG_INPUT = getattr(settings, "FFMPEG_INPUT", FFMPEG_INPUT)
1✔
117
    FFMPEG_LIBX = getattr(settings, "FFMPEG_LIBX", FFMPEG_LIBX)
1✔
118
    FFMPEG_MP4_ENCODE = getattr(settings, "FFMPEG_MP4_ENCODE", FFMPEG_MP4_ENCODE)
1✔
119
    FFMPEG_HLS_COMMON_PARAMS = getattr(
1✔
120
        settings, "FFMPEG_HLS_COMMON_PARAMS", FFMPEG_HLS_COMMON_PARAMS
121
    )
122
    FFMPEG_HLS_ENCODE_PARAMS = getattr(
1✔
123
        settings, "FFMPEG_HLS_ENCODE_PARAMS", FFMPEG_HLS_ENCODE_PARAMS
124
    )
125
    FFMPEG_MP3_ENCODE = getattr(settings, "FFMPEG_MP3_ENCODE", FFMPEG_MP3_ENCODE)
1✔
126
    FFMPEG_M4A_ENCODE = getattr(settings, "FFMPEG_M4A_ENCODE", FFMPEG_M4A_ENCODE)
1✔
127
    FFMPEG_NB_THREADS = getattr(settings, "FFMPEG_NB_THREADS", FFMPEG_NB_THREADS)
1✔
128
    FFMPEG_AUDIO_BITRATE = getattr(settings, "FFMPEG_AUDIO_BITRATE", FFMPEG_AUDIO_BITRATE)
1✔
129
    FFMPEG_EXTRACT_THUMBNAIL = getattr(
1✔
130
        settings, "FFMPEG_EXTRACT_THUMBNAIL", FFMPEG_EXTRACT_THUMBNAIL
131
    )
132
    FFMPEG_NB_THUMBNAIL = getattr(settings, "FFMPEG_NB_THUMBNAIL", FFMPEG_NB_THUMBNAIL)
1✔
133
    FFMPEG_CREATE_THUMBNAIL = getattr(
1✔
134
        settings, "FFMPEG_CREATE_THUMBNAIL", FFMPEG_CREATE_THUMBNAIL
135
    )
136
    FFMPEG_EXTRACT_SUBTITLE = getattr(
1✔
137
        settings, "FFMPEG_EXTRACT_SUBTITLE", FFMPEG_EXTRACT_SUBTITLE
138
    )
139
    FFMPEG_DRESSING_INPUT = getattr(
1✔
140
        settings, "FFMPEG_DRESSING_INPUT", FFMPEG_DRESSING_INPUT
141
    )
142
    FFMPEG_DRESSING_OUTPUT = getattr(
1✔
143
        settings, "FFMPEG_DRESSING_OUTPUT", FFMPEG_DRESSING_OUTPUT
144
    )
145
    FFMPEG_DRESSING_WATERMARK = getattr(
1✔
146
        settings, "FFMPEG_DRESSING_WATERMARK", FFMPEG_DRESSING_WATERMARK
147
    )
148
    FFMPEG_DRESSING_FILTER_COMPLEX = getattr(
1✔
149
        settings, "FFMPEG_DRESSING_FILTER_COMPLEX", FFMPEG_DRESSING_FILTER_COMPLEX
150
    )
151
    FFMPEG_DRESSING_SCALE = getattr(
1✔
152
        settings, "FFMPEG_DRESSING_SCALE", FFMPEG_DRESSING_SCALE
153
    )
154
    FFMPEG_DRESSING_CONCAT = getattr(
1✔
155
        settings, "FFMPEG_DRESSING_CONCAT", FFMPEG_DRESSING_CONCAT
156
    )
157
    FFMPEG_DRESSING_SILENT = getattr(
1✔
158
        settings, "FFMPEG_DRESSING_SILENT", FFMPEG_DRESSING_SILENT
159
    )
160
    FFMPEG_DRESSING_AUDIO = getattr(
1✔
161
        settings, "FFMPEG_DRESSING_AUDIO", FFMPEG_DRESSING_AUDIO
162
    )
163
    DEBUG = getattr(settings, "DEBUG", True)
1✔
164
except ImportError:  # pragma: no cover
165
    DEBUG = True
166
    pass
167

168
logger = logging.getLogger(__name__)
1✔
169
if DEBUG:
1✔
170
    logger.setLevel(logging.DEBUG)
1✔
171

172

173
class Encoding_video:
1✔
174
    """Encoding video object."""
175

176
    id = 0
1✔
177
    video_file = ""
1✔
178
    duration = 0
1✔
179
    list_video_track = {}
1✔
180
    list_audio_track = {}
1✔
181
    list_subtitle_track = {}
1✔
182
    list_image_track = {}
1✔
183
    list_mp4_files = {}
1✔
184
    list_hls_files = {}
1✔
185
    list_mp3_files = {}
1✔
186
    list_m4a_files = {}
1✔
187
    list_thumbnail_files = {}
1✔
188
    list_overview_files = {}
1✔
189
    list_subtitle_files = {}
1✔
190
    encoding_log = {}
1✔
191
    output_dir = ""
1✔
192
    start = 0
1✔
193
    stop = 0
1✔
194
    error_encoding = False
1✔
195
    cutting_start = 0
1✔
196
    cutting_stop = 0
1✔
197
    json_dressing = None
1✔
198
    dressing_input = ""
1✔
199

200
    def __init__(
1✔
201
        self,
202
        id=0,
203
        video_file="",
204
        start=0,
205
        stop=0,
206
        json_dressing=None,
207
        dressing_input="",
208
    ) -> None:
209
        """Initialize a new Encoding_video object."""
210
        self.id = id
1✔
211
        self.video_file = video_file
1✔
212
        self.duration = 0
1✔
213
        self.list_video_track = {}
1✔
214
        self.list_audio_track = {}
1✔
215
        self.list_subtitle_track = {}
1✔
216
        self.list_image_track = {}
1✔
217
        self.list_mp4_files = {}
1✔
218
        self.list_hls_files = {}
1✔
219
        self.list_mp3_files = {}
1✔
220
        self.list_m4a_files = {}
1✔
221
        self.list_thumbnail_files = {}
1✔
222
        self.list_overview_files = {}
1✔
223
        self.encoding_log = {}
1✔
224
        self.list_subtitle_files = {}
1✔
225
        self.output_dir = ""
1✔
226
        self.start = 0
1✔
227
        self.stop = 0
1✔
228
        self.error_encoding = False
1✔
229
        self.cutting_start = start or 0
1✔
230
        self.cutting_stop = stop or 0
1✔
231
        self.json_dressing = json_dressing
1✔
232
        self.dressing_input = dressing_input
1✔
233

234
    def is_video(self) -> bool:
1✔
235
        """Check if current encoding correspond to a video."""
236
        return len(self.list_video_track) > 0
1✔
237

238
    def get_subtime(self, clip_begin, clip_end) -> str:
1✔
239
        subtime = ""
1✔
240
        if clip_begin != 0 or clip_end != 0:
1✔
241
            subtime += "-ss %s " % str(clip_begin) + "-to %s " % str(clip_end)
×
242
        return subtime
1✔
243

244
    def get_video_data(self) -> None:
1✔
245
        """Get alls tracks from video source and put it in object passed in parameter."""
246
        msg = "--> get_info_video\n"
1✔
247
        probe_cmd = FFPROBE_GET_INFO % {
1✔
248
            "ffprobe": FFPROBE_CMD,
249
            "select_streams": "",
250
            "source": '"' + self.video_file + '" ',
251
        }
252
        msg += probe_cmd + "\n"
1✔
253
        duration = 0
1✔
254
        info, return_msg = get_info_from_video(probe_cmd)
1✔
255
        msg += json.dumps(info, indent=2)
1✔
256
        msg += " \n"
1✔
257
        msg += return_msg + "\n"
1✔
258
        self.add_encoding_log("probe_cmd", probe_cmd, True, msg)
1✔
259
        try:
1✔
260
            fmt = info.get("format") if info else None
1✔
261
            if fmt and fmt.get("duration") is not None:
1✔
262
                duration = int(float("%s" % fmt.get("duration")))
1✔
263
            else:
264
                raise KeyError("duration")
×
265
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
266
            msg = "\nUnexpected error: {0}".format(err)
×
267
            self.add_encoding_log("duration", "", True, msg)
×
268
        if self.cutting_start != 0 or self.cutting_stop != 0:
1✔
269
            duration = self.cutting_stop - self.cutting_start
×
270
        self.duration = duration
1✔
271
        streams = info.get("streams", [])
1✔
272
        for stream in streams:
1✔
273
            self.add_stream(stream)
1✔
274

275
    def fix_duration(self, input_file) -> None:
1✔
276
        msg = "--> get_info_video\n"
×
277
        probe_cmd = 'ffprobe -v quiet -show_entries format=duration -hide_banner  \
×
278
                    -of default=noprint_wrappers=1:nokey=1 -print_format json -i \
279
                    "{}"'.format(input_file)
280
        info, return_msg = get_info_from_video(probe_cmd)
×
281
        msg += json.dumps(info, indent=2)
×
282
        msg += " \n"
×
283
        msg += return_msg + "\n"
×
284
        duration = 0
×
285
        try:
×
286
            fmt = info.get("format") if info else None
×
287
            if fmt and fmt.get("duration") is not None:
×
288
                duration = int(float("%s" % fmt.get("duration")))
×
289
            else:
290
                raise KeyError("duration")
×
291
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
292
            msg += "\nUnexpected error: {0}".format(err)
×
293
        self.add_encoding_log("fix_duration", "", True, msg)
×
294
        if self.cutting_start != 0 or self.cutting_stop != 0:
×
295
            duration = self.cutting_stop - self.cutting_start
×
296
        self.duration = duration
×
297

298
    def add_stream(self, stream):
1✔
299
        codec_type = stream.get("codec_type", "unknown")
1✔
300
        # https://ffmpeg.org/doxygen/3.2/group__lavu__misc.html#ga9a84bba4713dfced21a1a56163be1f48
301
        if codec_type == "audio":
1✔
302
            codec = stream.get("codec_name", "unknown")
1✔
303
            self.list_audio_track["%s" % stream.get("index")] = {
1✔
304
                "sample_rate": stream.get("sample_rate", 0),
305
                "channels": stream.get("channels", 0),
306
            }
307
        if codec_type == "video":
1✔
308
            codec = stream.get("codec_name", "unknown")
1✔
309
            if any(ext in codec.lower() for ext in image_codec):
1✔
310
                self.list_image_track["%s" % stream.get("index")] = {
×
311
                    "width": stream.get("width", 0),
312
                    "height": stream.get("height", 0),
313
                }
314
            else:
315
                self.list_video_track["%s" % stream.get("index")] = {
1✔
316
                    "width": stream.get("width", 0),
317
                    "height": stream.get("height", 0),
318
                }
319
        if codec_type == "subtitle":
1✔
320
            codec = stream.get("codec_name", "unknown")
×
321
            language = ""
×
322
            if stream.get("tags"):
×
323
                language = stream.get("tags").get("language", "")
×
324
            self.list_subtitle_track["%s" % stream.get("index")] = {"language": language}
×
325

326
    def get_output_dir(self) -> str:
1✔
327
        dirname = os.path.dirname(self.video_file)
1✔
328
        return os.path.join(dirname, "%04d" % int(self.id))
1✔
329

330
    def create_output_dir(self) -> None:
1✔
331
        output_dir = self.get_output_dir()
1✔
332
        if not os.path.exists(output_dir):
1✔
333
            os.makedirs(output_dir)
1✔
334
        self.output_dir = output_dir
1✔
335

336
    def get_mp4_command(self) -> str:
1✔
337
        mp4_cmd_parts = [FFMPEG_CMD]
1✔
338
        list_rendition = get_list_rendition()
1✔
339
        # remove rendition if encode_mp4 == False
340
        for rend in list_rendition.copy():
1✔
341
            if list_rendition[rend]["encode_mp4"] is False:
1✔
342
                list_rendition.pop(rend)
1✔
343
        if len(list_rendition) == 0:
1✔
344
            return ""
×
345
        first_item = list_rendition.popitem(last=False)
1✔
346
        mp4_cmd_parts.append(
1✔
347
            FFMPEG_INPUT
348
            % {
349
                "input": self.video_file,
350
                "nb_threads": FFMPEG_NB_THREADS,
351
            }
352
        )
353
        output_file = os.path.join(self.output_dir, "%sp.mp4" % first_item[0])
1✔
354
        mp4_cmd_parts.append(
1✔
355
            FFMPEG_MP4_ENCODE
356
            % {
357
                "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
358
                "map_audio": "-map 0:a:0" if len(self.list_audio_track) > 0 else "",
359
                "libx": FFMPEG_LIBX,
360
                "height": first_item[0],
361
                "preset": FFMPEG_PRESET,
362
                "profile": FFMPEG_PROFILE,
363
                "level": FFMPEG_LEVEL,
364
                "crf": FFMPEG_CRF,
365
                "maxrate": first_item[1]["maxrate"],
366
                "bufsize": first_item[1]["maxrate"],
367
                "ba": first_item[1]["audio_bitrate"],
368
                "output": output_file,
369
            }
370
        )
371
        self.list_mp4_files[first_item[0]] = output_file
1✔
372
        """
1✔
373
        il est possible de faire ainsi :
374
        mp4_command += FFMPEG_MP4_ENCODE.format(
375
            height=first_item[0],
376
            [...]
377
        )
378
        """
379
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
380
        for rend in list_rendition:
1✔
381
            resolution_threshold = rend - rend * (
1✔
382
                list_rendition[rend]["encoding_resolution_threshold"] / 100
383
            )
384
            if in_height >= resolution_threshold:
1✔
385
                output_file = os.path.join(self.output_dir, "%sp.mp4" % rend)
×
NEW
386
                mp4_cmd_parts.append(
×
387
                    FFMPEG_MP4_ENCODE
388
                    % {
389
                        "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
390
                        "map_audio": (
391
                            "-map 0:a:0" if len(self.list_audio_track) > 0 else ""
392
                        ),
393
                        "libx": FFMPEG_LIBX,
394
                        "height": min(rend, in_height),
395
                        "preset": FFMPEG_PRESET,
396
                        "profile": FFMPEG_PROFILE,
397
                        "level": FFMPEG_LEVEL,
398
                        "crf": FFMPEG_CRF,
399
                        "maxrate": list_rendition[rend]["maxrate"],
400
                        "bufsize": list_rendition[rend]["maxrate"],
401
                        "ba": list_rendition[rend]["audio_bitrate"],
402
                        "output": output_file,
403
                    }
404
                )
405
                self.list_mp4_files[rend] = output_file
×
406
        return " ".join(mp4_cmd_parts)
1✔
407

408
    def get_hls_command(self) -> str:
1✔
409
        hls_cmd_parts = [FFMPEG_CMD]
1✔
410
        list_rendition = get_list_rendition()
1✔
411
        hls_cmd_parts.append(
1✔
412
            FFMPEG_INPUT
413
            % {
414
                "input": self.video_file,
415
                "nb_threads": FFMPEG_NB_THREADS,
416
            }
417
        )
418
        hls_common_params = FFMPEG_HLS_COMMON_PARAMS % {
1✔
419
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
420
            "libx": FFMPEG_LIBX,
421
            "preset": FFMPEG_PRESET,
422
            "profile": FFMPEG_PROFILE,
423
            "level": FFMPEG_LEVEL,
424
            "crf": FFMPEG_CRF,
425
        }
426
        hls_cmd_parts.append(hls_common_params)
1✔
427
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
428
        for index, rend in enumerate(list_rendition):
1✔
429
            resolution_threshold = rend - rend * (
1✔
430
                list_rendition[rend]["encoding_resolution_threshold"] / 100
431
            )
432
            if in_height >= resolution_threshold or index == 0:
1✔
433
                output_file = os.path.join(self.output_dir, "%sp.m3u8" % rend)
1✔
434
                hls_cmd_parts.append(hls_common_params)
1✔
435
                hls_cmd_parts.append(
1✔
436
                    FFMPEG_HLS_ENCODE_PARAMS
437
                    % {
438
                        "height": min(rend, in_height),
439
                        "maxrate": list_rendition[rend]["maxrate"],
440
                        "bufsize": list_rendition[rend]["maxrate"],
441
                        "ba": list_rendition[rend]["audio_bitrate"],
442
                        "hls_time": FFMPEG_HLS_TIME,
443
                        "output": output_file,
444
                    }
445
                )
446
                self.list_hls_files[rend] = output_file
1✔
447
        return " ".join(hls_cmd_parts)
1✔
448

449
    def get_dressing_file(self) -> str:
1✔
450
        """Create or replace the dressed video file."""
451
        dirname = os.path.dirname(self.video_file)
×
452
        filename, ext = os.path.splitext(os.path.basename(self.video_file))
×
453
        output_file = os.path.join(dirname, filename + "_dressing" + ext)
×
454
        return output_file
×
455

456
    def get_dressing_command(self) -> str:
1✔
457
        """
458
        Generate the FFMPEG command based on the dressing object parameters.
459

460
        Returns:
461
            A string representing the complete FFMPEG command.
462
        """
463
        height = str(list(self.list_video_track.items())[0][1]["height"])
×
NEW
464
        dressing_cmd_parts = [FFMPEG_CMD]
×
NEW
465
        dressing_cmd_parts.append(
×
466
            FFMPEG_INPUT
467
            % {
468
                "input": self.video_file,
469
                "nb_threads": FFMPEG_NB_THREADS,
470
            }
471
        )
472

NEW
473
        dressing_cmd_parts.append(self.dressing_input)
×
474

475
        # Handle opening and ending credits
NEW
476
        dressing_cmd_parts.append(self.handle_dressing_credits())
×
477

478
        # Apply filters
479
        dressing_command_filter, dressing_command_params = self.build_dressing_filters(
×
480
            height
481
        )
482

NEW
483
        dressing_cmd_parts.append(
×
484
            FFMPEG_DRESSING_FILTER_COMPLEX
485
            % {
486
                "filter": ";".join(dressing_command_filter),
487
            }
488
        )
489

490
        if self.json_dressing.get("opening_credits") or self.json_dressing.get(
×
491
            "ending_credits"
492
        ):
NEW
493
            dressing_cmd_parts.append("-map '[v]' -map '[a]'")
×
494

495
        output_file = self.get_dressing_file()
×
NEW
496
        dressing_cmd_parts.append(FFMPEG_DRESSING_OUTPUT % {"output": output_file})
×
497

NEW
498
        return " ".join(dressing_cmd_parts)
×
499

500
    def handle_dressing_credits(self) -> str:
1✔
501
        """
502
        Handle the addition of opening and ending credits to the command.
503

504
        Returns:
505
            A string with the FFMPEG commands for silent credits if required.
506
        """
507
        command = ""
×
508
        for credit_type in ["opening_credits", "ending_credits"]:
×
509
            if self.json_dressing.get(credit_type):
×
510
                has_audio_key = f"{credit_type}_video_hasaudio"
×
511
                duration_key = f"{credit_type}_video_duration"
×
512

513
                if not self.json_dressing.get(has_audio_key, True):
×
514
                    duration = self.json_dressing.get(duration_key)
×
515

516
                    try:
×
517
                        duration = int(duration) if duration and int(duration) > 0 else 1
×
518
                    except (ValueError, TypeError):
×
519
                        duration = 1
×
520

521
                    command += FFMPEG_DRESSING_SILENT % {"duration": duration}
×
522

523
        return command
×
524

525
    def build_dressing_filters(self, height: str):
1✔
526
        """
527
        Build the filters for video processing.
528

529
        Args:
530
            height: The height of the video for scaling purposes.
531

532
        Returns:
533
            A tuple containing the list of filters and the filter parameters.
534
        """
535
        filters = []
×
536
        params = ""
×
537
        order = 0
×
538
        interval_silent = 0
×
539
        concat_number = 1
×
540

541
        # Base video track
542
        filters.append(
×
543
            FFMPEG_DRESSING_SCALE % {"number": "0", "height": height, "name": "vid"}
544
        )
545
        params = "[vid][0:a]"
×
546
        order = order + 1
×
547

548
        # Watermark if present
549
        if self.json_dressing.get("watermark"):
×
550
            filters.append(self.apply_dressing_watermark(height))
×
551
            params = "[video][0:a]"
×
552
            order = order + 1
×
553

554
        interval_silent = order
×
555
        # Opening credits
556
        if self.json_dressing.get("opening_credits"):
×
557
            if self.json_dressing.get("ending_credits"):
×
558
                interval_silent = interval_silent + 1
×
559
            filters, params, interval_silent = self.add_dressing_credits(
×
560
                filters,
561
                params,
562
                height,
563
                "opening_credits",
564
                "debut",
565
                order,
566
                interval_silent,
567
            )
568
            order = order + 1
×
569
            concat_number = concat_number + 1
×
570

571
        # Ending credits
572
        if self.json_dressing.get("ending_credits"):
×
573
            filters, params, interval_silent = self.add_dressing_credits(
×
574
                filters, params, height, "ending_credits", "fin", order, interval_silent
575
            )
576
            concat_number = concat_number + 1
×
577

578
        # Concatenate if needed
579
        if self.json_dressing.get("opening_credits") or self.json_dressing.get(
×
580
            "ending_credits"
581
        ):
582
            filters.append(
×
583
                FFMPEG_DRESSING_CONCAT % {"params": params, "number": concat_number}
584
            )
585

586
        return filters, params
×
587

588
    def apply_dressing_watermark(self, height: str) -> str:
1✔
589
        """
590
        Apply the watermark to the video.
591

592
        Args:
593
            height: The height of the video to position the watermark correctly.
594

595
        Returns:
596
            A string representing the FFMPEG command for applying the watermark.
597
        """
598
        if not self.json_dressing:
×
599
            return ""
×
600
        opacity = self.json_dressing.get("opacity", 100) / 100.0
×
601
        position = get_dressing_position_value(
×
602
            self.json_dressing.get("position_orig", "center"), height
603
        )
604
        name_out = (
×
605
            "[video]"
606
            if self.json_dressing.get("opening_credits")
607
            or self.json_dressing.get("ending_credits")
608
            else ""
609
        )
610

611
        return FFMPEG_DRESSING_WATERMARK % {
×
612
            "opacity": opacity,
613
            "position": position,
614
            "name_out": name_out,
615
        }
616

617
    def add_dressing_credits(
1✔
618
        self,
619
        filters: list,
620
        params: str,
621
        height: str,
622
        credit_type: str,
623
        name: str,
624
        order: int,
625
        interval_silent: int,
626
    ):
627
        """
628
        Add opening or ending credits to the FFmpeg command by updating the filters and parameters.
629

630
        Args:
631
            filters (list): A list of existing FFmpeg filters to which new filters will be appended.
632
            params (str): The current filter parameters string that will be updated to include the credits.
633
            height (str): The height of the video used for scaling the credit overlay.
634
            credit_type (str): Specifies the type of credits to add, either 'opening_credits' or 'ending_credits'.
635
            name (str): The identifier for the credit video or overlay to be used in the FFmpeg filter graph.
636
            order (int): The position identifier for the audio stream, used to sync audio with the credits.
637
            interval_silent (int): Counter indicating the number of silent audio intervals inserted, used when adding silent audio tracks.
638

639
        Returns:
640
            tuple:
641
                - Updated list of filters with the added credit filters.
642
                - Updated filter parameter string reflecting the new credit positioning.
643
                - Updated interval_silent value if a silent audio track was added.
644
        """
645
        audio_out = f"{order}:a"
×
646

647
        if not self.json_dressing.get(f"{credit_type}_video_hasaudio"):
×
648
            audio_out = f"a{order}"
×
649
            filters.append(
×
650
                FFMPEG_DRESSING_AUDIO
651
                % {
652
                    "param_in": f"{interval_silent + 1}:a",
653
                    "param_out": audio_out,
654
                }
655
            )
656
            interval_silent = interval_silent + 1
×
657

658
        if credit_type == "opening_credits":
×
659
            params = f"[{name}][{audio_out}]{params}"
×
660
        else:
661
            params = f"{params}[{name}][{audio_out}]"
×
662

663
        filters.append(
×
664
            FFMPEG_DRESSING_SCALE % {"number": str(order), "height": height, "name": name}
665
        )
666

667
        return filters, params, interval_silent
×
668

669
    def encode_video_dressing(self) -> None:
1✔
670
        """Encode the dressed video."""
671
        dressing_command = self.get_dressing_command()
×
672
        return_value, return_msg = launch_cmd(dressing_command)
×
673
        self.add_encoding_log(
×
674
            "dressing_command", dressing_command, return_value, return_msg
675
        )
676
        self.video_file = self.get_dressing_file()
×
677

678
    def encode_video_part(self) -> None:
1✔
679
        """Encode the video part of a file."""
680
        mp4_command = self.get_mp4_command()
1✔
681
        return_value, return_msg = launch_cmd(mp4_command)
1✔
682
        self.add_encoding_log("mp4_command", mp4_command, return_value, return_msg)
1✔
683
        if not return_value:
1✔
684
            self.error_encoding = True
×
685
        if self.duration == 0:
1✔
686
            list_rendition = get_list_rendition()
×
687
            first_item = list_rendition.popitem(last=False)
×
688
            self.fix_duration(self.list_mp4_files[first_item[0]])
×
689
        hls_command = self.get_hls_command()
1✔
690
        return_value, return_msg = launch_cmd(hls_command)
1✔
691
        if return_value:
1✔
692
            self.create_main_livestream()
1✔
693
        self.add_encoding_log("hls_command", hls_command, return_value, return_msg)
1✔
694

695
    def create_main_livestream(self) -> None:
1✔
696
        list_rendition = get_list_rendition()
1✔
697
        livestream_content = ""
1✔
698
        for index, rend in enumerate(list_rendition):
1✔
699
            rend_livestream = os.path.join(
1✔
700
                self.get_output_dir(), "livestream%s.m3u8" % rend
701
            )
702
            if os.path.exists(rend_livestream):
1✔
703
                with open(rend_livestream, "r") as file:
1✔
704
                    data = file.read()
1✔
705
                if index == 0:
1✔
706
                    livestream_content += data
1✔
707
                else:
708
                    livestream_content += "\n".join(data.split("\n")[2:])
×
709
                os.remove(rend_livestream)
1✔
710
        livestream_file = open(
1✔
711
            os.path.join(self.get_output_dir(), "livestream.m3u8"), "w"
712
        )
713
        livestream_file.write(livestream_content.replace("\n\n", "\n"))
1✔
714
        livestream_file.close()
1✔
715

716
    def get_mp3_command(self) -> str:
1✔
717
        mp3_cmd_parts = [FFMPEG_CMD]
1✔
718
        mp3_cmd_parts.append(
1✔
719
            FFMPEG_INPUT
720
            % {
721
                "input": self.video_file,
722
                "nb_threads": FFMPEG_NB_THREADS,
723
            }
724
        )
725
        output_file = os.path.join(self.output_dir, "audio_%s.mp3" % FFMPEG_AUDIO_BITRATE)
1✔
726
        mp3_cmd_parts.append(
1✔
727
            FFMPEG_MP3_ENCODE
728
            % {
729
                # "audio_bitrate": AUDIO_BITRATE,
730
                "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
731
                "output": output_file,
732
            }
733
        )
734
        self.list_mp3_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
735
        return " ".join(mp3_cmd_parts)
1✔
736

737
    def get_m4a_command(self) -> str:
1✔
738
        m4a_cmd_parts = [FFMPEG_CMD]
1✔
739
        m4a_cmd_parts.append(
1✔
740
            FFMPEG_INPUT
741
            % {
742
                "input": self.video_file,
743
                "nb_threads": FFMPEG_NB_THREADS,
744
            }
745
        )
746
        output_file = os.path.join(self.output_dir, "audio_%s.m4a" % FFMPEG_AUDIO_BITRATE)
1✔
747
        m4a_cmd_parts.append(
1✔
748
            FFMPEG_M4A_ENCODE
749
            % {
750
                "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
751
                "audio_bitrate": FFMPEG_AUDIO_BITRATE,
752
                "output": output_file,
753
            }
754
        )
755
        self.list_m4a_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
756
        return " ".join(m4a_cmd_parts)
1✔
757

758
    def encode_audio_part(self) -> None:
1✔
759
        """Encode the audio part of a video."""
760
        mp3_command = self.get_mp3_command()
1✔
761
        return_value, return_msg = launch_cmd(mp3_command)
1✔
762
        self.add_encoding_log("mp3_command", mp3_command, return_value, return_msg)
1✔
763
        if self.duration == 0:
1✔
764
            new_k = list(self.list_mp3_files)[0]
×
765
            self.fix_duration(self.list_mp3_files[new_k])
×
766
        if not self.is_video():
1✔
767
            m4a_command = self.get_m4a_command()
1✔
768
            return_value, return_msg = launch_cmd(m4a_command)
1✔
769
            self.add_encoding_log("m4a_command", m4a_command, return_value, return_msg)
1✔
770

771
    def get_extract_thumbnail_command(self) -> str:
1✔
NEW
772
        thumbnail_cmd_parts = [FFMPEG_CMD]
×
NEW
773
        thumbnail_cmd_parts.append(
×
774
            FFMPEG_INPUT
775
            % {
776
                "input": self.video_file,
777
                "nb_threads": FFMPEG_NB_THREADS,
778
            }
779
        )
780
        for img in self.list_image_track:
×
781
            output_file = os.path.join(self.output_dir, "thumbnail_%s.png" % img)
×
NEW
782
            thumbnail_cmd_parts.append(
×
783
                FFMPEG_EXTRACT_THUMBNAIL
784
                % {
785
                    "index": img,
786
                    "output": output_file,
787
                }
788
            )
789
            self.list_thumbnail_files[img] = output_file
×
NEW
790
        return " ".join(thumbnail_cmd_parts)
×
791

792
    def get_create_thumbnail_command(self) -> str:
1✔
793
        thumbnail_cmd_parts = [FFMPEG_CMD]
1✔
794
        first_item = self.get_first_item()
1✔
795
        if not first_item or first_item[0] not in self.list_mp4_files:
1✔
796
            logger.error("No MP4 rendition available to create thumbnails.")
×
797
            return ""
×
798
        input_file = self.list_mp4_files[first_item[0]]
1✔
799
        thumbnail_cmd_parts.append(
1✔
800
            FFMPEG_INPUT
801
            % {
802
                "input": input_file,
803
                "nb_threads": FFMPEG_NB_THREADS,
804
            }
805
        )
806
        output_file = os.path.join(self.output_dir, "thumbnail")
1✔
807
        thumbnail_cmd_parts.append(
1✔
808
            FFMPEG_CREATE_THUMBNAIL
809
            % {
810
                "duration": self.duration,
811
                "nb_thumbnail": FFMPEG_NB_THUMBNAIL,
812
                "output": output_file,
813
            }
814
        )
815
        for nb in range(0, FFMPEG_NB_THUMBNAIL):
1✔
816
            num_thumb = str(nb + 1)
1✔
817
            self.list_thumbnail_files[num_thumb] = "%s_000%s.png" % (
1✔
818
                output_file,
819
                num_thumb,
820
            )
821
        return " ".join(thumbnail_cmd_parts)
1✔
822

823
    def get_first_item(self):
1✔
824
        """Get the first mp4 render from setting."""
825
        list_rendition = get_list_rendition()
1✔
826
        for rend in list_rendition.copy():
1✔
827
            if list_rendition[rend]["encode_mp4"] is False:
1✔
828
                list_rendition.pop(rend)
1✔
829
        if len(list_rendition) == 0:
1✔
830
            return None
×
831
        else:
832
            return list_rendition.popitem(last=False)
1✔
833

834
    def create_overview(self) -> None:
1✔
835
        first_item = self.get_first_item()
1✔
836
        if not first_item or first_item[0] not in self.list_mp4_files:
1✔
837
            logger.error("No MP4 rendition available to create overview.")
×
838
            return
×
839
        # overview combine for 160x90
840
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
841
        in_width = list(self.list_video_track.items())[0][1]["width"]
1✔
842
        image_height = 90
1✔
843
        coef = in_height / image_height
1✔
844
        image_width = int(in_width / coef)
1✔
845
        input_file = self.list_mp4_files[first_item[0]]
1✔
846
        nb_img = 100 if self.duration >= 100 else 10
1✔
847

848
        overviewimagefilename = os.path.join(self.output_dir, "overview.png")
1✔
849
        overview_image_command = (
1✔
850
            FFMPEG_CMD
851
            + " "
852
            + FFMPEG_INPUT
853
            % {
854
                "input": input_file,
855
                "nb_threads": FFMPEG_NB_THREADS,
856
            }
857
            + FFMPEG_CREATE_OVERVIEW
858
            % {
859
                "duration": self.duration,
860
                "image_count": nb_img,
861
                "width": image_width,
862
                "height": image_height,
863
                "output": overviewimagefilename,
864
            }
865
        )
866
        return_value, output_message = launch_cmd(overview_image_command)
1✔
867
        if not return_value or not check_file(overviewimagefilename):
1✔
868
            logger.error(f"FFmpeg failed with output: {output_message}")
×
869

870
        overviewfilename = os.path.join(self.output_dir, "overview.vtt")
1✔
871
        image_url = os.path.basename(overviewimagefilename)
1✔
872
        webvtt = WebVTT()
1✔
873
        for i in range(0, nb_img):
1✔
874
            start = format(float(self.duration * i / nb_img), ".3f")
1✔
875
            end = format(float(self.duration * (i + 1) / nb_img), ".3f")
1✔
876
            start_time = time.strftime(
1✔
877
                "%H:%M:%S", time.gmtime(int(str(start).split(".")[0]))
878
            )
879
            start_time += ".%s" % (str(start).split(".")[1])
1✔
880
            end_time = time.strftime(
1✔
881
                "%H:%M:%S", time.gmtime(int(str(end).split(".")[0]))
882
            ) + ".%s" % (str(end).split(".")[1])
883
            caption = Caption(
1✔
884
                "%s" % start_time,
885
                "%s" % end_time,
886
                "%s#xywh=%d,%d,%d,%d"
887
                % (image_url, image_width * i, 0, image_width, image_height),
888
            )
889
            webvtt.captions.append(caption)
1✔
890
        webvtt.save(overviewfilename)
1✔
891
        if check_file(overviewfilename) and check_file(overviewimagefilename):
1✔
892
            self.list_overview_files["0"] = overviewimagefilename
1✔
893
            self.list_overview_files["1"] = overviewfilename
1✔
894
            # self.encoding_log += "\n- overviewfilename:\n%s" % overviewfilename
895
        else:
896
            self.add_encoding_log("create_overview", "", False, "")
×
897

898
    def encode_image_part(self) -> None:
1✔
899
        if len(self.list_image_track) > 0:
1✔
900
            thumbnail_command = self.get_extract_thumbnail_command()
×
901
            return_value, return_msg = launch_cmd(thumbnail_command)
×
902
            self.add_encoding_log(
×
903
                "extract_thumbnail_command", thumbnail_command, return_value, return_msg
904
            )
905
        elif self.is_video():
1✔
906
            thumbnail_command = self.get_create_thumbnail_command()
1✔
907
            return_value, return_msg = launch_cmd(thumbnail_command)
1✔
908
            self.add_encoding_log(
1✔
909
                "create_thumbnail_command", thumbnail_command, return_value, return_msg
910
            )
911
        # on ne fait pas d'overview pour les videos de moins de 10 secondes
912
        # (laisser les 10sec inclus pour laisser les tests passer) --> OK
913
        if self.is_video() and self.duration >= 10:
1✔
914
            self.create_overview()
1✔
915

916
    def get_extract_subtitle_command(self) -> str:
1✔
NEW
917
        subtitle_cmd_parts = [FFMPEG_CMD]
×
NEW
918
        subtitle_cmd_parts.append(
×
919
            FFMPEG_INPUT
920
            % {
921
                "input": self.video_file,
922
                "nb_threads": FFMPEG_NB_THREADS,
923
            }
924
        )
925
        for sub in self.list_subtitle_track:
×
926
            lang = self.list_subtitle_track[sub]["language"]
×
927
            output_file = os.path.join(self.output_dir, "subtitle_%s.vtt" % lang)
×
NEW
928
            subtitle_cmd_parts.append(
×
929
                FFMPEG_EXTRACT_SUBTITLE
930
                % {
931
                    "index": sub,
932
                    "output": output_file,
933
                }
934
            )
935
            self.list_subtitle_files[sub] = [lang, output_file]
×
NEW
936
        return " ".subtitle_cmd_parts
×
937

938
    def get_subtitle_part(self) -> None:
1✔
939
        if len(self.list_subtitle_track) > 0:
×
940
            subtitle_command = self.get_extract_subtitle_command()
×
941
            return_value, return_msg = launch_cmd(subtitle_command)
×
942
            self.add_encoding_log(
×
943
                "subtitle_command", subtitle_command, return_value, return_msg
944
            )
945

946
    def export_to_json(self) -> None:
1✔
947
        data_to_dump = {}
1✔
948
        for attribute, value in self.__dict__.items():
1✔
949
            data_to_dump[attribute] = value
1✔
950
        with open(self.output_dir + "/info_video.json", "w") as outfile:
1✔
951
            json.dump(data_to_dump, outfile, indent=2)
1✔
952

953
    def add_encoding_log(self, title, command, result, msg) -> None:
1✔
954
        """Add Encoding step to the encoding_log dict."""
955
        self.encoding_log[title] = {"command": command, "result": result, "msg": msg}
1✔
956
        if result is False and self.error_encoding is False:
1✔
957
            self.error_encoding = True
×
958

959
    def start_encode(self) -> None:
1✔
960
        self.start = time.ctime()
1✔
961
        self.create_output_dir()
1✔
962
        self.get_video_data()
1✔
963
        if self.json_dressing is not None:
1✔
964
            self.encode_video_dressing()
×
965
        logger.info(
1✔
966
            "start_encode {id: %s, file: %s, duration: %s}"
967
            % (self.id, self.video_file, self.duration)
968
        )
969
        if self.is_video():
1✔
970
            logger.debug("* encode_video_part")
1✔
971
            self.encode_video_part()
1✔
972
        if len(self.list_audio_track) > 0:
1✔
973
            logger.debug("* encode_audio_part")
1✔
974
            self.encode_audio_part()
1✔
975
        logger.debug("* encode_image_part")
1✔
976
        self.encode_image_part()
1✔
977
        if len(self.list_subtitle_track) > 0:
1✔
978
            logger.debug("* get_subtitle_part")
×
979
            self.get_subtitle_part()
×
980
        self.stop = time.ctime()
1✔
981
        self.export_to_json()
1✔
982

983

984
def fix_input(input) -> str:
1✔
985
    filename = ""
×
986
    if args.input.startswith("/"):
×
987
        path_file = args.input
×
988
    else:
989
        path_file = os.path.join(os.getcwd(), args.input)
×
990
    if os.access(path_file, os.F_OK) and os.stat(path_file).st_size > 0:
×
991
        # remove accent and space
992
        filename = "".join(
×
993
            (
994
                c
995
                for c in unicodedata.normalize("NFD", path_file)
996
                if unicodedata.category(c) != "Mn"
997
            )
998
        )
999
        filename = filename.replace(" ", "_")
×
1000
        os.rename(
×
1001
            path_file,
1002
            filename,
1003
        )
1004
        logger.info("Encoding file {} \n".format(filename))
×
1005
    return filename
×
1006

1007

1008
"""
1✔
1009
  remote encode???
1010
"""
1011
if __name__ == "__main__":
1✔
1012
    start = "Start at: %s" % time.ctime()
×
1013
    parser = argparse.ArgumentParser(description="Running encoding video.")
×
1014
    parser.add_argument("--id", required=True, help="the ID of the video")
×
1015
    parser.add_argument("--start", required=False, help="Start cut")
×
1016
    parser.add_argument("--stop", required=False, help="Stop cut")
×
1017
    parser.add_argument("--input", required=True, help="name of input file to encode")
×
1018
    parser.add_argument("--dressing", required=False, help="Dressing for the video")
×
1019

1020
    args = parser.parse_args()
×
1021
    logger.debug(args.start)
×
1022
    filename = fix_input(args.input)
×
1023
    dressing_data = None
×
1024
    if args.dressing:
×
1025
        try:
×
1026
            dressing_data = json.loads(args.dressing)
×
1027
        except json.JSONDecodeError:
×
1028
            logger.error("Invalid dressing JSON provided, ignoring it.")
×
1029
    encoding_video = Encoding_video(
×
1030
        args.id, filename, args.start, args.stop, dressing_data
1031
    )
1032
    # error if uncommented
1033
    # encoding_video.encoding_log += start
1034
    # AttributeError: 'NoneType' object has no attribute 'get'
1035
    encoding_video.start_encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc