• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 13852636957

24 Jan 2025 03:58PM UTC coverage: 70.238% (-0.1%) from 70.375%
13852636957

push

github

Badatos
Merge branch 'main' into pod_V4

# Conflicts:
#	.env.dev-exemple
#	dockerfile-dev-with-volumes/README.adoc
#	pod/main/test_settings.py
#	pod/video/models.py
#	pod/video/utils.py
#	requirements.txt

93 of 150 new or added lines in 24 files covered. (62.0%)

12 existing lines in 7 files now uncovered.

11932 of 16988 relevant lines covered (70.24%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.48
/pod/video_encode_transcript/Encoding_video.py
1
"""Esup-Pod video encoding."""
2

3
import json
1✔
4
import os
1✔
5
import time
1✔
6
from webvtt import WebVTT, Caption
1✔
7
import argparse
1✔
8
import unicodedata
1✔
9

10
if __name__ == "__main__":
1✔
11
    from encoding_utils import (
×
12
        get_dressing_position_value,
13
        get_info_from_video,
14
        get_list_rendition,
15
        launch_cmd,
16
        check_file,
17
    )
18
    from encoding_settings import (
×
19
        FFMPEG_CMD,
20
        FFPROBE_CMD,
21
        FFPROBE_GET_INFO,
22
        FFMPEG_CRF,
23
        FFMPEG_PRESET,
24
        FFMPEG_PROFILE,
25
        FFMPEG_LEVEL,
26
        FFMPEG_HLS_TIME,
27
        FFMPEG_INPUT,
28
        FFMPEG_LIBX,
29
        FFMPEG_MP4_ENCODE,
30
        FFMPEG_HLS_COMMON_PARAMS,
31
        FFMPEG_HLS_ENCODE_PARAMS,
32
        FFMPEG_MP3_ENCODE,
33
        FFMPEG_M4A_ENCODE,
34
        FFMPEG_NB_THREADS,
35
        FFMPEG_AUDIO_BITRATE,
36
        FFMPEG_EXTRACT_THUMBNAIL,
37
        FFMPEG_NB_THUMBNAIL,
38
        FFMPEG_CREATE_THUMBNAIL,
39
        FFMPEG_CREATE_OVERVIEW,
40
        FFMPEG_EXTRACT_SUBTITLE,
41
        FFMPEG_DRESSING_INPUT,
42
        FFMPEG_DRESSING_OUTPUT,
43
        FFMPEG_DRESSING_WATERMARK,
44
        FFMPEG_DRESSING_FILTER_COMPLEX,
45
        FFMPEG_DRESSING_SCALE,
46
        FFMPEG_DRESSING_CONCAT,
47
    )
48
else:
49
    from .encoding_utils import (
1✔
50
        get_dressing_position_value,
51
        get_info_from_video,
52
        get_list_rendition,
53
        launch_cmd,
54
        check_file,
55
    )
56
    from .encoding_settings import (
1✔
57
        FFMPEG_CMD,
58
        FFPROBE_CMD,
59
        FFPROBE_GET_INFO,
60
        FFMPEG_CRF,
61
        FFMPEG_PRESET,
62
        FFMPEG_PROFILE,
63
        FFMPEG_LEVEL,
64
        FFMPEG_HLS_TIME,
65
        FFMPEG_INPUT,
66
        FFMPEG_LIBX,
67
        FFMPEG_MP4_ENCODE,
68
        FFMPEG_HLS_COMMON_PARAMS,
69
        FFMPEG_HLS_ENCODE_PARAMS,
70
        FFMPEG_MP3_ENCODE,
71
        FFMPEG_M4A_ENCODE,
72
        FFMPEG_NB_THREADS,
73
        FFMPEG_AUDIO_BITRATE,
74
        FFMPEG_EXTRACT_THUMBNAIL,
75
        FFMPEG_NB_THUMBNAIL,
76
        FFMPEG_CREATE_THUMBNAIL,
77
        FFMPEG_CREATE_OVERVIEW,
78
        FFMPEG_EXTRACT_SUBTITLE,
79
        FFMPEG_DRESSING_INPUT,
80
        FFMPEG_DRESSING_OUTPUT,
81
        FFMPEG_DRESSING_WATERMARK,
82
        FFMPEG_DRESSING_FILTER_COMPLEX,
83
        FFMPEG_DRESSING_SCALE,
84
        FFMPEG_DRESSING_CONCAT,
85
    )
86

87

88
__author__ = "Nicolas CAN <nicolas.can@univ-lille.fr>"
1✔
89
__license__ = "LGPL v3"
1✔
90

91
image_codec = ["jpeg", "gif", "png", "bmp", "jpg"]
1✔
92

93
"""
94
 - Get video source
95
 - get alls track from video source
96
 - encode tracks in HLS and mp4
97
 - save it
98
"""
99

100
try:
1✔
101
    from django.conf import settings
1✔
102

103
    FFMPEG_CMD = getattr(settings, "FFMPEG_CMD", FFMPEG_CMD)
1✔
104
    FFPROBE_CMD = getattr(settings, "FFPROBE_CMD", FFPROBE_CMD)
1✔
105
    FFPROBE_GET_INFO = getattr(settings, "FFPROBE_GET_INFO", FFPROBE_GET_INFO)
1✔
106
    FFMPEG_CRF = getattr(settings, "FFMPEG_CRF", FFMPEG_CRF)
1✔
107
    FFMPEG_PRESET = getattr(settings, "FFMPEG_PRESET", FFMPEG_PRESET)
1✔
108
    FFMPEG_PROFILE = getattr(settings, "FFMPEG_PROFILE", FFMPEG_PROFILE)
1✔
109
    FFMPEG_LEVEL = getattr(settings, "FFMPEG_LEVEL", FFMPEG_LEVEL)
1✔
110
    FFMPEG_HLS_TIME = getattr(settings, "FFMPEG_HLS_TIME", FFMPEG_HLS_TIME)
1✔
111
    FFMPEG_INPUT = getattr(settings, "FFMPEG_INPUT", FFMPEG_INPUT)
1✔
112
    FFMPEG_LIBX = getattr(settings, "FFMPEG_LIBX", FFMPEG_LIBX)
1✔
113
    FFMPEG_MP4_ENCODE = getattr(settings, "FFMPEG_MP4_ENCODE", FFMPEG_MP4_ENCODE)
1✔
114
    FFMPEG_HLS_COMMON_PARAMS = getattr(
1✔
115
        settings, "FFMPEG_HLS_COMMON_PARAMS", FFMPEG_HLS_COMMON_PARAMS
116
    )
117
    FFMPEG_HLS_ENCODE_PARAMS = getattr(
1✔
118
        settings, "FFMPEG_HLS_ENCODE_PARAMS", FFMPEG_HLS_ENCODE_PARAMS
119
    )
120
    FFMPEG_MP3_ENCODE = getattr(settings, "FFMPEG_MP3_ENCODE", FFMPEG_MP3_ENCODE)
1✔
121
    FFMPEG_M4A_ENCODE = getattr(settings, "FFMPEG_M4A_ENCODE", FFMPEG_M4A_ENCODE)
1✔
122
    FFMPEG_NB_THREADS = getattr(settings, "FFMPEG_NB_THREADS", FFMPEG_NB_THREADS)
1✔
123
    FFMPEG_AUDIO_BITRATE = getattr(settings, "FFMPEG_AUDIO_BITRATE", FFMPEG_AUDIO_BITRATE)
1✔
124
    FFMPEG_EXTRACT_THUMBNAIL = getattr(
1✔
125
        settings, "FFMPEG_EXTRACT_THUMBNAIL", FFMPEG_EXTRACT_THUMBNAIL
126
    )
127
    FFMPEG_NB_THUMBNAIL = getattr(settings, "FFMPEG_NB_THUMBNAIL", FFMPEG_NB_THUMBNAIL)
1✔
128
    FFMPEG_CREATE_THUMBNAIL = getattr(
1✔
129
        settings, "FFMPEG_CREATE_THUMBNAIL", FFMPEG_CREATE_THUMBNAIL
130
    )
131
    FFMPEG_EXTRACT_SUBTITLE = getattr(
1✔
132
        settings, "FFMPEG_EXTRACT_SUBTITLE", FFMPEG_EXTRACT_SUBTITLE
133
    )
134
    FFMPEG_DRESSING_INPUT = getattr(
1✔
135
        settings, "FFMPEG_DRESSING_INPUT", FFMPEG_DRESSING_INPUT
136
    )
137
    FFMPEG_DRESSING_OUTPUT = getattr(
1✔
138
        settings, "FFMPEG_DRESSING_OUTPUT", FFMPEG_DRESSING_OUTPUT
139
    )
140
    FFMPEG_DRESSING_WATERMARK = getattr(
1✔
141
        settings, "FFMPEG_DRESSING_WATERMARK", FFMPEG_DRESSING_WATERMARK
142
    )
143
    FFMPEG_DRESSING_FILTER_COMPLEX = getattr(
1✔
144
        settings, "FFMPEG_DRESSING_FILTER_COMPLEX", FFMPEG_DRESSING_FILTER_COMPLEX
145
    )
146
    FFMPEG_DRESSING_SCALE = getattr(
1✔
147
        settings, "FFMPEG_DRESSING_SCALE", FFMPEG_DRESSING_SCALE
148
    )
149
    FFMPEG_DRESSING_CONCAT = getattr(
1✔
150
        settings, "FFMPEG_DRESSING_CONCAT", FFMPEG_DRESSING_CONCAT
151
    )
152
except ImportError:  # pragma: no cover
153
    pass
154

155

156
class Encoding_video:
1✔
157
    """Encoding video object."""
158

159
    id = 0
1✔
160
    video_file = ""
1✔
161
    duration = 0
1✔
162
    list_video_track = {}
1✔
163
    list_audio_track = {}
1✔
164
    list_subtitle_track = {}
1✔
165
    list_image_track = {}
1✔
166
    list_mp4_files = {}
1✔
167
    list_hls_files = {}
1✔
168
    list_mp3_files = {}
1✔
169
    list_m4a_files = {}
1✔
170
    list_thumbnail_files = {}
1✔
171
    list_overview_files = {}
1✔
172
    list_subtitle_files = {}
1✔
173
    encoding_log = {}
1✔
174
    output_dir = ""
1✔
175
    start = 0
1✔
176
    stop = 0
1✔
177
    error_encoding = False
1✔
178
    cutting_start = 0
1✔
179
    cutting_stop = 0
1✔
180
    json_dressing = None
1✔
181
    dressing_input = ""
1✔
182

183
    def __init__(
1✔
184
        self, id=0, video_file="", start=0, stop=0, json_dressing=None, dressing_input=""
185
    ):
186
        """Initialize a new Encoding_video object."""
187
        self.id = id
1✔
188
        self.video_file = video_file
1✔
189
        self.duration = 0
1✔
190
        self.list_video_track = {}
1✔
191
        self.list_audio_track = {}
1✔
192
        self.list_subtitle_track = {}
1✔
193
        self.list_image_track = {}
1✔
194
        self.list_mp4_files = {}
1✔
195
        self.list_hls_files = {}
1✔
196
        self.list_mp3_files = {}
1✔
197
        self.list_m4a_files = {}
1✔
198
        self.list_thumbnail_files = {}
1✔
199
        self.list_overview_files = {}
1✔
200
        self.encoding_log = {}
1✔
201
        self.list_subtitle_files = {}
1✔
202
        self.output_dir = ""
1✔
203
        self.start = 0
1✔
204
        self.stop = 0
1✔
205
        self.error_encoding = False
1✔
206
        self.cutting_start = start or 0
1✔
207
        self.cutting_stop = stop or 0
1✔
208
        self.json_dressing = json_dressing
1✔
209
        self.dressing_input = dressing_input
1✔
210

211
    def is_video(self) -> bool:
1✔
212
        """Check if current encoding correspond to a video."""
213
        return len(self.list_video_track) > 0
1✔
214

215
    def get_subtime(self, clip_begin, clip_end):
1✔
216
        subtime = ""
1✔
217
        if clip_begin != 0 or clip_end != 0:
1✔
218
            subtime += "-ss %s " % str(clip_begin) + "-to %s " % str(clip_end)
×
219
        return subtime
1✔
220

221
    def get_video_data(self):
1✔
222
        """Get alls tracks from video source and put it in object passed in parameter."""
223
        msg = "--> get_info_video\n"
1✔
224
        probe_cmd = FFPROBE_GET_INFO % {
1✔
225
            "ffprobe": FFPROBE_CMD,
226
            "select_streams": "",
227
            "source": '"' + self.video_file + '" ',
228
        }
229
        msg += probe_cmd + "\n"
1✔
230
        duration = 0
1✔
231
        info, return_msg = get_info_from_video(probe_cmd)
1✔
232
        msg += json.dumps(info, indent=2)
1✔
233
        msg += " \n"
1✔
234
        msg += return_msg + "\n"
1✔
235
        self.add_encoding_log("probe_cmd", probe_cmd, True, msg)
1✔
236
        try:
1✔
237
            duration = int(float("%s" % info["format"]["duration"]))
1✔
238
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
239
            msg = "\nUnexpected error: {0}".format(err)
×
240
            self.add_encoding_log("duration", "", True, msg)
×
241
        if self.cutting_start != 0 or self.cutting_stop != 0:
1✔
242
            duration = self.cutting_stop - self.cutting_start
×
243
        self.duration = duration
1✔
244
        streams = info.get("streams", [])
1✔
245
        for stream in streams:
1✔
246
            self.add_stream(stream)
1✔
247

248
    def fix_duration(self, input_file):
1✔
249
        msg = "--> get_info_video\n"
×
250
        probe_cmd = 'ffprobe -v quiet -show_entries format=duration -hide_banner  \
×
251
                    -of default=noprint_wrappers=1:nokey=1 -print_format json -i \
252
                    "{}"'.format(
253
            input_file
254
        )
255
        info, return_msg = get_info_from_video(probe_cmd)
×
256
        msg += json.dumps(info, indent=2)
×
257
        msg += " \n"
×
258
        msg += return_msg + "\n"
×
259
        duration = 0
×
260
        try:
×
261
            duration = int(float("%s" % info["format"]["duration"]))
×
262
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
263
            msg += "\nUnexpected error: {0}".format(err)
×
264
        self.add_encoding_log("fix_duration", "", True, msg)
×
265
        if self.cutting_start != 0 or self.cutting_stop != 0:
×
266
            duration = self.cutting_stop - self.cutting_start
×
267
        self.duration = duration
×
268

269
    def add_stream(self, stream):
1✔
270
        codec_type = stream.get("codec_type", "unknown")
1✔
271
        # https://ffmpeg.org/doxygen/3.2/group__lavu__misc.html#ga9a84bba4713dfced21a1a56163be1f48
272
        if codec_type == "audio":
1✔
273
            codec = stream.get("codec_name", "unknown")
1✔
274
            self.list_audio_track["%s" % stream.get("index")] = {
1✔
275
                "sample_rate": stream.get("sample_rate", 0),
276
                "channels": stream.get("channels", 0),
277
            }
278
        if codec_type == "video":
1✔
279
            codec = stream.get("codec_name", "unknown")
1✔
280
            if any(ext in codec.lower() for ext in image_codec):
1✔
281
                self.list_image_track["%s" % stream.get("index")] = {
×
282
                    "width": stream.get("width", 0),
283
                    "height": stream.get("height", 0),
284
                }
285
            else:
286
                self.list_video_track["%s" % stream.get("index")] = {
1✔
287
                    "width": stream.get("width", 0),
288
                    "height": stream.get("height", 0),
289
                }
290
        if codec_type == "subtitle":
1✔
291
            codec = stream.get("codec_name", "unknown")
×
292
            language = ""
×
293
            if stream.get("tags"):
×
294
                language = stream.get("tags").get("language", "")
×
295
            self.list_subtitle_track["%s" % stream.get("index")] = {"language": language}
×
296

297
    def get_output_dir(self) -> str:
1✔
298
        dirname = os.path.dirname(self.video_file)
1✔
299
        return os.path.join(dirname, "%04d" % int(self.id))
1✔
300

301
    def create_output_dir(self):
1✔
302
        output_dir = self.get_output_dir()
1✔
303
        if not os.path.exists(output_dir):
1✔
304
            os.makedirs(output_dir)
1✔
305
        self.output_dir = output_dir
1✔
306

307
    def get_mp4_command(self) -> str:
1✔
308
        mp4_command = "%s " % FFMPEG_CMD
1✔
309
        list_rendition = get_list_rendition()
1✔
310
        # remove rendition if encode_mp4 == False
311
        for rend in list_rendition.copy():
1✔
312
            if list_rendition[rend]["encode_mp4"] is False:
1✔
313
                list_rendition.pop(rend)
1✔
314
        if len(list_rendition) == 0:
1✔
315
            return ""
×
316
        first_item = list_rendition.popitem(last=False)
1✔
317
        mp4_command += FFMPEG_INPUT % {
1✔
318
            "input": self.video_file,
319
            "nb_threads": FFMPEG_NB_THREADS,
320
        }
321
        output_file = os.path.join(self.output_dir, "%sp.mp4" % first_item[0])
1✔
322
        mp4_command += FFMPEG_MP4_ENCODE % {
1✔
323
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
324
            "map_audio": "-map 0:a:0" if len(self.list_audio_track) > 0 else "",
325
            "libx": FFMPEG_LIBX,
326
            "height": first_item[0],
327
            "preset": FFMPEG_PRESET,
328
            "profile": FFMPEG_PROFILE,
329
            "level": FFMPEG_LEVEL,
330
            "crf": FFMPEG_CRF,
331
            "maxrate": first_item[1]["maxrate"],
332
            "bufsize": first_item[1]["maxrate"],
333
            "ba": first_item[1]["audio_bitrate"],
334
            "output": output_file,
335
        }
336
        self.list_mp4_files[first_item[0]] = output_file
1✔
337
        """
338
        il est possible de faire ainsi :
339
        mp4_command += FFMPEG_MP4_ENCODE.format(
340
            height=first_item[0],
341
            [...]
342
        )
343
        """
344
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
345
        for rend in list_rendition:
1✔
346
            resolution_threshold = rend - rend * (
1✔
347
                list_rendition[rend]["encoding_resolution_threshold"] / 100
348
            )
349
            if in_height >= resolution_threshold:
1✔
350
                output_file = os.path.join(self.output_dir, "%sp.mp4" % rend)
×
351
                mp4_command += FFMPEG_MP4_ENCODE % {
×
352
                    "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
353
                    "map_audio": "-map 0:a:0" if len(self.list_audio_track) > 0 else "",
354
                    "libx": FFMPEG_LIBX,
355
                    "height": min(rend, in_height),
356
                    "preset": FFMPEG_PRESET,
357
                    "profile": FFMPEG_PROFILE,
358
                    "level": FFMPEG_LEVEL,
359
                    "crf": FFMPEG_CRF,
360
                    "maxrate": list_rendition[rend]["maxrate"],
361
                    "bufsize": list_rendition[rend]["maxrate"],
362
                    "ba": list_rendition[rend]["audio_bitrate"],
363
                    "output": output_file,
364
                }
365
                self.list_mp4_files[rend] = output_file
×
366
        return mp4_command
1✔
367

368
    def get_hls_command(self) -> str:
1✔
369
        hls_command = "%s " % FFMPEG_CMD
1✔
370
        list_rendition = get_list_rendition()
1✔
371
        hls_command += FFMPEG_INPUT % {
1✔
372
            "input": self.video_file,
373
            "nb_threads": FFMPEG_NB_THREADS,
374
        }
375
        hls_common_params = FFMPEG_HLS_COMMON_PARAMS % {
1✔
376
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
377
            "libx": FFMPEG_LIBX,
378
            "preset": FFMPEG_PRESET,
379
            "profile": FFMPEG_PROFILE,
380
            "level": FFMPEG_LEVEL,
381
            "crf": FFMPEG_CRF,
382
        }
383
        hls_command += hls_common_params
1✔
384
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
385
        for index, rend in enumerate(list_rendition):
1✔
386
            resolution_threshold = rend - rend * (
1✔
387
                list_rendition[rend]["encoding_resolution_threshold"] / 100
388
            )
389
            if in_height >= resolution_threshold or index == 0:
1✔
390
                output_file = os.path.join(self.output_dir, "%sp.m3u8" % rend)
1✔
391
                hls_command += hls_common_params
1✔
392
                hls_command += FFMPEG_HLS_ENCODE_PARAMS % {
1✔
393
                    "height": min(rend, in_height),
394
                    "maxrate": list_rendition[rend]["maxrate"],
395
                    "bufsize": list_rendition[rend]["maxrate"],
396
                    "ba": list_rendition[rend]["audio_bitrate"],
397
                    "hls_time": FFMPEG_HLS_TIME,
398
                    "output": output_file,
399
                }
400
                self.list_hls_files[rend] = output_file
1✔
401
        return hls_command
1✔
402

403
    def get_dressing_file(self) -> str:
1✔
404
        """Create or replace the dressed video file."""
405
        dirname = os.path.dirname(self.video_file)
×
406
        filename, ext = os.path.splitext(os.path.basename(self.video_file))
×
407
        output_file = os.path.join(dirname, filename + "_dressing" + ext)
×
408
        return output_file
×
409

410
    def get_dressing_command(self) -> str:
1✔
411
        """Get the command based on the dressing object parameters."""
412
        height = str(list(self.list_video_track.items())[0][1]["height"])
×
413
        order_opening_credits = 0
×
414
        dressing_command_params = "[vid][0:a]"
×
415
        number_concat = 1
×
416
        dressing_command = "%s " % FFMPEG_CMD
×
417
        dressing_command += FFMPEG_INPUT % {
×
418
            "input": self.video_file,
419
            "nb_threads": FFMPEG_NB_THREADS,
420
        }
421

422
        dressing_command += self.dressing_input
×
423
        dressing_command_filter = []
×
424
        dressing_command_filter.append(
×
425
            FFMPEG_DRESSING_SCALE
426
            % {
427
                "number": "0",
428
                "height": height,
429
                "name": "vid",
430
            }
431
        )
432
        if self.json_dressing["watermark"]:
×
433
            dressing_command_params = "[video][0:a]"
×
434
            order_opening_credits = order_opening_credits + 1
×
435
            name_out = ""
×
436
            if (
×
437
                self.json_dressing["opening_credits"]
438
                or self.json_dressing["ending_credits"]
439
            ):
440
                name_out = "[video]"
×
441
            dressing_command_filter.append(
×
442
                FFMPEG_DRESSING_WATERMARK
443
                % {
444
                    "opacity": self.json_dressing["opacity"] / 100.0,
445
                    "position": get_dressing_position_value(
446
                        self.json_dressing["position_orig"], height
447
                    ),
448
                    "name_out": name_out,
449
                }
450
            )
451
        if self.json_dressing["opening_credits"]:
×
452
            order_opening_credits = order_opening_credits + 1
×
453
            number_concat = number_concat + 1
×
454
            dressing_command_params = (
×
455
                "[debut][" + str(order_opening_credits) + ":a]" + dressing_command_params
456
            )
457
            dressing_command_filter.append(
×
458
                FFMPEG_DRESSING_SCALE
459
                % {
460
                    "number": str(order_opening_credits),
461
                    "height": height,
462
                    "name": "debut",
463
                }
464
            )
465
        if self.json_dressing["ending_credits"]:
×
466
            number_concat = number_concat + 1
×
467
            dressing_command_params = (
×
468
                dressing_command_params
469
                + "[fin]["
470
                + str(order_opening_credits + 1)
471
                + ":a]"
472
            )
473
            dressing_command_filter.append(
×
474
                FFMPEG_DRESSING_SCALE
475
                % {
476
                    "number": str(order_opening_credits + 1),
477
                    "height": str(list(self.list_video_track.items())[0][1]["height"]),
478
                    "name": "fin",
479
                }
480
            )
481
        if self.json_dressing["opening_credits"] or self.json_dressing["ending_credits"]:
×
482
            dressing_command_filter.append(
×
483
                FFMPEG_DRESSING_CONCAT
484
                % {
485
                    "params": dressing_command_params,
486
                    "number": number_concat,
487
                }
488
            )
489

490
        dressing_command += FFMPEG_DRESSING_FILTER_COMPLEX % {
×
491
            "filter": ";".join(dressing_command_filter),
492
        }
493

494
        if self.json_dressing["opening_credits"] or self.json_dressing["ending_credits"]:
×
495
            dressing_command += " -map '[v]' -map '[a]' "
×
496

497
        output_file = self.get_dressing_file()
×
498
        dressing_command += FFMPEG_DRESSING_OUTPUT % {
×
499
            "output": output_file,
500
        }
501
        return dressing_command
×
502

503
    def encode_video_dressing(self):
1✔
504
        """Encode the dressed video."""
505
        dressing_command = self.get_dressing_command()
×
506
        return_value, return_msg = launch_cmd(dressing_command)
×
507
        self.add_encoding_log(
×
508
            "dressing_command", dressing_command, return_value, return_msg
509
        )
510
        self.video_file = self.get_dressing_file()
×
511

512
    def encode_video_part(self):
1✔
513
        """Encode the video part of a file."""
514
        mp4_command = self.get_mp4_command()
1✔
515
        return_value, return_msg = launch_cmd(mp4_command)
1✔
516
        self.add_encoding_log("mp4_command", mp4_command, return_value, return_msg)
1✔
517
        if not return_value:
1✔
518
            self.error_encoding = True
×
519
        if self.duration == 0:
1✔
520
            list_rendition = get_list_rendition()
×
521
            first_item = list_rendition.popitem(last=False)
×
522
            self.fix_duration(self.list_mp4_files[first_item[0]])
×
523
        hls_command = self.get_hls_command()
1✔
524
        return_value, return_msg = launch_cmd(hls_command)
1✔
525
        if return_value:
1✔
526
            self.create_main_livestream()
1✔
527
        self.add_encoding_log("hls_command", hls_command, return_value, return_msg)
1✔
528

529
    def create_main_livestream(self):
1✔
530
        list_rendition = get_list_rendition()
1✔
531
        livestream_content = ""
1✔
532
        for index, rend in enumerate(list_rendition):
1✔
533
            rend_livestream = os.path.join(
1✔
534
                self.get_output_dir(), "livestream%s.m3u8" % rend
535
            )
536
            if os.path.exists(rend_livestream):
1✔
537
                with open(rend_livestream, "r") as file:
1✔
538
                    data = file.read()
1✔
539
                if index == 0:
1✔
540
                    livestream_content += data
1✔
541
                else:
542
                    livestream_content += "\n".join(data.split("\n")[2:])
×
543
                os.remove(rend_livestream)
1✔
544
        livestream_file = open(
1✔
545
            os.path.join(self.get_output_dir(), "livestream.m3u8"), "w"
546
        )
547
        livestream_file.write(livestream_content.replace("\n\n", "\n"))
1✔
548
        livestream_file.close()
1✔
549

550
    def get_mp3_command(self) -> str:
1✔
551
        mp3_command = "%s " % FFMPEG_CMD
1✔
552
        mp3_command += FFMPEG_INPUT % {
1✔
553
            "input": self.video_file,
554
            "nb_threads": FFMPEG_NB_THREADS,
555
        }
556
        output_file = os.path.join(self.output_dir, "audio_%s.mp3" % FFMPEG_AUDIO_BITRATE)
1✔
557
        mp3_command += FFMPEG_MP3_ENCODE % {
1✔
558
            # "audio_bitrate": AUDIO_BITRATE,
559
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
560
            "output": output_file,
561
        }
562
        self.list_mp3_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
563
        return mp3_command
1✔
564

565
    def get_m4a_command(self) -> str:
1✔
566
        m4a_command = "%s " % FFMPEG_CMD
1✔
567
        m4a_command += FFMPEG_INPUT % {
1✔
568
            "input": self.video_file,
569
            "nb_threads": FFMPEG_NB_THREADS,
570
        }
571
        output_file = os.path.join(self.output_dir, "audio_%s.m4a" % FFMPEG_AUDIO_BITRATE)
1✔
572
        m4a_command += FFMPEG_M4A_ENCODE % {
1✔
573
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
574
            "audio_bitrate": FFMPEG_AUDIO_BITRATE,
575
            "output": output_file,
576
        }
577
        self.list_m4a_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
578
        return m4a_command
1✔
579

580
    def encode_audio_part(self):
1✔
581
        """Encode the audio part of a video."""
582
        mp3_command = self.get_mp3_command()
1✔
583
        return_value, return_msg = launch_cmd(mp3_command)
1✔
584
        self.add_encoding_log("mp3_command", mp3_command, return_value, return_msg)
1✔
585
        if self.duration == 0:
1✔
586
            new_k = list(self.list_mp3_files)[0]
×
587
            self.fix_duration(self.list_mp3_files[new_k])
×
588
        if not self.is_video():
1✔
589
            m4a_command = self.get_m4a_command()
1✔
590
            return_value, return_msg = launch_cmd(m4a_command)
1✔
591
            self.add_encoding_log("m4a_command", m4a_command, return_value, return_msg)
1✔
592

593
    def get_extract_thumbnail_command(self) -> str:
1✔
594
        thumbnail_command = "%s " % FFMPEG_CMD
×
595
        thumbnail_command += FFMPEG_INPUT % {
×
596
            "input": self.video_file,
597
            "nb_threads": FFMPEG_NB_THREADS,
598
        }
599
        for img in self.list_image_track:
×
600
            output_file = os.path.join(self.output_dir, "thumbnail_%s.png" % img)
×
601
            thumbnail_command += FFMPEG_EXTRACT_THUMBNAIL % {
×
602
                "index": img,
603
                "output": output_file,
604
            }
605
            self.list_thumbnail_files[img] = output_file
×
606
        return thumbnail_command
×
607

608
    def get_create_thumbnail_command(self) -> str:
1✔
609
        thumbnail_command = "%s " % FFMPEG_CMD
1✔
610
        first_item = self.get_first_item()
1✔
611
        input_file = self.list_mp4_files[first_item[0]]
1✔
612
        thumbnail_command += FFMPEG_INPUT % {
1✔
613
            "input": input_file,
614
            "nb_threads": FFMPEG_NB_THREADS,
615
        }
616
        output_file = os.path.join(self.output_dir, "thumbnail")
1✔
617
        thumbnail_command += FFMPEG_CREATE_THUMBNAIL % {
1✔
618
            "duration": self.duration,
619
            "nb_thumbnail": FFMPEG_NB_THUMBNAIL,
620
            "output": output_file,
621
        }
622
        for nb in range(0, FFMPEG_NB_THUMBNAIL):
1✔
623
            num_thumb = str(nb + 1)
1✔
624
            self.list_thumbnail_files[num_thumb] = "%s_000%s.png" % (
1✔
625
                output_file,
626
                num_thumb,
627
            )
628
        return thumbnail_command
1✔
629

630
    def get_first_item(self):
1✔
631
        """Get the first mp4 render from setting."""
632
        list_rendition = get_list_rendition()
1✔
633
        for rend in list_rendition.copy():
1✔
634
            if list_rendition[rend]["encode_mp4"] is False:
1✔
635
                list_rendition.pop(rend)
1✔
636
        if len(list_rendition) == 0:
1✔
637
            return None
×
638
        else:
639
            return list_rendition.popitem(last=False)
1✔
640

641
    def create_overview(self):
1✔
642
        first_item = self.get_first_item()
1✔
643
        # overview combine for 160x90
644
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
645
        in_width = list(self.list_video_track.items())[0][1]["width"]
1✔
646
        image_height = 90
1✔
647
        coef = in_height / image_height
1✔
648
        image_width = int(in_width / coef)
1✔
649
        input_file = self.list_mp4_files[first_item[0]]
1✔
650
        nb_img = 100 if self.duration >= 100 else 10
1✔
651

652
        overviewimagefilename = os.path.join(self.output_dir, "overview.png")
1✔
653
        overview_image_command = (
1✔
654
            FFMPEG_CMD
655
            + " "
656
            + FFMPEG_INPUT
657
            % {
658
                "input": input_file,
659
                "nb_threads": FFMPEG_NB_THREADS,
660
            }
661
            + FFMPEG_CREATE_OVERVIEW
662
            % {
663
                "duration": self.duration,
664
                "image_count": nb_img,
665
                "width": image_width,
666
                "height": image_height,
667
                "output": overviewimagefilename,
668
            }
669
        )
670
        return_value, output_message = launch_cmd(overview_image_command)
1✔
671
        if not return_value or not check_file(overviewimagefilename):
1✔
672
            print(f"FFmpeg failed with output: {output_message}")
×
673

674
        overviewfilename = os.path.join(self.output_dir, "overview.vtt")
1✔
675
        image_url = os.path.basename(overviewimagefilename)
1✔
676
        webvtt = WebVTT()
1✔
677
        for i in range(0, nb_img):
1✔
678
            start = format(float(self.duration * i / nb_img), ".3f")
1✔
679
            end = format(float(self.duration * (i + 1) / nb_img), ".3f")
1✔
680
            start_time = time.strftime(
1✔
681
                "%H:%M:%S", time.gmtime(int(str(start).split(".")[0]))
682
            )
683
            start_time += ".%s" % (str(start).split(".")[1])
1✔
684
            end_time = time.strftime(
1✔
685
                "%H:%M:%S", time.gmtime(int(str(end).split(".")[0]))
686
            ) + ".%s" % (str(end).split(".")[1])
687
            caption = Caption(
1✔
688
                "%s" % start_time,
689
                "%s" % end_time,
690
                "%s#xywh=%d,%d,%d,%d"
691
                % (image_url, image_width * i, 0, image_width, image_height),
692
            )
693
            webvtt.captions.append(caption)
1✔
694
        webvtt.save(overviewfilename)
1✔
695
        if check_file(overviewfilename) and check_file(overviewimagefilename):
1✔
696
            self.list_overview_files["0"] = overviewimagefilename
1✔
697
            self.list_overview_files["1"] = overviewfilename
1✔
698
            # self.encoding_log += "\n- overviewfilename:\n%s" % overviewfilename
699
        else:
700
            self.add_encoding_log("create_overview", "", False, "")
×
701

702
    def encode_image_part(self):
1✔
703
        if len(self.list_image_track) > 0:
1✔
704
            thumbnail_command = self.get_extract_thumbnail_command()
×
705
            return_value, return_msg = launch_cmd(thumbnail_command)
×
706
            self.add_encoding_log(
×
707
                "extract_thumbnail_command", thumbnail_command, return_value, return_msg
708
            )
709
        elif self.is_video():
1✔
710
            thumbnail_command = self.get_create_thumbnail_command()
1✔
711
            return_value, return_msg = launch_cmd(thumbnail_command)
1✔
712
            self.add_encoding_log(
1✔
713
                "create_thumbnail_command", thumbnail_command, return_value, return_msg
714
            )
715
        # on ne fait pas d'overview pour les videos de moins de 10 secondes
716
        # (laisser les 10sec inclus pour laisser les tests passer) --> OK
717
        if self.is_video() and self.duration >= 10:
1✔
718
            self.create_overview()
1✔
719

720
    def get_extract_subtitle_command(self) -> str:
1✔
721
        subtitle_command = "%s " % FFMPEG_CMD
×
722
        subtitle_command += FFMPEG_INPUT % {
×
723
            "input": self.video_file,
724
            "nb_threads": FFMPEG_NB_THREADS,
725
        }
726
        for sub in self.list_subtitle_track:
×
727
            lang = self.list_subtitle_track[sub]["language"]
×
728
            output_file = os.path.join(self.output_dir, "subtitle_%s.vtt" % lang)
×
729
            subtitle_command += FFMPEG_EXTRACT_SUBTITLE % {
×
730
                "index": sub,
731
                "output": output_file,
732
            }
733
            self.list_subtitle_files[sub] = [lang, output_file]
×
734
        return subtitle_command
×
735

736
    def get_subtitle_part(self):
1✔
737
        if len(self.list_subtitle_track) > 0:
×
738
            subtitle_command = self.get_extract_subtitle_command()
×
739
            return_value, return_msg = launch_cmd(subtitle_command)
×
740
            self.add_encoding_log(
×
741
                "subtitle_command", subtitle_command, return_value, return_msg
742
            )
743

744
    def export_to_json(self):
1✔
745
        data_to_dump = {}
1✔
746
        for attribute, value in self.__dict__.items():
1✔
747
            data_to_dump[attribute] = value
1✔
748
        with open(self.output_dir + "/info_video.json", "w") as outfile:
1✔
749
            json.dump(data_to_dump, outfile, indent=2)
1✔
750

751
    def add_encoding_log(self, title, command, result, msg):
1✔
752
        """Add Encoding step to the encoding_log dict."""
753
        self.encoding_log[title] = {"command": command, "result": result, "msg": msg}
1✔
754
        if result is False and self.error_encoding is False:
1✔
UNCOV
755
            self.error_encoding = True
×
756

757
    def start_encode(self):
1✔
758
        self.start = time.ctime()
1✔
759
        self.create_output_dir()
1✔
760
        self.get_video_data()
1✔
761
        if self.json_dressing is not None:
1✔
762
            self.encode_video_dressing()
×
763
        print(self.id, self.video_file, self.duration)
1✔
764
        if self.is_video():
1✔
765
            self.encode_video_part()
1✔
766
        if len(self.list_audio_track) > 0:
1✔
767
            self.encode_audio_part()
1✔
768
        self.encode_image_part()
1✔
769
        if len(self.list_subtitle_track) > 0:
1✔
770
            self.get_subtitle_part()
×
771
        self.stop = time.ctime()
1✔
772
        self.export_to_json()
1✔
773

774

775
def fix_input(input) -> str:
1✔
776
    filename = ""
×
777
    if args.input.startswith("/"):
×
778
        path_file = args.input
×
779
    else:
780
        path_file = os.path.join(os.getcwd(), args.input)
×
781
    if os.access(path_file, os.F_OK) and os.stat(path_file).st_size > 0:
×
782
        # remove accent and space
783
        filename = "".join(
×
784
            (
785
                c
786
                for c in unicodedata.normalize("NFD", path_file)
787
                if unicodedata.category(c) != "Mn"
788
            )
789
        )
790
        filename = filename.replace(" ", "_")
×
791
        os.rename(
×
792
            path_file,
793
            filename,
794
        )
795
        print("Encoding file {} \n".format(filename))
×
796
    return filename
×
797

798

799
"""
800
  remote encode???
801
"""
802
if __name__ == "__main__":
1✔
803
    start = "Start at: %s" % time.ctime()
×
804
    parser = argparse.ArgumentParser(description="Running encoding video.")
×
805
    parser.add_argument("--id", required=True, help="the ID of the video")
×
806
    parser.add_argument("--start", required=False, help="Start cut")
×
807
    parser.add_argument("--stop", required=False, help="Stop cut")
×
808
    parser.add_argument("--input", required=True, help="name of input file to encode")
×
809
    parser.add_argument("--dressing", required=False, help="Dressing for the video")
×
810

811
    args = parser.parse_args()
×
812
    print(args.start)
×
813
    filename = fix_input(args.input)
×
814
    encoding_video = Encoding_video(
×
815
        args.id, filename, args.start, args.stop, args.dressing
816
    )
817
    # error if uncommented
818
    # encoding_video.encoding_log += start
819
    # AttributeError: 'NoneType' object has no attribute 'get'
820
    encoding_video.start_encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc