• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 26805325804

02 Jun 2026 07:32AM UTC coverage: 69.176%. First build
26805325804

Pull #1454

github

web-flow
Merge 284ffcac1 into 05026e2ce
Pull Request #1454: Security Hardening and Priority-User Support for Encoding/Transcript workflows

406 of 582 new or added lines in 14 files covered. (69.76%)

13618 of 19686 relevant lines covered (69.18%)

0.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.45
/pod/video_encode_transcript/rest_views.py
1
import json
1✔
2
import logging
1✔
3
import os
1✔
4

5
import webvtt
1✔
6
from django.conf import settings
1✔
7
from django.core.exceptions import SuspiciousFileOperation, SuspiciousOperation
1✔
8
from django.shortcuts import get_object_or_404
1✔
9
from django.utils._os import safe_join
1✔
10
from django.utils.translation import gettext_lazy as _
1✔
11
from django.views.decorators.csrf import csrf_exempt
1✔
12
from rest_framework import serializers, viewsets
1✔
13
from rest_framework.decorators import action, api_view
1✔
14
from rest_framework.response import Response
1✔
15

16
from pod.recorder.models import Recording
1✔
17
from pod.video.models import Video
1✔
18
from pod.video.rest_views import VideoSerializer
1✔
19

20
from .models import EncodingAudio, EncodingVideo, PlaylistVideo, VideoRendition
1✔
21

22
USE_TRANSCRIPTION = getattr(settings, "USE_TRANSCRIPTION", False)
1✔
23
if USE_TRANSCRIPTION:
1✔
24
    from pod.video_encode_transcript.transcript import start_transcript
1✔
25

26
MEDIA_ROOT = getattr(settings, "MEDIA_ROOT", "")
1✔
27

28
DEBUG = getattr(settings, "DEBUG", True)
1✔
29
logger = logging.getLogger(__name__)
1✔
30
if DEBUG:
1✔
31
    logger.setLevel(logging.DEBUG)
1✔
32

33

34
def _safe_temp_media_path(filename: str) -> str:
1✔
35
    """Resolve a temporary media file path and keep it under MEDIA_ROOT/temp."""
NEW
36
    media_temp_dir = os.path.realpath(os.path.join(MEDIA_ROOT, "temp"))
×
NEW
37
    try:
×
NEW
38
        filepath = os.path.realpath(safe_join(media_temp_dir, filename))
×
NEW
39
        if os.path.commonpath([filepath, media_temp_dir]) != media_temp_dir:
×
NEW
40
            raise SuspiciousOperation(_("Unsafe temporary path."))
×
NEW
41
    except (SuspiciousFileOperation, ValueError) as exc:
×
NEW
42
        raise SuspiciousOperation(_("Unsafe temporary path.")) from exc
×
NEW
43
    return filepath
×
44

45

46
class VideoRenditionSerializer(serializers.HyperlinkedModelSerializer):
1✔
47
    class Meta:
1✔
48
        model = VideoRendition
1✔
49
        fields = (
1✔
50
            "id",
51
            "url",
52
            "resolution",
53
            "video_bitrate",
54
            "audio_bitrate",
55
            "encode_mp4",
56
            "sites",
57
        )
58

59

60
class EncodingVideoSerializer(serializers.HyperlinkedModelSerializer):
1✔
61
    """Serializer for the EncodingVideo model."""
62

63
    class Meta:
1✔
64
        model = EncodingVideo
1✔
65
        fields = (
1✔
66
            "id",
67
            "url",
68
            "name",
69
            "video",
70
            "rendition",
71
            "encoding_format",
72
            "source_file",
73
            "sites_all",
74
        )
75

76

77
class EncodingAudioSerializer(serializers.HyperlinkedModelSerializer):
1✔
78
    """Serializer for the EncodingAudio model."""
79

80
    class Meta:
1✔
81
        model = EncodingAudio
1✔
82
        fields = (
1✔
83
            "id",
84
            "url",
85
            "name",
86
            "video",
87
            "encoding_format",
88
            "source_file",
89
            "sites_all",
90
        )
91

92

93
class EncodingVideoViewSet(viewsets.ModelViewSet):
1✔
94
    """Viewset for EncodingVideo model."""
95

96
    queryset = EncodingVideo.objects.all()
1✔
97
    serializer_class = EncodingVideoSerializer
1✔
98
    filterset_fields = [
1✔
99
        "video",
100
    ]
101

102
    @action(detail=False, methods=["get"])
1✔
103
    def video_encodedfiles(self, request):
1✔
104
        """Retrieve encoded video files."""
105
        encoded_videos = EncodingVideoViewSet.filter_encoded_medias(
×
106
            self.queryset, request
107
        )
108
        encoded_videos = sorted(encoded_videos, key=lambda x: x.height)
×
109
        serializer = EncodingVideoSerializer(
×
110
            encoded_videos, many=True, context={"request": request}
111
        )
112
        return Response(serializer.data)
×
113

114
    @staticmethod
1✔
115
    def filter_encoded_medias(queryset, request):
1✔
116
        """Filter encoded media files."""
117
        encoded_audios = queryset
×
118
        if request.GET.get("video"):
×
119
            encoded_audios = encoded_audios.filter(video__id=request.GET.get("video"))
×
120
        if request.GET.get("extension"):
×
121
            encoded_audios = encoded_audios.filter(
×
122
                source_file__iendswith=request.GET.get("extension")
123
            )
124
        return encoded_audios
×
125

126

127
class EncodingAudioViewSet(viewsets.ModelViewSet):
1✔
128
    """Viewset for EncodingAudio model."""
129

130
    queryset = EncodingAudio.objects.all()
1✔
131
    serializer_class = EncodingAudioSerializer
1✔
132
    filterset_fields = [
1✔
133
        "video",
134
    ]
135

136
    @action(detail=False, methods=["get"])
1✔
137
    def audio_encodedfiles(self, request):
1✔
138
        """Retrieve encoded audio files."""
139
        encoded_audios = EncodingVideoViewSet.filter_encoded_medias(
×
140
            self.queryset, request
141
        )
142
        serializer = EncodingAudioSerializer(
×
143
            encoded_audios, many=True, context={"request": request}
144
        )
145
        return Response(serializer.data)
×
146

147

148
class VideoRenditionViewSet(viewsets.ModelViewSet):
1✔
149
    queryset = VideoRendition.objects.all()
1✔
150
    serializer_class = VideoRenditionSerializer
1✔
151

152

153
class PlaylistVideoSerializer(serializers.HyperlinkedModelSerializer):
1✔
154
    class Meta:
1✔
155
        model = PlaylistVideo
1✔
156
        fields = (
1✔
157
            "id",
158
            "url",
159
            "name",
160
            "video",
161
            "encoding_format",
162
            "source_file",
163
            "sites_all",
164
        )
165

166

167
class PlaylistVideoViewSet(viewsets.ModelViewSet):
1✔
168
    queryset = PlaylistVideo.objects.all()
1✔
169
    serializer_class = PlaylistVideoSerializer
1✔
170

171

172
@api_view(["GET"])
1✔
173
def launch_encode_view(request):
1✔
174
    """View API for launching video encoding."""
175
    video = get_object_or_404(Video, slug=request.GET.get("slug"))
×
176
    if (
×
177
        video is not None
178
        and (
179
            not hasattr(video, "launch_encode") or getattr(video, "launch_encode") is True
180
        )
181
        and video.encoding_in_progress is False
182
    ):
183
        video.launch_encode = True
×
184
        video.save()
×
185
    return Response(VideoSerializer(instance=video, context={"request": request}).data)
×
186

187

188
@api_view(["GET"])
1✔
189
def launch_transcript_view(request):
1✔
190
    """View API for launching transcript."""
191
    video = get_object_or_404(Video, slug=request.GET.get("slug"))
×
192
    if video is not None and video.get_video_mp3():
×
193
        start_transcript(video.id, threaded=True)
×
194
    return Response(VideoSerializer(instance=video, context={"request": request}).data)
×
195

196

197
@csrf_exempt
1✔
198
@api_view(["POST"])
1✔
199
def store_remote_encoded_video(request):
1✔
200
    """View API for storing remote encoded videos."""
201
    from .encode import end_of_encoding, store_encoding_info
×
202
    from .Encoding_video_model import Encoding_video_model
×
203

204
    video_id = request.GET.get("id", 0)
×
205
    logger.info("Start importing encoding data for video: %s" % video_id)
×
206
    video = get_object_or_404(Video, id=video_id)
×
207
    # start_store_remote_encoding_video(video_id)
208
    # check if video is encoding!!!
209
    data = json.loads(request.body.decode("utf-8"))
×
210
    if video.encoding_in_progress is False:
×
NEW
211
        raise SuspiciousOperation(_("Video not being encoded."))
×
212
    if str(video_id) != str(data["video_id"]):
×
213
        raise SuspiciousOperation(
×
214
            _("Different video id: %(expected)s - %(received)s")
215
            % {"expected": video_id, "received": data["video_id"]}
216
        )
217
    encoding_video = Encoding_video_model(
×
218
        video_id, data["video_path"], data["cut_start"], data["cut_end"]
219
    )
220
    encoding_video.start = data["start"]
×
221
    encoding_video.stop = data["stop"]
×
222
    final_video = store_encoding_info(video_id, encoding_video)
×
223
    end_of_encoding(final_video)
×
224
    return Response(VideoSerializer(instance=video, context={"request": request}).data)
×
225

226

227
@csrf_exempt
1✔
228
@api_view(["POST"])
1✔
229
def store_remote_encoded_video_studio(request):
1✔
230
    from .encode import store_encoding_studio_info
×
231

232
    recording_id = request.GET.get("recording_id", 0)
×
233
    get_object_or_404(Recording, id=recording_id)
×
234
    data = json.loads(request.body.decode("utf-8"))
×
235
    store_encoding_studio_info(recording_id, data["video_output"], data["msg"])
×
236
    return Response("ok")
×
237

238

239
@csrf_exempt
1✔
240
@api_view(["POST"])
1✔
241
def store_remote_transcripted_video(request):
1✔
242
    """View API for storing remote transcripted videos."""
243
    from .transcript import save_vtt_and_notify
×
244

245
    video_id = request.GET.get("id", 0)
×
246
    video = get_object_or_404(Video, id=video_id)
×
247
    # check if video is encoding!!!
248
    data = json.loads(request.body.decode("utf-8"))
×
249
    if str(video_id) != str(data["video_id"]):
×
250
        raise SuspiciousOperation(
×
251
            _("Different video id: %(expected)s - %(received)s")
252
            % {"expected": video_id, "received": data["video_id"]}
253
        )
254
    logger.info("Start importing transcription data for video: %s" % video_id)
×
255
    filename = os.path.basename(data["temp_vtt_file"])
×
NEW
256
    filepath = _safe_temp_media_path(filename)
×
NEW
257
    media_temp_dir = os.path.realpath(os.path.join(MEDIA_ROOT, "temp"))
×
NEW
258
    if os.path.commonpath([filepath, media_temp_dir]) != media_temp_dir:
×
NEW
259
        raise SuspiciousOperation(_("Unsafe temporary path."))
×
260
    new_vtt = webvtt.read(filepath)
×
261
    save_vtt_and_notify(video, data["msg"], new_vtt)
×
262
    os.remove(filepath)
×
263
    return Response(VideoSerializer(instance=video, context={"request": request}).data)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc