• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 26805325804

02 Jun 2026 07:32AM UTC coverage: 69.176%. First build
26805325804

Pull #1454

github

web-flow
Merge 284ffcac1 into 05026e2ce
Pull Request #1454: Security Hardening and Priority-User Support for Encoding/Transcript workflows

406 of 582 new or added lines in 14 files covered. (69.76%)

13618 of 19686 relevant lines covered (69.18%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.23
/pod/video_encode_transcript/Encoding_video_model.py
1
"""Model for video encoding."""
2

3
import json
1✔
4
import logging
1✔
5
import os
1✔
6
import re
1✔
7
import time
1✔
8

9
from django.conf import settings
1✔
10
from django.core.files import File
1✔
11
from django.utils.translation import gettext_lazy as _
1✔
12

13
from pod.completion.models import Track
1✔
14
from pod.video.models import LANG_CHOICES, Video
1✔
15

16
from .encoding_utils import check_file, launch_cmd
1✔
17
from .Encoding_video import (
1✔
18
    FFMPEG_CMD,
19
    FFMPEG_CREATE_THUMBNAIL,
20
    FFMPEG_INPUT,
21
    FFMPEG_NB_THREADS,
22
    FFMPEG_NB_THUMBNAIL,
23
    Encoding_video,
24
)
25
from .models import (
1✔
26
    EncodingAudio,
27
    EncodingLog,
28
    EncodingVideo,
29
    PlaylistVideo,
30
    VideoRendition,
31
)
32

33
DEBUG = getattr(settings, "DEBUG", True)
1✔
34
logger = logging.getLogger(__name__)
1✔
35
if DEBUG:
1✔
36
    logger.setLevel(logging.DEBUG)
1✔
37

38
ENCODING_CHOICES = getattr(
1✔
39
    settings,
40
    "ENCODING_CHOICES",
41
    (
42
        ("audio", "audio"),
43
        ("360p", "360p"),
44
        ("480p", "480p"),
45
        ("720p", "720p"),
46
        ("1080p", "1080p"),
47
        ("playlist", "playlist"),
48
    ),
49
)
50

51
__LANG_CHOICES_DICT__ = {
1✔
52
    key: value for key, value in LANG_CHOICES[0][1] + LANG_CHOICES[1][1]
53
}
54
DEFAULT_LANG_TRACK = getattr(settings, "DEFAULT_LANG_TRACK", "fr")
1✔
55

56
if getattr(settings, "USE_PODFILE", False):
1✔
57
    __FILEPICKER__ = True
1✔
58
    from pod.podfile.models import CustomFileModel, CustomImageModel
1✔
59
else:
60
    __FILEPICKER__ = False
×
61
    from pod.main.models import CustomFileModel, CustomImageModel
×
62

63

64
class Encoding_video_model(Encoding_video):
1✔
65
    """Encoding video model."""
66

67
    def _safe_output_path(self, filepath: str) -> str:
1✔
68
        """Return a normalized path constrained to the current encoding output dir."""
69
        output_dir = os.path.realpath(self.get_output_dir())
1✔
70
        candidate_path = os.path.realpath(filepath)
1✔
71
        try:
1✔
72
            is_in_output_dir = (
1✔
73
                os.path.commonpath([candidate_path, output_dir]) == output_dir
74
            )
NEW
75
        except ValueError as err:
×
NEW
76
            raise ValueError(_("Unsafe path for encoding output: %s") % filepath) from err
×
77

78
        if not is_in_output_dir:
1✔
79
            raise ValueError(_("Unsafe path for encoding output: %s") % filepath)
1✔
80
        return candidate_path
1✔
81

82
    def remove_old_data(self) -> None:
1✔
83
        """Remove data from previous encoding."""
84
        video_to_encode = Video.objects.get(id=self.id)
1✔
85
        video_to_encode.thumbnail = None
1✔
86
        if video_to_encode.overview:
1✔
87
            image_overview = os.path.join(
×
88
                os.path.dirname(video_to_encode.overview.path), "overview.png"
89
            )
90
            if os.path.isfile(image_overview):
×
91
                os.remove(image_overview)
×
92
            video_to_encode.overview.delete()
×
93
        video_to_encode.overview = None
1✔
94
        video_to_encode.save()
1✔
95

96
        encoding_log_msg = ""
1✔
97
        encoding_log_msg += self.remove_previous_encoding_video(video_to_encode)
1✔
98
        encoding_log_msg += self.remove_previous_encoding_audio(video_to_encode)
1✔
99
        encoding_log_msg += self.remove_previous_encoding_playlist(video_to_encode)
1✔
100
        encoding_log_msg += self.remove_previous_encoding_log(video_to_encode)
1✔
101
        self.add_encoding_log("remove_old_data", "", True, encoding_log_msg)
1✔
102

103
    def remove_previous_encoding_log(self, video_to_encode):
1✔
104
        """Remove previous logs."""
105
        msg = "\n"
1✔
106
        log_json = self.get_output_dir() + "/info_video.json"
1✔
107
        if os.path.exists(log_json):
1✔
108
            os.remove(log_json)
1✔
109
            msg += "\nDELETE PREVIOUS ENCODING LOG"
1✔
110
        else:
111
            msg += "Audio: Nothing to delete"
1✔
112
        return msg
1✔
113

114
    def remove_previous_encoding_objects(self, model_class, video_to_encode) -> str:
1✔
115
        """Remove previously encoded objects of the given model."""
116
        msg = "\n"
1✔
117
        object_type = model_class.__name__
1✔
118
        object_type = re.sub(r"([A-Z])", r" \1", object_type).upper()
1✔
119
        previous_encoding_video = model_class.objects.filter(video=video_to_encode)
1✔
120
        if len(previous_encoding_video) > 0:
1✔
121
            msg += "DELETE PREVIOUS{}".format(object_type)
1✔
122
            # previous_encoding.delete()
123
            for encoding in previous_encoding_video:
1✔
124
                encoding.delete()
1✔
125
        else:
126
            msg += "Video: Nothing to delete"
1✔
127
        return msg
1✔
128

129
    def remove_previous_encoding_video(self, video_to_encode) -> str:
1✔
130
        """Remove previously encoded video."""
131
        return self.remove_previous_encoding_objects(EncodingVideo, video_to_encode)
1✔
132

133
    def remove_previous_encoding_audio(self, video_to_encode) -> str:
1✔
134
        """Remove previously encoded audio."""
135
        return self.remove_previous_encoding_objects(EncodingAudio, video_to_encode)
1✔
136

137
    def remove_previous_encoding_playlist(self, video_to_encode) -> str:
1✔
138
        """Remove previously encoded playlist."""
139
        return self.remove_previous_encoding_objects(PlaylistVideo, video_to_encode)
1✔
140

141
    def get_true_path(self, original):
1✔
142
        """Get the true path by replacing the MEDIA_ROOT from the original path."""
143
        return original.replace(os.path.join(settings.MEDIA_ROOT, ""), "")
1✔
144

145
    def store_json_list_mp3_m4a_files(self, info_video, video_to_encode) -> None:
1✔
146
        """Store JSON list of MP3 and M4A files for encoding."""
147
        encoding_list = ["list_m4a_files", "list_mp3_files"]
1✔
148
        for encode_item in encoding_list:
1✔
149
            mp3_files = info_video[encode_item]
1✔
150
            for audio_file in mp3_files:
1✔
151
                if not check_file(mp3_files[audio_file]):
1✔
152
                    continue
×
153
                encoding, created = EncodingAudio.objects.get_or_create(
1✔
154
                    name="audio",
155
                    video=video_to_encode,
156
                    encoding_format=(
157
                        "audio/mp3" if (encode_item == "list_mp3_files") else "video/mp4"
158
                    ),
159
                    # need to double check path
160
                    source_file=self.get_true_path(mp3_files[audio_file]),
161
                )
162

163
    def store_json_list_mp4_hls_files(self, info_video, video_to_encode) -> None:
1✔
164
        mp4_files = info_video["list_mp4_files"]
1✔
165
        for video_file in mp4_files:
1✔
166
            if not check_file(mp4_files[video_file]):
1✔
167
                continue
×
168
            rendition = VideoRendition.objects.get(resolution__contains="x" + video_file)
1✔
169
            encod_name = video_file + "p"
1✔
170
            encoding, created = EncodingVideo.objects.get_or_create(
1✔
171
                name=encod_name,
172
                video=video_to_encode,
173
                rendition=rendition,
174
                encoding_format="video/mp4",
175
                source_file=self.get_true_path(mp4_files[video_file]),
176
            )
177

178
        hls_files = info_video["list_hls_files"]
1✔
179
        for video_file in hls_files:
1✔
180
            if not check_file(hls_files[video_file]):
1✔
181
                continue
×
182
            rendition = VideoRendition.objects.get(resolution__contains="x" + video_file)
1✔
183
            encod_name = video_file + "p"
1✔
184
            encoding, created = PlaylistVideo.objects.get_or_create(
1✔
185
                name=encod_name,
186
                video=video_to_encode,
187
                encoding_format="application/x-mpegURL",
188
                source_file=self.get_true_path(hls_files[video_file]),
189
            )
190
            ts_file = hls_files[video_file].replace(".m3u8", ".ts")
1✔
191
            if check_file(ts_file):
1✔
192
                encoding, created = EncodingVideo.objects.get_or_create(
1✔
193
                    name=encod_name,
194
                    video=video_to_encode,
195
                    rendition=rendition,
196
                    encoding_format="video/mp2t",
197
                    source_file=self.get_true_path(ts_file),
198
                )
199

200
        if check_file(os.path.join(self.get_output_dir(), "livestream.m3u8")):
1✔
201
            playlist_file = self.get_true_path(
1✔
202
                os.path.join(self.get_output_dir(), "livestream.m3u8")
203
            )
204
            encoding, created = PlaylistVideo.objects.get_or_create(
1✔
205
                name="playlist",
206
                video=video_to_encode,
207
                encoding_format="application/x-mpegURL",
208
                source_file=playlist_file,
209
            )
210

211
    def store_json_encoding_log(self, info_video, video_to_encode) -> None:
1✔
212
        # Need to modify start and stop
213
        log_to_text = ""
1✔
214
        # logs = info_video["encoding_log"]
215
        log_to_text += "Start: %s " % str(self.start)
1✔
216
        """
1✔
217
        for log in logs:
218
            log_to_text = log_to_text + "[" + log + "]\n\n"
219
            logdetails = logs[log]
220
            for logcate in logdetails:
221
                log_to_text = (
222
                    log_to_text
223
                    + "- "
224
                    + logcate
225
                    + " : \n"
226
                    + str(logdetails[logcate])
227
                    + "\n"
228
                )
229
        """
230
        # add path to log file to easily open it
231
        log_to_text += "\nLog File: \n"
1✔
232
        log_to_text += self.get_output_dir() + "/info_video.json"
1✔
233
        log_to_text += "\nEnd: %s" % str(self.stop)
1✔
234

235
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
1✔
236
        encoding_log.log = log_to_text
1✔
237
        encoding_log.logfile = self.get_true_path(
1✔
238
            self.get_output_dir() + "/info_video.json"
239
        )
240
        encoding_log.save()
1✔
241

242
    def store_json_list_subtitle_files(self, info_video, video_to_encode) -> None:
1✔
243
        list_subtitle_files = info_video["list_subtitle_files"]
1✔
244
        if __FILEPICKER__:
1✔
245
            videodir = video_to_encode.get_or_create_video_folder()
1✔
246

247
        for sub in list_subtitle_files:
1✔
248
            if not check_file(list_subtitle_files[sub][1]):
×
249
                continue
×
250
            if __FILEPICKER__:
×
251
                podfile, created = CustomFileModel.objects.get_or_create(
×
252
                    file=self.get_true_path(list_subtitle_files[sub][1]),
253
                    name=list_subtitle_files[sub][1],
254
                    description="A subtitle file",
255
                    created_by=video_to_encode.owner,
256
                    folder=videodir,
257
                )
258
            else:
259
                podfile = CustomFileModel()
×
260
                podfile.file = self.get_true_path(list_subtitle_files[sub][1])
×
261

262
            logger.debug("subtitle lang: %s " % list_subtitle_files[sub][0])
×
263

264
            sub_lang = list_subtitle_files[sub][0]
×
265
            track_lang = (
×
266
                sub_lang[:2]
267
                if (__LANG_CHOICES_DICT__.get(sub_lang[:2]))
268
                else DEFAULT_LANG_TRACK
269
            )
270

271
            Track.objects.get_or_create(
×
272
                video=video_to_encode,
273
                kind="subtitles",
274
                lang=track_lang,
275
                src=podfile,
276
                enrich_ready=True,
277
            )
278

279
    def store_json_list_thumbnail_files(self, info_video) -> Video:
1✔
280
        """store_json_list_thumbnail_files."""
281
        video = Video.objects.get(id=self.id)
1✔
282
        list_thumbnail_files = info_video["list_thumbnail_files"]
1✔
283
        thumbnail = CustomImageModel()
1✔
284
        if __FILEPICKER__:
1✔
285
            videodir = video.get_or_create_video_folder()
1✔
286
            thumbnail = CustomImageModel(folder=videodir, created_by=video.owner)
1✔
287
        for index, thumbnail_path in enumerate(list_thumbnail_files):
1✔
288
            if check_file(list_thumbnail_files[thumbnail_path]):
1✔
289
                thumbnail.file.save(
1✔
290
                    "%s_%s.png" % (video.slug, thumbnail_path),
291
                    File(open(list_thumbnail_files[thumbnail_path], "rb")),
292
                    save=True,
293
                )
294
                thumbnail.save()
1✔
295
                if index == 1 and thumbnail.id:
1✔
296
                    video.thumbnail = thumbnail
1✔
297
                    video.save()
1✔
298
                # rm temp location
299
                os.remove(list_thumbnail_files[thumbnail_path])
1✔
300
        return video
1✔
301

302
    def store_json_list_overview_files(self, info_video) -> Video:
1✔
303
        list_overview_files = info_video["list_overview_files"]
1✔
304
        video = Video.objects.get(id=self.id)
1✔
305
        if len(list_overview_files) > 0:
1✔
306
            vtt_file = (
1✔
307
                list_overview_files["0"]
308
                if ".vtt" in list_overview_files["0"]
309
                else list_overview_files["1"]
310
            )
311
            video.overview = self.get_true_path(vtt_file)
1✔
312
            video.save()
1✔
313
        return video
1✔
314

315
    def wait_for_file(self, filepath) -> None:
1✔
NEW
316
        safe_filepath = self._safe_output_path(filepath)
×
NEW
317
        output_dir = os.path.realpath(self.get_output_dir())
×
NEW
318
        if os.path.commonpath([safe_filepath, output_dir]) != output_dir:
×
NEW
319
            raise ValueError(_("Unsafe path for encoding output: %s") % filepath)
×
320
        time_to_wait = 40
×
321
        time_counter = 0
×
NEW
322
        logger.info("wait_for_file: %s" % safe_filepath)
×
NEW
323
        while not os.path.exists(safe_filepath):
×
324
            time.sleep(1)
×
325
            time_counter += 1
×
326
            print(".", end="")
×
327
            if time_counter > time_to_wait:
×
328
                break
×
329

330
    def store_json_info(self) -> Video:
1✔
331
        """Open json file and store its data in current instance."""
332
        infovideojsonfilepath = self._safe_output_path(
1✔
333
            os.path.join(self.get_output_dir(), "info_video.json")
334
        )
335
        output_dir = os.path.realpath(self.get_output_dir())
1✔
336
        if os.path.commonpath([infovideojsonfilepath, output_dir]) != output_dir:
1✔
NEW
337
            raise ValueError(
×
338
                _("Unsafe path for encoding output: %s") % infovideojsonfilepath
339
            )
340
        if not check_file(infovideojsonfilepath):
1✔
341
            self.wait_for_file(infovideojsonfilepath)
×
342

343
        with open(infovideojsonfilepath, "r") as json_file:
1✔
344
            info_video = json.load(json_file)
1✔
345
            video_to_encode = Video.objects.get(id=self.id)
1✔
346
            video_to_encode.duration = info_video["duration"]
1✔
347
            video_to_encode.save()
1✔
348

349
            self.store_json_list_mp3_m4a_files(info_video, video_to_encode)
1✔
350
            self.store_json_list_mp4_hls_files(info_video, video_to_encode)
1✔
351
            self.store_json_encoding_log(info_video, video_to_encode)
1✔
352
            self.store_json_list_subtitle_files(info_video, video_to_encode)
1✔
353
            # update and create new video to be sur that thumbnail and overview be present
354
            self.store_json_list_thumbnail_files(info_video)
1✔
355
            video = self.store_json_list_overview_files(info_video)
1✔
356

357
            return video
1✔
358

359
    def get_create_thumbnail_command_from_video(self, video_to_encode):
1✔
360
        """Create command line to generate thumbnails from video."""
361
        thumbnail_command = "%s " % FFMPEG_CMD
×
362
        ev = EncodingVideo.objects.filter(
×
363
            video=video_to_encode, encoding_format="video/mp4"
364
        )
365
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
366
        if not created:
×
367
            encoding_log.log = ""
×
368
        encoding_log.log += "\n----------------------------------------"
×
369
        if ev.count() == 0:
×
370
            encoding_log.log += "\nget_create_thumbnail_command_from_video"
×
371
            encoding_log.log += "\nNO MP4 FILES FOUND!"
×
372
            return ""
×
373
        video_mp4 = sorted(ev, key=lambda m: m.height)[0]
×
374
        input_file = video_mp4.source_file.path
×
375
        thumbnail_command += FFMPEG_INPUT % {
×
376
            "input": input_file,
377
            "nb_threads": FFMPEG_NB_THREADS,
378
        }
379
        output_file = os.path.join(self.output_dir, "thumbnail")
×
380
        thumbnail_command += FFMPEG_CREATE_THUMBNAIL % {
×
381
            "duration": self.duration,
382
            "nb_thumbnail": FFMPEG_NB_THUMBNAIL,
383
            "output": output_file,
384
        }
385
        for nb in range(0, FFMPEG_NB_THUMBNAIL):
×
386
            num_thumb = str(nb + 1)
×
387
            self.list_thumbnail_files[num_thumb] = "%s_000%s.png" % (
×
388
                output_file,
389
                num_thumb,
390
            )
391
        encoding_log.log += "\n %s" % thumbnail_command
×
392
        encoding_log.save()
×
393
        return thumbnail_command
×
394

395
    def recreate_thumbnail(self) -> None:
1✔
396
        self.create_output_dir()
×
397
        self.get_video_data()
×
398
        info_video = {}
×
399
        for attribute, value in self.__dict__.items():
×
400
            info_video[attribute] = value
×
401
        video_to_encode = Video.objects.get(id=self.id)
×
402
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
403
        if len(self.list_image_track) > 0:
×
404
            thumbnail_command = self.get_extract_thumbnail_command()
×
405
            return_value, return_msg = launch_cmd(thumbnail_command)
×
406
            if not created:
×
407
                encoding_log.log = ""
×
408
            encoding_log.log += "\n----------------------------------------"
×
409
            encoding_log.log += "\n extract_thumbnail_command"
×
410
            encoding_log.log += "\n %s" % thumbnail_command
×
411
            encoding_log.log += "\n %s" % return_value
×
412
            encoding_log.log += "\n %s" % return_msg
×
413
        elif self.is_video():
×
414
            thumbnail_command = self.get_create_thumbnail_command_from_video(
×
415
                video_to_encode
416
            )
417
            if thumbnail_command:
×
418
                return_value, return_msg = launch_cmd(thumbnail_command)
×
419
                encoding_log.log += "\n----------------------------------------"
×
420
                encoding_log.log += "\n create_thumbnail_command"
×
421
                encoding_log.log += "\n %s" % thumbnail_command
×
422
                encoding_log.log += "\n %s" % return_value
×
423
                encoding_log.log += "\n %s" % return_msg
×
424
        encoding_log.save()
×
425
        if len(self.list_thumbnail_files) > 0:
×
426
            info_video["list_thumbnail_files"] = self.list_thumbnail_files
×
427
            self.store_json_list_thumbnail_files(info_video)
×
428

429
    def encode_video(self) -> None:
1✔
430
        """Start video encoding."""
431
        self.start_encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc