• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 22725050893

05 Mar 2026 03:28PM UTC coverage: 68.07% (+0.003%) from 68.067%
22725050893

Pull #1409

github

web-flow
Merge 7c29945ad into 3b7eb8722
Pull Request #1409: Fix Thumbnail Persistence and Completion Alert

1 of 24 new or added lines in 3 files covered. (4.17%)

8 existing lines in 4 files now uncovered.

12923 of 18985 relevant lines covered (68.07%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.34
/pod/video_encode_transcript/runner_manager_utils.py
1
"""Utilities for storing and importing remote encoding artifacts in Esup-Pod.
2

3
This module orchestrates post-encoding persistence for videos and recordings:
4
- updates encoding logs and processing state,
5
- imports generated files (video/audio/playlist/thumbnail),
6
- clears stale artifacts from previous runs.
7
"""
8

9
import json
1✔
10
import logging
1✔
11
import os
1✔
12
import re
1✔
13
import time
1✔
14
from typing import Any, TypedDict, cast
1✔
15

16
from django.conf import settings
1✔
17
from django.core.files import File
1✔
18
from webpush.models import PushInformation
1✔
19

20
from pod.recorder.models import Recording
1✔
21
from pod.video.models import Video
1✔
22

23
from .models import (
1✔
24
    EncodingAudio,
25
    EncodingLog,
26
    EncodingVideo,
27
    PlaylistVideo,
28
    VideoRendition,
29
)
30
from .utils import (
1✔
31
    add_encoding_log,
32
    change_encoding_step,
33
    check_file,
34
    create_outputdir,
35
    send_email,
36
    send_email_encoding,
37
    send_notification_encoding,
38
)
39

40
if getattr(settings, "USE_PODFILE", False):
1✔
41
    FILEPICKER = True
1✔
42
    from pod.podfile.models import CustomImageModel
1✔
43
else:
44
    FILEPICKER = False
×
45
    from pod.main.models import CustomImageModel
×
46

47
log = logging.getLogger(__name__)
1✔
48

49
DEBUG = getattr(settings, "DEBUG", True)
1✔
50

51
USE_NOTIFICATIONS = getattr(settings, "USE_NOTIFICATIONS", True)
1✔
52

53
USE_TRANSCRIPTION = getattr(settings, "USE_TRANSCRIPTION", False)
1✔
54

55
if USE_TRANSCRIPTION:
1✔
56
    from . import transcript
×
57

58
    TRANSCRIPT_VIDEO = getattr(settings, "TRANSCRIPT_VIDEO", "start_transcript")
×
59

60
EMAIL_ON_ENCODING_COMPLETION = getattr(settings, "EMAIL_ON_ENCODING_COMPLETION", True)
1✔
61

62
ENCODING_CHOICES = getattr(
1✔
63
    settings,
64
    "ENCODING_CHOICES",
65
    (
66
        ("audio", "audio"),
67
        ("360p", "360p"),
68
        ("480p", "480p"),
69
        ("720p", "720p"),
70
        ("1080p", "1080p"),
71
        ("playlist", "playlist"),
72
    ),
73
)
74

75

76
class EncodedAudioInfo(TypedDict):
1✔
77
    """Audio entry produced by the remote encoder."""
78

79
    encoding_format: str
1✔
80
    filename: str
1✔
81

82

83
class EncodedVideoInfo(TypedDict):
1✔
84
    """Video entry produced by the remote encoder."""
85

86
    encoding_format: str
1✔
87
    filename: str
1✔
88
    rendition: str
1✔
89

90

91
class EncodedThumbnailInfo(TypedDict):
1✔
92
    """Thumbnail entry produced by the remote encoder."""
93

94
    filename: str
1✔
95

96

97
class RemoteEncodingInfo(TypedDict, total=False):
1✔
98
    """Top-level JSON payload written by the remote encoder."""
99

100
    duration: float
1✔
101
    has_stream_video: bool
1✔
102
    has_stream_audio: bool
1✔
103
    has_stream_thumbnail: bool
1✔
104
    encode_video: list[EncodedVideoInfo]
1✔
105
    encode_audio: EncodedAudioInfo | list[EncodedAudioInfo]
1✔
106
    encode_thumbnail: EncodedThumbnailInfo | list[EncodedThumbnailInfo]
1✔
107

108

109
def store_before_remote_encoding_recording(
1✔
110
    recording_id: int, execute_url: str, data: dict[str, Any]
111
) -> None:
112
    """Store pre-encoding metadata for a recording."""
113
    recording = Recording.objects.get(id=recording_id)
×
114
    msg = "\nStart at: %s" % time.ctime()
×
115
    msg += "\nprocess manager remote encode: %s with data %s" % (execute_url, data)
×
116
    recording.comment += msg
×
117
    recording.save()
×
118

119

120
def store_remote_encoding_log_recording(recording_id: int, video_id: int) -> None:
1✔
121
    # Get recording info
122
    recording = Recording.objects.get(id=recording_id)
×
123
    # Get video info
124
    video_to_encode = Video.objects.get(id=video_id)
×
125
    # Store encoding log
126
    encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
127
    encoding_log.log = "%s" % recording.comment
×
128
    encoding_log.save()
×
129

130

131
def store_before_remote_encoding_video(
1✔
132
    video_id: int, execute_url: str, data: dict[str, Any]
133
) -> None:
134
    """Initialize video state and logs before remote encoding starts."""
135
    start = "Start at: %s" % time.ctime()
×
136
    msg = ""
×
137
    video_to_encode = Video.objects.get(id=video_id)
×
138
    video_to_encode.encoding_in_progress = True
×
139
    video_to_encode.save()
×
140
    change_encoding_step(video_id, 0, "start")
×
141

142
    encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
143
    encoding_log.log = "%s" % start
×
144
    encoding_log.save()
×
145

146
    if check_file(video_to_encode.video.path):
×
147
        change_encoding_step(video_id, 1, "remove old data")
×
148
        remove_msg = remove_old_data(video_id)
×
149
        add_encoding_log(video_id, "remove old data: %s" % remove_msg)
×
150

151
        change_encoding_step(video_id, 2, "create output dir")
×
152
        output_dir = create_outputdir(video_id, video_to_encode.video.path)
×
153
        add_encoding_log(video_id, "output_dir: %s" % output_dir)
×
154

155
        open(output_dir + "/encoding.log", "w").close()
×
156
        with open(output_dir + "/encoding.log", "a") as f:
×
157
            f.write("%s\n" % start)
×
158

159
        change_encoding_step(video_id, 3, "process manager remote encode")
×
160
        add_encoding_log(
×
161
            video_id,
162
            "process manager remote encode: %s with data %s" % (execute_url, data),
163
        )
164

165
    else:
166
        msg += "Wrong file or path: " + "\n%s" % video_to_encode.video.path
×
167
        add_encoding_log(video_id, msg)
×
168
        change_encoding_step(video_id, -1, msg)
×
169
        send_email(msg, video_id)
×
170

171

172
def store_after_remote_encoding_video(video_id: int) -> None:
1✔
173
    """Import remote artifacts and finalize encoding state for a video."""
174
    msg = ""
×
175
    video_to_encode = Video.objects.get(id=video_id)
×
176
    output_dir = create_outputdir(video_id, video_to_encode.video.path)
×
177
    info_video: RemoteEncodingInfo = {}
×
178

179
    with open(output_dir + "/info_video.json", encoding="utf-8") as json_file:
×
180
        info_video = cast(RemoteEncodingInfo, json.load(json_file))
×
181

182
    # Update only required fields to avoid overwriting unrelated values
183
    # (e.g., thumbnail assigned later in the workflow).
NEW
184
    Video.objects.filter(id=video_id).update(
×
185
        duration=info_video["duration"],
186
        encoding_in_progress=True,
187
    )
188
    video_to_encode.duration = info_video["duration"]
×
189
    video_to_encode.encoding_in_progress = True
×
190

UNCOV
191
    msg += remote_video_part(video_to_encode, info_video, output_dir)
×
192
    msg += remote_audio_part(video_to_encode, info_video, output_dir)
×
193

NEW
194
    if (
×
195
        info_video.get("has_stream_thumbnail")
196
        and info_video.get("encode_thumbnail")
197
        and Video.objects.filter(id=video_id, thumbnail__isnull=True).exists()
198
    ):
NEW
199
        warning_msg = (
×
200
            "WARNING thumbnail still missing after import_remote_thumbnail execution"
201
        )
NEW
202
        add_encoding_log(video_id, warning_msg)
×
NEW
203
        msg += "\n- %s" % warning_msg
×
204

205
    if not info_video["has_stream_video"]:
×
NEW
206
        Video.objects.filter(id=video_id).update(is_video=False)
×
207

208
    add_encoding_log(video_id, msg)
×
209
    change_encoding_step(video_id, 0, "done")
×
210

NEW
211
    Video.objects.filter(id=video_id).update(encoding_in_progress=False)
×
NEW
212
    video_encoding = Video.objects.get(id=video_id)
×
213

214
    add_encoding_log(video_id, "End: %s" % time.ctime())
×
215
    with open(output_dir + "/encoding.log", "a") as f:
×
216
        f.write("\n\nEnd: %s" % time.ctime())
×
217

218
    if (
×
219
        USE_NOTIFICATIONS
220
        and video_encoding.owner.owner.accepts_notifications
221
        and PushInformation.objects.filter(user=video_encoding.owner).exists()
222
    ):
223
        send_notification_encoding(video_encoding)
×
224

225
    if EMAIL_ON_ENCODING_COMPLETION:
×
226
        send_email_encoding(video_encoding)
×
227

228
    if USE_TRANSCRIPTION and video_encoding.transcript not in ["", "0", "1"]:
×
229
        start_transcript_video = getattr(transcript, TRANSCRIPT_VIDEO)
×
230
        log.info(
×
231
            "Start transcript video %s",
232
            getattr(transcript, TRANSCRIPT_VIDEO),
233
        )
234
        start_transcript_video(video_id, False)
×
235

236
    log.info("ALL is DONE")
×
237

238

239
def remote_audio_part(
1✔
240
    video_to_encode: Video, info_video: RemoteEncodingInfo, output_dir: str
241
) -> str:
242
    """Import audio (and optional thumbnail) artifacts from remote outputs."""
243
    msg = ""
×
244
    if info_video["has_stream_audio"] and info_video.get("encode_audio"):
×
245
        msg += import_remote_audio(
×
246
            info_video["encode_audio"], output_dir, video_to_encode
247
        )
248
        # Avoid importing the same thumbnail twice when both audio and video are present.
249
        if (
×
250
            info_video["has_stream_thumbnail"]
251
            and info_video.get("encode_thumbnail")
252
            and not (
253
                info_video.get("has_stream_video", False)
254
                and info_video.get("encode_video")
255
            )
256
        ):
257
            msg += import_remote_thumbnail(
×
258
                info_video["encode_thumbnail"], output_dir, video_to_encode
259
            )
260
    elif info_video["has_stream_audio"] or info_video.get("encode_audio"):
×
261
        msg += "\n- has stream audio but not info audio in json "
×
262
        add_encoding_log(video_to_encode.id, msg)
×
263
        change_encoding_step(video_to_encode.id, -1, msg)
×
264
        send_email(msg, video_to_encode.id)
×
265
    return msg
×
266

267

268
def remote_video_part(
1✔
269
    video_to_encode: Video, info_video: RemoteEncodingInfo, output_dir: str
270
) -> str:
271
    """Import video artifacts and attach optional overview/thumbnail files."""
272
    msg = ""
×
273
    if info_video["has_stream_video"] and info_video.get("encode_video"):
×
274
        msg += import_remote_video(
×
275
            info_video["encode_video"], output_dir, video_to_encode
276
        )
277
        video_id = video_to_encode.id
×
278
        # If the remote pipeline generated overview thumbnails metadata, attach it.
279
        overview_vtt = os.path.join(output_dir, "overview.vtt")
×
280
        if check_file(overview_vtt):
×
281
            try:
×
282
                video_to_encode.overview = overview_vtt.replace(
×
283
                    os.path.join(settings.MEDIA_ROOT, ""), ""
284
                )
285
                video_to_encode.save()
×
286
                msg += "\n- existing overview:\n%s" % overview_vtt
×
287
                add_encoding_log(video_id, "attach existing overview: %s" % overview_vtt)
×
288
            except Exception as err:
×
289
                err_msg = f"Error attaching existing overview: {err}"
×
290
                add_encoding_log(video_id, err_msg)
×
291
        else:
292
            add_encoding_log(video_id, "No existing overview file found (overview.vtt)")
×
293

294
        if info_video["has_stream_thumbnail"] and info_video.get("encode_thumbnail"):
×
295
            msg += import_remote_thumbnail(
×
296
                info_video["encode_thumbnail"], output_dir, video_to_encode
297
            )
298
        else:
299
            add_encoding_log(video_id, "No thumbnail info in json; skip thumbnail attach")
×
300
    elif info_video["has_stream_video"] or info_video.get("encode_video"):
×
301
        msg += "\n- has stream video but not info video "
×
302
        add_encoding_log(video_to_encode.id, msg)
×
303
        change_encoding_step(video_to_encode.id, -1, msg)
×
304
        send_email(msg, video_to_encode.id)
×
305

306
    return msg
×
307

308

309
def _get_ordered_thumbnail_entries(
1✔
310
    info_encode_thumbnail: EncodedThumbnailInfo | list[EncodedThumbnailInfo],
311
) -> list[EncodedThumbnailInfo]:
312
    """Return thumbnail entries with the preferred (middle) candidate first."""
313
    # Accept both a single thumbnail payload and a list from different callers.
314
    thumbnail_entries: list[EncodedThumbnailInfo]
315
    if isinstance(info_encode_thumbnail, list):
×
316
        thumbnail_entries = info_encode_thumbnail
×
317
    else:
318
        thumbnail_entries = [info_encode_thumbnail]
×
319

320
    if not thumbnail_entries:
×
321
        return []
×
322

323
    # Build sortable tuples: (numeric filename suffix, original position, payload).
324
    # Example matched suffix: "thumb_2.png" -> 2.
325
    indexed_entries: list[tuple[int | None, int, EncodedThumbnailInfo]] = []
×
326
    for position, thumbnail_data in enumerate(thumbnail_entries):
×
327
        filename = thumbnail_data.get("filename", "")
×
328
        match = re.search(r"_(\d+)(?=\.[^.]+$)", os.path.basename(filename))
×
329
        index = int(match.group(1)) if match else None
×
330
        indexed_entries.append((index, position, thumbnail_data))
×
331

332
    has_numeric_indexes = any(entry[0] is not None for entry in indexed_entries)
×
333
    if has_numeric_indexes:
×
334
        # Keep numerically indexed files first (ordered by index), then fallback entries
335
        # without index in their original input order.
336
        thumbnail_entries = [
×
337
            entry[2]
338
            for entry in sorted(
339
                indexed_entries,
340
                key=lambda item: (
341
                    item[0] is None,
342
                    item[0] if item[0] is not None else item[1],
343
                ),
344
            )
345
        ]
346

347
    # Move the middle candidate to the first position as the preferred thumbnail.
348
    preferred_index = len(thumbnail_entries) // 2
×
349
    ordered_entries = [thumbnail_entries[preferred_index]]
×
350
    ordered_entries.extend(
×
351
        thumbnail_data
352
        for pos, thumbnail_data in enumerate(thumbnail_entries)
353
        if pos != preferred_index
354
    )
355
    return ordered_entries
×
356

357

358
def _save_thumbnail_for_video(
1✔
359
    video_to_encode: Video,
360
    thumbnailfilename: str,
361
    thumbnail_name: str,
362
) -> CustomImageModel:
363
    """Persist one thumbnail file and return the stored image object."""
364
    if FILEPICKER:
×
365
        videodir = video_to_encode.get_or_create_video_folder()
×
366
        thumbnail = CustomImageModel(folder=videodir, created_by=video_to_encode.owner)
×
367
    else:
368
        thumbnail = CustomImageModel()
×
369
    with open(thumbnailfilename, "rb") as thumbnail_file:
×
370
        thumbnail.file.save(
×
371
            thumbnail_name,
372
            File(thumbnail_file),
373
            save=True,
374
        )
375
    thumbnail.save()
×
376
    return thumbnail
×
377

378

379
def import_remote_thumbnail(
1✔
380
    info_encode_thumbnail: EncodedThumbnailInfo | list[EncodedThumbnailInfo],
381
    output_dir: str,
382
    video_to_encode: Video,
383
) -> str:
384
    """Import all generated thumbnails and associate one preferred thumbnail to the video."""
385
    msg = ""
×
386
    ordered_thumbnails = _get_ordered_thumbnail_entries(info_encode_thumbnail)
×
387
    if not ordered_thumbnails:
×
388
        msg += "\nERROR THUMBNAILS missing data "
×
389
        add_encoding_log(video_to_encode.id, msg)
×
390
        change_encoding_step(video_to_encode.id, -1, msg)
×
391
        send_email(msg, video_to_encode.id)
×
392
        return msg
×
393

394
    checked_thumbnail_files: list[str] = []
×
395
    selected_thumbnail: CustomImageModel | None = None
×
NEW
396
    stored_thumbnail_ids: list[int] = []
×
397
    for thumbnail_data in ordered_thumbnails:
×
398
        thumbnail_name = thumbnail_data.get("filename")
×
399
        if not thumbnail_name:
×
400
            continue
×
401

402
        thumbnailfilename = os.path.join(output_dir, thumbnail_name)
×
403
        checked_thumbnail_files.append(thumbnailfilename)
×
404
        if not check_file(thumbnailfilename):
×
405
            continue
×
406

407
        stored_thumbnail = _save_thumbnail_for_video(
×
408
            video_to_encode,
409
            thumbnailfilename,
410
            thumbnail_name,
411
        )
NEW
412
        stored_thumbnail_ids.append(stored_thumbnail.id)
×
413
        if selected_thumbnail is None:
×
414
            selected_thumbnail = stored_thumbnail
×
415
        msg += "\n- thumbnailfilename:\n%s" % thumbnailfilename
×
416

417
    if selected_thumbnail is not None:
×
418
        # Persist only the thumbnail relation to avoid clobbering other fields.
NEW
419
        Video.objects.filter(id=video_to_encode.id).update(thumbnail=selected_thumbnail)
×
420
        video_to_encode.thumbnail = selected_thumbnail
×
NEW
421
        msg += "\n- selected_thumbnail_id:\n%s" % selected_thumbnail.id
×
NEW
422
        persisted_thumbnail_id = (
×
423
            Video.objects.filter(id=video_to_encode.id)
424
            .values_list("thumbnail_id", flat=True)
425
            .first()
426
        )
NEW
427
        msg += "\n- persisted_thumbnail_id:\n%s" % persisted_thumbnail_id
×
NEW
428
        add_encoding_log(
×
429
            video_to_encode.id,
430
            "- selected_thumbnail_id:\n%s\n- persisted_thumbnail_id:\n%s"
431
            % (selected_thumbnail.id, persisted_thumbnail_id),
432
        )
NEW
433
        if persisted_thumbnail_id is None and stored_thumbnail_ids:
×
NEW
434
            fallback_thumbnail = stored_thumbnail_ids[0]
×
NEW
435
            Video.objects.filter(id=video_to_encode.id).update(
×
436
                thumbnail_id=fallback_thumbnail
437
            )
NEW
438
            msg += "\n- thumbnail_fallback_id:\n%s" % fallback_thumbnail
×
NEW
439
            add_encoding_log(
×
440
                video_to_encode.id,
441
                "- thumbnail_fallback_id:\n%s" % fallback_thumbnail,
442
            )
UNCOV
443
        return msg
×
444

445
    missing_files = ", ".join(checked_thumbnail_files) or "missing data"
×
446
    msg += "\nERROR THUMBNAILS %s " % missing_files
×
447
    msg += "Wrong file or path"
×
448
    add_encoding_log(video_to_encode.id, msg)
×
449
    change_encoding_step(video_to_encode.id, -1, msg)
×
450
    send_email(msg, video_to_encode.id)
×
451
    return msg
×
452

453

454
def import_remote_audio(
1✔
455
    info_encode_audio: EncodedAudioInfo | list[EncodedAudioInfo],
456
    output_dir: str,
457
    video_to_encode: Video,
458
) -> str:
459
    """Persist generated audio tracks (mp3/m4a) for a video."""
460
    msg = ""
×
461
    if isinstance(info_encode_audio, dict):
×
462
        info_encode_audio = [info_encode_audio]
×
463
    for encode_audio in info_encode_audio:
×
464
        if encode_audio["encoding_format"] == "audio/mp3":
×
465
            filename = os.path.splitext(encode_audio["filename"])[0]
×
466
            audiofilename = os.path.join(output_dir, "%s.mp3" % filename)
×
467
            if check_file(audiofilename):
×
468
                encoding, created = EncodingAudio.objects.get_or_create(
×
469
                    name="audio",
470
                    video=video_to_encode,
471
                    encoding_format="audio/mp3",
472
                )
473
                encoding.source_file = audiofilename.replace(
×
474
                    os.path.join(settings.MEDIA_ROOT, ""), ""
475
                )
476
                encoding.save()
×
477
                msg += "\n- encode_video_mp3:\n%s" % audiofilename
×
478
            else:
479
                msg += "\n- encode_video_mp3 Wrong file or path "
×
480
                msg += audiofilename + " "
×
481
                add_encoding_log(video_to_encode.id, msg)
×
482
                change_encoding_step(video_to_encode.id, -1, msg)
×
483
                send_email(msg, video_to_encode.id)
×
484
        if encode_audio["encoding_format"] == "video/mp4":
×
485
            filename = os.path.splitext(encode_audio["filename"])[0]
×
486
            audiofilename = os.path.join(output_dir, "%s.m4a" % filename)
×
487
            if check_file(audiofilename):
×
488
                encoding, created = EncodingAudio.objects.get_or_create(
×
489
                    name="audio",
490
                    video=video_to_encode,
491
                    encoding_format="video/mp4",
492
                )
493
                encoding.source_file = audiofilename.replace(
×
494
                    os.path.join(settings.MEDIA_ROOT, ""), ""
495
                )
496
                encoding.save()
×
497
                msg += "\n- encode_video_m4a:\n%s" % audiofilename
×
498
            else:
499
                msg += "\n- encode_video_m4a Wrong file or path "
×
500
                msg += audiofilename + " "
×
501
                add_encoding_log(video_to_encode.id, msg)
×
502
                change_encoding_step(video_to_encode.id, -1, msg)
×
503
                send_email(msg, video_to_encode.id)
×
504
    return msg
×
505

506

507
def import_remote_video(
1✔
508
    info_encode_video: list[EncodedVideoInfo],
509
    output_dir: str,
510
    video_to_encode: Video,
511
) -> str:
512
    """Persist generated video tracks and build the HLS master playlist."""
513
    msg = ""
×
514
    master_playlist = ""
×
515
    video_has_playlist = False
×
516
    for encod_video in info_encode_video:
×
517
        if encod_video["encoding_format"] == "video/mp2t":
×
518
            video_has_playlist = True
×
519
            import_msg, import_master_playlist = import_m3u8(
×
520
                encod_video, output_dir, video_to_encode
521
            )
522
            msg += import_msg
×
523
            master_playlist += import_master_playlist
×
524

525
        if encod_video["encoding_format"] == "video/mp4":
×
526
            import_msg = import_mp4(encod_video, output_dir, video_to_encode)
×
527
            msg += import_msg
×
528

529
    if video_has_playlist:
×
530
        # Aggregate all rendition playlists into a single HLS master playlist.
531
        playlist_master_file = output_dir + "/playlist.m3u8"
×
532
        with open(playlist_master_file, "w") as f:
×
533
            f.write("#EXTM3U\n#EXT-X-VERSION:3\n" + master_playlist)
×
534

535
        if check_file(playlist_master_file):
×
536
            playlist, created = PlaylistVideo.objects.get_or_create(
×
537
                name="playlist",
538
                video=video_to_encode,
539
                encoding_format="application/x-mpegURL",
540
            )
541
            playlist.source_file = (
×
542
                output_dir.replace(os.path.join(settings.MEDIA_ROOT, ""), "")
543
                + "/playlist.m3u8"
544
            )
545
            playlist.save()
×
546

547
            msg += "\n- Playlist:\n%s" % playlist_master_file
×
548
        else:
549
            msg = (
×
550
                "save_playlist_master Wrong file or path: "
551
                + "\n%s" % playlist_master_file
552
            )
553
            add_encoding_log(video_to_encode.id, msg)
×
554
            change_encoding_step(video_to_encode.id, -1, msg)
×
555
            send_email(msg, video_to_encode.id)
×
556
    return msg
×
557

558

559
def import_mp4(
1✔
560
    encod_video: EncodedVideoInfo, output_dir: str, video_to_encode: Video
561
) -> str:
562
    """Persist a single MP4 rendition into EncodingVideo."""
563
    filename = os.path.splitext(encod_video["filename"])[0]
×
564
    videofilenameMp4 = os.path.join(output_dir, "%s.mp4" % filename)
×
565
    msg = "\n- videofilenameMp4:\n%s" % videofilenameMp4
×
566
    if check_file(videofilenameMp4):
×
567
        rendition = VideoRendition.objects.get(resolution=encod_video["rendition"])
×
568
        encoding, created = EncodingVideo.objects.get_or_create(
×
569
            name=get_encoding_choice_from_filename(filename),
570
            video=video_to_encode,
571
            rendition=rendition,
572
            encoding_format="video/mp4",
573
        )
574
        encoding.source_file = videofilenameMp4.replace(
×
575
            os.path.join(settings.MEDIA_ROOT, ""), ""
576
        )
577
        encoding.save()
×
578
    else:
579
        msg = "save_mp4_file Wrong file or path: " + "\n%s " % (videofilenameMp4)
×
580
        add_encoding_log(video_to_encode.id, msg)
×
581
        change_encoding_step(video_to_encode.id, -1, msg)
×
582
        send_email(msg, video_to_encode.id)
×
583
    return msg
×
584

585

586
def import_m3u8(
1✔
587
    encod_video: EncodedVideoInfo, output_dir: str, video_to_encode: Video
588
) -> tuple[str, str]:
589
    """Persist one HLS rendition and return its master playlist fragment."""
590
    msg = ""
×
591
    master_playlist = ""
×
592
    filename = os.path.splitext(encod_video["filename"])[0]
×
593
    videofilenameM3u8 = os.path.join(output_dir, "%s.m3u8" % filename)
×
594
    videofilenameTS = os.path.join(output_dir, "%s.ts" % filename)
×
595
    msg += "\n- videofilenameM3u8:\n%s" % videofilenameM3u8
×
596
    msg += "\n- videofilenameTS:\n%s" % videofilenameTS
×
597

598
    rendition = VideoRendition.objects.get(resolution=encod_video["rendition"])
×
599

600
    bitrate_match = re.search(r"(\d+)k", rendition.video_bitrate, re.I)
×
601
    if bitrate_match is None:
×
602
        msg = "Invalid rendition bitrate format: %s" % rendition.video_bitrate
×
603
        add_encoding_log(video_to_encode.id, msg)
×
604
        change_encoding_step(video_to_encode.id, -1, msg)
×
605
        send_email(msg, video_to_encode.id)
×
606
        return msg, master_playlist
×
607

608
    int_bitrate = int(bitrate_match.group(1))
×
609
    bandwidth = int_bitrate * 1000
×
610

611
    if check_file(videofilenameM3u8) and check_file(videofilenameTS):
×
612
        encoding, created = EncodingVideo.objects.get_or_create(
×
613
            name=get_encoding_choice_from_filename(filename),
614
            video=video_to_encode,
615
            rendition=rendition,
616
            encoding_format="video/mp2t",
617
        )
618
        encoding.source_file = videofilenameTS.replace(
×
619
            os.path.join(settings.MEDIA_ROOT, ""), ""
620
        )
621
        encoding.save()
×
622

623
        playlist, created = PlaylistVideo.objects.get_or_create(
×
624
            name=get_encoding_choice_from_filename(filename),
625
            video=video_to_encode,
626
            encoding_format="application/x-mpegURL",
627
        )
628
        playlist.source_file = videofilenameM3u8.replace(
×
629
            os.path.join(settings.MEDIA_ROOT, ""), ""
630
        )
631
        playlist.save()
×
632

633
        master_playlist += "#EXT-X-STREAM-INF:BANDWIDTH=%s," % bandwidth
×
634
        master_playlist += "RESOLUTION=%s\n%s\n" % (
×
635
            rendition.resolution,
636
            encod_video["filename"],
637
        )
638
    else:
639
        msg = "save_playlist_file Wrong file or path: " + "\n%s and %s" % (
×
640
            videofilenameM3u8,
641
            videofilenameTS,
642
        )
643
        add_encoding_log(video_to_encode.id, msg)
×
644
        change_encoding_step(video_to_encode.id, -1, msg)
×
645
        send_email(msg, video_to_encode.id)
×
646

647
    return msg, master_playlist
×
648

649

650
def get_encoding_choice_from_filename(filename: str) -> str:
1✔
651
    """Map filename prefix to the configured encoding choice name."""
652
    choices: dict[str, str] = {}
×
653
    for choice in ENCODING_CHOICES:
×
654
        choices[choice[0][:3]] = choice[0]
×
655
    return choices.get(filename[:3], "360p")
×
656

657

658
def remove_old_data(video_id: int) -> str:
1✔
659
    """Remove old data."""
660
    video_to_encode = Video.objects.get(id=video_id)
×
661
    video_to_encode.thumbnail = None
×
662
    if video_to_encode.overview:
×
663
        image_overview = os.path.join(
×
664
            os.path.dirname(video_to_encode.overview.path), "overview.png"
665
        )
666
        if os.path.isfile(image_overview):
×
667
            os.remove(image_overview)
×
668
        video_to_encode.overview.delete()
×
669
    video_to_encode.overview = None
×
670
    video_to_encode.save()
×
671

672
    encoding_log_msg = ""
×
673
    encoding_log_msg += remove_previous_encoding_video(video_to_encode)
×
674
    encoding_log_msg += remove_previous_encoding_audio(video_to_encode)
×
675
    encoding_log_msg += remove_previous_encoding_playlist(video_to_encode)
×
676
    return encoding_log_msg
×
677

678

679
def remove_previous_encoding_video(video_to_encode: Video) -> str:
1✔
680
    """Remove previously encoded video."""
681
    msg = "\n"
×
682
    previous_encoding_video = EncodingVideo.objects.filter(video=video_to_encode)
×
683
    if len(previous_encoding_video) > 0:
×
684
        msg += "\nDELETE PREVIOUS ENCODING VIDEO"
×
685
        for encoding in previous_encoding_video:
×
686
            encoding.delete()
×
687
    else:
688
        msg += "Video: Nothing to delete"
×
689
    return msg
×
690

691

692
def remove_previous_encoding_audio(video_to_encode: Video) -> str:
1✔
693
    """Remove previously encoded audio."""
694
    msg = "\n"
×
695
    previous_encoding_audio = EncodingAudio.objects.filter(video=video_to_encode)
×
696
    if len(previous_encoding_audio) > 0:
×
697
        msg += "\nDELETE PREVIOUS ENCODING AUDIO"
×
698
        for encoding in previous_encoding_audio:
×
699
            encoding.delete()
×
700
    else:
701
        msg += "Audio: Nothing to delete"
×
702
    return msg
×
703

704

705
def remove_previous_encoding_playlist(video_to_encode: Video) -> str:
1✔
706
    """Remove previously encoded playlist."""
707
    msg = "\n"
×
708
    previous_playlist = PlaylistVideo.objects.filter(video=video_to_encode)
×
709
    if len(previous_playlist) > 0:
×
710
        msg += "DELETE PREVIOUS PLAYLIST M3U8"
×
711
        for encoding in previous_playlist:
×
712
            encoding.delete()
×
713
    else:
714
        msg += "Playlist: Nothing to delete"
×
715
    return msg
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc