• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 21389926431

27 Jan 2026 08:27AM UTC coverage: 70.284% (+0.05%) from 70.233%
21389926431

push

github

web-flow
Fixup. Format code with Black (#1396)

Co-authored-by: github-actions <github-actions@github.com>

4 of 9 new or added lines in 7 files covered. (44.44%)

7 existing lines in 5 files now uncovered.

12358 of 17583 relevant lines covered (70.28%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.68
/pod/video_encode_transcript/Encoding_video.py
1
"""Esup-Pod video encoding."""
2

3
import argparse
1✔
4
import json
1✔
5
import logging
1✔
6
import os
1✔
7
import time
1✔
8
import unicodedata
1✔
9
from webvtt import WebVTT, Caption
1✔
10

11
if __name__ == "__main__":
1✔
12
    from encoding_utils import (
×
13
        get_dressing_position_value,
14
        get_info_from_video,
15
        get_list_rendition,
16
        launch_cmd,
17
        check_file,
18
    )
19
    from encoding_settings import (
×
20
        FFMPEG_CMD,
21
        FFPROBE_CMD,
22
        FFPROBE_GET_INFO,
23
        FFMPEG_CRF,
24
        FFMPEG_PRESET,
25
        FFMPEG_PROFILE,
26
        FFMPEG_LEVEL,
27
        FFMPEG_HLS_TIME,
28
        FFMPEG_INPUT,
29
        FFMPEG_LIBX,
30
        FFMPEG_MP4_ENCODE,
31
        FFMPEG_HLS_COMMON_PARAMS,
32
        FFMPEG_HLS_ENCODE_PARAMS,
33
        FFMPEG_MP3_ENCODE,
34
        FFMPEG_M4A_ENCODE,
35
        FFMPEG_NB_THREADS,
36
        FFMPEG_AUDIO_BITRATE,
37
        FFMPEG_EXTRACT_THUMBNAIL,
38
        FFMPEG_NB_THUMBNAIL,
39
        FFMPEG_CREATE_THUMBNAIL,
40
        FFMPEG_CREATE_OVERVIEW,
41
        FFMPEG_EXTRACT_SUBTITLE,
42
        FFMPEG_DRESSING_INPUT,
43
        FFMPEG_DRESSING_OUTPUT,
44
        FFMPEG_DRESSING_WATERMARK,
45
        FFMPEG_DRESSING_FILTER_COMPLEX,
46
        FFMPEG_DRESSING_SCALE,
47
        FFMPEG_DRESSING_CONCAT,
48
        FFMPEG_DRESSING_SILENT,
49
        FFMPEG_DRESSING_AUDIO,
50
    )
51
else:
52
    from .encoding_utils import (
1✔
53
        get_dressing_position_value,
54
        get_info_from_video,
55
        get_list_rendition,
56
        launch_cmd,
57
        check_file,
58
    )
59
    from .encoding_settings import (
1✔
60
        FFMPEG_CMD,
61
        FFPROBE_CMD,
62
        FFPROBE_GET_INFO,
63
        FFMPEG_CRF,
64
        FFMPEG_PRESET,
65
        FFMPEG_PROFILE,
66
        FFMPEG_LEVEL,
67
        FFMPEG_HLS_TIME,
68
        FFMPEG_INPUT,
69
        FFMPEG_LIBX,
70
        FFMPEG_MP4_ENCODE,
71
        FFMPEG_HLS_COMMON_PARAMS,
72
        FFMPEG_HLS_ENCODE_PARAMS,
73
        FFMPEG_MP3_ENCODE,
74
        FFMPEG_M4A_ENCODE,
75
        FFMPEG_NB_THREADS,
76
        FFMPEG_AUDIO_BITRATE,
77
        FFMPEG_EXTRACT_THUMBNAIL,
78
        FFMPEG_NB_THUMBNAIL,
79
        FFMPEG_CREATE_THUMBNAIL,
80
        FFMPEG_CREATE_OVERVIEW,
81
        FFMPEG_EXTRACT_SUBTITLE,
82
        FFMPEG_DRESSING_INPUT,
83
        FFMPEG_DRESSING_OUTPUT,
84
        FFMPEG_DRESSING_WATERMARK,
85
        FFMPEG_DRESSING_FILTER_COMPLEX,
86
        FFMPEG_DRESSING_SCALE,
87
        FFMPEG_DRESSING_CONCAT,
88
        FFMPEG_DRESSING_SILENT,
89
        FFMPEG_DRESSING_AUDIO,
90
    )
91

92
__author__ = "Nicolas CAN <nicolas.can@univ-lille.fr>"
1✔
93
__license__ = "LGPL v3"
1✔
94

95
image_codec = ["jpeg", "gif", "png", "bmp", "jpg"]
1✔
96

97
"""
98
 - Get video source
99
 - get alls track from video source
100
 - encode tracks in HLS and mp4
101
 - save it
102
"""
103

104
try:
1✔
105
    from django.conf import settings
1✔
106

107
    FFMPEG_CMD = getattr(settings, "FFMPEG_CMD", FFMPEG_CMD)
1✔
108
    FFPROBE_CMD = getattr(settings, "FFPROBE_CMD", FFPROBE_CMD)
1✔
109
    FFPROBE_GET_INFO = getattr(settings, "FFPROBE_GET_INFO", FFPROBE_GET_INFO)
1✔
110
    FFMPEG_CRF = getattr(settings, "FFMPEG_CRF", FFMPEG_CRF)
1✔
111
    FFMPEG_PRESET = getattr(settings, "FFMPEG_PRESET", FFMPEG_PRESET)
1✔
112
    FFMPEG_PROFILE = getattr(settings, "FFMPEG_PROFILE", FFMPEG_PROFILE)
1✔
113
    FFMPEG_LEVEL = getattr(settings, "FFMPEG_LEVEL", FFMPEG_LEVEL)
1✔
114
    FFMPEG_HLS_TIME = getattr(settings, "FFMPEG_HLS_TIME", FFMPEG_HLS_TIME)
1✔
115
    FFMPEG_INPUT = getattr(settings, "FFMPEG_INPUT", FFMPEG_INPUT)
1✔
116
    FFMPEG_LIBX = getattr(settings, "FFMPEG_LIBX", FFMPEG_LIBX)
1✔
117
    FFMPEG_MP4_ENCODE = getattr(settings, "FFMPEG_MP4_ENCODE", FFMPEG_MP4_ENCODE)
1✔
118
    FFMPEG_HLS_COMMON_PARAMS = getattr(
1✔
119
        settings, "FFMPEG_HLS_COMMON_PARAMS", FFMPEG_HLS_COMMON_PARAMS
120
    )
121
    FFMPEG_HLS_ENCODE_PARAMS = getattr(
1✔
122
        settings, "FFMPEG_HLS_ENCODE_PARAMS", FFMPEG_HLS_ENCODE_PARAMS
123
    )
124
    FFMPEG_MP3_ENCODE = getattr(settings, "FFMPEG_MP3_ENCODE", FFMPEG_MP3_ENCODE)
1✔
125
    FFMPEG_M4A_ENCODE = getattr(settings, "FFMPEG_M4A_ENCODE", FFMPEG_M4A_ENCODE)
1✔
126
    FFMPEG_NB_THREADS = getattr(settings, "FFMPEG_NB_THREADS", FFMPEG_NB_THREADS)
1✔
127
    FFMPEG_AUDIO_BITRATE = getattr(settings, "FFMPEG_AUDIO_BITRATE", FFMPEG_AUDIO_BITRATE)
1✔
128
    FFMPEG_EXTRACT_THUMBNAIL = getattr(
1✔
129
        settings, "FFMPEG_EXTRACT_THUMBNAIL", FFMPEG_EXTRACT_THUMBNAIL
130
    )
131
    FFMPEG_NB_THUMBNAIL = getattr(settings, "FFMPEG_NB_THUMBNAIL", FFMPEG_NB_THUMBNAIL)
1✔
132
    FFMPEG_CREATE_THUMBNAIL = getattr(
1✔
133
        settings, "FFMPEG_CREATE_THUMBNAIL", FFMPEG_CREATE_THUMBNAIL
134
    )
135
    FFMPEG_EXTRACT_SUBTITLE = getattr(
1✔
136
        settings, "FFMPEG_EXTRACT_SUBTITLE", FFMPEG_EXTRACT_SUBTITLE
137
    )
138
    FFMPEG_DRESSING_INPUT = getattr(
1✔
139
        settings, "FFMPEG_DRESSING_INPUT", FFMPEG_DRESSING_INPUT
140
    )
141
    FFMPEG_DRESSING_OUTPUT = getattr(
1✔
142
        settings, "FFMPEG_DRESSING_OUTPUT", FFMPEG_DRESSING_OUTPUT
143
    )
144
    FFMPEG_DRESSING_WATERMARK = getattr(
1✔
145
        settings, "FFMPEG_DRESSING_WATERMARK", FFMPEG_DRESSING_WATERMARK
146
    )
147
    FFMPEG_DRESSING_FILTER_COMPLEX = getattr(
1✔
148
        settings, "FFMPEG_DRESSING_FILTER_COMPLEX", FFMPEG_DRESSING_FILTER_COMPLEX
149
    )
150
    FFMPEG_DRESSING_SCALE = getattr(
1✔
151
        settings, "FFMPEG_DRESSING_SCALE", FFMPEG_DRESSING_SCALE
152
    )
153
    FFMPEG_DRESSING_CONCAT = getattr(
1✔
154
        settings, "FFMPEG_DRESSING_CONCAT", FFMPEG_DRESSING_CONCAT
155
    )
156
    FFMPEG_DRESSING_SILENT = getattr(
1✔
157
        settings, "FFMPEG_DRESSING_SILENT", FFMPEG_DRESSING_SILENT
158
    )
159
    FFMPEG_DRESSING_AUDIO = getattr(
1✔
160
        settings, "FFMPEG_DRESSING_AUDIO", FFMPEG_DRESSING_AUDIO
161
    )
162
    DEBUG = getattr(settings, "DEBUG", True)
1✔
163
except ImportError:  # pragma: no cover
164
    DEBUG = True
165
    pass
166

167
logger = logging.getLogger(__name__)
1✔
168
if DEBUG:
1✔
169
    logger.setLevel(logging.DEBUG)
1✔
170

171

172
class Encoding_video:
1✔
173
    """Encoding video object."""
174

175
    id = 0
1✔
176
    video_file = ""
1✔
177
    duration = 0
1✔
178
    list_video_track = {}
1✔
179
    list_audio_track = {}
1✔
180
    list_subtitle_track = {}
1✔
181
    list_image_track = {}
1✔
182
    list_mp4_files = {}
1✔
183
    list_hls_files = {}
1✔
184
    list_mp3_files = {}
1✔
185
    list_m4a_files = {}
1✔
186
    list_thumbnail_files = {}
1✔
187
    list_overview_files = {}
1✔
188
    list_subtitle_files = {}
1✔
189
    encoding_log = {}
1✔
190
    output_dir = ""
1✔
191
    start = 0
1✔
192
    stop = 0
1✔
193
    error_encoding = False
1✔
194
    cutting_start = 0
1✔
195
    cutting_stop = 0
1✔
196
    json_dressing = None
1✔
197
    dressing_input = ""
1✔
198

199
    def __init__(
1✔
200
        self, id=0, video_file="", start=0, stop=0, json_dressing=None, dressing_input=""
201
    ) -> None:
202
        """Initialize a new Encoding_video object."""
203
        self.id = id
1✔
204
        self.video_file = video_file
1✔
205
        self.duration = 0
1✔
206
        self.list_video_track = {}
1✔
207
        self.list_audio_track = {}
1✔
208
        self.list_subtitle_track = {}
1✔
209
        self.list_image_track = {}
1✔
210
        self.list_mp4_files = {}
1✔
211
        self.list_hls_files = {}
1✔
212
        self.list_mp3_files = {}
1✔
213
        self.list_m4a_files = {}
1✔
214
        self.list_thumbnail_files = {}
1✔
215
        self.list_overview_files = {}
1✔
216
        self.encoding_log = {}
1✔
217
        self.list_subtitle_files = {}
1✔
218
        self.output_dir = ""
1✔
219
        self.start = 0
1✔
220
        self.stop = 0
1✔
221
        self.error_encoding = False
1✔
222
        self.cutting_start = start or 0
1✔
223
        self.cutting_stop = stop or 0
1✔
224
        self.json_dressing = json_dressing
1✔
225
        self.dressing_input = dressing_input
1✔
226

227
    def is_video(self) -> bool:
1✔
228
        """Check if current encoding correspond to a video."""
229
        return len(self.list_video_track) > 0
1✔
230

231
    def get_subtime(self, clip_begin, clip_end) -> str:
1✔
232
        subtime = ""
1✔
233
        if clip_begin != 0 or clip_end != 0:
1✔
234
            subtime += "-ss %s " % str(clip_begin) + "-to %s " % str(clip_end)
×
235
        return subtime
1✔
236

237
    def get_video_data(self) -> None:
1✔
238
        """Get alls tracks from video source and put it in object passed in parameter."""
239
        msg = "--> get_info_video\n"
1✔
240
        probe_cmd = FFPROBE_GET_INFO % {
1✔
241
            "ffprobe": FFPROBE_CMD,
242
            "select_streams": "",
243
            "source": '"' + self.video_file + '" ',
244
        }
245
        msg += probe_cmd + "\n"
1✔
246
        duration = 0
1✔
247
        info, return_msg = get_info_from_video(probe_cmd)
1✔
248
        msg += json.dumps(info, indent=2)
1✔
249
        msg += " \n"
1✔
250
        msg += return_msg + "\n"
1✔
251
        self.add_encoding_log("probe_cmd", probe_cmd, True, msg)
1✔
252
        try:
1✔
253
            duration = int(float("%s" % info["format"]["duration"]))
1✔
254
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
255
            msg = "\nUnexpected error: {0}".format(err)
×
256
            self.add_encoding_log("duration", "", True, msg)
×
257
        if self.cutting_start != 0 or self.cutting_stop != 0:
1✔
258
            duration = self.cutting_stop - self.cutting_start
×
259
        self.duration = duration
1✔
260
        streams = info.get("streams", [])
1✔
261
        for stream in streams:
1✔
262
            self.add_stream(stream)
1✔
263

264
    def fix_duration(self, input_file) -> None:
1✔
265
        msg = "--> get_info_video\n"
×
266
        probe_cmd = 'ffprobe -v quiet -show_entries format=duration -hide_banner  \
×
267
                    -of default=noprint_wrappers=1:nokey=1 -print_format json -i \
268
                    "{}"'.format(input_file)
UNCOV
269
        info, return_msg = get_info_from_video(probe_cmd)
×
270
        msg += json.dumps(info, indent=2)
×
271
        msg += " \n"
×
272
        msg += return_msg + "\n"
×
273
        duration = 0
×
274
        try:
×
275
            duration = int(float("%s" % info["format"]["duration"]))
×
276
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
277
            msg += "\nUnexpected error: {0}".format(err)
×
278
        self.add_encoding_log("fix_duration", "", True, msg)
×
279
        if self.cutting_start != 0 or self.cutting_stop != 0:
×
280
            duration = self.cutting_stop - self.cutting_start
×
281
        self.duration = duration
×
282

283
    def add_stream(self, stream):
1✔
284
        codec_type = stream.get("codec_type", "unknown")
1✔
285
        # https://ffmpeg.org/doxygen/3.2/group__lavu__misc.html#ga9a84bba4713dfced21a1a56163be1f48
286
        if codec_type == "audio":
1✔
287
            codec = stream.get("codec_name", "unknown")
1✔
288
            self.list_audio_track["%s" % stream.get("index")] = {
1✔
289
                "sample_rate": stream.get("sample_rate", 0),
290
                "channels": stream.get("channels", 0),
291
            }
292
        if codec_type == "video":
1✔
293
            codec = stream.get("codec_name", "unknown")
1✔
294
            if any(ext in codec.lower() for ext in image_codec):
1✔
295
                self.list_image_track["%s" % stream.get("index")] = {
×
296
                    "width": stream.get("width", 0),
297
                    "height": stream.get("height", 0),
298
                }
299
            else:
300
                self.list_video_track["%s" % stream.get("index")] = {
1✔
301
                    "width": stream.get("width", 0),
302
                    "height": stream.get("height", 0),
303
                }
304
        if codec_type == "subtitle":
1✔
305
            codec = stream.get("codec_name", "unknown")
×
306
            language = ""
×
307
            if stream.get("tags"):
×
308
                language = stream.get("tags").get("language", "")
×
309
            self.list_subtitle_track["%s" % stream.get("index")] = {"language": language}
×
310

311
    def get_output_dir(self) -> str:
1✔
312
        dirname = os.path.dirname(self.video_file)
1✔
313
        return os.path.join(dirname, "%04d" % int(self.id))
1✔
314

315
    def create_output_dir(self) -> None:
1✔
316
        output_dir = self.get_output_dir()
1✔
317
        if not os.path.exists(output_dir):
1✔
318
            os.makedirs(output_dir)
1✔
319
        self.output_dir = output_dir
1✔
320

321
    def get_mp4_command(self) -> str:
1✔
322
        mp4_command = "%s " % FFMPEG_CMD
1✔
323
        list_rendition = get_list_rendition()
1✔
324
        # remove rendition if encode_mp4 == False
325
        for rend in list_rendition.copy():
1✔
326
            if list_rendition[rend]["encode_mp4"] is False:
1✔
327
                list_rendition.pop(rend)
1✔
328
        if len(list_rendition) == 0:
1✔
329
            return ""
×
330
        first_item = list_rendition.popitem(last=False)
1✔
331
        mp4_command += FFMPEG_INPUT % {
1✔
332
            "input": self.video_file,
333
            "nb_threads": FFMPEG_NB_THREADS,
334
        }
335
        output_file = os.path.join(self.output_dir, "%sp.mp4" % first_item[0])
1✔
336
        mp4_command += FFMPEG_MP4_ENCODE % {
1✔
337
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
338
            "map_audio": "-map 0:a:0" if len(self.list_audio_track) > 0 else "",
339
            "libx": FFMPEG_LIBX,
340
            "height": first_item[0],
341
            "preset": FFMPEG_PRESET,
342
            "profile": FFMPEG_PROFILE,
343
            "level": FFMPEG_LEVEL,
344
            "crf": FFMPEG_CRF,
345
            "maxrate": first_item[1]["maxrate"],
346
            "bufsize": first_item[1]["maxrate"],
347
            "ba": first_item[1]["audio_bitrate"],
348
            "output": output_file,
349
        }
350
        self.list_mp4_files[first_item[0]] = output_file
1✔
351
        """
352
        il est possible de faire ainsi :
353
        mp4_command += FFMPEG_MP4_ENCODE.format(
354
            height=first_item[0],
355
            [...]
356
        )
357
        """
358
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
359
        for rend in list_rendition:
1✔
360
            resolution_threshold = rend - rend * (
1✔
361
                list_rendition[rend]["encoding_resolution_threshold"] / 100
362
            )
363
            if in_height >= resolution_threshold:
1✔
364
                output_file = os.path.join(self.output_dir, "%sp.mp4" % rend)
×
365
                mp4_command += FFMPEG_MP4_ENCODE % {
×
366
                    "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
367
                    "map_audio": "-map 0:a:0" if len(self.list_audio_track) > 0 else "",
368
                    "libx": FFMPEG_LIBX,
369
                    "height": min(rend, in_height),
370
                    "preset": FFMPEG_PRESET,
371
                    "profile": FFMPEG_PROFILE,
372
                    "level": FFMPEG_LEVEL,
373
                    "crf": FFMPEG_CRF,
374
                    "maxrate": list_rendition[rend]["maxrate"],
375
                    "bufsize": list_rendition[rend]["maxrate"],
376
                    "ba": list_rendition[rend]["audio_bitrate"],
377
                    "output": output_file,
378
                }
379
                self.list_mp4_files[rend] = output_file
×
380
        return mp4_command
1✔
381

382
    def get_hls_command(self) -> str:
1✔
383
        hls_command = "%s " % FFMPEG_CMD
1✔
384
        list_rendition = get_list_rendition()
1✔
385
        hls_command += FFMPEG_INPUT % {
1✔
386
            "input": self.video_file,
387
            "nb_threads": FFMPEG_NB_THREADS,
388
        }
389
        hls_common_params = FFMPEG_HLS_COMMON_PARAMS % {
1✔
390
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
391
            "libx": FFMPEG_LIBX,
392
            "preset": FFMPEG_PRESET,
393
            "profile": FFMPEG_PROFILE,
394
            "level": FFMPEG_LEVEL,
395
            "crf": FFMPEG_CRF,
396
        }
397
        hls_command += hls_common_params
1✔
398
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
399
        for index, rend in enumerate(list_rendition):
1✔
400
            resolution_threshold = rend - rend * (
1✔
401
                list_rendition[rend]["encoding_resolution_threshold"] / 100
402
            )
403
            if in_height >= resolution_threshold or index == 0:
1✔
404
                output_file = os.path.join(self.output_dir, "%sp.m3u8" % rend)
1✔
405
                hls_command += hls_common_params
1✔
406
                hls_command += FFMPEG_HLS_ENCODE_PARAMS % {
1✔
407
                    "height": min(rend, in_height),
408
                    "maxrate": list_rendition[rend]["maxrate"],
409
                    "bufsize": list_rendition[rend]["maxrate"],
410
                    "ba": list_rendition[rend]["audio_bitrate"],
411
                    "hls_time": FFMPEG_HLS_TIME,
412
                    "output": output_file,
413
                }
414
                self.list_hls_files[rend] = output_file
1✔
415
        return hls_command
1✔
416

417
    def get_dressing_file(self) -> str:
1✔
418
        """Create or replace the dressed video file."""
419
        dirname = os.path.dirname(self.video_file)
×
420
        filename, ext = os.path.splitext(os.path.basename(self.video_file))
×
421
        output_file = os.path.join(dirname, filename + "_dressing" + ext)
×
422
        return output_file
×
423

424
    def get_dressing_command(self) -> str:
1✔
425
        """
426
        Generate the FFMPEG command based on the dressing object parameters.
427

428
        Returns:
429
            A string representing the complete FFMPEG command.
430
        """
431
        height = str(list(self.list_video_track.items())[0][1]["height"])
×
432
        dressing_command = f"{FFMPEG_CMD} "
×
433
        dressing_command += FFMPEG_INPUT % {
×
434
            "input": self.video_file,
435
            "nb_threads": FFMPEG_NB_THREADS,
436
        }
437

438
        dressing_command += self.dressing_input
×
439

440
        # Handle opening and ending credits
441
        dressing_command += self.handle_dressing_credits()
×
442

443
        # Apply filters
444
        dressing_command_filter, dressing_command_params = self.build_dressing_filters(
×
445
            height
446
        )
447

448
        dressing_command += FFMPEG_DRESSING_FILTER_COMPLEX % {
×
449
            "filter": ";".join(dressing_command_filter),
450
        }
451

452
        if self.json_dressing.get("opening_credits") or self.json_dressing.get(
×
453
            "ending_credits"
454
        ):
455
            dressing_command += " -map '[v]' -map '[a]' "
×
456

457
        output_file = self.get_dressing_file()
×
458
        dressing_command += FFMPEG_DRESSING_OUTPUT % {"output": output_file}
×
459

460
        return dressing_command
×
461

462
    def handle_dressing_credits(self) -> str:
1✔
463
        """
464
        Handle the addition of opening and ending credits to the command.
465

466
        Returns:
467
            A string with the FFMPEG commands for silent credits if required.
468
        """
469
        command = ""
×
470
        for credit_type in ["opening_credits", "ending_credits"]:
×
471
            if self.json_dressing.get(credit_type):
×
472
                has_audio_key = f"{credit_type}_video_hasaudio"
×
473
                duration_key = f"{credit_type}_video_duration"
×
474

475
                if not self.json_dressing.get(has_audio_key, True):
×
476
                    duration = self.json_dressing.get(duration_key)
×
477

478
                    try:
×
479
                        duration = int(duration) if duration and int(duration) > 0 else 1
×
480
                    except (ValueError, TypeError):
×
481
                        duration = 1
×
482

483
                    command += FFMPEG_DRESSING_SILENT % {"duration": duration}
×
484

485
        return command
×
486

487
    def build_dressing_filters(self, height: str):
1✔
488
        """
489
        Build the filters for video processing.
490

491
        Args:
492
            height: The height of the video for scaling purposes.
493

494
        Returns:
495
            A tuple containing the list of filters and the filter parameters.
496
        """
497
        filters = []
×
498
        params = ""
×
499
        order = 0
×
500
        interval_silent = 0
×
501
        concat_number = 1
×
502

503
        # Base video track
504
        filters.append(
×
505
            FFMPEG_DRESSING_SCALE % {"number": "0", "height": height, "name": "vid"}
506
        )
507
        params = "[vid][0:a]"
×
508
        order = order + 1
×
509

510
        # Watermark if present
511
        if self.json_dressing.get("watermark"):
×
512
            filters.append(self.apply_dressing_watermark(height))
×
513
            params = "[video][0:a]"
×
514
            order = order + 1
×
515

516
        interval_silent = order
×
517
        # Opening credits
518
        if self.json_dressing.get("opening_credits"):
×
519
            if self.json_dressing.get("ending_credits"):
×
520
                interval_silent = interval_silent + 1
×
521
            filters, params, interval_silent = self.add_dressing_credits(
×
522
                filters,
523
                params,
524
                height,
525
                "opening_credits",
526
                "debut",
527
                order,
528
                interval_silent,
529
            )
530
            order = order + 1
×
531
            concat_number = concat_number + 1
×
532

533
        # Ending credits
534
        if self.json_dressing.get("ending_credits"):
×
535
            filters, params, interval_silent = self.add_dressing_credits(
×
536
                filters, params, height, "ending_credits", "fin", order, interval_silent
537
            )
538
            concat_number = concat_number + 1
×
539

540
        # Concatenate if needed
541
        if self.json_dressing.get("opening_credits") or self.json_dressing.get(
×
542
            "ending_credits"
543
        ):
544
            filters.append(
×
545
                FFMPEG_DRESSING_CONCAT % {"params": params, "number": concat_number}
546
            )
547

548
        return filters, params
×
549

550
    def apply_dressing_watermark(self, height: str) -> str:
1✔
551
        """
552
        Apply the watermark to the video.
553

554
        Args:
555
            height: The height of the video to position the watermark correctly.
556

557
        Returns:
558
            A string representing the FFMPEG command for applying the watermark.
559
        """
560
        opacity = self.json_dressing["opacity"] / 100.0
×
561
        position = get_dressing_position_value(
×
562
            self.json_dressing["position_orig"], height
563
        )
564
        name_out = (
×
565
            "[video]"
566
            if self.json_dressing.get("opening_credits")
567
            or self.json_dressing.get("ending_credits")
568
            else ""
569
        )
570

571
        return FFMPEG_DRESSING_WATERMARK % {
×
572
            "opacity": opacity,
573
            "position": position,
574
            "name_out": name_out,
575
        }
576

577
    def add_dressing_credits(
1✔
578
        self,
579
        filters: list,
580
        params: str,
581
        height: str,
582
        credit_type: str,
583
        name: str,
584
        order: str,
585
        interval_silent: str,
586
    ):
587
        """
588
        Add opening or ending credits to the FFmpeg command by updating the filters and parameters.
589

590
        Args:
591
            filters (list): A list of existing FFmpeg filters to which new filters will be appended.
592
            params (str): The current filter parameters string that will be updated to include the credits.
593
            height (str): The height of the video used for scaling the credit overlay.
594
            credit_type (str): Specifies the type of credits to add, either 'opening_credits' or 'ending_credits'.
595
            name (str): The identifier for the credit video or overlay to be used in the FFmpeg filter graph.
596
            order (str): The position identifier for the audio stream, used to sync audio with the credits.
597
            interval_silent (str): Counter indicating the number of silent audio intervals inserted, used when adding silent audio tracks.
598

599
        Returns:
600
            tuple:
601
                - Updated list of filters with the added credit filters.
602
                - Updated filter parameter string reflecting the new credit positioning.
603
                - Updated interval_silent value if a silent audio track was added.
604
        """
605
        audio_out = f"{order}:a"
×
606

607
        if not self.json_dressing.get(f"{credit_type}_video_hasaudio"):
×
608
            audio_out = f"a{order}"
×
609
            filters.append(
×
610
                FFMPEG_DRESSING_AUDIO
611
                % {
612
                    "param_in": f"{interval_silent + 1}:a",
613
                    "param_out": audio_out,
614
                }
615
            )
616
            interval_silent = interval_silent + 1
×
617

618
        if credit_type == "opening_credits":
×
619
            params = f"[{name}][{audio_out}]{params}"
×
620
        else:
621
            params = f"{params}[{name}][{audio_out}]"
×
622

623
        filters.append(
×
624
            FFMPEG_DRESSING_SCALE % {"number": str(order), "height": height, "name": name}
625
        )
626

627
        return filters, params, interval_silent
×
628

629
    def encode_video_dressing(self) -> None:
1✔
630
        """Encode the dressed video."""
631
        dressing_command = self.get_dressing_command()
×
632
        return_value, return_msg = launch_cmd(dressing_command)
×
633
        self.add_encoding_log(
×
634
            "dressing_command", dressing_command, return_value, return_msg
635
        )
636
        self.video_file = self.get_dressing_file()
×
637

638
    def encode_video_part(self) -> None:
1✔
639
        """Encode the video part of a file."""
640
        mp4_command = self.get_mp4_command()
1✔
641
        return_value, return_msg = launch_cmd(mp4_command)
1✔
642
        self.add_encoding_log("mp4_command", mp4_command, return_value, return_msg)
1✔
643
        if not return_value:
1✔
644
            self.error_encoding = True
×
645
        if self.duration == 0:
1✔
646
            list_rendition = get_list_rendition()
×
647
            first_item = list_rendition.popitem(last=False)
×
648
            self.fix_duration(self.list_mp4_files[first_item[0]])
×
649
        hls_command = self.get_hls_command()
1✔
650
        return_value, return_msg = launch_cmd(hls_command)
1✔
651
        if return_value:
1✔
652
            self.create_main_livestream()
1✔
653
        self.add_encoding_log("hls_command", hls_command, return_value, return_msg)
1✔
654

655
    def create_main_livestream(self) -> None:
1✔
656
        list_rendition = get_list_rendition()
1✔
657
        livestream_content = ""
1✔
658
        for index, rend in enumerate(list_rendition):
1✔
659
            rend_livestream = os.path.join(
1✔
660
                self.get_output_dir(), "livestream%s.m3u8" % rend
661
            )
662
            if os.path.exists(rend_livestream):
1✔
663
                with open(rend_livestream, "r") as file:
1✔
664
                    data = file.read()
1✔
665
                if index == 0:
1✔
666
                    livestream_content += data
1✔
667
                else:
668
                    livestream_content += "\n".join(data.split("\n")[2:])
×
669
                os.remove(rend_livestream)
1✔
670
        livestream_file = open(
1✔
671
            os.path.join(self.get_output_dir(), "livestream.m3u8"), "w"
672
        )
673
        livestream_file.write(livestream_content.replace("\n\n", "\n"))
1✔
674
        livestream_file.close()
1✔
675

676
    def get_mp3_command(self) -> str:
1✔
677
        mp3_command = "%s " % FFMPEG_CMD
1✔
678
        mp3_command += FFMPEG_INPUT % {
1✔
679
            "input": self.video_file,
680
            "nb_threads": FFMPEG_NB_THREADS,
681
        }
682
        output_file = os.path.join(self.output_dir, "audio_%s.mp3" % FFMPEG_AUDIO_BITRATE)
1✔
683
        mp3_command += FFMPEG_MP3_ENCODE % {
1✔
684
            # "audio_bitrate": AUDIO_BITRATE,
685
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
686
            "output": output_file,
687
        }
688
        self.list_mp3_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
689
        return mp3_command
1✔
690

691
    def get_m4a_command(self) -> str:
1✔
692
        m4a_command = "%s " % FFMPEG_CMD
1✔
693
        m4a_command += FFMPEG_INPUT % {
1✔
694
            "input": self.video_file,
695
            "nb_threads": FFMPEG_NB_THREADS,
696
        }
697
        output_file = os.path.join(self.output_dir, "audio_%s.m4a" % FFMPEG_AUDIO_BITRATE)
1✔
698
        m4a_command += FFMPEG_M4A_ENCODE % {
1✔
699
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
700
            "audio_bitrate": FFMPEG_AUDIO_BITRATE,
701
            "output": output_file,
702
        }
703
        self.list_m4a_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
704
        return m4a_command
1✔
705

706
    def encode_audio_part(self) -> None:
1✔
707
        """Encode the audio part of a video."""
708
        mp3_command = self.get_mp3_command()
1✔
709
        return_value, return_msg = launch_cmd(mp3_command)
1✔
710
        self.add_encoding_log("mp3_command", mp3_command, return_value, return_msg)
1✔
711
        if self.duration == 0:
1✔
712
            new_k = list(self.list_mp3_files)[0]
×
713
            self.fix_duration(self.list_mp3_files[new_k])
×
714
        if not self.is_video():
1✔
715
            m4a_command = self.get_m4a_command()
1✔
716
            return_value, return_msg = launch_cmd(m4a_command)
1✔
717
            self.add_encoding_log("m4a_command", m4a_command, return_value, return_msg)
1✔
718

719
    def get_extract_thumbnail_command(self) -> str:
1✔
720
        thumbnail_command = "%s " % FFMPEG_CMD
×
721
        thumbnail_command += FFMPEG_INPUT % {
×
722
            "input": self.video_file,
723
            "nb_threads": FFMPEG_NB_THREADS,
724
        }
725
        for img in self.list_image_track:
×
726
            output_file = os.path.join(self.output_dir, "thumbnail_%s.png" % img)
×
727
            thumbnail_command += FFMPEG_EXTRACT_THUMBNAIL % {
×
728
                "index": img,
729
                "output": output_file,
730
            }
731
            self.list_thumbnail_files[img] = output_file
×
732
        return thumbnail_command
×
733

734
    def get_create_thumbnail_command(self) -> str:
1✔
735
        thumbnail_command = "%s " % FFMPEG_CMD
1✔
736
        first_item = self.get_first_item()
1✔
737
        input_file = self.list_mp4_files[first_item[0]]
1✔
738
        thumbnail_command += FFMPEG_INPUT % {
1✔
739
            "input": input_file,
740
            "nb_threads": FFMPEG_NB_THREADS,
741
        }
742
        output_file = os.path.join(self.output_dir, "thumbnail")
1✔
743
        thumbnail_command += FFMPEG_CREATE_THUMBNAIL % {
1✔
744
            "duration": self.duration,
745
            "nb_thumbnail": FFMPEG_NB_THUMBNAIL,
746
            "output": output_file,
747
        }
748
        for nb in range(0, FFMPEG_NB_THUMBNAIL):
1✔
749
            num_thumb = str(nb + 1)
1✔
750
            self.list_thumbnail_files[num_thumb] = "%s_000%s.png" % (
1✔
751
                output_file,
752
                num_thumb,
753
            )
754
        return thumbnail_command
1✔
755

756
    def get_first_item(self):
1✔
757
        """Get the first mp4 render from setting."""
758
        list_rendition = get_list_rendition()
1✔
759
        for rend in list_rendition.copy():
1✔
760
            if list_rendition[rend]["encode_mp4"] is False:
1✔
761
                list_rendition.pop(rend)
1✔
762
        if len(list_rendition) == 0:
1✔
763
            return None
×
764
        else:
765
            return list_rendition.popitem(last=False)
1✔
766

767
    def create_overview(self) -> None:
1✔
768
        first_item = self.get_first_item()
1✔
769
        # overview combine for 160x90
770
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
771
        in_width = list(self.list_video_track.items())[0][1]["width"]
1✔
772
        image_height = 90
1✔
773
        coef = in_height / image_height
1✔
774
        image_width = int(in_width / coef)
1✔
775
        input_file = self.list_mp4_files[first_item[0]]
1✔
776
        nb_img = 100 if self.duration >= 100 else 10
1✔
777

778
        overviewimagefilename = os.path.join(self.output_dir, "overview.png")
1✔
779
        overview_image_command = (
1✔
780
            FFMPEG_CMD
781
            + " "
782
            + FFMPEG_INPUT
783
            % {
784
                "input": input_file,
785
                "nb_threads": FFMPEG_NB_THREADS,
786
            }
787
            + FFMPEG_CREATE_OVERVIEW
788
            % {
789
                "duration": self.duration,
790
                "image_count": nb_img,
791
                "width": image_width,
792
                "height": image_height,
793
                "output": overviewimagefilename,
794
            }
795
        )
796
        return_value, output_message = launch_cmd(overview_image_command)
1✔
797
        if not return_value or not check_file(overviewimagefilename):
1✔
798
            logger.error(f"FFmpeg failed with output: {output_message}")
×
799

800
        overviewfilename = os.path.join(self.output_dir, "overview.vtt")
1✔
801
        image_url = os.path.basename(overviewimagefilename)
1✔
802
        webvtt = WebVTT()
1✔
803
        for i in range(0, nb_img):
1✔
804
            start = format(float(self.duration * i / nb_img), ".3f")
1✔
805
            end = format(float(self.duration * (i + 1) / nb_img), ".3f")
1✔
806
            start_time = time.strftime(
1✔
807
                "%H:%M:%S", time.gmtime(int(str(start).split(".")[0]))
808
            )
809
            start_time += ".%s" % (str(start).split(".")[1])
1✔
810
            end_time = time.strftime(
1✔
811
                "%H:%M:%S", time.gmtime(int(str(end).split(".")[0]))
812
            ) + ".%s" % (str(end).split(".")[1])
813
            caption = Caption(
1✔
814
                "%s" % start_time,
815
                "%s" % end_time,
816
                "%s#xywh=%d,%d,%d,%d"
817
                % (image_url, image_width * i, 0, image_width, image_height),
818
            )
819
            webvtt.captions.append(caption)
1✔
820
        webvtt.save(overviewfilename)
1✔
821
        if check_file(overviewfilename) and check_file(overviewimagefilename):
1✔
822
            self.list_overview_files["0"] = overviewimagefilename
1✔
823
            self.list_overview_files["1"] = overviewfilename
1✔
824
            # self.encoding_log += "\n- overviewfilename:\n%s" % overviewfilename
825
        else:
826
            self.add_encoding_log("create_overview", "", False, "")
×
827

828
    def encode_image_part(self) -> None:
1✔
829
        if len(self.list_image_track) > 0:
1✔
830
            thumbnail_command = self.get_extract_thumbnail_command()
×
831
            return_value, return_msg = launch_cmd(thumbnail_command)
×
832
            self.add_encoding_log(
×
833
                "extract_thumbnail_command", thumbnail_command, return_value, return_msg
834
            )
835
        elif self.is_video():
1✔
836
            thumbnail_command = self.get_create_thumbnail_command()
1✔
837
            return_value, return_msg = launch_cmd(thumbnail_command)
1✔
838
            self.add_encoding_log(
1✔
839
                "create_thumbnail_command", thumbnail_command, return_value, return_msg
840
            )
841
        # on ne fait pas d'overview pour les videos de moins de 10 secondes
842
        # (laisser les 10sec inclus pour laisser les tests passer) --> OK
843
        if self.is_video() and self.duration >= 10:
1✔
844
            self.create_overview()
1✔
845

846
    def get_extract_subtitle_command(self) -> str:
1✔
847
        subtitle_command = "%s " % FFMPEG_CMD
×
848
        subtitle_command += FFMPEG_INPUT % {
×
849
            "input": self.video_file,
850
            "nb_threads": FFMPEG_NB_THREADS,
851
        }
852
        for sub in self.list_subtitle_track:
×
853
            lang = self.list_subtitle_track[sub]["language"]
×
854
            output_file = os.path.join(self.output_dir, "subtitle_%s.vtt" % lang)
×
855
            subtitle_command += FFMPEG_EXTRACT_SUBTITLE % {
×
856
                "index": sub,
857
                "output": output_file,
858
            }
859
            self.list_subtitle_files[sub] = [lang, output_file]
×
860
        return subtitle_command
×
861

862
    def get_subtitle_part(self) -> None:
1✔
863
        if len(self.list_subtitle_track) > 0:
×
864
            subtitle_command = self.get_extract_subtitle_command()
×
865
            return_value, return_msg = launch_cmd(subtitle_command)
×
866
            self.add_encoding_log(
×
867
                "subtitle_command", subtitle_command, return_value, return_msg
868
            )
869

870
    def export_to_json(self) -> None:
1✔
871
        data_to_dump = {}
1✔
872
        for attribute, value in self.__dict__.items():
1✔
873
            data_to_dump[attribute] = value
1✔
874
        with open(self.output_dir + "/info_video.json", "w") as outfile:
1✔
875
            json.dump(data_to_dump, outfile, indent=2)
1✔
876

877
    def add_encoding_log(self, title, command, result, msg) -> None:
1✔
878
        """Add Encoding step to the encoding_log dict."""
879
        self.encoding_log[title] = {"command": command, "result": result, "msg": msg}
1✔
880
        if result is False and self.error_encoding is False:
1✔
881
            self.error_encoding = True
×
882

883
    def start_encode(self) -> None:
1✔
884
        self.start = time.ctime()
1✔
885
        self.create_output_dir()
1✔
886
        self.get_video_data()
1✔
887
        if self.json_dressing is not None:
1✔
888
            self.encode_video_dressing()
×
889
        logger.info(
1✔
890
            "start_encode {id: %s, file: %s, duration: %s}"
891
            % (self.id, self.video_file, self.duration)
892
        )
893
        if self.is_video():
1✔
894
            logger.debug("* encode_video_part")
1✔
895
            self.encode_video_part()
1✔
896
        if len(self.list_audio_track) > 0:
1✔
897
            logger.debug("* encode_audio_part")
1✔
898
            self.encode_audio_part()
1✔
899
        logger.debug("* encode_image_part")
1✔
900
        self.encode_image_part()
1✔
901
        if len(self.list_subtitle_track) > 0:
1✔
902
            logger.debug("* get_subtitle_part")
×
903
            self.get_subtitle_part()
×
904
        self.stop = time.ctime()
1✔
905
        self.export_to_json()
1✔
906

907

908
def fix_input(input) -> str:
1✔
909
    filename = ""
×
910
    if args.input.startswith("/"):
×
911
        path_file = args.input
×
912
    else:
913
        path_file = os.path.join(os.getcwd(), args.input)
×
914
    if os.access(path_file, os.F_OK) and os.stat(path_file).st_size > 0:
×
915
        # remove accent and space
916
        filename = "".join(
×
917
            (
918
                c
919
                for c in unicodedata.normalize("NFD", path_file)
920
                if unicodedata.category(c) != "Mn"
921
            )
922
        )
923
        filename = filename.replace(" ", "_")
×
924
        os.rename(
×
925
            path_file,
926
            filename,
927
        )
928
        logger.info("Encoding file {} \n".format(filename))
×
929
    return filename
×
930

931

932
"""
933
  remote encode???
934
"""
935
if __name__ == "__main__":
1✔
936
    start = "Start at: %s" % time.ctime()
×
937
    parser = argparse.ArgumentParser(description="Running encoding video.")
×
938
    parser.add_argument("--id", required=True, help="the ID of the video")
×
939
    parser.add_argument("--start", required=False, help="Start cut")
×
940
    parser.add_argument("--stop", required=False, help="Stop cut")
×
941
    parser.add_argument("--input", required=True, help="name of input file to encode")
×
942
    parser.add_argument("--dressing", required=False, help="Dressing for the video")
×
943

944
    args = parser.parse_args()
×
945
    logger.debug(args.start)
×
946
    filename = fix_input(args.input)
×
947
    encoding_video = Encoding_video(
×
948
        args.id, filename, args.start, args.stop, args.dressing
949
    )
950
    # error if uncommented
951
    # encoding_video.encoding_log += start
952
    # AttributeError: 'NoneType' object has no attribute 'get'
953
    encoding_video.start_encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc