• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 8156742544

05 Mar 2024 01:03PM UTC coverage: 69.889% (-0.3%) from 70.212%
8156742544

push

github

web-flow
Merge pull request #1054 from EsupPortail/develop

[DONE] Develop #3.5.1

281 of 580 new or added lines in 23 files covered. (48.45%)

19 existing lines in 11 files now uncovered.

9911 of 14181 relevant lines covered (69.89%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.28
/pod/video_encode_transcript/Encoding_video.py
1
"""Esup-Pod video encoding."""
2

3
import json
1✔
4
import os
1✔
5
import time
1✔
6
from webvtt import WebVTT, Caption
1✔
7
import argparse
1✔
8
import unicodedata
1✔
9

10
if __name__ == "__main__":
1✔
11
    from encoding_utils import (
×
12
        get_dressing_position_value,
13
        get_info_from_video,
14
        get_list_rendition,
15
        launch_cmd,
16
        check_file,
17
    )
18
    from encoding_settings import (
×
19
        FFMPEG_CMD,
20
        FFPROBE_CMD,
21
        FFPROBE_GET_INFO,
22
        FFMPEG_CRF,
23
        FFMPEG_PRESET,
24
        FFMPEG_PROFILE,
25
        FFMPEG_LEVEL,
26
        FFMPEG_HLS_TIME,
27
        FFMPEG_INPUT,
28
        FFMPEG_LIBX,
29
        FFMPEG_MP4_ENCODE,
30
        FFMPEG_HLS_COMMON_PARAMS,
31
        FFMPEG_HLS_ENCODE_PARAMS,
32
        FFMPEG_MP3_ENCODE,
33
        FFMPEG_M4A_ENCODE,
34
        FFMPEG_NB_THREADS,
35
        FFMPEG_AUDIO_BITRATE,
36
        FFMPEG_EXTRACT_THUMBNAIL,
37
        FFMPEG_NB_THUMBNAIL,
38
        FFMPEG_CREATE_THUMBNAIL,
39
        FFMPEG_EXTRACT_SUBTITLE,
40
        FFMPEG_DRESSING_INPUT,
41
        FFMPEG_DRESSING_OUTPUT,
42
        FFMPEG_DRESSING_WATERMARK,
43
        FFMPEG_DRESSING_FILTER_COMPLEX,
44
        FFMPEG_DRESSING_SCALE,
45
        FFMPEG_DRESSING_CONCAT,
46
    )
47
else:
48
    from .encoding_utils import (
1✔
49
        get_dressing_position_value,
50
        get_info_from_video,
51
        get_list_rendition,
52
        launch_cmd,
53
        check_file,
54
    )
55
    from .encoding_settings import (
1✔
56
        FFMPEG_CMD,
57
        FFPROBE_CMD,
58
        FFPROBE_GET_INFO,
59
        FFMPEG_CRF,
60
        FFMPEG_PRESET,
61
        FFMPEG_PROFILE,
62
        FFMPEG_LEVEL,
63
        FFMPEG_HLS_TIME,
64
        FFMPEG_INPUT,
65
        FFMPEG_LIBX,
66
        FFMPEG_MP4_ENCODE,
67
        FFMPEG_HLS_COMMON_PARAMS,
68
        FFMPEG_HLS_ENCODE_PARAMS,
69
        FFMPEG_MP3_ENCODE,
70
        FFMPEG_M4A_ENCODE,
71
        FFMPEG_NB_THREADS,
72
        FFMPEG_AUDIO_BITRATE,
73
        FFMPEG_EXTRACT_THUMBNAIL,
74
        FFMPEG_NB_THUMBNAIL,
75
        FFMPEG_CREATE_THUMBNAIL,
76
        FFMPEG_EXTRACT_SUBTITLE,
77
        FFMPEG_DRESSING_INPUT,
78
        FFMPEG_DRESSING_OUTPUT,
79
        FFMPEG_DRESSING_WATERMARK,
80
        FFMPEG_DRESSING_FILTER_COMPLEX,
81
        FFMPEG_DRESSING_SCALE,
82
        FFMPEG_DRESSING_CONCAT,
83
    )
84

85

86
__author__ = "Nicolas CAN <nicolas.can@univ-lille.fr>"
1✔
87
__license__ = "LGPL v3"
1✔
88

89
image_codec = ["jpeg", "gif", "png", "bmp", "jpg"]
1✔
90

91
"""
92
 - Get video source
93
 - get alls track from video source
94
 - encode tracks in HLS and mp4
95
 - save it
96
"""
97

98
try:
1✔
99
    from django.conf import settings
1✔
100

101
    FFMPEG_CMD = getattr(settings, "FFMPEG_CMD", FFMPEG_CMD)
1✔
102
    FFPROBE_CMD = getattr(settings, "FFPROBE_CMD", FFPROBE_CMD)
1✔
103
    FFPROBE_GET_INFO = getattr(settings, "FFPROBE_GET_INFO", FFPROBE_GET_INFO)
1✔
104
    FFMPEG_CRF = getattr(settings, "FFMPEG_CRF", FFMPEG_CRF)
1✔
105
    FFMPEG_PRESET = getattr(settings, "FFMPEG_PRESET", FFMPEG_PRESET)
1✔
106
    FFMPEG_PROFILE = getattr(settings, "FFMPEG_PROFILE", FFMPEG_PROFILE)
1✔
107
    FFMPEG_LEVEL = getattr(settings, "FFMPEG_LEVEL", FFMPEG_LEVEL)
1✔
108
    FFMPEG_HLS_TIME = getattr(settings, "FFMPEG_HLS_TIME", FFMPEG_HLS_TIME)
1✔
109
    FFMPEG_INPUT = getattr(settings, "FFMPEG_INPUT", FFMPEG_INPUT)
1✔
110
    FFMPEG_LIBX = getattr(settings, "FFMPEG_LIBX", FFMPEG_LIBX)
1✔
111
    FFMPEG_MP4_ENCODE = getattr(settings, "FFMPEG_MP4_ENCODE", FFMPEG_MP4_ENCODE)
1✔
112
    FFMPEG_HLS_COMMON_PARAMS = getattr(
1✔
113
        settings, "FFMPEG_HLS_COMMON_PARAMS", FFMPEG_HLS_COMMON_PARAMS
114
    )
115
    FFMPEG_HLS_ENCODE_PARAMS = getattr(
1✔
116
        settings, "FFMPEG_HLS_ENCODE_PARAMS", FFMPEG_HLS_ENCODE_PARAMS
117
    )
118
    FFMPEG_MP3_ENCODE = getattr(settings, "FFMPEG_MP3_ENCODE", FFMPEG_MP3_ENCODE)
1✔
119
    FFMPEG_M4A_ENCODE = getattr(settings, "FFMPEG_M4A_ENCODE", FFMPEG_M4A_ENCODE)
1✔
120
    FFMPEG_NB_THREADS = getattr(settings, "FFMPEG_NB_THREADS", FFMPEG_NB_THREADS)
1✔
121
    FFMPEG_AUDIO_BITRATE = getattr(settings, "FFMPEG_AUDIO_BITRATE", FFMPEG_AUDIO_BITRATE)
1✔
122
    FFMPEG_EXTRACT_THUMBNAIL = getattr(
1✔
123
        settings, "FFMPEG_EXTRACT_THUMBNAIL", FFMPEG_EXTRACT_THUMBNAIL
124
    )
125
    FFMPEG_NB_THUMBNAIL = getattr(settings, "FFMPEG_NB_THUMBNAIL", FFMPEG_NB_THUMBNAIL)
1✔
126
    FFMPEG_CREATE_THUMBNAIL = getattr(
1✔
127
        settings, "FFMPEG_CREATE_THUMBNAIL", FFMPEG_CREATE_THUMBNAIL
128
    )
129
    FFMPEG_EXTRACT_SUBTITLE = getattr(
1✔
130
        settings, "FFMPEG_EXTRACT_SUBTITLE", FFMPEG_EXTRACT_SUBTITLE
131
    )
132
    FFMPEG_DRESSING_INPUT = getattr(
1✔
133
        settings, "FFMPEG_DRESSING_INPUT", FFMPEG_DRESSING_INPUT
134
    )
135
    FFMPEG_DRESSING_OUTPUT = getattr(
1✔
136
        settings, "FFMPEG_DRESSING_OUTPUT", FFMPEG_DRESSING_OUTPUT
137
    )
138
    FFMPEG_DRESSING_WATERMARK = getattr(
1✔
139
        settings, "FFMPEG_DRESSING_WATERMARK", FFMPEG_DRESSING_WATERMARK
140
    )
141
    FFMPEG_DRESSING_FILTER_COMPLEX = getattr(
1✔
142
        settings, "FFMPEG_DRESSING_FILTER_COMPLEX", FFMPEG_DRESSING_FILTER_COMPLEX
143
    )
144
    FFMPEG_DRESSING_SCALE = getattr(
1✔
145
        settings, "FFMPEG_DRESSING_SCALE", FFMPEG_DRESSING_SCALE
146
    )
147
    FFMPEG_DRESSING_CONCAT = getattr(
1✔
148
        settings, "FFMPEG_DRESSING_CONCAT", FFMPEG_DRESSING_CONCAT
149
    )
150
except ImportError:  # pragma: no cover
151
    pass
152

153

154
class Encoding_video:
1✔
155
    """Encoding video object."""
156

157
    id = 0
1✔
158
    video_file = ""
1✔
159
    duration = 0
1✔
160
    list_video_track = {}
1✔
161
    list_audio_track = {}
1✔
162
    list_subtitle_track = {}
1✔
163
    list_image_track = {}
1✔
164
    list_mp4_files = {}
1✔
165
    list_hls_files = {}
1✔
166
    list_mp3_files = {}
1✔
167
    list_m4a_files = {}
1✔
168
    list_thumbnail_files = {}
1✔
169
    list_overview_files = {}
1✔
170
    list_subtitle_files = {}
1✔
171
    encoding_log = {}
1✔
172
    output_dir = ""
1✔
173
    start = 0
1✔
174
    stop = 0
1✔
175
    error_encoding = False
1✔
176
    cutting_start = 0
1✔
177
    cutting_stop = 0
1✔
178
    json_dressing = None
1✔
179
    dressing_input = ""
1✔
180

181
    def __init__(
1✔
182
        self, id=0, video_file="", start=0, stop=0, json_dressing=None, dressing_input=""
183
    ):
184
        """Initialize a new Encoding_video object."""
185
        self.id = id
1✔
186
        self.video_file = video_file
1✔
187
        self.duration = 0
1✔
188
        self.list_video_track = {}
1✔
189
        self.list_audio_track = {}
1✔
190
        self.list_subtitle_track = {}
1✔
191
        self.list_image_track = {}
1✔
192
        self.list_mp4_files = {}
1✔
193
        self.list_hls_files = {}
1✔
194
        self.list_mp3_files = {}
1✔
195
        self.list_m4a_files = {}
1✔
196
        self.list_thumbnail_files = {}
1✔
197
        self.list_overview_files = {}
1✔
198
        self.encoding_log = {}
1✔
199
        self.list_subtitle_files = {}
1✔
200
        self.output_dir = ""
1✔
201
        self.start = 0
1✔
202
        self.stop = 0
1✔
203
        self.error_encoding = False
1✔
204
        self.cutting_start = start or 0
1✔
205
        self.cutting_stop = stop or 0
1✔
206
        self.json_dressing = json_dressing
1✔
207
        self.dressing_input = dressing_input
1✔
208

209
    def is_video(self) -> bool:
1✔
210
        """Check if current encoding correspond to a video."""
211
        return len(self.list_video_track) > 0
1✔
212

213
    def get_subtime(self, clip_begin, clip_end):
1✔
214
        subtime = ""
1✔
215
        if clip_begin != 0 or clip_end != 0:
1✔
216
            subtime += "-ss %s " % str(clip_begin) + "-to %s " % str(clip_end)
×
217
        return subtime
1✔
218

219
    def get_video_data(self):
1✔
220
        """Get alls tracks from video source and put it in object passed in parameter."""
221
        msg = "--> get_info_video\n"
1✔
222
        probe_cmd = FFPROBE_GET_INFO % {
1✔
223
            "ffprobe": FFPROBE_CMD,
224
            "select_streams": "",
225
            "source": '"' + self.video_file + '" ',
226
        }
227
        msg += probe_cmd + "\n"
1✔
228
        duration = 0
1✔
229
        info, return_msg = get_info_from_video(probe_cmd)
1✔
230
        msg += json.dumps(info, indent=2)
1✔
231
        msg += " \n"
1✔
232
        msg += return_msg + "\n"
1✔
233
        self.add_encoding_log("probe_cmd", probe_cmd, True, msg)
1✔
234
        try:
1✔
235
            duration = int(float("%s" % info["format"]["duration"]))
1✔
236
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
237
            msg = "\nUnexpected error: {0}".format(err)
×
238
            self.add_encoding_log("duration", "", True, msg)
×
239
        if self.cutting_start != 0 or self.cutting_stop != 0:
1✔
240
            duration = self.cutting_stop - self.cutting_start
×
241
        self.duration = duration
1✔
242
        streams = info.get("streams", [])
1✔
243
        for stream in streams:
1✔
244
            self.add_stream(stream)
1✔
245

246
    def fix_duration(self, input_file):
1✔
247
        msg = "--> get_info_video\n"
×
248
        probe_cmd = 'ffprobe -v quiet -show_entries format=duration -hide_banner  \
×
249
                    -of default=noprint_wrappers=1:nokey=1 -print_format json -i \
250
                    "{}"'.format(
251
            input_file
252
        )
253
        info, return_msg = get_info_from_video(probe_cmd)
×
254
        msg += json.dumps(info, indent=2)
×
255
        msg += " \n"
×
256
        msg += return_msg + "\n"
×
257
        duration = 0
×
258
        try:
×
259
            duration = int(float("%s" % info["format"]["duration"]))
×
260
        except (RuntimeError, KeyError, AttributeError, ValueError, TypeError) as err:
×
261
            msg += "\nUnexpected error: {0}".format(err)
×
262
        self.add_encoding_log("fix_duration", "", True, msg)
×
263
        if self.cutting_start != 0 or self.cutting_stop != 0:
×
264
            duration = self.cutting_stop - self.cutting_start
×
265
        self.duration = duration
×
266

267
    def add_stream(self, stream):
1✔
268
        codec_type = stream.get("codec_type", "unknown")
1✔
269
        # https://ffmpeg.org/doxygen/3.2/group__lavu__misc.html#ga9a84bba4713dfced21a1a56163be1f48
270
        if codec_type == "audio":
1✔
271
            codec = stream.get("codec_name", "unknown")
1✔
272
            self.list_audio_track["%s" % stream.get("index")] = {
1✔
273
                "sample_rate": stream.get("sample_rate", 0),
274
                "channels": stream.get("channels", 0),
275
            }
276
        if codec_type == "video":
1✔
277
            codec = stream.get("codec_name", "unknown")
1✔
278
            if any(ext in codec.lower() for ext in image_codec):
1✔
279
                self.list_image_track["%s" % stream.get("index")] = {
×
280
                    "width": stream.get("width", 0),
281
                    "height": stream.get("height", 0),
282
                }
283
            else:
284
                self.list_video_track["%s" % stream.get("index")] = {
1✔
285
                    "width": stream.get("width", 0),
286
                    "height": stream.get("height", 0),
287
                }
288
        if codec_type == "subtitle":
1✔
289
            codec = stream.get("codec_name", "unknown")
×
290
            language = ""
×
291
            if stream.get("tags"):
×
292
                language = stream.get("tags").get("language", "")
×
293
            self.list_subtitle_track["%s" % stream.get("index")] = {"language": language}
×
294

295
    def get_output_dir(self) -> str:
1✔
296
        dirname = os.path.dirname(self.video_file)
1✔
297
        return os.path.join(dirname, "%04d" % int(self.id))
1✔
298

299
    def create_output_dir(self):
1✔
300
        output_dir = self.get_output_dir()
1✔
301
        if not os.path.exists(output_dir):
1✔
302
            os.makedirs(output_dir)
1✔
303
        self.output_dir = output_dir
1✔
304

305
    def get_mp4_command(self) -> str:
1✔
306
        mp4_command = "%s " % FFMPEG_CMD
1✔
307
        list_rendition = get_list_rendition()
1✔
308
        # remove rendition if encode_mp4 == False
309
        for rend in list_rendition.copy():
1✔
310
            if list_rendition[rend]["encode_mp4"] is False:
1✔
311
                list_rendition.pop(rend)
1✔
312
        if len(list_rendition) == 0:
1✔
313
            return ""
×
314
        first_item = list_rendition.popitem(last=False)
1✔
315
        mp4_command += FFMPEG_INPUT % {
1✔
316
            "input": self.video_file,
317
            "nb_threads": FFMPEG_NB_THREADS,
318
        }
319
        output_file = os.path.join(self.output_dir, "%sp.mp4" % first_item[0])
1✔
320
        mp4_command += FFMPEG_MP4_ENCODE % {
1✔
321
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
322
            "map_audio": "-map 0:a:0" if len(self.list_audio_track) > 0 else "",
323
            "libx": FFMPEG_LIBX,
324
            "height": first_item[0],
325
            "preset": FFMPEG_PRESET,
326
            "profile": FFMPEG_PROFILE,
327
            "level": FFMPEG_LEVEL,
328
            "crf": FFMPEG_CRF,
329
            "maxrate": first_item[1]["maxrate"],
330
            "bufsize": first_item[1]["maxrate"],
331
            "ba": first_item[1]["audio_bitrate"],
332
            "output": output_file,
333
        }
334
        self.list_mp4_files[first_item[0]] = output_file
1✔
335
        """
336
        il est possible de faire ainsi :
337
        mp4_command += FFMPEG_MP4_ENCODE.format(
338
            height=first_item[0],
339
            [...]
340
        )
341
        """
342
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
343
        for rend in list_rendition:
1✔
344
            resolution_threshold = rend - rend * (
1✔
345
                list_rendition[rend]["encoding_resolution_threshold"] / 100
346
            )
347
            if in_height >= resolution_threshold:
1✔
348
                output_file = os.path.join(self.output_dir, "%sp.mp4" % rend)
×
349
                mp4_command += FFMPEG_MP4_ENCODE % {
×
350
                    "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
351
                    "map_audio": "-map 0:a:0" if len(self.list_audio_track) > 0 else "",
352
                    "libx": FFMPEG_LIBX,
353
                    "height": min(rend, in_height),
354
                    "preset": FFMPEG_PRESET,
355
                    "profile": FFMPEG_PROFILE,
356
                    "level": FFMPEG_LEVEL,
357
                    "crf": FFMPEG_CRF,
358
                    "maxrate": list_rendition[rend]["maxrate"],
359
                    "bufsize": list_rendition[rend]["maxrate"],
360
                    "ba": list_rendition[rend]["audio_bitrate"],
361
                    "output": output_file,
362
                }
363
                self.list_mp4_files[rend] = output_file
×
364
        return mp4_command
1✔
365

366
    def get_hls_command(self) -> str:
1✔
367
        hls_command = "%s " % FFMPEG_CMD
1✔
368
        list_rendition = get_list_rendition()
1✔
369
        hls_command += FFMPEG_INPUT % {
1✔
370
            "input": self.video_file,
371
            "nb_threads": FFMPEG_NB_THREADS,
372
        }
373
        hls_common_params = FFMPEG_HLS_COMMON_PARAMS % {
1✔
374
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
375
            "libx": FFMPEG_LIBX,
376
            "preset": FFMPEG_PRESET,
377
            "profile": FFMPEG_PROFILE,
378
            "level": FFMPEG_LEVEL,
379
            "crf": FFMPEG_CRF,
380
        }
381
        hls_command += hls_common_params
1✔
382
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
383
        for index, rend in enumerate(list_rendition):
1✔
384
            resolution_threshold = rend - rend * (
1✔
385
                list_rendition[rend]["encoding_resolution_threshold"] / 100
386
            )
387
            if in_height >= resolution_threshold or index == 0:
1✔
388
                output_file = os.path.join(self.output_dir, "%sp.m3u8" % rend)
1✔
389
                hls_command += hls_common_params
1✔
390
                hls_command += FFMPEG_HLS_ENCODE_PARAMS % {
1✔
391
                    "height": min(rend, in_height),
392
                    "maxrate": list_rendition[rend]["maxrate"],
393
                    "bufsize": list_rendition[rend]["maxrate"],
394
                    "ba": list_rendition[rend]["audio_bitrate"],
395
                    "hls_time": FFMPEG_HLS_TIME,
396
                    "output": output_file,
397
                }
398
                self.list_hls_files[rend] = output_file
1✔
399
        return hls_command
1✔
400

401
    def get_dressing_file(self) -> str:
1✔
402
        """Create or replace the dressed video file."""
403
        dirname = os.path.dirname(self.video_file)
×
404
        filename, ext = os.path.splitext(os.path.basename(self.video_file))
×
405
        output_file = os.path.join(dirname, filename + "_dressing" + ext)
×
406
        return output_file
×
407

408
    def get_dressing_command(self) -> str:
1✔
409
        """Get the command based on the dressing object parameters."""
410
        height = str(list(self.list_video_track.items())[0][1]["height"])
×
411
        order_opening_credits = 0
×
412
        dressing_command_params = "[vid][0:a]"
×
413
        number_concat = 1
×
414
        dressing_command = "%s " % FFMPEG_CMD
×
415
        dressing_command += FFMPEG_INPUT % {
×
416
            "input": self.video_file,
417
            "nb_threads": FFMPEG_NB_THREADS,
418
        }
419

NEW
420
        dressing_command += self.dressing_input
×
421
        dressing_command_filter = []
×
422
        dressing_command_filter.append(
×
423
            FFMPEG_DRESSING_SCALE
424
            % {
425
                "number": "0",
426
                "height": height,
427
                "name": "vid",
428
            }
429
        )
NEW
430
        if self.json_dressing["watermark"]:
×
431
            dressing_command_params = "[video][0:a]"
×
432
            order_opening_credits = order_opening_credits + 1
×
433
            name_out = ""
×
NEW
434
            if (
×
435
                self.json_dressing["opening_credits"]
436
                or self.json_dressing["ending_credits"]
437
            ):
438
                name_out = "[video]"
×
439
            dressing_command_filter.append(
×
440
                FFMPEG_DRESSING_WATERMARK
441
                % {
442
                    "opacity": self.json_dressing.opacity / 100.0,
443
                    "position": get_dressing_position_value(
444
                        self.json_dressing["position"], height
445
                    ),
446
                    "name_out": name_out,
447
                }
448
            )
NEW
449
        if self.json_dressing["opening_credits"]:
×
450
            order_opening_credits = order_opening_credits + 1
×
451
            number_concat = number_concat + 1
×
452
            dressing_command_params = (
×
453
                "[debut][" + str(order_opening_credits) + ":a]" + dressing_command_params
454
            )
455
            dressing_command_filter.append(
×
456
                FFMPEG_DRESSING_SCALE
457
                % {
458
                    "number": str(order_opening_credits),
459
                    "height": height,
460
                    "name": "debut",
461
                }
462
            )
NEW
463
        if self.json_dressing["ending_credits"]:
×
464
            number_concat = number_concat + 1
×
465
            dressing_command_params = (
×
466
                dressing_command_params
467
                + "[fin]["
468
                + str(order_opening_credits + 1)
469
                + ":a]"
470
            )
471
            dressing_command_filter.append(
×
472
                FFMPEG_DRESSING_SCALE
473
                % {
474
                    "number": str(order_opening_credits + 1),
475
                    "height": str(list(self.list_video_track.items())[0][1]["height"]),
476
                    "name": "fin",
477
                }
478
            )
NEW
479
        if self.json_dressing["opening_credits"] or self.json_dressing["ending_credits"]:
×
480
            dressing_command_filter.append(
×
481
                FFMPEG_DRESSING_CONCAT
482
                % {
483
                    "params": dressing_command_params,
484
                    "number": number_concat,
485
                }
486
            )
487

488
        dressing_command += FFMPEG_DRESSING_FILTER_COMPLEX % {
×
489
            "filter": ";".join(dressing_command_filter),
490
        }
491

NEW
492
        if self.json_dressing["opening_credits"] or self.json_dressing["ending_credits"]:
×
493
            dressing_command += " -map '[v]' -map '[a]' "
×
494

495
        output_file = self.get_dressing_file()
×
496
        dressing_command += FFMPEG_DRESSING_OUTPUT % {
×
497
            "output": output_file,
498
        }
499
        return dressing_command
×
500

501
    def encode_video_dressing(self):
1✔
502
        """Encode the dressed video."""
503
        dressing_command = self.get_dressing_command()
×
504
        return_value, return_msg = launch_cmd(dressing_command)
×
505
        self.add_encoding_log(
×
506
            "dressing_command", dressing_command, return_value, return_msg
507
        )
508
        self.video_file = self.get_dressing_file()
×
509

510
    def encode_video_part(self):
1✔
511
        """Encode the video part of a file."""
512
        mp4_command = self.get_mp4_command()
1✔
513
        return_value, return_msg = launch_cmd(mp4_command)
1✔
514
        self.add_encoding_log("mp4_command", mp4_command, return_value, return_msg)
1✔
515
        if not return_value:
1✔
516
            self.error_encoding = True
×
517
        if self.duration == 0:
1✔
518
            list_rendition = get_list_rendition()
×
519
            first_item = list_rendition.popitem(last=False)
×
520
            self.fix_duration(self.list_mp4_files[first_item[0]])
×
521
        hls_command = self.get_hls_command()
1✔
522
        return_value, return_msg = launch_cmd(hls_command)
1✔
523
        if return_value:
1✔
524
            self.create_main_livestream()
1✔
525
        self.add_encoding_log("hls_command", hls_command, return_value, return_msg)
1✔
526

527
    def create_main_livestream(self):
1✔
528
        list_rendition = get_list_rendition()
1✔
529
        livestream_content = ""
1✔
530
        for index, rend in enumerate(list_rendition):
1✔
531
            rend_livestream = os.path.join(
1✔
532
                self.get_output_dir(), "livestream%s.m3u8" % rend
533
            )
534
            if os.path.exists(rend_livestream):
1✔
535
                with open(rend_livestream, "r") as file:
1✔
536
                    data = file.read()
1✔
537
                if index == 0:
1✔
538
                    livestream_content += data
1✔
539
                else:
540
                    livestream_content += "\n".join(data.split("\n")[2:])
×
541
                os.remove(rend_livestream)
1✔
542
        livestream_file = open(
1✔
543
            os.path.join(self.get_output_dir(), "livestream.m3u8"), "w"
544
        )
545
        livestream_file.write(livestream_content.replace("\n\n", "\n"))
1✔
546
        livestream_file.close()
1✔
547

548
    def get_mp3_command(self) -> str:
1✔
549
        mp3_command = "%s " % FFMPEG_CMD
1✔
550
        mp3_command += FFMPEG_INPUT % {
1✔
551
            "input": self.video_file,
552
            "nb_threads": FFMPEG_NB_THREADS,
553
        }
554
        output_file = os.path.join(self.output_dir, "audio_%s.mp3" % FFMPEG_AUDIO_BITRATE)
1✔
555
        mp3_command += FFMPEG_MP3_ENCODE % {
1✔
556
            # "audio_bitrate": AUDIO_BITRATE,
557
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
558
            "output": output_file,
559
        }
560
        self.list_mp3_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
561
        return mp3_command
1✔
562

563
    def get_m4a_command(self) -> str:
1✔
564
        m4a_command = "%s " % FFMPEG_CMD
1✔
565
        m4a_command += FFMPEG_INPUT % {
1✔
566
            "input": self.video_file,
567
            "nb_threads": FFMPEG_NB_THREADS,
568
        }
569
        output_file = os.path.join(self.output_dir, "audio_%s.m4a" % FFMPEG_AUDIO_BITRATE)
1✔
570
        m4a_command += FFMPEG_M4A_ENCODE % {
1✔
571
            "cut": self.get_subtime(self.cutting_start, self.cutting_stop),
572
            "audio_bitrate": FFMPEG_AUDIO_BITRATE,
573
            "output": output_file,
574
        }
575
        self.list_m4a_files[FFMPEG_AUDIO_BITRATE] = output_file
1✔
576
        return m4a_command
1✔
577

578
    def encode_audio_part(self):
1✔
579
        """Encode the audio part of a video."""
580
        mp3_command = self.get_mp3_command()
1✔
581
        return_value, return_msg = launch_cmd(mp3_command)
1✔
582
        self.add_encoding_log("mp3_command", mp3_command, return_value, return_msg)
1✔
583
        if self.duration == 0:
1✔
584
            new_k = list(self.list_mp3_files)[0]
×
585
            self.fix_duration(self.list_mp3_files[new_k])
×
586
        if not self.is_video():
1✔
587
            m4a_command = self.get_m4a_command()
1✔
588
            return_value, return_msg = launch_cmd(m4a_command)
1✔
589
            self.add_encoding_log("m4a_command", m4a_command, return_value, return_msg)
1✔
590

591
    def get_extract_thumbnail_command(self) -> str:
1✔
592
        thumbnail_command = "%s " % FFMPEG_CMD
×
593
        thumbnail_command += FFMPEG_INPUT % {
×
594
            "input": self.video_file,
595
            "nb_threads": FFMPEG_NB_THREADS,
596
        }
597
        for img in self.list_image_track:
×
598
            output_file = os.path.join(self.output_dir, "thumbnail_%s.png" % img)
×
599
            thumbnail_command += FFMPEG_EXTRACT_THUMBNAIL % {
×
600
                "index": img,
601
                "output": output_file,
602
            }
603
            self.list_thumbnail_files[img] = output_file
×
604
        return thumbnail_command
×
605

606
    def get_create_thumbnail_command(self) -> str:
1✔
607
        thumbnail_command = "%s " % FFMPEG_CMD
1✔
608
        first_item = self.get_first_item()
1✔
609
        input_file = self.list_mp4_files[first_item[0]]
1✔
610
        thumbnail_command += FFMPEG_INPUT % {
1✔
611
            "input": input_file,
612
            "nb_threads": FFMPEG_NB_THREADS,
613
        }
614
        output_file = os.path.join(self.output_dir, "thumbnail")
1✔
615
        thumbnail_command += FFMPEG_CREATE_THUMBNAIL % {
1✔
616
            "duration": self.duration,
617
            "nb_thumbnail": FFMPEG_NB_THUMBNAIL,
618
            "output": output_file,
619
        }
620
        for nb in range(0, FFMPEG_NB_THUMBNAIL):
1✔
621
            num_thumb = str(nb + 1)
1✔
622
            self.list_thumbnail_files[num_thumb] = "%s_000%s.png" % (
1✔
623
                output_file,
624
                num_thumb,
625
            )
626
        return thumbnail_command
1✔
627

628
    def get_first_item(self):
1✔
629
        """Get the first mp4 render from setting."""
630
        list_rendition = get_list_rendition()
1✔
631
        for rend in list_rendition.copy():
1✔
632
            if list_rendition[rend]["encode_mp4"] is False:
1✔
633
                list_rendition.pop(rend)
1✔
634
        if len(list_rendition) == 0:
1✔
635
            return None
×
636
        else:
637
            return list_rendition.popitem(last=False)
1✔
638

639
    def create_overview(self):
1✔
640
        first_item = self.get_first_item()
1✔
641
        # overview combine for 160x90
642
        in_height = list(self.list_video_track.items())[0][1]["height"]
1✔
643
        in_width = list(self.list_video_track.items())[0][1]["width"]
1✔
644
        image_height = 75  # decrease to 75 px instead of 90 due to montage overflow
1✔
645
        coef = in_height / image_height
1✔
646
        image_width = int(in_width / coef)
1✔
647
        input_file = self.list_mp4_files[first_item[0]]
1✔
648
        nb_img = 100
1✔
649
        step = 1
1✔
650
        if self.duration < 100:
1✔
651
            # nb_img = int(self.duration * 10 / 100)
652
            step = 10  # on ne fait que 10 images si la video dure moins de 100 sec.
1✔
653
        overviewimagefilename = os.path.join(self.output_dir, "overview.png")
1✔
654
        image_url = os.path.basename(overviewimagefilename)
1✔
655
        overviewfilename = os.path.join(self.output_dir, "overview.vtt")
1✔
656
        webvtt = WebVTT()
1✔
657
        for i in range(0, nb_img, step):
1✔
658
            # create overviewimagefilename for first pass
659
            output_file = (
1✔
660
                os.path.join(self.output_dir, "thumbnail_%s.png" % i)
661
                if i > 0
662
                else overviewimagefilename
663
            )
664
            cmd_ffmpegthumbnailer = (
1✔
665
                'ffmpegthumbnailer -t "%(stamp)s" '
666
                + '-s "%(image_width)s" -i %(source)s -c png '
667
                + "-o %(output_file)s "
668
            ) % {
669
                "stamp": str(i) + "%",
670
                "source": input_file,
671
                "output_file": output_file,
672
                "image_width": image_width,
673
            }
674
            return_value, return_msg = launch_cmd(cmd_ffmpegthumbnailer)
1✔
675
            # self.add_encoding_log(
676
            # "ffmpegthumbnailer_%s" % i, cmd_ffmpegthumbnailer, return_value, return_msg)
677
            if return_value and check_file(output_file) and i > 0:
1✔
678
                # print("MONTAGE")
679
                cmd_montage = (
1✔
680
                    "montage -geometry +0+0 %(overviewimagefilename)s \
681
                    %(output_file)s  %(overviewimagefilename)s"
682
                    % {
683
                        "overviewimagefilename": overviewimagefilename,
684
                        "output_file": output_file,
685
                    }
686
                )
687
                return_value, return_msg = launch_cmd(cmd_montage)
1✔
688
                if not return_value:
1✔
689
                    print("cmd_montage_%s" % i, cmd_montage, return_value, return_msg)
×
690
                # self.add_encoding_log
691
                # ("cmd_montage_%s" % i, cmd_montage, return_value, return_msg)
692
                os.remove(output_file)
1✔
693
            start = format(float(self.duration * i / 100), ".3f")
1✔
694
            end = format(float(self.duration * (i + step) / 100), ".3f")
1✔
695
            start_time = time.strftime(
1✔
696
                "%H:%M:%S", time.gmtime(int(str(start).split(".")[0]))
697
            )
698
            start_time += ".%s" % (str(start).split(".")[1])
1✔
699
            end_time = time.strftime(
1✔
700
                "%H:%M:%S", time.gmtime(int(str(end).split(".")[0]))
701
            ) + ".%s" % (str(end).split(".")[1])
702
            caption = Caption(
1✔
703
                "%s" % start_time,
704
                "%s" % end_time,
705
                "%s#xywh=%d,%d,%d,%d"
706
                % (image_url, image_width * (i / step), 0, image_width, image_height),
707
            )
708
            webvtt.captions.append(caption)
1✔
709
        webvtt.save(overviewfilename)
1✔
710
        if check_file(overviewfilename) and check_file(overviewimagefilename):
1✔
711
            self.list_overview_files["0"] = overviewimagefilename
1✔
712
            self.list_overview_files["1"] = overviewfilename
1✔
713
            # self.encoding_log += "\n- overviewfilename:\n%s" % overviewfilename
714
        else:
715
            self.add_encoding_log("create_overview", "", False, "")
×
716

717
    def encode_image_part(self):
1✔
718
        if len(self.list_image_track) > 0:
1✔
719
            thumbnail_command = self.get_extract_thumbnail_command()
×
720
            return_value, return_msg = launch_cmd(thumbnail_command)
×
721
            self.add_encoding_log(
×
722
                "extract_thumbnail_command", thumbnail_command, return_value, return_msg
723
            )
724
        elif self.is_video():
1✔
725
            thumbnail_command = self.get_create_thumbnail_command()
1✔
726
            return_value, return_msg = launch_cmd(thumbnail_command)
1✔
727
            self.add_encoding_log(
1✔
728
                "create_thumbnail_command", thumbnail_command, return_value, return_msg
729
            )
730
        # on ne fait pas d'overview pour les videos de moins de 10 secondes
731
        # (laisser les 10sec inclus pour laisser les tests passer) --> OK
732
        if self.is_video() and self.duration >= 10:
1✔
733
            self.create_overview()
1✔
734

735
    def get_extract_subtitle_command(self) -> str:
1✔
736
        subtitle_command = "%s " % FFMPEG_CMD
×
737
        subtitle_command += FFMPEG_INPUT % {
×
738
            "input": self.video_file,
739
            "nb_threads": FFMPEG_NB_THREADS,
740
        }
741
        for sub in self.list_subtitle_track:
×
742
            lang = self.list_subtitle_track[sub]["language"]
×
743
            output_file = os.path.join(self.output_dir, "subtitle_%s.vtt" % lang)
×
744
            subtitle_command += FFMPEG_EXTRACT_SUBTITLE % {
×
745
                "index": sub,
746
                "output": output_file,
747
            }
748
            self.list_subtitle_files[sub] = [lang, output_file]
×
749
        return subtitle_command
×
750

751
    def get_subtitle_part(self):
1✔
752
        if len(self.list_subtitle_track) > 0:
×
753
            subtitle_command = self.get_extract_subtitle_command()
×
754
            return_value, return_msg = launch_cmd(subtitle_command)
×
755
            self.add_encoding_log(
×
756
                "subtitle_command", subtitle_command, return_value, return_msg
757
            )
758

759
    def export_to_json(self):
1✔
760
        data_to_dump = {}
1✔
761
        for attribute, value in self.__dict__.items():
1✔
762
            data_to_dump[attribute] = value
1✔
763
        with open(self.output_dir + "/info_video.json", "w") as outfile:
1✔
764
            json.dump(data_to_dump, outfile, indent=2)
1✔
765

766
    def add_encoding_log(self, title, command, result, msg):
1✔
767
        """Add Encoding step to the encoding_log dict."""
768
        self.encoding_log[title] = {"command": command, "result": result, "msg": msg}
1✔
769
        if result is False and self.error_encoding is False:
1✔
770
            self.error_encoding = True
1✔
771

772
    def start_encode(self):
1✔
773
        self.start = time.ctime()
1✔
774
        self.create_output_dir()
1✔
775
        self.get_video_data()
1✔
776
        if self.json_dressing is not None:
1✔
777
            self.encode_video_dressing()
×
778
        print(self.id, self.video_file, self.duration)
1✔
779
        if self.is_video():
1✔
780
            self.encode_video_part()
1✔
781
        if len(self.list_audio_track) > 0:
1✔
782
            self.encode_audio_part()
1✔
783
        self.encode_image_part()
1✔
784
        if len(self.list_subtitle_track) > 0:
1✔
785
            self.get_subtitle_part()
×
786
        self.stop = time.ctime()
1✔
787
        self.export_to_json()
1✔
788

789

790
def fix_input(input) -> str:
1✔
791
    filename = ""
×
792
    if args.input.startswith("/"):
×
793
        path_file = args.input
×
794
    else:
795
        path_file = os.path.join(os.getcwd(), args.input)
×
796
    if os.access(path_file, os.F_OK) and os.stat(path_file).st_size > 0:
×
797
        # remove accent and space
798
        filename = "".join(
×
799
            (
800
                c
801
                for c in unicodedata.normalize("NFD", path_file)
802
                if unicodedata.category(c) != "Mn"
803
            )
804
        )
805
        filename = filename.replace(" ", "_")
×
806
        os.rename(
×
807
            path_file,
808
            filename,
809
        )
810
        print("Encoding file {} \n".format(filename))
×
811
    return filename
×
812

813

814
"""
815
  remote encode ???
816
"""
817
if __name__ == "__main__":
1✔
818
    start = "Start at: %s" % time.ctime()
×
819
    parser = argparse.ArgumentParser(description="Running encoding video.")
×
820
    parser.add_argument("--id", required=True, help="the ID of the video")
×
821
    parser.add_argument("--start", required=False, help="Start cut")
×
822
    parser.add_argument("--stop", required=False, help="Stop cut")
×
823
    parser.add_argument("--input", required=True, help="name of input file to encode")
×
824
    parser.add_argument("--dressing", required=False, help="Dressing for the video")
×
825

826
    args = parser.parse_args()
×
827
    print(args.start)
×
828
    filename = fix_input(args.input)
×
829
    encoding_video = Encoding_video(
×
830
        args.id, filename, args.start, args.stop, args.dressing
831
    )
832
    # error if uncommented
833
    # encoding_video.encoding_log += start
834
    # AttributeError: 'NoneType' object has no attribute 'get'
835
    encoding_video.start_encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc