• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 5322792349

pending completion
5322792349

push

github

web-flow
Merge pull request #863 from EsupPortail/develop

#3.3.0

- Import external video from url, youtube, peertube and BigBlueButton
- Change xapi actor to deal with Moodle
- Update template to BS5.3 and improve compliance for W3C
- Use redis to cache session and improve logging
- refactor of encoding/transcripting to move it in separate application
- Fixbug on categories, recorder, user liste, tags cloud

1484 of 1484 new or added lines in 51 files covered. (100.0%)

9011 of 12517 relevant lines covered (71.99%)

0.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.28
/pod/video_encode_transcript/Encoding_video_model.py
1
"""Model for video encoding."""
2
import os
1✔
3
import re
1✔
4
from django.conf import settings
1✔
5
from .models import EncodingVideo
1✔
6
from .models import EncodingAudio
1✔
7
from .models import VideoRendition
1✔
8
from .models import PlaylistVideo
1✔
9
from .models import EncodingLog
1✔
10
from pod.video.models import Video
1✔
11
from pod.completion.models import Track
1✔
12
from django.core.files import File
1✔
13
from .Encoding_video import (
1✔
14
    Encoding_video,
15
    FFMPEG_NB_THUMBNAIL,
16
    FFMPEG_CREATE_THUMBNAIL,
17
    FFMPEG_CMD,
18
    FFMPEG_INPUT,
19
    FFMPEG_NB_THREADS,
20
)
21
from pod.video.models import LANG_CHOICES
1✔
22
import json
1✔
23

24
from .encoding_utils import (
1✔
25
    launch_cmd,
26
    check_file,
27
)
28

29
ENCODING_CHOICES = getattr(
1✔
30
    settings,
31
    "ENCODING_CHOICES",
32
    (
33
        ("audio", "audio"),
34
        ("360p", "360p"),
35
        ("480p", "480p"),
36
        ("720p", "720p"),
37
        ("1080p", "1080p"),
38
        ("playlist", "playlist"),
39
    ),
40
)
41

42
__LANG_CHOICES_DICT__ = {
1✔
43
    key: value for key, value in LANG_CHOICES[0][1] + LANG_CHOICES[1][1]
44
}
45
DEFAULT_LANG_TRACK = getattr(settings, "DEFAULT_LANG_TRACK", "fr")
1✔
46

47
if getattr(settings, "USE_PODFILE", False):
1✔
48
    __FILEPICKER__ = True
1✔
49
    from pod.podfile.models import CustomImageModel
1✔
50
    from pod.podfile.models import UserFolder
1✔
51
    from pod.podfile.models import CustomFileModel
1✔
52
else:
53
    __FILEPICKER__ = False
×
54
    from pod.main.models import CustomImageModel
×
55
    from pod.main.models import CustomFileModel
×
56

57

58
class Encoding_video_model(Encoding_video):
1✔
59
    """Encoding video model."""
60

61
    def remove_old_data(self):
1✔
62
        """Remove data from previous encoding."""
63
        video_to_encode = Video.objects.get(id=self.id)
1✔
64
        video_to_encode.thumbnail = None
1✔
65
        if video_to_encode.overview:
1✔
66
            image_overview = os.path.join(
×
67
                os.path.dirname(video_to_encode.overview.path), "overview.png"
68
            )
69
            if os.path.isfile(image_overview):
×
70
                os.remove(image_overview)
×
71
            video_to_encode.overview.delete()
×
72
        video_to_encode.overview = None
1✔
73
        video_to_encode.save()
1✔
74

75
        encoding_log_msg = ""
1✔
76
        encoding_log_msg += self.remove_previous_encoding_video(video_to_encode)
1✔
77
        encoding_log_msg += self.remove_previous_encoding_audio(video_to_encode)
1✔
78
        encoding_log_msg += self.remove_previous_encoding_playlist(video_to_encode)
1✔
79
        encoding_log_msg += self.remove_previous_encoding_log(video_to_encode)
1✔
80
        self.add_encoding_log("remove_old_data", "", True, encoding_log_msg)
1✔
81

82
    def remove_previous_encoding_log(self, video_to_encode):
1✔
83
        """Remove previous logs."""
84
        msg = "\n"
1✔
85
        log_json = self.get_output_dir() + "/info_video.json"
1✔
86
        if os.path.exists(log_json):
1✔
87
            os.remove(log_json)
1✔
88
            msg += "\nDELETE PREVIOUS ENCODING LOG"
1✔
89
        else:
90
            msg += "Audio: Nothing to delete"
1✔
91
        return msg
1✔
92

93
    def remove_previous_encoding_objects(self, model_class, video_to_encode):
1✔
94
        """Remove previously encoded objects of the given model."""
95
        msg = "\n"
1✔
96
        object_type = model_class.__name__
1✔
97
        object_type = re.sub(r"([A-Z])", r" \1", object_type).upper()
1✔
98
        previous_encoding_video = model_class.objects.filter(video=video_to_encode)
1✔
99
        if len(previous_encoding_video) > 0:
1✔
100
            msg += "DELETE PREVIOUS{}".format(object_type)
×
101
            # previous_encoding.delete()
102
            for encoding in previous_encoding_video:
×
103
                encoding.delete()
×
104
        else:
105
            msg += "Video: Nothing to delete"
1✔
106
        return msg
1✔
107

108
    def remove_previous_encoding_video(self, video_to_encode):
1✔
109
        """Remove previously encoded video."""
110
        return self.remove_previous_encoding_objects(EncodingVideo, video_to_encode)
1✔
111

112
    def remove_previous_encoding_audio(self, video_to_encode):
1✔
113
        """Remove previously encoded audio."""
114
        return self.remove_previous_encoding_objects(EncodingAudio, video_to_encode)
1✔
115

116
    def remove_previous_encoding_playlist(self, video_to_encode):
1✔
117
        """Remove previously encoded playlist."""
118
        return self.remove_previous_encoding_objects(PlaylistVideo, video_to_encode)
1✔
119

120
    def get_true_path(self, original):
1✔
121
        """Get the true path by replacing the MEDIA_ROOT from the original path."""
122
        return original.replace(os.path.join(settings.MEDIA_ROOT, ""), "")
1✔
123

124
    def store_json_list_mp3_m4a_files(self, info_video, video_to_encode):
1✔
125
        """Store JSON list of MP3 and M4A files for encoding."""
126
        encoding_list = ["list_m4a_files", "list_mp3_files"]
1✔
127
        for encode_item in encoding_list:
1✔
128
            mp3_files = info_video[encode_item]
1✔
129
            for audio_file in mp3_files:
1✔
130
                if not check_file(mp3_files[audio_file]):
1✔
131
                    continue
×
132
                encoding, created = EncodingAudio.objects.get_or_create(
1✔
133
                    name="audio",
134
                    video=video_to_encode,
135
                    encoding_format=(
136
                        "audio/mp3" if (encode_item == "list_mp3_files") else "video/mp4"
137
                    ),
138
                    # need to double check path
139
                    source_file=self.get_true_path(mp3_files[audio_file]),
140
                )
141

142
    def store_json_list_mp4_hls_files(self, info_video, video_to_encode):
1✔
143
        mp4_files = info_video["list_mp4_files"]
1✔
144
        for video_file in mp4_files:
1✔
145
            if not check_file(mp4_files[video_file]):
1✔
146
                continue
×
147
            rendition = VideoRendition.objects.get(resolution__contains="x" + video_file)
1✔
148
            encod_name = video_file + "p"
1✔
149
            encoding, created = EncodingVideo.objects.get_or_create(
1✔
150
                name=encod_name,
151
                video=video_to_encode,
152
                rendition=rendition,
153
                encoding_format="video/mp4",
154
                source_file=self.get_true_path(mp4_files[video_file]),
155
            )
156

157
        hls_files = info_video["list_hls_files"]
1✔
158
        for video_file in hls_files:
1✔
159
            if not check_file(hls_files[video_file]):
1✔
160
                continue
×
161
            rendition = VideoRendition.objects.get(resolution__contains="x" + video_file)
1✔
162
            encod_name = video_file + "p"
1✔
163
            encoding, created = PlaylistVideo.objects.get_or_create(
1✔
164
                name=encod_name,
165
                video=video_to_encode,
166
                encoding_format="application/x-mpegURL",
167
                source_file=self.get_true_path(hls_files[video_file]),
168
            )
169
            ts_file = hls_files[video_file].replace(".m3u8", ".ts")
1✔
170
            if check_file(ts_file):
1✔
171
                encoding, created = EncodingVideo.objects.get_or_create(
1✔
172
                    name=encod_name,
173
                    video=video_to_encode,
174
                    rendition=rendition,
175
                    encoding_format="video/mp2t",
176
                    source_file=self.get_true_path(ts_file),
177
                )
178

179
        if check_file(os.path.join(self.get_output_dir(), "livestream.m3u8")):
1✔
180
            playlist_file = self.get_true_path(
1✔
181
                os.path.join(self.get_output_dir(), "livestream.m3u8")
182
            )
183
            encoding, created = PlaylistVideo.objects.get_or_create(
1✔
184
                name="playlist",
185
                video=video_to_encode,
186
                encoding_format="application/x-mpegURL",
187
                source_file=playlist_file,
188
            )
189

190
    def store_json_encoding_log(self, info_video, video_to_encode):
1✔
191
        # Need to modify start and stop
192
        log_to_text = ""
1✔
193
        # logs = info_video["encoding_log"]
194
        log_to_text = log_to_text + "Start : " + self.start
1✔
195
        """
196
        for log in logs:
197
            log_to_text = log_to_text + "[" + log + "]\n\n"
198
            logdetails = logs[log]
199
            for logcate in logdetails:
200
                log_to_text = (
201
                    log_to_text
202
                    + "- "
203
                    + logcate
204
                    + " : \n"
205
                    + str(logdetails[logcate])
206
                    + "\n"
207
                )
208
        """
209
        # add path to log file to easily open it
210
        log_to_text = log_to_text + "\nLog File : \n"
1✔
211
        log_to_text = log_to_text + self.get_output_dir() + "/info_video.json"
1✔
212
        log_to_text = log_to_text + "\nEnd : " + self.stop
1✔
213

214
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
1✔
215
        encoding_log.log = log_to_text
1✔
216
        encoding_log.logfile = self.get_true_path(
1✔
217
            self.get_output_dir() + "/info_video.json"
218
        )
219
        encoding_log.save()
1✔
220

221
    def store_json_list_subtitle_files(self, info_video, video_to_encode):
1✔
222
        list_subtitle_files = info_video["list_subtitle_files"]
1✔
223
        if __FILEPICKER__:
1✔
224
            videodir, created = UserFolder.objects.get_or_create(
1✔
225
                name="%s" % video_to_encode.slug,
226
                owner=video_to_encode.owner,
227
            )
228

229
        for sub in list_subtitle_files:
1✔
230
            if not check_file(list_subtitle_files[sub][1]):
×
231
                continue
×
232
            if __FILEPICKER__:
×
233
                podfile, created = CustomFileModel.objects.get_or_create(
×
234
                    file=self.get_true_path(list_subtitle_files[sub][1]),
235
                    name=list_subtitle_files[sub][1],
236
                    description="A subtitle file",
237
                    created_by=video_to_encode.owner,
238
                    folder=videodir,
239
                )
240
            else:
241
                podfile = CustomFileModel()
×
242
                podfile.file = self.get_true_path(list_subtitle_files[sub][1])
×
243

244
            print("subtitle lang: %s " % list_subtitle_files[sub][0])
×
245

246
            sub_lang = list_subtitle_files[sub][0]
×
247
            track_lang = (
×
248
                sub_lang[:2]
249
                if (__LANG_CHOICES_DICT__.get(sub_lang[:2]))
250
                else DEFAULT_LANG_TRACK
251
            )
252

253
            Track.objects.get_or_create(
×
254
                video=video_to_encode,
255
                kind="subtitles",
256
                lang=track_lang,
257
                src=podfile,
258
                enrich_ready=True,
259
            )
260

261
    def store_json_list_thumbnail_files(self, info_video):
1✔
262
        """store_json_list_thumbnail_files."""
263
        video = Video.objects.get(id=self.id)
1✔
264
        list_thumbnail_files = info_video["list_thumbnail_files"]
1✔
265
        thumbnail = CustomImageModel()
1✔
266
        if __FILEPICKER__:
1✔
267
            videodir, created = UserFolder.objects.get_or_create(
1✔
268
                name="%s" % video.slug,
269
                owner=video.owner,
270
            )
271
            thumbnail = CustomImageModel(folder=videodir, created_by=video.owner)
1✔
272
        for index, thumbnail_path in enumerate(list_thumbnail_files):
1✔
273
            if check_file(list_thumbnail_files[thumbnail_path]):
1✔
274
                thumbnail.file.save(
1✔
275
                    "%s_%s.png" % (video.slug, thumbnail_path),
276
                    File(open(list_thumbnail_files[thumbnail_path], "rb")),
277
                    save=True,
278
                )
279
                thumbnail.save()
1✔
280
                if index == 1 and thumbnail.id:
1✔
281
                    video.thumbnail = thumbnail
1✔
282
                    video.save()
1✔
283
                # rm temp location
284
                os.remove(list_thumbnail_files[thumbnail_path])
1✔
285
        return video
1✔
286

287
    def store_json_list_overview_files(self, info_video):
1✔
288
        list_overview_files = info_video["list_overview_files"]
1✔
289
        video = Video.objects.get(id=self.id)
1✔
290
        if len(list_overview_files) > 0:
1✔
291
            vtt_file = (
1✔
292
                list_overview_files["0"]
293
                if ".vtt" in list_overview_files["0"]
294
                else list_overview_files["1"]
295
            )
296
            video.overview = self.get_true_path(vtt_file)
1✔
297
            video.save()
1✔
298
        return video
1✔
299

300
    def store_json_info(self):
1✔
301
        """Open json file and store its data in current instance."""
302
        with open(self.get_output_dir() + "/info_video.json") as json_file:
1✔
303
            info_video = json.load(json_file)
1✔
304
            video_to_encode = Video.objects.get(id=self.id)
1✔
305
            video_to_encode.duration = info_video["duration"]
1✔
306
            video_to_encode.save()
1✔
307

308
            self.store_json_list_mp3_m4a_files(info_video, video_to_encode)
1✔
309
            self.store_json_list_mp4_hls_files(info_video, video_to_encode)
1✔
310
            self.store_json_encoding_log(info_video, video_to_encode)
1✔
311
            self.store_json_list_subtitle_files(info_video, video_to_encode)
1✔
312
            # update and create new video to be sur that thumbnail and overview be present
313
            self.store_json_list_thumbnail_files(info_video)
1✔
314
            video = self.store_json_list_overview_files(info_video)
1✔
315

316
            return video
1✔
317

318
    def get_create_thumbnail_command_from_video(self, video_to_encode):
1✔
319
        """Create command line to generate thumbnails from video."""
320
        thumbnail_command = "%s " % FFMPEG_CMD
×
321
        ev = EncodingVideo.objects.filter(
×
322
            video=video_to_encode, encoding_format="video/mp4"
323
        )
324
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
325
        if not created:
×
326
            encoding_log.log = ""
×
327
        encoding_log.log += "\n----------------------------------------"
×
328
        if ev.count() == 0:
×
329
            encoding_log.log += "\nget_create_thumbnail_command_from_video"
×
330
            encoding_log.log += "\nNO MP4 FILES FOUND!"
×
331
            return ""
×
332
        video_mp4 = sorted(ev, key=lambda m: m.height)[0]
×
333
        input_file = video_mp4.source_file.path
×
334
        thumbnail_command += FFMPEG_INPUT % {
×
335
            "input": input_file,
336
            "nb_threads": FFMPEG_NB_THREADS,
337
        }
338
        output_file = os.path.join(self.output_dir, "thumbnail")
×
339
        thumbnail_command += FFMPEG_CREATE_THUMBNAIL % {
×
340
            "duration": self.duration,
341
            "nb_thumbnail": FFMPEG_NB_THUMBNAIL,
342
            "output": output_file,
343
        }
344
        for nb in range(0, FFMPEG_NB_THUMBNAIL):
×
345
            num_thumb = str(nb + 1)
×
346
            self.list_thumbnail_files[num_thumb] = "%s_000%s.png" % (
×
347
                output_file,
348
                num_thumb,
349
            )
350
        encoding_log.log += "\n %s" % thumbnail_command
×
351
        encoding_log.save()
×
352
        return thumbnail_command
×
353

354
    def recreate_thumbnail(self):
1✔
355
        self.create_output_dir()
×
356
        self.get_video_data()
×
357
        info_video = {}
×
358
        for attribute, value in self.__dict__.items():
×
359
            info_video[attribute] = value
×
360
        video_to_encode = Video.objects.get(id=self.id)
×
361
        encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
362
        if len(self.list_image_track) > 0:
×
363
            thumbnail_command = self.get_extract_thumbnail_command()
×
364
            return_value, return_msg = launch_cmd(thumbnail_command)
×
365
            if not created:
×
366
                encoding_log.log = ""
×
367
            encoding_log.log += "\n----------------------------------------"
×
368
            encoding_log.log += "\n extract_thumbnail_command"
×
369
            encoding_log.log += "\n %s" % thumbnail_command
×
370
            encoding_log.log += "\n %s" % return_value
×
371
            encoding_log.log += "\n %s" % return_msg
×
372
        elif self.is_video():
×
373
            thumbnail_command = self.get_create_thumbnail_command_from_video(
×
374
                video_to_encode
375
            )
376
            if thumbnail_command:
×
377
                return_value, return_msg = launch_cmd(thumbnail_command)
×
378
                encoding_log.log += "\n----------------------------------------"
×
379
                encoding_log.log += "\n create_thumbnail_command"
×
380
                encoding_log.log += "\n %s" % thumbnail_command
×
381
                encoding_log.log += "\n %s" % return_value
×
382
                encoding_log.log += "\n %s" % return_msg
×
383
        encoding_log.save()
×
384
        if len(self.list_thumbnail_files) > 0:
×
385
            info_video["list_thumbnail_files"] = self.list_thumbnail_files
×
386
            self.store_json_list_thumbnail_files(info_video, video_to_encode)
×
387

388
    def encode_video(self):
1✔
389
        """Start video encoding."""
390
        self.start_encode()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc