• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 22574034458

02 Mar 2026 11:30AM UTC coverage: 68.109%. First build
22574034458

Pull #1406

github

web-flow
Merge 207dd0354 into ed8189ee5
Pull Request #1406: Enhance dashboard filtering and runner manager administration with i18n updates

70 of 145 new or added lines in 8 files covered. (48.28%)

12923 of 18974 relevant lines covered (68.11%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.58
/pod/video_encode_transcript/views.py
1
"""Views and helpers, useful only for Runner Manager callbacks and artifact imports in Esup-Pod.
2

3
This module handles the full post-processing flow for remote tasks:
4
- validate and authorize webhook notifications,
5
- download task result files from the Runner Manager,
6
- import artifacts back into Pod (encoding, studio, transcription).
7
"""
8

9
import json
1✔
10
import logging
1✔
11
import os
1✔
12
import random
1✔
13
import secrets
1✔
14
import shutil
1✔
15
import tempfile
1✔
16
import time
1✔
17
from hashlib import sha256
1✔
18
from typing import TypeAlias, TypedDict, cast
1✔
19

20
import requests
1✔
21
import webvtt
1✔
22
from django.conf import settings
1✔
23
from django.core.handlers.wsgi import WSGIRequest
1✔
24
from django.http import JsonResponse
1✔
25
from django.views.decorators.csrf import csrf_exempt
1✔
26

27
from pod.recorder.models import Recording
1✔
28
from pod.recorder.plugins.type_studio import save_basic_video
1✔
29
from pod.video.models import Video
1✔
30
from pod.video_encode_transcript.models import RunnerManager, Task
1✔
31
from pod.video_encode_transcript.runner_manager_utils import (
1✔
32
    store_after_remote_encoding_video,
33
    store_remote_encoding_log_recording,
34
)
35
from pod.video_encode_transcript.task_queue import refresh_pending_task_ranks
1✔
36
from pod.video_encode_transcript.transcript import save_vtt_and_notify
1✔
37
from pod.video_encode_transcript.utils import send_email_item
1✔
38

39
log = logging.getLogger(__name__)
1✔
40

41
DEBUG = getattr(settings, "DEBUG", True)
1✔
42
MANIFEST_MEMBER_DOWNLOAD_MAX_RETRIES = 5
1✔
43
MANIFEST_MEMBER_DOWNLOAD_BACKOFF_BASE_SECONDS = 0.5
1✔
44
MANIFEST_MEMBER_DOWNLOAD_BACKOFF_MAX_SECONDS = 8.0
1✔
45

46
media_root_setting = getattr(settings, "MEDIA_ROOT", None)
1✔
47
if not media_root_setting:
1✔
48
    raise RuntimeError("MEDIA_ROOT is not configured in settings")
×
49
MEDIA_ROOT = str(media_root_setting)
1✔
50

51
VIDEOS_DIR = getattr(settings, "VIDEOS_DIR", "videos")
1✔
52

53
HeadersDict: TypeAlias = dict[str, str]
1✔
54

55

56
def _get_media_root() -> str:
1✔
57
    """Return MEDIA_ROOT from settings at runtime."""
58
    media_root = getattr(settings, "MEDIA_ROOT", None)
1✔
59
    if not media_root:
1✔
60
        raise RuntimeError("MEDIA_ROOT is not configured in settings")
×
61
    return str(media_root)
1✔
62

63

64
def _get_videos_dir() -> str:
1✔
65
    """Return VIDEOS_DIR from settings at runtime."""
66
    return str(getattr(settings, "VIDEOS_DIR", "videos"))
1✔
67

68

69
def _format_video_directory(video_id: int | str) -> str:
1✔
70
    """Return normalized video directory name using at least 4 digits."""
71
    try:
1✔
72
        return f"{int(video_id):04d}"
1✔
NEW
73
    except (TypeError, ValueError):
×
NEW
74
        return str(video_id)
×
75

76

77
class NotifyTaskPayload(TypedDict, total=False):
1✔
78
    """Payload sent by Runner Manager to the notify endpoint."""
79

80
    task_id: str
1✔
81
    status: str
1✔
82
    script_output: str
1✔
83
    error_message: str
1✔
84

85

86
class ResultManifest(TypedDict, total=False):
1✔
87
    """Manifest returned by the Runner Manager result endpoint."""
88

89
    files: list[object]
1✔
90

91

92
class ManifestMemberIntegrityError(RuntimeError):
1✔
93
    """Raised when a downloaded manifest member fails integrity validation."""
94

95

96
def _build_result_url(manager_url: str, task_id: str) -> str:
1✔
97
    """Return runner manager result URL ending with trailing slash."""
98
    if not manager_url.endswith("/"):
×
99
        manager_url += "/"
×
100
    return manager_url + f"task/result/{task_id}"
×
101

102

103
def _build_result_file_url(manager_url: str, task_id: str, file_path: str) -> str:
1✔
104
    """Return runner manager file URL for a given task result file."""
105
    if not manager_url.endswith("/"):
×
106
        manager_url += "/"
×
107
    return manager_url + f"task/result/{task_id}/file/{file_path}"
×
108

109

110
def _build_headers(token: str) -> HeadersDict:
1✔
111
    """Construct headers used when querying runner manager."""
112
    return {
×
113
        "Accept": "application/json",
114
        "Content-Type": "application/json",
115
        "Authorization": f"Bearer {token}",
116
    }
117

118

119
def _build_file_headers(token: str) -> HeadersDict:
1✔
120
    """Construct headers used when downloading files from runner manager."""
121
    return {
×
122
        "Accept": "*/*",
123
        "Authorization": f"Bearer {token}",
124
    }
125

126

127
def _extract_bearer_token(request: WSGIRequest) -> str | None:
1✔
128
    """Return token from Authorization header when using Bearer scheme."""
129
    authorization = request.headers.get("Authorization", "")
1✔
130
    auth_type, _, token = authorization.partition(" ")
1✔
131
    token = token.strip()
1✔
132
    if auth_type.lower() != "bearer" or not token:
1✔
133
        return None
1✔
134
    return token
1✔
135

136

137
def _fetch_task_result(
1✔
138
    url: str, headers: HeadersDict, task_id: int | str
139
) -> requests.Response | None:
140
    """Return HTTP response for a task result or None on failure."""
141
    try:
×
142
        response = requests.get(url, headers=headers, timeout=30)
×
143
        log.info(f"Fetched result for task {task_id}: {url}")
×
144
    except requests.RequestException as exc:
×
145
        log.error(f"Error reaching runner manager for task {task_id}: {exc}")
×
146
        return None
×
147
    if response.status_code != 200:
×
148
        log.error(
×
149
            f"Failed to download result for task {task_id}: {response.status_code} {response.text}"
150
        )
151
        return None
×
152
    return response
×
153

154

155
def _finalize_task_import(
1✔
156
    task: Task, extracted_dir: str, extracted_vtt_path: str
157
) -> None:
158
    """Persist result path and import artifacts based on task type."""
159
    if hasattr(task, "result_path"):
×
160
        task.result_path = extracted_dir
×
161
    task.save()
×
162

163
    try:
×
164
        if task.type == "studio" or getattr(task, "recording_id", None):
×
165
            # Studio tasks generate a new Video from the base media before importing artifacts.
166
            video_id = _create_video_from_studio_task(task)
×
167
            recording_id = task.recording_id
×
168
            if recording_id is None:
×
169
                raise RuntimeError(
×
170
                    "Studio task missing recording_id after video creation"
171
                )
172
            store_remote_encoding_log_recording(recording_id, video_id)
×
173
            store_after_remote_encoding_video(video_id)
×
174
        elif task.type == "transcription":
×
175
            _import_transcription_result(task, extracted_vtt_path)
×
176
        else:
177
            store_after_remote_encoding_video(task.video.id)
×
178
        task.status = "completed"
×
179
        task.save()
×
180
    except Exception as exc:  # noqa: BLE001
×
181
        log.error(f"Error while importing result for task {task.id}: {exc}")
×
182

183

184
def _parse_notify_task_end_request(
1✔
185
    request: WSGIRequest,
186
) -> tuple[NotifyTaskPayload | None, str | None, JsonResponse | None]:
187
    """Validate request shape and return parsed JSON payload with bearer token."""
188
    if request.method != "POST":
1✔
189
        return (
×
190
            None,
191
            None,
192
            JsonResponse({"error": "Only POST requests are allowed."}, status=405),
193
        )
194

195
    content_type = request.headers.get("Content-Type") or ""
1✔
196
    if "application/json" not in content_type:
1✔
197
        return (
×
198
            None,
199
            None,
200
            JsonResponse(
201
                {"error": "Only application/json content type is allowed."}, status=415
202
            ),
203
        )
204

205
    bearer_token = _extract_bearer_token(request)
1✔
206
    if not bearer_token:
1✔
207
        return (
1✔
208
            None,
209
            None,
210
            JsonResponse({"error": "Missing or invalid Bearer token."}, status=401),
211
        )
212

213
    try:
1✔
214
        payload = json.loads(request.body)
1✔
215
    except json.JSONDecodeError:
×
216
        return None, None, JsonResponse({"error": "Invalid request."}, status=400)
×
217

218
    if not isinstance(payload, dict):
1✔
219
        return None, None, JsonResponse({"error": "Invalid request."}, status=400)
×
220

221
    data = cast(NotifyTaskPayload, payload)
1✔
222
    return data, bearer_token, None
1✔
223

224

225
def _get_notify_task(
1✔
226
    data: NotifyTaskPayload,
227
) -> tuple[Task | None, JsonResponse | None]:
228
    """Fetch task referenced in notification payload."""
229
    task_id = data.get("task_id")
1✔
230
    if not isinstance(task_id, str) or not task_id:
1✔
231
        return None, JsonResponse({"error": "No task id in the request."}, status=400)
×
232

233
    task = Task.objects.filter(task_id=task_id).select_related("runner_manager").first()
1✔
234
    if not task:
1✔
235
        return None, JsonResponse({"error": "Task not found."}, status=404)
×
236

237
    return task, None
1✔
238

239

240
def _authorize_notify_task(task: Task, bearer_token: str) -> JsonResponse | None:
1✔
241
    """Check bearer token against the task runner manager token."""
242
    runner_manager = task.runner_manager
1✔
243
    if not runner_manager or not runner_manager.token:
1✔
244
        log.error("Task %s has no runner manager token configured", task.task_id)
×
245
        return JsonResponse(
×
246
            {"error": "Runner manager token is not configured."},
247
            status=500,
248
        )
249

250
    if not secrets.compare_digest(bearer_token, runner_manager.token):
1✔
251
        return JsonResponse({"error": "Invalid Bearer token."}, status=403)
1✔
252

253
    return None
1✔
254

255

256
def _apply_notify_payload_to_task(task: Task, data: NotifyTaskPayload) -> None:
1✔
257
    """Persist task status and append optional script output details."""
258
    task.status = str(data["status"])
1✔
259

260
    script_output = task.script_output or ""
1✔
261
    error_message = data.get("error_message")
1✔
262
    if error_message is not None:
1✔
263
        script_output += f"{error_message}\n---\n"
×
264
    script_output_payload = data.get("script_output")
1✔
265
    if script_output_payload is not None:
1✔
266
        script_output += str(script_output_payload)
×
267

268
    task.script_output = script_output
1✔
269
    task.save()
1✔
270
    refresh_pending_task_ranks()
1✔
271

272

273
@csrf_exempt
1✔
274
def notify_task_end(request: WSGIRequest) -> JsonResponse:
1✔
275
    """Receive webhook from the Runner Manager service."""
276
    data, bearer_token, error_response = _parse_notify_task_end_request(request)
1✔
277
    if error_response:
1✔
278
        return error_response
1✔
279

280
    if data is None or bearer_token is None:
1✔
281
        return JsonResponse({"error": "Invalid request."}, status=400)
×
282

283
    task, error_response = _get_notify_task(data)
1✔
284
    if error_response:
1✔
285
        return error_response
×
286

287
    if task is None:
1✔
288
        return JsonResponse({"error": "Task not found."}, status=404)
×
289

290
    error_response = _authorize_notify_task(task, bearer_token)
1✔
291
    if error_response:
1✔
292
        return error_response
1✔
293

294
    if "status" not in data:
1✔
295
        return JsonResponse(
×
296
            {"status": "Task has not yet been successfully achieved."},
297
            status=500,
298
        )
299

300
    _apply_notify_payload_to_task(task, data)
1✔
301

302
    if task.status == "failed":
1✔
303
        send_email_item(f"Task {task.id} failed", "Task", task.task_id)
1✔
304

305
    if task.status == "completed":
1✔
306
        download_and_import_task_result(task)
×
307

308
    return JsonResponse({"status": "OK"}, status=200)
1✔
309

310

311
def _get_runner_manager_for_task(task: Task) -> RunnerManager | None:
1✔
312
    """Return task runner manager if available, otherwise log and return None."""
313
    try:
×
314
        return RunnerManager.objects.get(id=task.runner_manager_id)
×
315
    except RunnerManager.DoesNotExist:
×
316
        log.error(f"Runner manager not found for task {task.id}")
×
317
        return None
×
318
    except Exception as exc:  # noqa: BLE001
×
319
        log.error(f"Error downloading result for task {task.id}: {exc}")
×
320
        return None
×
321

322

323
def _get_task_result_manifest(
1✔
324
    task: Task, runner_manager: RunnerManager
325
) -> ResultManifest | None:
326
    """Fetch and validate task result manifest from runner manager."""
327
    if not task.task_id:
×
328
        log.error(f"Missing task_id for task {task.id}")
×
329
        return None
×
330

331
    result_url = _build_result_url(runner_manager.url, task.task_id)
×
332
    headers = _build_headers(runner_manager.token)
×
333
    response = _fetch_task_result(result_url, headers, task.id)
×
334
    if not response:
×
335
        return None
×
336

337
    try:
×
338
        manifest_payload = response.json()
×
339
    except ValueError:
×
340
        manifest_payload = {}
×
341

342
    if not isinstance(manifest_payload, dict):
×
343
        log.error(f"Invalid manifest JSON for task {task.id}")
×
344
        return None
×
345

346
    manifest = cast(ResultManifest, manifest_payload)
×
347
    if not manifest.get("files"):
×
348
        log.error(f"Invalid manifest JSON for task {task.id}")
×
349
        return None
×
350

351
    return manifest
×
352

353

354
def download_and_import_task_result(task: Task) -> None:
1✔
355
    """Download the result of a completed task from the runner manager, extract it,
356
    and import the encoded video back into Pod.
357

358
    Args:
359
        task (Task): Task object
360
    """
361
    runner_manager = _get_runner_manager_for_task(task)
×
362
    if not runner_manager:
×
363
        return
×
364

365
    manifest = _get_task_result_manifest(task, runner_manager)
×
366
    if manifest is None:
×
367
        return
×
368

369
    extracted_dir, extracted_vtt_path = _save_manifest_files(
×
370
        manifest, task, runner_manager.url, runner_manager.token
371
    )
372

373
    if not extracted_dir:
×
374
        log.error(f"Failed to import result for task {task.id}")
×
375
        return
×
376

377
    log.info(f"Successfully downloaded and extracted result for task {task.id}")
×
378
    _finalize_task_import(task, extracted_dir, extracted_vtt_path)
×
379

380

381
def _import_transcription_result(task: Task, extracted_vtt_path: str) -> None:
1✔
382
    """Import a transcription result produced by the Runner Manager.
383

384
    Expected: at least one .vtt file in the extracted directory.
385
    It will be attached to the related video. Language defaults to the
386
    video's `transcript` field (fallback to main_lang when empty).
387

388
    Args:
389
        task: Task of type "transcription" linked to a `Video`.
390
        extracted_vtt_path: Path to the extracted VTT file.
391
    """
392
    if not getattr(task, "video", None):
×
393
        raise RuntimeError("Transcription task is not linked to a video")
×
394

395
    log.info(f"Importing VTT file {extracted_vtt_path} for video {task.video.id}")
×
396

397
    # Get the video
398
    video_id = task.video.id
×
399
    video = Video.objects.get(id=video_id)
×
400
    # Default message
401
    msg = f"Transcription imported successfully: {extracted_vtt_path}"
×
402
    # Read the VTT file
403
    wvtt = webvtt.read(extracted_vtt_path)
×
404
    # Save VTT for Pod and notify user
405
    save_vtt_and_notify(video, msg, wvtt)
×
406
    log.info(f"Attached VTT transcript to video {video.id}")
×
407

408

409
def _get_destination_directory(task: Task, dest_base: str | None = None) -> str:
1✔
410
    """Get and create the destination directory for extraction.
411

412
    Args:
413
        task (Task): Task instance used to name the destination directory.
414
        dest_base (str | None): parent directory where to extract.
415
    Returns:
416
        str: path of the destination directory.
417
    """
418
    if dest_base is None:
1✔
419
        # Choose base directory based on task type (encoding, studio or transcription)
420
        if task.type == "transcription" and getattr(task, "video", None):
1✔
421
            video_dir = _format_video_directory(task.video.id)
1✔
422
            dest_dir = os.path.join(
1✔
423
                MEDIA_ROOT,
424
                VIDEOS_DIR,
425
                str(task.video.owner.owner.hashkey),
426
                video_dir,
427
            )
428
        elif task.type == "studio":
1✔
429
            dest_dir = os.path.join(MEDIA_ROOT, "tasks", str(task.id))
×
430
        else:
431
            video_dir = _format_video_directory(task.video.id)
1✔
432
            dest_dir = os.path.join(
1✔
433
                MEDIA_ROOT,
434
                VIDEOS_DIR,
435
                str(task.video.owner.owner.hashkey),
436
                video_dir,
437
            )
438
        log.info(f"Save result into {dest_dir}")
1✔
439
        os.makedirs(os.path.dirname(dest_dir), exist_ok=True)
1✔
440
    else:
441
        dest_dir = os.path.join(dest_base, str(task.id))
×
442
        os.makedirs(dest_dir, exist_ok=True)
×
443
    return dest_dir
1✔
444

445

446
def _is_safe_path(dest_dir: str, member: str) -> bool:
1✔
447
    """Check whether a manifest entry path is safe to write.
448

449
    Args:
450
        dest_dir (str): The destination directory.
451
        member (str): The manifest file path to validate.
452
    Returns:
453
        bool: True if the path is safe, False otherwise.
454
    """
455
    member_path = os.path.normpath(member)
×
456
    # Skip absolute paths and parent-traversal entries
457
    if member_path.startswith("..") or os.path.isabs(member_path):
×
458
        return False
×
459
    dest_path = os.path.normpath(os.path.join(dest_dir, member_path))
×
460
    normalized_dest = os.path.normpath(dest_dir)
×
461
    # Ensure extraction is within destination directory
462
    return dest_path.startswith(normalized_dest + os.sep) or dest_path == normalized_dest
×
463

464

465
def _should_extract_transcription_member(member: str) -> bool:
1✔
466
    """Return True when member must be extracted for transcription tasks."""
467
    if member.endswith("/"):
×
468
        return False
×
469
    base_l = os.path.basename(member).lower()
×
470
    return base_l.endswith(".vtt") or base_l.endswith(".json")
×
471

472

473
def _should_download_manifest_member(task: Task, dest_dir: str, file_path: str) -> bool:
1✔
474
    """Return True when the given manifest entry should be downloaded."""
475
    if not file_path:
×
476
        return False
×
477

478
    if task.type == "transcription" and not _should_extract_transcription_member(
×
479
        file_path
480
    ):
481
        return False
×
482

483
    if not _is_safe_path(dest_dir, file_path):
×
484
        log.warning(f"Ignored suspicious manifest file path: {file_path}")
×
485
        return False
×
486

487
    return True
×
488

489

490
def _download_manifest_member(
1✔
491
    url: str,
492
    headers: HeadersDict,
493
    file_path: str,
494
    task: Task,
495
) -> requests.Response | None:
496
    """Download a single manifest member file."""
497
    try:
×
498
        response = requests.get(url, headers=headers, timeout=60, stream=True)
×
499
    except requests.RequestException as exc:
×
500
        log.error(f"Error downloading file {file_path} for task {task.id}: {exc}")
×
501
        return None
×
502

503
    if response.status_code != 200:
×
504
        log.error(
×
505
            "Failed to download file %s for task %s: %s %s",
506
            file_path,
507
            task.id,
508
            response.status_code,
509
            response.text,
510
        )
511
        return None
×
512

513
    return response
×
514

515

516
def _store_manifest_member(
1✔
517
    response: requests.Response,
518
    dest_dir: str,
519
    file_path: str,
520
    expected_sha256: str | None = None,
521
) -> str:
522
    """Write a downloaded manifest member to disk and return destination path."""
523
    dest_path = os.path.normpath(os.path.join(dest_dir, file_path))
1✔
524
    parent = os.path.dirname(dest_path)
1✔
525
    if parent:
1✔
526
        os.makedirs(parent, exist_ok=True)
1✔
527

528
    if file_path.endswith("/"):
1✔
529
        os.makedirs(dest_path, exist_ok=True)
×
530
        return dest_path
×
531

532
    temp_path = _create_manifest_temp_path(parent or dest_dir)
1✔
533
    try:
1✔
534
        actual_sha256 = _stream_manifest_member_to_tempfile(response, temp_path)
1✔
535
        _validate_manifest_member_checksum(file_path, expected_sha256, actual_sha256)
1✔
536
        os.replace(temp_path, dest_path)
1✔
537
    except Exception:
1✔
538
        _remove_file_if_exists(temp_path)
1✔
539
        raise
1✔
540

541
    return dest_path
1✔
542

543

544
def _create_manifest_temp_path(temp_dir: str) -> str:
1✔
545
    """Create and return an empty temporary file path for atomic writes."""
546
    temp_file_descriptor, temp_path = tempfile.mkstemp(
1✔
547
        prefix=".manifest_",
548
        suffix=".part",
549
        dir=temp_dir,
550
    )
551
    os.close(temp_file_descriptor)
1✔
552
    return temp_path
1✔
553

554

555
def _stream_manifest_member_to_tempfile(
1✔
556
    response: requests.Response, temp_path: str
557
) -> str:
558
    """Stream response content to temp_path and return computed SHA-256."""
559
    checksum = sha256()
1✔
560
    with open(temp_path, "wb") as target:
1✔
561
        for chunk in response.iter_content(chunk_size=1024 * 1024):
1✔
562
            if not chunk:
1✔
563
                continue
×
564
            target.write(chunk)
1✔
565
            checksum.update(chunk)
1✔
566
    return checksum.hexdigest()
1✔
567

568

569
def _validate_manifest_member_checksum(
1✔
570
    file_path: str,
571
    expected_sha256: str | None,
572
    actual_sha256: str,
573
) -> None:
574
    """Raise on checksum mismatch when manifest provides an expected hash."""
575
    if expected_sha256 is None:
1✔
576
        return
1✔
577
    if actual_sha256 == expected_sha256:
1✔
578
        return
1✔
579
    raise ManifestMemberIntegrityError(
1✔
580
        f"Checksum mismatch for {file_path}: expected {expected_sha256}"
581
    )
582

583

584
def _remove_file_if_exists(path: str) -> None:
1✔
585
    """Delete file if present, ignoring absence and cleanup race conditions."""
586
    try:
1✔
587
        os.remove(path)
1✔
588
    except FileNotFoundError:
×
589
        pass
×
590
    except OSError:
×
591
        pass
×
592

593

594
def _parse_manifest_member_entry(
1✔
595
    task: Task, manifest_entry: object
596
) -> tuple[str | None, str | None]:
597
    """Extract (file_path, optional_sha256) from one manifest entry."""
598
    if isinstance(manifest_entry, str):
×
599
        return manifest_entry, None
×
600

601
    if not isinstance(manifest_entry, dict):
×
602
        log.warning(
×
603
            "Ignored manifest entry with unsupported format for task %s: %r",
604
            task.id,
605
            manifest_entry,
606
        )
607
        return None, None
×
608

609
    file_path = manifest_entry.get("file_path")
×
610
    if not isinstance(file_path, str) or not file_path:
×
611
        alt_file_path = manifest_entry.get("path")
×
612
        if isinstance(alt_file_path, str) and alt_file_path:
×
613
            file_path = alt_file_path
×
614
        else:
615
            log.warning(
×
616
                "Ignored manifest entry without file path for task %s: %r",
617
                task.id,
618
                manifest_entry,
619
            )
620
            return None, None
×
621

622
    checksum_value = manifest_entry.get("sha256")
×
623
    if checksum_value is None:
×
624
        return file_path, None
×
625

626
    if not isinstance(checksum_value, str):
×
627
        log.warning(
×
628
            "Ignored non-string sha256 for file %s in task %s",
629
            file_path,
630
            task.id,
631
        )
632
        return file_path, None
×
633

634
    normalized_checksum = checksum_value.strip().lower()
×
635
    if len(normalized_checksum) != 64 or any(
×
636
        char not in "0123456789abcdef" for char in normalized_checksum
637
    ):
638
        log.warning(
×
639
            "Ignored invalid sha256 for file %s in task %s",
640
            file_path,
641
            task.id,
642
        )
643
        return file_path, None
×
644

645
    return file_path, normalized_checksum
×
646

647

648
def _compute_manifest_retry_delay(attempt: int) -> float:
1✔
649
    """Return delay in seconds using exponential backoff with full jitter."""
650
    exponential_delay = min(
×
651
        MANIFEST_MEMBER_DOWNLOAD_BACKOFF_MAX_SECONDS,
652
        MANIFEST_MEMBER_DOWNLOAD_BACKOFF_BASE_SECONDS * (2 ** (attempt - 1)),
653
    )
654
    return random.uniform(0.0, exponential_delay)
×
655

656

657
def _download_and_store_manifest_member(
1✔
658
    url: str,
659
    headers: HeadersDict,
660
    file_path: str,
661
    task: Task,
662
    dest_dir: str,
663
    expected_sha256: str | None = None,
664
    max_attempts: int = MANIFEST_MEMBER_DOWNLOAD_MAX_RETRIES,
665
) -> str | None:
666
    """Download and store one manifest file with retry and integrity checks."""
667
    for attempt in range(1, max_attempts + 1):
1✔
668
        response = _download_manifest_member(url, headers, file_path, task)
1✔
669
        if response is None:
1✔
670
            if attempt == max_attempts:
×
671
                return None
×
672
            retry_delay = _compute_manifest_retry_delay(attempt)
×
673
            log.warning(
×
674
                "Retrying file %s for task %s after failed request (%s/%s), next try in %.2fs",
675
                file_path,
676
                task.id,
677
                attempt,
678
                max_attempts,
679
                retry_delay,
680
            )
681
            time.sleep(retry_delay)
×
682
            continue
×
683

684
        try:
1✔
685
            return _store_manifest_member(
1✔
686
                response,
687
                dest_dir,
688
                file_path,
689
                expected_sha256=expected_sha256,
690
            )
691
        except (requests.RequestException, ManifestMemberIntegrityError) as exc:
1✔
692
            if attempt == max_attempts:
1✔
693
                log.error(
1✔
694
                    "Failed to fully download file %s for task %s after %s attempts: %s",
695
                    file_path,
696
                    task.id,
697
                    max_attempts,
698
                    exc,
699
                )
700
                return None
1✔
701
            retry_delay = _compute_manifest_retry_delay(attempt)
1✔
702
            log.warning(
1✔
703
                "Retrying file %s for task %s after streamed transfer/integrity error (%s/%s), next try in %.2fs: %s",
704
                file_path,
705
                task.id,
706
                attempt,
707
                max_attempts,
708
                retry_delay,
709
                exc,
710
            )
711
            time.sleep(retry_delay)
1✔
712
        except OSError as exc:
×
713
            log.error(f"Failed to write file {file_path} for task {task.id}: {exc}")
×
714
            return None
×
715
        finally:
716
            response.close()
1✔
717
    return None
×
718

719

720
def _save_manifest_files(
1✔
721
    manifest: ResultManifest,
722
    task: Task,
723
    manager_url: str,
724
    token: str,
725
    dest_base: str | None = None,
726
) -> tuple[str | None, str]:
727
    """Download files listed in a manifest JSON and store them in dest_dir.
728

729
    Args:
730
        manifest: Manifest JSON with at least a "files" list.
731
        task: Task instance used to name the destination directory.
732
        manager_url: Runner manager base URL.
733
        token: Runner manager token for Authorization header.
734
        dest_base: Optional parent directory where to store.
735

736
    Returns:
737
        (dest_dir, vtt_path) where vtt_path may be empty.
738
    """
739
    files = manifest.get("files")
×
740
    if not isinstance(files, list) or not files:
×
741
        log.error(f"Manifest missing files for task {task.id}")
×
742
        return None, ""
×
743

744
    if not task.task_id:
×
745
        log.error(f"Missing task_id for task {task.id}")
×
746
        return None, ""
×
747

748
    dest_dir = _get_destination_directory(task, dest_base)
×
749
    dest_vtt_path = ""
×
750
    headers = _build_file_headers(token)
×
751

752
    for manifest_entry in files:
×
753
        file_path, expected_sha256 = _parse_manifest_member_entry(task, manifest_entry)
×
754
        if file_path is None:
×
755
            continue
×
756

757
        if not _should_download_manifest_member(task, dest_dir, file_path):
×
758
            continue
×
759

760
        url = _build_result_file_url(manager_url, task.task_id, file_path)
×
761
        dest_path = _download_and_store_manifest_member(
×
762
            url,
763
            headers,
764
            file_path,
765
            task,
766
            dest_dir,
767
            expected_sha256=expected_sha256,
768
        )
769
        if dest_path is None:
×
770
            return None, ""
×
771

772
        if task.type == "transcription" and dest_path.lower().endswith(".vtt"):
×
773
            dest_vtt_path = dest_path
×
774

775
    log.info(f"Downloaded manifest files for task {task.id} into {dest_dir}")
×
776
    return dest_dir, dest_vtt_path
×
777

778

779
def _create_video_from_studio_task(task: Task, extracted_dir: str | None = None) -> int:
1✔
780
    """Create a Pod video from a studio task and relocate artifacts.
781

782
    Workflow:
783
    - Validate that the task references a recording (`task.recording_id`).
784
    - Expect `studio_base.mp4` under MEDIA_ROOT/tasks/<task.id> by default.
785
    - Create a new Video via `save_basic_video(recording, src_file)`.
786
    - Move the task extraction directory to MEDIA_ROOT/VIDEOS_DIR/<hashkey>/<%04d video_id>.
787
    - Return the created video's id.
788

789
    Args:
790
        task: The `Task` instance (must contain `recording_id`).
791
        extracted_dir: Optional source directory containing extracted studio files.
792

793
    Returns:
794
        int: The created video id.
795

796
    Raises:
797
        RuntimeError: If recording is missing or expected files/directories are not present.
798
    """
799
    # Ensure the task is linked to a recording
800
    if not getattr(task, "recording_id", None):
1✔
801
        raise RuntimeError("Studio task missing recording_id")
×
802

803
    # Compute source file path from extracted_dir or default MEDIA_ROOT/tasks/<task.id>.
804
    src_dir = extracted_dir or os.path.join(_get_media_root(), "tasks", str(task.id))
1✔
805
    src_file = os.path.join(src_dir, "studio_base.mp4")
1✔
806
    if not os.path.exists(src_file) or not os.path.isfile(src_file):
1✔
807
        raise RuntimeError(f"Source studio file not found: {src_file}")
×
808

809
    # Load recording and create a new Pod video from the base file
810
    try:
1✔
811
        recording = Recording.objects.get(id=task.recording_id)
1✔
812
        # Create a new Pod video
813
        video = save_basic_video(recording, src_file)
1✔
814
    except Recording.DoesNotExist:
×
815
        raise RuntimeError(f"Recording not found: {task.recording_id}")
×
816

817
    # Move the rest of the task artifacts into the new video directory
818
    _move_task_directory_to_video(task, video.id, src_dir=src_dir, recording=recording)
1✔
819

820
    return video.id
1✔
821

822

823
def _get_user_hashkey_from_recording(recording: Recording) -> str:
1✔
824
    """Return the user's hashkey from a recording or raise a clear error."""
825
    try:
1✔
826
        return str(recording.user.owner.hashkey)
1✔
827
    except Exception as exc:
1✔
828
        raise RuntimeError("Unable to resolve recording owner's hashkey") from exc
1✔
829

830

831
def _merge_or_move_directory(src_dir: str, dest_dir: str) -> None:
1✔
832
    """Move src_dir to dest_dir; merge contents if destination already exists."""
833
    # Ensure parent of dest_dir exists
834
    os.makedirs(os.path.dirname(dest_dir), exist_ok=True)
1✔
835

836
    # If destination exists, merge contents; else move the whole directory
837
    if os.path.exists(dest_dir):
1✔
838
        if not os.path.isdir(dest_dir):
1✔
839
            raise RuntimeError(
×
840
                f"Destination path exists and is not a directory: {dest_dir}"
841
            )
842
        for entry in os.listdir(src_dir):
1✔
843
            src_path = os.path.join(src_dir, entry)
1✔
844
            target_path = os.path.join(dest_dir, entry)
1✔
845
            # If target exists, remove it before move to avoid errors
846
            if os.path.exists(target_path):
1✔
847
                if os.path.isdir(target_path):
1✔
848
                    shutil.rmtree(target_path)
1✔
849
                else:
850
                    os.remove(target_path)
1✔
851
            shutil.move(src_path, dest_dir)
1✔
852
        # Cleanup empty src_dir (best effort)
853
        try:
1✔
854
            os.rmdir(src_dir)
1✔
855
        except OSError:
×
856
            pass
×
857
    else:
858
        shutil.move(src_dir, dest_dir)
1✔
859

860

861
def _move_task_directory_to_video(
1✔
862
    task: Task,
863
    video_id: int,
864
    src_dir: str | None = None,
865
    recording: Recording | None = None,
866
) -> None:
867
    """Move the task extraction directory into the final video directory.
868

869
    Source: MEDIA_ROOT/tasks/<task.id> by default.
870
    Destination: MEDIA_ROOT/VIDEOS_DIR/<hashkey>/<%04d video_id>
871

872
    Args:
873
        task: The `Task` instance (must contain `recording_id`).
874
        video_id: The destination video id used to build the target path.
875
        src_dir: Optional explicit source directory for extracted studio files.
876
        recording: Optional recording object to avoid loading it twice.
877

878
    Raises:
879
        RuntimeError: When required data or paths are missing.
880
    """
881
    if not getattr(task, "id", None):
1✔
882
        raise RuntimeError("Task missing id")
×
883

884
    if not getattr(task, "recording_id", None):
1✔
885
        raise RuntimeError("Task missing recording_id")
×
886

887
    source_dir = src_dir or os.path.join(_get_media_root(), "tasks", str(task.id))
1✔
888
    if not os.path.exists(source_dir) or not os.path.isdir(source_dir):
1✔
889
        raise RuntimeError(f"Source directory not found: {source_dir}")
×
890

891
    if recording is None:
1✔
892
        try:
×
893
            recording = Recording.objects.get(id=task.recording_id)
×
894
        except Recording.DoesNotExist:
×
895
            raise RuntimeError(f"Recording not found: {task.recording_id}")
×
896

897
    user_hashkey = _get_user_hashkey_from_recording(recording)
1✔
898
    video_dir = _format_video_directory(video_id)
1✔
899
    dest_dir = os.path.join(
1✔
900
        _get_media_root(), _get_videos_dir(), user_hashkey, video_dir
901
    )
902

903
    _merge_or_move_directory(source_dir, dest_dir)
1✔
904

905
    log.info(f"Moved task directory from {source_dir} to {dest_dir}")
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc