• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 22574034458

02 Mar 2026 11:30AM UTC coverage: 68.109%. First build
22574034458

Pull #1406

github

web-flow
Merge 207dd0354 into ed8189ee5
Pull Request #1406: Enhance dashboard filtering and runner manager administration with i18n updates

70 of 145 new or added lines in 8 files covered. (48.28%)

12923 of 18974 relevant lines covered (68.11%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.89
/pod/video_encode_transcript/runner_manager_utils.py
1
"""Utilities for storing and importing remote encoding artifacts in Esup-Pod.
2

3
This module orchestrates post-encoding persistence for videos and recordings:
4
- updates encoding logs and processing state,
5
- imports generated files (video/audio/playlist/thumbnail),
6
- clears stale artifacts from previous runs.
7
"""
8

9
import json
1✔
10
import logging
1✔
11
import os
1✔
12
import re
1✔
13
import time
1✔
14
from typing import Any, TypedDict, cast
1✔
15

16
from django.conf import settings
1✔
17
from django.core.files import File
1✔
18
from webpush.models import PushInformation
1✔
19

20
from pod.recorder.models import Recording
1✔
21
from pod.video.models import Video
1✔
22

23
from .models import (
1✔
24
    EncodingAudio,
25
    EncodingLog,
26
    EncodingVideo,
27
    PlaylistVideo,
28
    VideoRendition,
29
)
30
from .utils import (
1✔
31
    add_encoding_log,
32
    change_encoding_step,
33
    check_file,
34
    create_outputdir,
35
    send_email,
36
    send_email_encoding,
37
    send_notification_encoding,
38
)
39

40
if getattr(settings, "USE_PODFILE", False):
1✔
41
    FILEPICKER = True
1✔
42
    from pod.podfile.models import CustomImageModel
1✔
43
else:
44
    FILEPICKER = False
×
45
    from pod.main.models import CustomImageModel
×
46

47
log = logging.getLogger(__name__)
1✔
48

49
DEBUG = getattr(settings, "DEBUG", True)
1✔
50

51
USE_NOTIFICATIONS = getattr(settings, "USE_NOTIFICATIONS", True)
1✔
52

53
USE_TRANSCRIPTION = getattr(settings, "USE_TRANSCRIPTION", False)
1✔
54

55
if USE_TRANSCRIPTION:
1✔
56
    from . import transcript
×
57

58
    TRANSCRIPT_VIDEO = getattr(settings, "TRANSCRIPT_VIDEO", "start_transcript")
×
59

60
EMAIL_ON_ENCODING_COMPLETION = getattr(settings, "EMAIL_ON_ENCODING_COMPLETION", True)
1✔
61

62
ENCODING_CHOICES = getattr(
1✔
63
    settings,
64
    "ENCODING_CHOICES",
65
    (
66
        ("audio", "audio"),
67
        ("360p", "360p"),
68
        ("480p", "480p"),
69
        ("720p", "720p"),
70
        ("1080p", "1080p"),
71
        ("playlist", "playlist"),
72
    ),
73
)
74

75

76
class EncodedAudioInfo(TypedDict):
1✔
77
    """Audio entry produced by the remote encoder."""
78

79
    encoding_format: str
1✔
80
    filename: str
1✔
81

82

83
class EncodedVideoInfo(TypedDict):
1✔
84
    """Video entry produced by the remote encoder."""
85

86
    encoding_format: str
1✔
87
    filename: str
1✔
88
    rendition: str
1✔
89

90

91
class EncodedThumbnailInfo(TypedDict):
1✔
92
    """Thumbnail entry produced by the remote encoder."""
93

94
    filename: str
1✔
95

96

97
class RemoteEncodingInfo(TypedDict, total=False):
1✔
98
    """Top-level JSON payload written by the remote encoder."""
99

100
    duration: float
1✔
101
    has_stream_video: bool
1✔
102
    has_stream_audio: bool
1✔
103
    has_stream_thumbnail: bool
1✔
104
    encode_video: list[EncodedVideoInfo]
1✔
105
    encode_audio: EncodedAudioInfo | list[EncodedAudioInfo]
1✔
106
    encode_thumbnail: EncodedThumbnailInfo | list[EncodedThumbnailInfo]
1✔
107

108

109
def store_before_remote_encoding_recording(
1✔
110
    recording_id: int, execute_url: str, data: dict[str, Any]
111
) -> None:
112
    """Store pre-encoding metadata for a recording."""
113
    recording = Recording.objects.get(id=recording_id)
×
114
    msg = "\nStart at: %s" % time.ctime()
×
115
    msg += "\nprocess manager remote encode: %s with data %s" % (execute_url, data)
×
116
    recording.comment += msg
×
117
    recording.save()
×
118

119

120
def store_remote_encoding_log_recording(recording_id: int, video_id: int) -> None:
1✔
121
    # Get recording info
122
    recording = Recording.objects.get(id=recording_id)
×
123
    # Get video info
124
    video_to_encode = Video.objects.get(id=video_id)
×
125
    # Store encoding log
126
    encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
127
    encoding_log.log = "%s" % recording.comment
×
128
    encoding_log.save()
×
129

130

131
def store_before_remote_encoding_video(
1✔
132
    video_id: int, execute_url: str, data: dict[str, Any]
133
) -> None:
134
    """Initialize video state and logs before remote encoding starts."""
135
    start = "Start at: %s" % time.ctime()
×
136
    msg = ""
×
137
    video_to_encode = Video.objects.get(id=video_id)
×
138
    video_to_encode.encoding_in_progress = True
×
139
    video_to_encode.save()
×
140
    change_encoding_step(video_id, 0, "start")
×
141

142
    encoding_log, created = EncodingLog.objects.get_or_create(video=video_to_encode)
×
143
    encoding_log.log = "%s" % start
×
144
    encoding_log.save()
×
145

146
    if check_file(video_to_encode.video.path):
×
147
        change_encoding_step(video_id, 1, "remove old data")
×
148
        remove_msg = remove_old_data(video_id)
×
149
        add_encoding_log(video_id, "remove old data: %s" % remove_msg)
×
150

151
        change_encoding_step(video_id, 2, "create output dir")
×
152
        output_dir = create_outputdir(video_id, video_to_encode.video.path)
×
153
        add_encoding_log(video_id, "output_dir: %s" % output_dir)
×
154

155
        open(output_dir + "/encoding.log", "w").close()
×
156
        with open(output_dir + "/encoding.log", "a") as f:
×
157
            f.write("%s\n" % start)
×
158

159
        change_encoding_step(video_id, 3, "process manager remote encode")
×
160
        add_encoding_log(
×
161
            video_id,
162
            "process manager remote encode: %s with data %s" % (execute_url, data),
163
        )
164

165
    else:
166
        msg += "Wrong file or path: " + "\n%s" % video_to_encode.video.path
×
167
        add_encoding_log(video_id, msg)
×
168
        change_encoding_step(video_id, -1, msg)
×
169
        send_email(msg, video_id)
×
170

171

172
def store_after_remote_encoding_video(video_id: int) -> None:
1✔
173
    """Import remote artifacts and finalize encoding state for a video."""
174
    msg = ""
×
175
    video_to_encode = Video.objects.get(id=video_id)
×
176
    output_dir = create_outputdir(video_id, video_to_encode.video.path)
×
177
    info_video: RemoteEncodingInfo = {}
×
178

179
    with open(output_dir + "/info_video.json", encoding="utf-8") as json_file:
×
180
        info_video = cast(RemoteEncodingInfo, json.load(json_file))
×
181

182
    video_to_encode.duration = info_video["duration"]
×
183
    video_to_encode.encoding_in_progress = True
×
184
    video_to_encode.save()
×
185

186
    msg += remote_video_part(video_to_encode, info_video, output_dir)
×
187
    msg += remote_audio_part(video_to_encode, info_video, output_dir)
×
188

189
    video_encoding = Video.objects.get(id=video_id)
×
190

191
    if not info_video["has_stream_video"]:
×
192
        video_encoding.is_video = False
×
193
        video_encoding.save()
×
194

195
    add_encoding_log(video_id, msg)
×
196
    change_encoding_step(video_id, 0, "done")
×
197

198
    video_encoding.encoding_in_progress = False
×
199
    video_encoding.save()
×
200

201
    add_encoding_log(video_id, "End: %s" % time.ctime())
×
202
    with open(output_dir + "/encoding.log", "a") as f:
×
203
        f.write("\n\nEnd: %s" % time.ctime())
×
204

205
    if (
×
206
        USE_NOTIFICATIONS
207
        and video_encoding.owner.owner.accepts_notifications
208
        and PushInformation.objects.filter(user=video_encoding.owner).exists()
209
    ):
210
        send_notification_encoding(video_encoding)
×
211

212
    if EMAIL_ON_ENCODING_COMPLETION:
×
213
        send_email_encoding(video_encoding)
×
214

215
    if USE_TRANSCRIPTION and video_encoding.transcript not in ["", "0", "1"]:
×
216
        start_transcript_video = getattr(transcript, TRANSCRIPT_VIDEO)
×
217
        log.info(
×
218
            "Start transcript video %s",
219
            getattr(transcript, TRANSCRIPT_VIDEO),
220
        )
221
        start_transcript_video(video_id, False)
×
222

223
    log.info("ALL is DONE")
×
224

225

226
def remote_audio_part(
1✔
227
    video_to_encode: Video, info_video: RemoteEncodingInfo, output_dir: str
228
) -> str:
229
    """Import audio (and optional thumbnail) artifacts from remote outputs."""
230
    msg = ""
×
231
    if info_video["has_stream_audio"] and info_video.get("encode_audio"):
×
232
        msg += import_remote_audio(
×
233
            info_video["encode_audio"], output_dir, video_to_encode
234
        )
235
        # Avoid importing the same thumbnail twice when both audio and video are present.
NEW
236
        if (
×
237
            info_video["has_stream_thumbnail"]
238
            and info_video.get("encode_thumbnail")
239
            and not (
240
                info_video.get("has_stream_video", False)
241
                and info_video.get("encode_video")
242
            )
243
        ):
244
            msg += import_remote_thumbnail(
×
245
                info_video["encode_thumbnail"], output_dir, video_to_encode
246
            )
247
    elif info_video["has_stream_audio"] or info_video.get("encode_audio"):
×
248
        msg += "\n- has stream audio but not info audio in json "
×
249
        add_encoding_log(video_to_encode.id, msg)
×
250
        change_encoding_step(video_to_encode.id, -1, msg)
×
251
        send_email(msg, video_to_encode.id)
×
252
    return msg
×
253

254

255
def remote_video_part(
1✔
256
    video_to_encode: Video, info_video: RemoteEncodingInfo, output_dir: str
257
) -> str:
258
    """Import video artifacts and attach optional overview/thumbnail files."""
259
    msg = ""
×
260
    if info_video["has_stream_video"] and info_video.get("encode_video"):
×
261
        msg += import_remote_video(
×
262
            info_video["encode_video"], output_dir, video_to_encode
263
        )
264
        video_id = video_to_encode.id
×
265
        # If the remote pipeline generated overview thumbnails metadata, attach it.
266
        overview_vtt = os.path.join(output_dir, "overview.vtt")
×
267
        if check_file(overview_vtt):
×
268
            try:
×
269
                video_to_encode.overview = overview_vtt.replace(
×
270
                    os.path.join(settings.MEDIA_ROOT, ""), ""
271
                )
272
                video_to_encode.save()
×
273
                msg += "\n- existing overview:\n%s" % overview_vtt
×
NEW
274
                add_encoding_log(
×
275
                    video_id, "attach existing overview: %s" % overview_vtt
276
                )
277
            except Exception as err:
×
278
                err_msg = f"Error attaching existing overview: {err}"
×
279
                add_encoding_log(video_id, err_msg)
×
280
        else:
281
            add_encoding_log(video_id, "No existing overview file found (overview.vtt)")
×
282

283
        if info_video["has_stream_thumbnail"] and info_video.get("encode_thumbnail"):
×
284
            msg += import_remote_thumbnail(
×
285
                info_video["encode_thumbnail"], output_dir, video_to_encode
286
            )
287
        else:
NEW
288
            add_encoding_log(
×
289
                video_id, "No thumbnail info in json; skip thumbnail attach"
290
            )
291
    elif info_video["has_stream_video"] or info_video.get("encode_video"):
×
292
        msg += "\n- has stream video but not info video "
×
293
        add_encoding_log(video_to_encode.id, msg)
×
294
        change_encoding_step(video_to_encode.id, -1, msg)
×
295
        send_email(msg, video_to_encode.id)
×
296

297
    return msg
×
298

299

300
def _get_ordered_thumbnail_entries(
1✔
301
    info_encode_thumbnail: EncodedThumbnailInfo | list[EncodedThumbnailInfo],
302
) -> list[EncodedThumbnailInfo]:
303
    """Return thumbnail entries with the preferred (middle) candidate first."""
304
    thumbnail_entries: list[EncodedThumbnailInfo]
NEW
305
    if isinstance(info_encode_thumbnail, list):
×
NEW
306
        thumbnail_entries = info_encode_thumbnail
×
307
    else:
NEW
308
        thumbnail_entries = [info_encode_thumbnail]
×
309

NEW
310
    if not thumbnail_entries:
×
NEW
311
        return []
×
312

313
    # Try to normalize list order using the trailing index in filenames (e.g. xx_0.png).
NEW
314
    indexed_entries: list[tuple[int | None, int, EncodedThumbnailInfo]] = []
×
NEW
315
    for position, thumbnail_data in enumerate(thumbnail_entries):
×
NEW
316
        filename = thumbnail_data.get("filename", "")
×
NEW
317
        match = re.search(r"_(\d+)(?=\.[^.]+$)", os.path.basename(filename))
×
NEW
318
        index = int(match.group(1)) if match else None
×
NEW
319
        indexed_entries.append((index, position, thumbnail_data))
×
320

NEW
321
    has_numeric_indexes = any(index is not None for index, _, _ in indexed_entries)
×
NEW
322
    if has_numeric_indexes:
×
NEW
323
        thumbnail_entries = [
×
324
            thumbnail_data
325
            for _, _, thumbnail_data in sorted(
326
                indexed_entries,
327
                key=lambda item: (
328
                    item[0] is None,
329
                    item[0] if item[0] is not None else item[1],
330
                ),
331
            )
332
        ]
333

NEW
334
    preferred_index = len(thumbnail_entries) // 2
×
NEW
335
    ordered_entries = [thumbnail_entries[preferred_index]]
×
NEW
336
    ordered_entries.extend(
×
337
        thumbnail_data
338
        for pos, thumbnail_data in enumerate(thumbnail_entries)
339
        if pos != preferred_index
340
    )
NEW
341
    return ordered_entries
×
342

343

344
def _save_thumbnail_for_video(
1✔
345
    video_to_encode: Video,
346
    thumbnailfilename: str,
347
    thumbnail_name: str,
348
) -> CustomImageModel:
349
    """Persist one thumbnail file and return the stored image object."""
NEW
350
    if FILEPICKER:
×
NEW
351
        videodir = video_to_encode.get_or_create_video_folder()
×
NEW
352
        thumbnail = CustomImageModel(folder=videodir, created_by=video_to_encode.owner)
×
353
    else:
NEW
354
        thumbnail = CustomImageModel()
×
NEW
355
    with open(thumbnailfilename, "rb") as thumbnail_file:
×
NEW
356
        thumbnail.file.save(
×
357
            thumbnail_name,
358
            File(thumbnail_file),
359
            save=True,
360
        )
NEW
361
    thumbnail.save()
×
NEW
362
    return thumbnail
×
363

364

365
def import_remote_thumbnail(
1✔
366
    info_encode_thumbnail: EncodedThumbnailInfo | list[EncodedThumbnailInfo],
367
    output_dir: str,
368
    video_to_encode: Video,
369
) -> str:
370
    """Import all generated thumbnails and associate one preferred thumbnail to the video."""
371
    msg = ""
×
NEW
372
    ordered_thumbnails = _get_ordered_thumbnail_entries(info_encode_thumbnail)
×
NEW
373
    if not ordered_thumbnails:
×
NEW
374
        msg += "\nERROR THUMBNAILS missing data "
×
375
        add_encoding_log(video_to_encode.id, msg)
×
376
        change_encoding_step(video_to_encode.id, -1, msg)
×
377
        send_email(msg, video_to_encode.id)
×
NEW
378
        return msg
×
379

NEW
380
    checked_thumbnail_files: list[str] = []
×
NEW
381
    selected_thumbnail: CustomImageModel | None = None
×
NEW
382
    for thumbnail_data in ordered_thumbnails:
×
NEW
383
        thumbnail_name = thumbnail_data.get("filename")
×
NEW
384
        if not thumbnail_name:
×
NEW
385
            continue
×
386

NEW
387
        thumbnailfilename = os.path.join(output_dir, thumbnail_name)
×
NEW
388
        checked_thumbnail_files.append(thumbnailfilename)
×
NEW
389
        if not check_file(thumbnailfilename):
×
NEW
390
            continue
×
391

NEW
392
        stored_thumbnail = _save_thumbnail_for_video(
×
393
            video_to_encode,
394
            thumbnailfilename,
395
            thumbnail_name,
396
        )
NEW
397
        if selected_thumbnail is None:
×
NEW
398
            selected_thumbnail = stored_thumbnail
×
NEW
399
        msg += "\n- thumbnailfilename:\n%s" % thumbnailfilename
×
400

NEW
401
    if selected_thumbnail is not None:
×
NEW
402
        video_to_encode.thumbnail = selected_thumbnail
×
NEW
403
        video_to_encode.save()
×
NEW
404
        return msg
×
405

NEW
406
    missing_files = ", ".join(checked_thumbnail_files) or "missing data"
×
NEW
407
    msg += "\nERROR THUMBNAILS %s " % missing_files
×
NEW
408
    msg += "Wrong file or path"
×
NEW
409
    add_encoding_log(video_to_encode.id, msg)
×
NEW
410
    change_encoding_step(video_to_encode.id, -1, msg)
×
NEW
411
    send_email(msg, video_to_encode.id)
×
412
    return msg
×
413

414

415
def import_remote_audio(
1✔
416
    info_encode_audio: EncodedAudioInfo | list[EncodedAudioInfo],
417
    output_dir: str,
418
    video_to_encode: Video,
419
) -> str:
420
    """Persist generated audio tracks (mp3/m4a) for a video."""
421
    msg = ""
×
422
    if isinstance(info_encode_audio, dict):
×
423
        info_encode_audio = [info_encode_audio]
×
424
    for encode_audio in info_encode_audio:
×
425
        if encode_audio["encoding_format"] == "audio/mp3":
×
426
            filename = os.path.splitext(encode_audio["filename"])[0]
×
427
            audiofilename = os.path.join(output_dir, "%s.mp3" % filename)
×
428
            if check_file(audiofilename):
×
429
                encoding, created = EncodingAudio.objects.get_or_create(
×
430
                    name="audio",
431
                    video=video_to_encode,
432
                    encoding_format="audio/mp3",
433
                )
434
                encoding.source_file = audiofilename.replace(
×
435
                    os.path.join(settings.MEDIA_ROOT, ""), ""
436
                )
437
                encoding.save()
×
438
                msg += "\n- encode_video_mp3:\n%s" % audiofilename
×
439
            else:
440
                msg += "\n- encode_video_mp3 Wrong file or path "
×
441
                msg += audiofilename + " "
×
442
                add_encoding_log(video_to_encode.id, msg)
×
443
                change_encoding_step(video_to_encode.id, -1, msg)
×
444
                send_email(msg, video_to_encode.id)
×
445
        if encode_audio["encoding_format"] == "video/mp4":
×
446
            filename = os.path.splitext(encode_audio["filename"])[0]
×
447
            audiofilename = os.path.join(output_dir, "%s.m4a" % filename)
×
448
            if check_file(audiofilename):
×
449
                encoding, created = EncodingAudio.objects.get_or_create(
×
450
                    name="audio",
451
                    video=video_to_encode,
452
                    encoding_format="video/mp4",
453
                )
454
                encoding.source_file = audiofilename.replace(
×
455
                    os.path.join(settings.MEDIA_ROOT, ""), ""
456
                )
457
                encoding.save()
×
458
                msg += "\n- encode_video_m4a:\n%s" % audiofilename
×
459
            else:
460
                msg += "\n- encode_video_m4a Wrong file or path "
×
461
                msg += audiofilename + " "
×
462
                add_encoding_log(video_to_encode.id, msg)
×
463
                change_encoding_step(video_to_encode.id, -1, msg)
×
464
                send_email(msg, video_to_encode.id)
×
465
    return msg
×
466

467

468
def import_remote_video(
1✔
469
    info_encode_video: list[EncodedVideoInfo],
470
    output_dir: str,
471
    video_to_encode: Video,
472
) -> str:
473
    """Persist generated video tracks and build the HLS master playlist."""
474
    msg = ""
×
475
    master_playlist = ""
×
476
    video_has_playlist = False
×
477
    for encod_video in info_encode_video:
×
478
        if encod_video["encoding_format"] == "video/mp2t":
×
479
            video_has_playlist = True
×
480
            import_msg, import_master_playlist = import_m3u8(
×
481
                encod_video, output_dir, video_to_encode
482
            )
483
            msg += import_msg
×
484
            master_playlist += import_master_playlist
×
485

486
        if encod_video["encoding_format"] == "video/mp4":
×
487
            import_msg = import_mp4(encod_video, output_dir, video_to_encode)
×
488
            msg += import_msg
×
489

490
    if video_has_playlist:
×
491
        # Aggregate all rendition playlists into a single HLS master playlist.
492
        playlist_master_file = output_dir + "/playlist.m3u8"
×
493
        with open(playlist_master_file, "w") as f:
×
494
            f.write("#EXTM3U\n#EXT-X-VERSION:3\n" + master_playlist)
×
495

496
        if check_file(playlist_master_file):
×
497
            playlist, created = PlaylistVideo.objects.get_or_create(
×
498
                name="playlist",
499
                video=video_to_encode,
500
                encoding_format="application/x-mpegURL",
501
            )
502
            playlist.source_file = (
×
503
                output_dir.replace(os.path.join(settings.MEDIA_ROOT, ""), "")
504
                + "/playlist.m3u8"
505
            )
506
            playlist.save()
×
507

508
            msg += "\n- Playlist:\n%s" % playlist_master_file
×
509
        else:
510
            msg = (
×
511
                "save_playlist_master Wrong file or path: "
512
                + "\n%s" % playlist_master_file
513
            )
514
            add_encoding_log(video_to_encode.id, msg)
×
515
            change_encoding_step(video_to_encode.id, -1, msg)
×
516
            send_email(msg, video_to_encode.id)
×
517
    return msg
×
518

519

520
def import_mp4(
1✔
521
    encod_video: EncodedVideoInfo, output_dir: str, video_to_encode: Video
522
) -> str:
523
    """Persist a single MP4 rendition into EncodingVideo."""
524
    filename = os.path.splitext(encod_video["filename"])[0]
×
525
    videofilenameMp4 = os.path.join(output_dir, "%s.mp4" % filename)
×
526
    msg = "\n- videofilenameMp4:\n%s" % videofilenameMp4
×
527
    if check_file(videofilenameMp4):
×
528
        rendition = VideoRendition.objects.get(resolution=encod_video["rendition"])
×
529
        encoding, created = EncodingVideo.objects.get_or_create(
×
530
            name=get_encoding_choice_from_filename(filename),
531
            video=video_to_encode,
532
            rendition=rendition,
533
            encoding_format="video/mp4",
534
        )
535
        encoding.source_file = videofilenameMp4.replace(
×
536
            os.path.join(settings.MEDIA_ROOT, ""), ""
537
        )
538
        encoding.save()
×
539
    else:
540
        msg = "save_mp4_file Wrong file or path: " + "\n%s " % (videofilenameMp4)
×
541
        add_encoding_log(video_to_encode.id, msg)
×
542
        change_encoding_step(video_to_encode.id, -1, msg)
×
543
        send_email(msg, video_to_encode.id)
×
544
    return msg
×
545

546

547
def import_m3u8(
1✔
548
    encod_video: EncodedVideoInfo, output_dir: str, video_to_encode: Video
549
) -> tuple[str, str]:
550
    """Persist one HLS rendition and return its master playlist fragment."""
551
    msg = ""
×
552
    master_playlist = ""
×
553
    filename = os.path.splitext(encod_video["filename"])[0]
×
554
    videofilenameM3u8 = os.path.join(output_dir, "%s.m3u8" % filename)
×
555
    videofilenameTS = os.path.join(output_dir, "%s.ts" % filename)
×
556
    msg += "\n- videofilenameM3u8:\n%s" % videofilenameM3u8
×
557
    msg += "\n- videofilenameTS:\n%s" % videofilenameTS
×
558

559
    rendition = VideoRendition.objects.get(resolution=encod_video["rendition"])
×
560

561
    bitrate_match = re.search(r"(\d+)k", rendition.video_bitrate, re.I)
×
562
    if bitrate_match is None:
×
563
        msg = "Invalid rendition bitrate format: %s" % rendition.video_bitrate
×
564
        add_encoding_log(video_to_encode.id, msg)
×
565
        change_encoding_step(video_to_encode.id, -1, msg)
×
566
        send_email(msg, video_to_encode.id)
×
567
        return msg, master_playlist
×
568

569
    int_bitrate = int(bitrate_match.group(1))
×
570
    bandwidth = int_bitrate * 1000
×
571

572
    if check_file(videofilenameM3u8) and check_file(videofilenameTS):
×
573
        encoding, created = EncodingVideo.objects.get_or_create(
×
574
            name=get_encoding_choice_from_filename(filename),
575
            video=video_to_encode,
576
            rendition=rendition,
577
            encoding_format="video/mp2t",
578
        )
579
        encoding.source_file = videofilenameTS.replace(
×
580
            os.path.join(settings.MEDIA_ROOT, ""), ""
581
        )
582
        encoding.save()
×
583

584
        playlist, created = PlaylistVideo.objects.get_or_create(
×
585
            name=get_encoding_choice_from_filename(filename),
586
            video=video_to_encode,
587
            encoding_format="application/x-mpegURL",
588
        )
589
        playlist.source_file = videofilenameM3u8.replace(
×
590
            os.path.join(settings.MEDIA_ROOT, ""), ""
591
        )
592
        playlist.save()
×
593

594
        master_playlist += "#EXT-X-STREAM-INF:BANDWIDTH=%s," % bandwidth
×
595
        master_playlist += "RESOLUTION=%s\n%s\n" % (
×
596
            rendition.resolution,
597
            encod_video["filename"],
598
        )
599
    else:
600
        msg = "save_playlist_file Wrong file or path: " + "\n%s and %s" % (
×
601
            videofilenameM3u8,
602
            videofilenameTS,
603
        )
604
        add_encoding_log(video_to_encode.id, msg)
×
605
        change_encoding_step(video_to_encode.id, -1, msg)
×
606
        send_email(msg, video_to_encode.id)
×
607

608
    return msg, master_playlist
×
609

610

611
def get_encoding_choice_from_filename(filename: str) -> str:
1✔
612
    """Map filename prefix to the configured encoding choice name."""
613
    choices: dict[str, str] = {}
×
614
    for choice in ENCODING_CHOICES:
×
615
        choices[choice[0][:3]] = choice[0]
×
616
    return choices.get(filename[:3], "360p")
×
617

618

619
def remove_old_data(video_id: int) -> str:
1✔
620
    """Remove old data."""
621
    video_to_encode = Video.objects.get(id=video_id)
×
622
    video_to_encode.thumbnail = None
×
623
    if video_to_encode.overview:
×
624
        image_overview = os.path.join(
×
625
            os.path.dirname(video_to_encode.overview.path), "overview.png"
626
        )
627
        if os.path.isfile(image_overview):
×
628
            os.remove(image_overview)
×
629
        video_to_encode.overview.delete()
×
630
    video_to_encode.overview = None
×
631
    video_to_encode.save()
×
632

633
    encoding_log_msg = ""
×
634
    encoding_log_msg += remove_previous_encoding_video(video_to_encode)
×
635
    encoding_log_msg += remove_previous_encoding_audio(video_to_encode)
×
636
    encoding_log_msg += remove_previous_encoding_playlist(video_to_encode)
×
637
    return encoding_log_msg
×
638

639

640
def remove_previous_encoding_video(video_to_encode: Video) -> str:
1✔
641
    """Remove previously encoded video."""
642
    msg = "\n"
×
643
    previous_encoding_video = EncodingVideo.objects.filter(video=video_to_encode)
×
644
    if len(previous_encoding_video) > 0:
×
645
        msg += "\nDELETE PREVIOUS ENCODING VIDEO"
×
646
        for encoding in previous_encoding_video:
×
647
            encoding.delete()
×
648
    else:
649
        msg += "Video: Nothing to delete"
×
650
    return msg
×
651

652

653
def remove_previous_encoding_audio(video_to_encode: Video) -> str:
1✔
654
    """Remove previously encoded audio."""
655
    msg = "\n"
×
656
    previous_encoding_audio = EncodingAudio.objects.filter(video=video_to_encode)
×
657
    if len(previous_encoding_audio) > 0:
×
658
        msg += "\nDELETE PREVIOUS ENCODING AUDIO"
×
659
        for encoding in previous_encoding_audio:
×
660
            encoding.delete()
×
661
    else:
662
        msg += "Audio: Nothing to delete"
×
663
    return msg
×
664

665

666
def remove_previous_encoding_playlist(video_to_encode: Video) -> str:
1✔
667
    """Remove previously encoded playlist."""
668
    msg = "\n"
×
669
    previous_playlist = PlaylistVideo.objects.filter(video=video_to_encode)
×
670
    if len(previous_playlist) > 0:
×
671
        msg += "DELETE PREVIOUS PLAYLIST M3U8"
×
672
        for encoding in previous_playlist:
×
673
            encoding.delete()
×
674
    else:
675
        msg += "Playlist: Nothing to delete"
×
676
    return msg
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc