• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 13964928276

20 Mar 2025 08:26AM UTC coverage: 70.106%. First build
13964928276

Pull #1280

github

web-flow
Merge a06fb2670 into 749de494a
Pull Request #1280: Replace credit_videofile by credit_video_dir

83 of 105 new or added lines in 12 files covered. (79.05%)

11984 of 17094 relevant lines covered (70.11%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.67
/pod/video_encode_transcript/Encoding_video_model.py
1
"""Model for video encoding."""
2

3
import logging
1✔
4
import os
1✔
5
import re
1✔
6
from django.conf import settings
1✔
7
from .models import EncodingVideo
1✔
8
from .models import EncodingAudio
1✔
9
from .models import VideoRendition
1✔
10
from .models import PlaylistVideo
1✔
11
from .models import EncodingLog
1✔
12
from pod.video.models import Video
1✔
13
from pod.completion.models import Track
1✔
14
from django.core.files import File
1✔
15
from .Encoding_video import (
1✔
16
    Encoding_video,
17
    FFMPEG_NB_THUMBNAIL,
18
    FFMPEG_CREATE_THUMBNAIL,
19
    FFMPEG_CMD,
20
    FFMPEG_INPUT,
21
    FFMPEG_NB_THREADS,
22
)
23
from pod.video.models import LANG_CHOICES
1✔
24
import json
1✔
25
import time
1✔
26
from .encoding_utils import (
1✔
27
    launch_cmd,
28
    check_file,
29
)
30

31
DEBUG = getattr(settings, "DEBUG", True)
1✔
32
logger = logging.getLogger(__name__)
1✔
33
if DEBUG:
1✔
34
    logger.setLevel(logging.DEBUG)
1✔
35

36
ENCODING_CHOICES = getattr(
1✔
37
    settings,
38
    "ENCODING_CHOICES",
39
    (
40
        ("audio", "audio"),
41
        ("360p", "360p"),
42
        ("480p", "480p"),
43
        ("720p", "720p"),
44
        ("1080p", "1080p"),
45
        ("playlist", "playlist"),
46
    ),
47
)
48

49
__LANG_CHOICES_DICT__ = {
1✔
50
    key: value for key, value in LANG_CHOICES[0][1] + LANG_CHOICES[1][1]
51
}
52
DEFAULT_LANG_TRACK = getattr(settings, "DEFAULT_LANG_TRACK", "fr")
1✔
53

54
if getattr(settings, "USE_PODFILE", False):
1✔
55
    __FILEPICKER__ = True
1✔
56
    from pod.podfile.models import CustomImageModel
1✔
57
    from pod.podfile.models import CustomFileModel
1✔
58
else:
59
    __FILEPICKER__ = False
×
60
    from pod.main.models import CustomImageModel
×
61
    from pod.main.models import CustomFileModel
×
62

63

64
class Encoding_video_model(Encoding_video):
1✔
65
    """Encoding video model."""
66

67
    def remove_old_data(self) -> None:
1✔
68
        """Remove data from previous encoding."""
69
        video_to_encode = Video.objects.get(id=self.id)
1✔
70
        video_to_encode.thumbnail = None
1✔
71
        if video_to_encode.overview:
1✔
72
            image_overview = os.path.join(
1✔
73
                os.path.dirname(video_to_encode.overview.path), "overview.png"
74
            )
75
            if os.path.isfile(image_overview):
1✔
76
                os.remove(image_overview)
1✔
77
            video_to_encode.overview.delete()
1✔
78
        video_to_encode.overview = None
1✔
79
        video_to_encode.save()
1✔
80

81
        encoding_log_msg = ""
1✔
82
        encoding_log_msg += self.remove_previous_encoding_video(video_to_encode)
1✔
83
        encoding_log_msg += self.remove_previous_encoding_audio(video_to_encode)
1✔
84
        encoding_log_msg += self.remove_previous_encoding_playlist(video_to_encode)
1✔
85
        encoding_log_msg += self.remove_previous_encoding_log(video_to_encode)
1✔
86
        self.add_encoding_log("remove_old_data", "", True, encoding_log_msg)
1✔
87

88
    def remove_previous_encoding_log(self, video_to_encode):
1✔
89
        """Remove previous logs."""
90
        msg = "\n"
1✔
91
        log_json = self.get_output_dir() + "/info_video.json"
1✔
92
        if os.path.exists(log_json):
1✔
93
            os.remove(log_json)
1✔
94
            msg += "\nDELETE PREVIOUS ENCODING LOG"
1✔
95
        else:
96
            msg += "Audio: Nothing to delete"
1✔
97
        return msg
1✔
98

99
    def remove_previous_encoding_objects(self, model_class, video_to_encode) -> str:
1✔
100
        """Remove previously encoded objects of the given model."""
101
        msg = "\n"
1✔
102
        object_type = model_class.__name__
1✔
103
        object_type = re.sub(r"([A-Z])", r" \1", object_type).upper()
1✔
104
        previous_encoding_video = model_class.objects.filter(video=video_to_encode)
1✔
105
        if len(previous_encoding_video) > 0:
1✔
106
            msg += "DELETE PREVIOUS{}".format(object_type)
1✔
107
            # previous_encoding.delete()
108
            for encoding in previous_encoding_video:
1✔
109
                encoding.delete()
1✔
110
        else:
111
            msg += "Video: Nothing to delete"
1✔
112
        return msg
1✔
113

114
    def remove_previous_encoding_video(self, video_to_encode) -> str:
1✔
115
        """Remove previously encoded video."""
116
        return self.remove_previous_encoding_objects(EncodingVideo, video_to_encode)
1✔
117

118
    def remove_previous_encoding_audio(self, video_to_encode) -> str:
1✔
119
        """Remove previously encoded audio."""
120
        return self.remove_previous_encoding_objects(EncodingAudio, video_to_encode)
1✔
121

122
    def remove_previous_encoding_playlist(self, video_to_encode) -> str:
1✔
123
        """Remove previously encoded playlist."""
124
        return self.remove_previous_encoding_objects(PlaylistVideo, video_to_encode)
1✔
125

126
    def get_true_path(self, original):
1✔
127
        """Get the true path by replacing the MEDIA_ROOT from the original path."""
128
        return original.replace(os.path.join(settings.MEDIA_ROOT, ""), "")
1✔
129

130
    def store_json_list_mp3_m4a_files(self, info_video, video_to_encode) -> None:
1✔
131
        """Store JSON list of MP3 and M4A files for encoding."""
132
        encoding_list = ["list_m4a_files", "list_mp3_files"]
1✔
133
        for encode_item in encoding_list:
1✔
134
            mp3_files = info_video[encode_item]
1✔
135
            for audio_file in mp3_files:
1✔
136
                if not check_file(mp3_files[audio_file]):
1✔
137
                    continue
×
138
                encoding, created = EncodingAudio.objects.get_or_create(
1✔
139
                    name="audio",
140
                    video=video_to_encode,
141
                    encoding_format=(
142
                        "audio/mp3" if (encode_item == "list_mp3_files") else "video/mp4"
143
                    ),
144
                    # need to double check path
145
                    source_file=self.get_true_path(mp3_files[audio_file]),
146
                )
147

148
    def store_json_list_mp4_hls_files(self, info_video, video_to_encode) -> None:
1✔
149
        mp4_files = info_video["list_mp4_files"]
1✔
150
        for video_file in mp4_files:
1✔
151
            if not check_file(mp4_files[video_file]):
1✔
152
                continue
×
153
            rendition = VideoRendition.objects.get(resolution__contains="x" + video_file)
1✔
154
            encod_name = video_file + "p"
1✔
155
            encoding, created = EncodingVideo.objects.get_or_create(
1✔
156
                name=encod_name,
157
                video=video_to_encode,
158
                rendition=rendition,
159
                encoding_format="video/mp4",
160
                source_file=self.get_true_path(mp4_files[video_file]),
161
            )
162

163
        hls_files = info_video["list_hls_files"]
1✔
164
        for video_file in hls_files:
1✔
165
            if not check_file(hls_files[video_file]):
1✔
166
                continue
×
167
            rendition = VideoRendition.objects.get(resolution__contains="x" + video_file)
1✔
168
            encod_name = video_file + "p"
1✔
169
            encoding, created = PlaylistVideo.objects.get_or_create(
1✔
170
                name=encod_name,
171
                video=video_to_encode,
172
                encoding_format="application/x-mpegURL",
173
                source_file=self.get_true_path(hls_files[video_file]),
174
            )
175
            ts_file = hls_files[video_file].replace(".m3u8", ".ts")
1✔
176
            if check_file(ts_file):
1✔
177
                encoding, created = EncodingVideo.objects.get_or_create(
1✔
178
                    name=encod_name,
179
                    video=video_to_encode,
180
                    rendition=rendition,
181
                    encoding_format="video/mp2t",
182
                    source_file=self.get_true_path(ts_file),
183
                )
184

185
        if check_file(os.path.join(self.get_output_dir(), "livestream.m3u8")):
1✔
186
            playlist_file = self.get_true_path(
1✔
187
                os.path.join(self.get_output_dir(), "livestream.m3u8")
188
            )
189
            encoding, created = PlaylistVideo.objects.get_or_create(
1✔
190
                name="playlist",
191
                video=video_to_encode,
192
                encoding_format="application/x-mpegURL",
193
                source_file=playlist_file,
194
            )
195

196
    def store_json_encoding_log(self, info_video, video_to_encode) -> None:
1✔
197
        # Need to modify start and stop
198
        log_to_text = ""
1✔
199
        # logs = info_video["encoding_log"]
200
        log_to_text += "Start: %s " % str(self.start)
1✔
201
        """
202
        for log in logs:
203
            log_to_text = log_to_text + "[" + log + "]\n\n"
204
            logdetails = logs[log]
205
            for logcate in logdetails:
206
                log_to_text = (
207
                    log_to_text
208
                    + "- "
209
                    + logcate
210
                    + " : \n"
211
                    + str(logdetails[logcate])
212
                    + "\n"
213
                )
214
        """
215
        # add path to log file to easily open it
216
        log_to_text += "\nLog File: \n"
1✔
217
        log_to_text += self.get_output_dir() + "/info_video.json"
1✔
218
        log_to_text += "\nEnd: %s" % str(self.stop)
1✔
219

220
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
1✔
221
        encoding_log.log = log_to_text
1✔
222
        encoding_log.logfile = self.get_true_path(
1✔
223
            self.get_output_dir() + "/info_video.json"
224
        )
225
        encoding_log.save()
1✔
226

227
    def store_json_list_subtitle_files(self, info_video, video_to_encode) -> None:
1✔
228
        list_subtitle_files = info_video["list_subtitle_files"]
1✔
229
        if __FILEPICKER__:
1✔
230
            videodir = video_to_encode.get_or_create_video_folder()
1✔
231

232
        for sub in list_subtitle_files:
1✔
233
            if not check_file(list_subtitle_files[sub][1]):
×
234
                continue
×
235
            if __FILEPICKER__:
×
236
                podfile, created = CustomFileModel.objects.get_or_create(
×
237
                    file=self.get_true_path(list_subtitle_files[sub][1]),
238
                    name=list_subtitle_files[sub][1],
239
                    description="A subtitle file",
240
                    created_by=video_to_encode.owner,
241
                    folder=videodir,
242
                )
243
            else:
244
                podfile = CustomFileModel()
×
245
                podfile.file = self.get_true_path(list_subtitle_files[sub][1])
×
246

NEW
247
            logger.debug("subtitle lang: %s " % list_subtitle_files[sub][0])
×
248

249
            sub_lang = list_subtitle_files[sub][0]
×
250
            track_lang = (
×
251
                sub_lang[:2]
252
                if (__LANG_CHOICES_DICT__.get(sub_lang[:2]))
253
                else DEFAULT_LANG_TRACK
254
            )
255

256
            Track.objects.get_or_create(
×
257
                video=video_to_encode,
258
                kind="subtitles",
259
                lang=track_lang,
260
                src=podfile,
261
                enrich_ready=True,
262
            )
263

264
    def store_json_list_thumbnail_files(self, info_video) -> Video:
1✔
265
        """store_json_list_thumbnail_files."""
266
        video = Video.objects.get(id=self.id)
1✔
267
        list_thumbnail_files = info_video["list_thumbnail_files"]
1✔
268
        thumbnail = CustomImageModel()
1✔
269
        if __FILEPICKER__:
1✔
270
            videodir = video.get_or_create_video_folder()
1✔
271
            thumbnail = CustomImageModel(folder=videodir, created_by=video.owner)
1✔
272
        for index, thumbnail_path in enumerate(list_thumbnail_files):
1✔
273
            if check_file(list_thumbnail_files[thumbnail_path]):
1✔
274
                thumbnail.file.save(
1✔
275
                    "%s_%s.png" % (video.slug, thumbnail_path),
276
                    File(open(list_thumbnail_files[thumbnail_path], "rb")),
277
                    save=True,
278
                )
279
                thumbnail.save()
1✔
280
                if index == 1 and thumbnail.id:
1✔
281
                    video.thumbnail = thumbnail
1✔
282
                    video.save()
1✔
283
                # rm temp location
284
                os.remove(list_thumbnail_files[thumbnail_path])
1✔
285
        return video
1✔
286

287
    def store_json_list_overview_files(self, info_video) -> Video:
1✔
288
        list_overview_files = info_video["list_overview_files"]
1✔
289
        video = Video.objects.get(id=self.id)
1✔
290
        if len(list_overview_files) > 0:
1✔
291
            vtt_file = (
1✔
292
                list_overview_files["0"]
293
                if ".vtt" in list_overview_files["0"]
294
                else list_overview_files["1"]
295
            )
296
            video.overview = self.get_true_path(vtt_file)
1✔
297
            video.save()
1✔
298
        return video
1✔
299

300
    def wait_for_file(self, filepath) -> None:
1✔
301
        time_to_wait = 40
×
302
        time_counter = 0
×
NEW
303
        logger.info("wait_for_file: %s" % filepath)
×
304
        while not os.path.exists(filepath):
×
305
            time.sleep(1)
×
306
            time_counter += 1
×
NEW
307
            print(".", end="")
×
308
            if time_counter > time_to_wait:
×
309
                break
×
310

311
    def store_json_info(self) -> Video:
1✔
312
        """Open json file and store its data in current instance."""
313
        infovideojsonfilepath = os.path.join(self.get_output_dir(), "info_video.json")
1✔
314
        if not check_file(infovideojsonfilepath):
1✔
315
            self.wait_for_file(infovideojsonfilepath)
×
316

317
        with open(infovideojsonfilepath, "r") as json_file:
1✔
318
            info_video = json.load(json_file)
1✔
319
            video_to_encode = Video.objects.get(id=self.id)
1✔
320
            video_to_encode.duration = info_video["duration"]
1✔
321
            video_to_encode.save()
1✔
322

323
            self.store_json_list_mp3_m4a_files(info_video, video_to_encode)
1✔
324
            self.store_json_list_mp4_hls_files(info_video, video_to_encode)
1✔
325
            self.store_json_encoding_log(info_video, video_to_encode)
1✔
326
            self.store_json_list_subtitle_files(info_video, video_to_encode)
1✔
327
            # update and create new video to be sur that thumbnail and overview be present
328
            self.store_json_list_thumbnail_files(info_video)
1✔
329
            video = self.store_json_list_overview_files(info_video)
1✔
330

331
            return video
1✔
332

333
    def get_create_thumbnail_command_from_video(self, video_to_encode):
1✔
334
        """Create command line to generate thumbnails from video."""
335
        thumbnail_command = "%s " % FFMPEG_CMD
×
336
        ev = EncodingVideo.objects.filter(
×
337
            video=video_to_encode, encoding_format="video/mp4"
338
        )
339
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
340
        if not created:
×
341
            encoding_log.log = ""
×
342
        encoding_log.log += "\n----------------------------------------"
×
343
        if ev.count() == 0:
×
344
            encoding_log.log += "\nget_create_thumbnail_command_from_video"
×
345
            encoding_log.log += "\nNO MP4 FILES FOUND!"
×
346
            return ""
×
347
        video_mp4 = sorted(ev, key=lambda m: m.height)[0]
×
348
        input_file = video_mp4.source_file.path
×
349
        thumbnail_command += FFMPEG_INPUT % {
×
350
            "input": input_file,
351
            "nb_threads": FFMPEG_NB_THREADS,
352
        }
353
        output_file = os.path.join(self.output_dir, "thumbnail")
×
354
        thumbnail_command += FFMPEG_CREATE_THUMBNAIL % {
×
355
            "duration": self.duration,
356
            "nb_thumbnail": FFMPEG_NB_THUMBNAIL,
357
            "output": output_file,
358
        }
359
        for nb in range(0, FFMPEG_NB_THUMBNAIL):
×
360
            num_thumb = str(nb + 1)
×
361
            self.list_thumbnail_files[num_thumb] = "%s_000%s.png" % (
×
362
                output_file,
363
                num_thumb,
364
            )
365
        encoding_log.log += "\n %s" % thumbnail_command
×
366
        encoding_log.save()
×
367
        return thumbnail_command
×
368

369
    def recreate_thumbnail(self) -> None:
1✔
370
        self.create_output_dir()
×
371
        self.get_video_data()
×
372
        info_video = {}
×
373
        for attribute, value in self.__dict__.items():
×
374
            info_video[attribute] = value
×
375
        video_to_encode = Video.objects.get(id=self.id)
×
376
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
377
        if len(self.list_image_track) > 0:
×
378
            thumbnail_command = self.get_extract_thumbnail_command()
×
379
            return_value, return_msg = launch_cmd(thumbnail_command)
×
380
            if not created:
×
381
                encoding_log.log = ""
×
382
            encoding_log.log += "\n----------------------------------------"
×
383
            encoding_log.log += "\n extract_thumbnail_command"
×
384
            encoding_log.log += "\n %s" % thumbnail_command
×
385
            encoding_log.log += "\n %s" % return_value
×
386
            encoding_log.log += "\n %s" % return_msg
×
387
        elif self.is_video():
×
388
            thumbnail_command = self.get_create_thumbnail_command_from_video(
×
389
                video_to_encode
390
            )
391
            if thumbnail_command:
×
392
                return_value, return_msg = launch_cmd(thumbnail_command)
×
393
                encoding_log.log += "\n----------------------------------------"
×
394
                encoding_log.log += "\n create_thumbnail_command"
×
395
                encoding_log.log += "\n %s" % thumbnail_command
×
396
                encoding_log.log += "\n %s" % return_value
×
397
                encoding_log.log += "\n %s" % return_msg
×
398
        encoding_log.save()
×
399
        if len(self.list_thumbnail_files) > 0:
×
400
            info_video["list_thumbnail_files"] = self.list_thumbnail_files
×
401
            self.store_json_list_thumbnail_files(info_video)
×
402

403
    def encode_video(self) -> None:
1✔
404
        """Start video encoding."""
405
        self.start_encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc