• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 11571148362

29 Oct 2024 09:39AM UTC coverage: 70.755%. First build
11571148362

Pull #1210

github

web-flow
[DONE] Use get_thumbnail to serve video thumbnail via caching system (#1221)

Prevents video thumbnail url to be publicly available, and serve it via cache system
Pull Request #1210: [RELEASE/WIP] Esup-Pod 3.8.2

57 of 113 new or added lines in 21 files covered. (50.44%)

12010 of 16974 relevant lines covered (70.76%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.22
/pod/video_encode_transcript/transcript.py
1
"""Esup-Pod transcript video functions."""
2

3
from django.conf import settings
1✔
4
from django.core.files import File
1✔
5
from pod.completion.models import Track
1✔
6
from pod.main.tasks import task_start_transcript
1✔
7
from webvtt import Caption, WebVTT
1✔
8

9
from .utils import (
1✔
10
    send_email,
11
    send_email_transcript,
12
    change_encoding_step,
13
    add_encoding_log,
14
)
15
from ..video.models import Video
1✔
16
import importlib.util
1✔
17

18
if (
1✔
19
    importlib.util.find_spec("vosk") is not None
20
    or importlib.util.find_spec("stt") is not None
21
    or importlib.util.find_spec("whisper") is not None
22
):
23
    from .transcript_model import start_transcripting
×
24

25

26
from .encoding_utils import sec_to_timestamp
1✔
27

28
import os
1✔
29
import time
1✔
30

31
from tempfile import NamedTemporaryFile
1✔
32

33
import threading
1✔
34
import logging
1✔
35

36
DEBUG = getattr(settings, "DEBUG", False)
1✔
37

38
if getattr(settings, "USE_PODFILE", False):
1✔
39
    __FILEPICKER__ = True
1✔
40
    from pod.podfile.models import CustomFileModel
1✔
41
else:
42
    __FILEPICKER__ = False
×
43
    from pod.main.models import CustomFileModel
×
44

45
EMAIL_ON_TRANSCRIPTING_COMPLETION = getattr(
1✔
46
    settings, "EMAIL_ON_TRANSCRIPTING_COMPLETION", True
47
)
48
TRANSCRIPTION_MODEL_PARAM = getattr(settings, "TRANSCRIPTION_MODEL_PARAM", False)
1✔
49
USE_TRANSCRIPTION = getattr(settings, "USE_TRANSCRIPTION", False)
1✔
50
if USE_TRANSCRIPTION:
1✔
51
    TRANSCRIPTION_TYPE = getattr(settings, "TRANSCRIPTION_TYPE", "STT")
1✔
52
TRANSCRIPTION_NORMALIZE = getattr(settings, "TRANSCRIPTION_NORMALIZE", False)
1✔
53
CELERY_TO_ENCODE = getattr(settings, "CELERY_TO_ENCODE", False)
1✔
54

55
USE_REMOTE_ENCODING_TRANSCODING = getattr(
1✔
56
    settings, "USE_REMOTE_ENCODING_TRANSCODING", False
57
)
58
if USE_REMOTE_ENCODING_TRANSCODING:
1✔
59
    from .transcripting_tasks import start_transcripting_task
1✔
60

61
CAPTIONS_STRICT_ACCESSIBILITY = getattr(
1✔
62
    settings,
63
    "CAPTIONS_STRICT_ACCESSIBILITY",
64
    False,
65
)
66

67
log = logging.getLogger(__name__)
1✔
68

69
"""
70
TO TEST IN THE SHELL -->
71
from pod.video.transcript import *
72
stt_model = get_model("fr")
73
msg, webvtt, all_text = main_stt_transcript(
74
    "/test/audio_192k_pod.mp3", # file
75
    177, # file duration
76
    stt_model # model stt loaded
77
)
78
print(webvtt)
79
"""
80

81

82
# ##########################################################################
83
# TRANSCRIPT VIDEO: THREAD TO LAUNCH TRANSCRIPT
84
# ##########################################################################
85
def start_transcript(video_id, threaded=True):
1✔
86
    """
87
    Call to start transcript main function.
88

89
    Will launch transcript mode depending on configuration.
90
    """
91
    if threaded:
1✔
92
        if CELERY_TO_ENCODE:
1✔
93
            task_start_transcript.delay(video_id)
×
94
        else:
95
            log.info("START TRANSCRIPT VIDEO %s" % video_id)
1✔
96
            t = threading.Thread(target=main_threaded_transcript, args=[video_id])
1✔
97
            t.setDaemon(True)
1✔
98
            t.start()
1✔
99
    else:
100
        main_threaded_transcript(video_id)
1✔
101

102

103
def main_threaded_transcript(video_to_encode_id):
1✔
104
    """
105
    Transcript main function.
106

107
    Will check all configuration and file and launch transcript.
108
    """
109
    change_encoding_step(video_to_encode_id, 5, "transcripting audio")
1✔
110

111
    video_to_encode = Video.objects.get(id=video_to_encode_id)
1✔
112
    video_to_encode.encoding_in_progress = True
1✔
113
    video_to_encode.save()
1✔
114
    msg = ""
1✔
115
    lang = video_to_encode.transcript
1✔
116
    # check if TRANSCRIPTION_MODEL_PARAM [lang] exist
117
    if not TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE].get(lang):
1✔
118
        msg += "\n no stt model found for lang: %s." % lang
×
119
        msg += "Please add it in TRANSCRIPTION_MODEL_PARAM."
×
120
        change_encoding_step(video_to_encode.id, -1, msg)
×
121
        send_email(msg, video_to_encode.id)
×
122
    else:
123
        mp3file = (
1✔
124
            video_to_encode.get_video_mp3().source_file
125
            if video_to_encode.get_video_mp3()
126
            else None
127
        )
128
        if mp3file is None:
1✔
129
            msg += "\n no mp3 file found for video: %s." % video_to_encode.id
×
130
            change_encoding_step(video_to_encode.id, -1, msg)
×
131
            send_email(msg, video_to_encode.id)
×
132
        else:
133
            mp3filepath = mp3file.path
1✔
134
            if USE_REMOTE_ENCODING_TRANSCODING:
1✔
135
                start_transcripting_task.delay(
1✔
136
                    video_to_encode.id, mp3filepath, video_to_encode.duration, lang
137
                )
138
            else:
139
                msg, webvtt = start_transcripting(
×
140
                    mp3filepath, video_to_encode.duration, lang
141
                )
142
                save_vtt_and_notify(video_to_encode, msg, webvtt)
×
143
    add_encoding_log(video_to_encode.id, msg)
1✔
144

145

146
def save_vtt_and_notify(video_to_encode, msg, webvtt):
1✔
147
    """Call save vtt file function and notify by mail at the end."""
148
    msg += save_vtt(video_to_encode, webvtt)
×
149
    change_encoding_step(video_to_encode.id, 0, "done")
×
150
    video_to_encode.encoding_in_progress = False
×
151
    video_to_encode.save()
×
152
    # envois mail fin transcription
153
    if EMAIL_ON_TRANSCRIPTING_COMPLETION:
×
154
        send_email_transcript(video_to_encode)
×
155
    add_encoding_log(video_to_encode.id, msg)
×
156

157

158
def save_vtt(video: Video, webvtt: WebVTT, lang_code: str = None):
1✔
159
    """Save webvtt file with the video."""
160
    msg = "\nSAVE TRANSCRIPT WEBVTT : %s" % time.ctime()
×
161
    lang = lang_code if lang_code else video.transcript
×
162
    temp_vtt_file = NamedTemporaryFile(suffix=".vtt")
×
163
    webvtt.save(temp_vtt_file.name)
×
164
    if webvtt.captions:
×
NEW
165
        if TRANSCRIPTION_TYPE != "WHISPER":
×
NEW
166
            improve_captions_accessibility(webvtt)
×
167
        msg += "\nstore vtt file in bdd with CustomFileModel model file field"
×
168
        if __FILEPICKER__:
×
169
            video_dir = video.get_or_create_video_folder()
×
170
            """
171
            previousSubtitleFile = CustomFileModel.objects.filter(
172
                name__startswith="subtitle_%s" % lang,
173
                folder=video_dir,
174
                created_by=video.owner
175
            )
176
            """
177
            # for subt in previousSubtitleFile:
178
            #     subt.delete()
179
            subtitle_file, created = CustomFileModel.objects.get_or_create(
×
180
                name="subtitle_%s_%s" % (lang, time.strftime("%Y%m%d-%H%M%S")),
181
                folder=video_dir,
182
                created_by=video.owner,
183
            )
184
            if subtitle_file.file and os.path.isfile(subtitle_file.file.path):
×
185
                os.remove(subtitle_file.file.path)
×
186
        else:
187
            subtitle_file, created = CustomFileModel.objects.get_or_create()
×
188

189
        subtitle_file.file.save(
×
190
            "subtitle_%s_%s.vtt" % (lang, time.strftime("%Y%m%d-%H%M%S")),
191
            File(temp_vtt_file),
192
        )
193
        msg += "\nstore vtt file in bdd with Track model src field"
×
194

195
        subtitle_btt, created = Track.objects.get_or_create(video=video, lang=lang)
×
196
        subtitle_btt.src = subtitle_file
×
197
        subtitle_btt.lang = lang
×
198
        subtitle_btt.save()
×
199
    else:
200
        msg += "\nERROR SUBTITLES Output size is 0"
×
201
    return msg
×
202

203

204
def remove_unnecessary_spaces(text: str) -> str:
1✔
205
    """
206
    Remove unnecessary spaces from a string.
207

208
    Args:
209
        text (str): The string.
210

211
    Returns:
212
        str: The new string.
213
    """
214
    return " ".join(text.split())
×
215

216

217
def improve_captions_accessibility(
1✔
218
    webvtt, strict_accessibility=CAPTIONS_STRICT_ACCESSIBILITY
219
):
220
    """
221
    Parse the vtt file in argument to render the caption conform to accessibility.
222

223
    - see `https://github.com/knarf18/Bonnes-pratiques-du-sous-titrage/blob/master/Liste%20de%20bonnes%20pratiques.md` # noqa: E501
224
    - 40 car maximum per line (CPL)
225
    - 2 lines max by caption
226

227
    Args:
228
        webvtt (:class:`webvtt.WebVTT`): The webvtt file content
229
        strict_accessibility (bool): If True, the caption will be more accessible
230

231
    """
232
    new_captions = []
×
233
    for caption in webvtt.captions:
×
234
        sent = split_string(caption.text, 40 if strict_accessibility else 55, sep=" ")
×
235
        # nb mots total
236
        nbTotWords = len(caption.text.split())
×
237
        if len(sent) > 2:
×
238
            num_captions = int(len(sent) / 2)
×
239
            if len(sent) % 2:
×
240
                num_captions += 1
×
241
            dur = caption.end_in_seconds - caption.start_in_seconds
×
242
            # On se positionne sur le point de départ en sec
243
            startTime = caption.start_in_seconds
×
244
            for x in range(num_captions):
×
245
                new_cap = Caption()
×
246
                new_cap.text = remove_unnecessary_spaces(get_cap_text(sent, x))
×
247
                # Durée d'affichage au prorata du nombre de mots
248
                timeCalc = dur * (len(new_cap.text.split()) / nbTotWords)
×
249
                new_cap.start = sec_to_timestamp(startTime)
×
250
                new_cap.end = sec_to_timestamp(startTime + timeCalc)
×
251
                startTime = startTime + timeCalc
×
252
                new_captions.append(new_cap)
×
253
        else:
254
            new_cap = Caption()
×
255
            new_cap.start = caption.start
×
256
            new_cap.end = caption.end
×
257
            new_cap.text = "\n".join(sent)
×
258
            new_captions.append(new_cap)
×
259
    # remove all old captions
260
    while len(webvtt.captions) > 0:
×
261
        del webvtt.captions[0]
×
262
    # add the new one
263
    for cap in new_captions:
×
264
        webvtt.captions.append(cap)
×
265
    webvtt.save()
×
266

267

268
def get_cap_text(sent, x):
1✔
269
    """
270
    Get the text in the sent array at the position gived in arg.
271

272
    Args:
273
        sent (list): The list of text
274
        x (int): The position to extract
275

276
    Returns:
277
        str: The extracted text
278
    """
279
    new_cap_text = sent[x * 2]
×
280
    try:
×
281
        new_cap_text += "\n" + sent[x * 2 + 1]
×
282
    except IndexError:
×
283
        pass
×
284
    return new_cap_text
×
285

286

287
def pad(line, limit):
1✔
288
    """
289
    Add some space at the end of line to specified limit.
290

291
    Args:
292
        line (str): A line of text
293
        limit (int): The size of line
294

295
    Returns:
296
        str: the line with space at the end
297
    """
298
    return line + " " * (limit - len(line))
×
299

300

301
def split_string(text, limit, sep=" "):
1✔
302
    """
303
    Split text by word for specified limit.
304

305
    Args:
306
        text (str): the text of the caption
307
        limit (int): size of line
308
        sep (str): default " "
309

310
    Returns:
311
        array: list of words in the text
312
    """
313
    words = text.split()
×
314
    if max(map(len, words)) > limit:
×
315
        raise ValueError("limit is too small")
×
316
    res = []
×
317
    part = words[0]
×
318
    others = words[1:]
×
319
    for word in others:
×
320
        if len(sep) + len(word) > limit - len(part):
×
321
            res.append(part)
×
322
            part = word
×
323
        else:
324
            part += sep + word
×
325
    if part:
×
326
        res.append(part)
×
327
    # add space to the end of line
328
    result = [pad(line, limit) for line in res]
×
329
    return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc