• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 22725050893

05 Mar 2026 03:28PM UTC coverage: 68.07% (+0.003%) from 68.067%
22725050893

Pull #1409

github

web-flow
Merge 7c29945ad into 3b7eb8722
Pull Request #1409: Fix Thumbnail Persistence and Completion Alert

1 of 24 new or added lines in 3 files covered. (4.17%)

8 existing lines in 4 files now uncovered.

12923 of 18985 relevant lines covered (68.07%)

0.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.11
/pod/video_encode_transcript/transcript.py
1
"""Esup-Pod transcript video functions."""
2

3
import importlib.util
1✔
4

5
from django.conf import settings
1✔
6
from django.core.files import File
1✔
7
from pod.completion.models import Track
1✔
8
from pod.main.tasks import task_start_transcript
1✔
9
from webvtt import Caption, WebVTT
1✔
10

11
from ..video.models import Video
1✔
12
from .utils import (
1✔
13
    add_encoding_log,
14
    change_encoding_step,
15
    send_email,
16
    send_email_transcript,
17
)
18

19
if (
1✔
20
    importlib.util.find_spec("vosk") is not None
21
    or importlib.util.find_spec("whisper") is not None
22
):
23
    from .transcript_model import start_transcripting
×
24
else:
25

26
    def start_transcripting(*args, **kwargs):
1✔
27
        raise NotImplementedError("No transcription engine available.")
×
28

29

30
import logging
1✔
31
import os
1✔
32
import threading
1✔
33
import time
1✔
34
from tempfile import NamedTemporaryFile
1✔
35

36
from .encoding_utils import sec_to_timestamp
1✔
37

38
if getattr(settings, "USE_PODFILE", False):
1✔
39
    __FILEPICKER__ = True
1✔
40
    from pod.podfile.models import CustomFileModel
1✔
41
else:
42
    __FILEPICKER__ = False
×
43
    from pod.main.models import CustomFileModel
×
44

45
EMAIL_ON_TRANSCRIPTING_COMPLETION = getattr(
1✔
46
    settings, "EMAIL_ON_TRANSCRIPTING_COMPLETION", True
47
)
48
TRANSCRIPTION_MODEL_PARAM = getattr(settings, "TRANSCRIPTION_MODEL_PARAM", False)
1✔
49
USE_TRANSCRIPTION = getattr(settings, "USE_TRANSCRIPTION", False)
1✔
50
TRANSCRIPTION_TYPE = (
1✔
51
    getattr(settings, "TRANSCRIPTION_TYPE", "WHISPER") if USE_TRANSCRIPTION else None
52
)
53
TRANSCRIPTION_NORMALIZE = getattr(settings, "TRANSCRIPTION_NORMALIZE", False)
1✔
54
CELERY_TO_ENCODE = getattr(settings, "CELERY_TO_ENCODE", False)
1✔
55

56
USE_REMOTE_ENCODING_TRANSCODING = getattr(
1✔
57
    settings, "USE_REMOTE_ENCODING_TRANSCODING", False
58
)
59
if USE_REMOTE_ENCODING_TRANSCODING:
1✔
60
    from .transcripting_tasks import start_transcripting_task
1✔
61

62
CAPTIONS_STRICT_ACCESSIBILITY = getattr(
1✔
63
    settings,
64
    "CAPTIONS_STRICT_ACCESSIBILITY",
65
    False,
66
)
67
USE_RUNNER_MANAGER = getattr(settings, "USE_RUNNER_MANAGER", False)
1✔
68

69
log = logging.getLogger(__name__)
1✔
70

71

72
def resolve_transcription_language(video: Video, lang_code: str | None = None) -> str:
1✔
73
    """Resolve transcription language with fallbacks.
74

75
    Priority:
76
    1. Explicit lang_code argument.
77
    2. Video.transcript field.
78
    3. Last existing subtitle track language for this video.
79
    4. Video.main_lang.
80
    5. DEFAULT_LANG_TRACK setting (fallback "fr").
81
    """
82
    if lang_code:
1✔
83
        return lang_code
×
84

85
    if getattr(video, "transcript", None):
1✔
86
        return str(video.transcript)
1✔
87

88
    track_lang = (
1✔
89
        Track.objects.filter(video=video, kind="subtitles")
90
        .exclude(lang__isnull=True)
91
        .exclude(lang="")
92
        .order_by("-id")
93
        .values_list("lang", flat=True)
94
        .first()
95
    )
96
    if track_lang:
1✔
97
        return str(track_lang)
1✔
98

99
    if getattr(video, "main_lang", None):
×
100
        return str(video.main_lang)
×
101

102
    return str(getattr(settings, "DEFAULT_LANG_TRACK", "fr"))
×
103

104

105
def start_transcript(video_id, threaded=True) -> None:
1✔
106
    """
107
    Call to start transcript main function.
108

109
    Will launch transcript mode depending on configuration.
110
    """
111
    if USE_RUNNER_MANAGER:
1✔
112
        log.info("Start transcription, with runner manager, for id: %s" % video_id)
×
113
        # Load module here to prevent circular import
114
        from .runner_manager import transcript_video
×
115

116
        transcript_video(video_id)
×
117
    else:
118
        log.info("Start transcription, without runner manager, for id: %s" % video_id)
1✔
119
        if threaded:
1✔
120
            if CELERY_TO_ENCODE:
1✔
121
                task_start_transcript.delay(video_id)
×
122
            else:
123
                log.info("START TRANSCRIPT VIDEO %s" % video_id)
1✔
124
                t = threading.Thread(target=main_threaded_transcript, args=[video_id])
1✔
125
                t.daemon = True
1✔
126
                t.start()
1✔
127
        else:
128
            main_threaded_transcript(video_id)
1✔
129

130

131
def main_threaded_transcript(video_to_encode_id) -> None:
1✔
132
    """
133
    Transcript main function.
134

135
    Will check all configuration and file and launch transcript.
136
    """
137
    change_encoding_step(video_to_encode_id, 5, "transcripting audio")
1✔
138

139
    video_to_encode = Video.objects.get(id=video_to_encode_id)
1✔
140
    Video.objects.filter(id=video_to_encode_id).update(encoding_in_progress=True)
1✔
141
    video_to_encode.encoding_in_progress = True
1✔
142
    msg = ""
1✔
143
    lang = video_to_encode.transcript
1✔
144
    # check if TRANSCRIPTION_MODEL_PARAM [lang] exist
145
    if not TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE].get(lang):
1✔
146
        msg += "\n no transcript model found for lang: %s." % lang
×
147
        msg += "Please add it in TRANSCRIPTION_MODEL_PARAM."
×
148
        change_encoding_step(video_to_encode.id, -1, msg)
×
149
        send_email(msg, video_to_encode.id)
×
150
    else:
151
        mp3file = (
1✔
152
            video_to_encode.get_video_mp3().source_file
153
            if video_to_encode.get_video_mp3()
154
            else None
155
        )
156
        if mp3file is None:
1✔
157
            msg += "\n no mp3 file found for video: %s." % video_to_encode.id
×
158
            change_encoding_step(video_to_encode.id, -1, msg)
×
159
            send_email(msg, video_to_encode.id)
×
160
        else:
161
            mp3filepath = mp3file.path
1✔
162
            if USE_REMOTE_ENCODING_TRANSCODING:
1✔
163
                start_transcripting_task.delay(
1✔
164
                    video_to_encode.id, mp3filepath, video_to_encode.duration, lang
165
                )
166
            else:
167
                msg, webvtt = start_transcripting(
×
168
                    mp3filepath, video_to_encode.duration, lang
169
                )
170
                save_vtt_and_notify(video_to_encode, msg, webvtt)
×
171
    add_encoding_log(video_to_encode.id, msg)
1✔
172

173

174
def save_vtt_and_notify(video_to_encode, msg, webvtt) -> None:
1✔
175
    """Call save vtt file function and notify by mail at the end."""
176
    msg += save_vtt(video_to_encode, webvtt)
×
177
    change_encoding_step(video_to_encode.id, 0, "done")
×
NEW
178
    Video.objects.filter(id=video_to_encode.id).update(encoding_in_progress=False)
×
179
    video_to_encode.encoding_in_progress = False
×
180
    # envois mail fin transcription
UNCOV
181
    if EMAIL_ON_TRANSCRIPTING_COMPLETION:
×
182
        send_email_transcript(video_to_encode)
×
183
    add_encoding_log(video_to_encode.id, msg)
×
184

185

186
def save_vtt_and_notify_with_lang(
1✔
187
    video_to_encode, msg, webvtt, lang_code: str = None
188
) -> None:
189
    """Call save vtt file function and notify by mail at the end."""
190
    msg += save_vtt(video_to_encode, webvtt, lang_code)
×
191
    change_encoding_step(video_to_encode.id, 0, "done")
×
NEW
192
    Video.objects.filter(id=video_to_encode.id).update(encoding_in_progress=False)
×
193
    video_to_encode.encoding_in_progress = False
×
194
    # envois mail fin transcription
UNCOV
195
    if EMAIL_ON_TRANSCRIPTING_COMPLETION:
×
196
        send_email_transcript(video_to_encode)
×
197
    add_encoding_log(video_to_encode.id, msg)
×
198

199

200
def save_vtt(video: Video, webvtt: WebVTT, lang_code: str = None) -> str:
1✔
201
    """Save webvtt file with the video."""
202
    msg = "\nSAVE TRANSCRIPT WEBVTT : %s" % time.ctime()
×
203
    lang = resolve_transcription_language(video, lang_code)
×
204
    temp_vtt_file = NamedTemporaryFile(suffix=".vtt")
×
205
    webvtt.save(temp_vtt_file.name)
×
206
    if webvtt.captions:
×
207
        if TRANSCRIPTION_TYPE != "WHISPER":
×
208
            improve_captions_accessibility(webvtt)
×
209
        msg += "\nstore vtt file in bdd with CustomFileModel model file field"
×
210
        if __FILEPICKER__:
×
211
            video_dir = video.get_or_create_video_folder()
×
212
            """
×
213
            previousSubtitleFile = CustomFileModel.objects.filter(
214
                name__startswith="subtitle_%s" % lang,
215
                folder=video_dir,
216
                created_by=video.owner
217
            )
218
            """
219
            # for subt in previousSubtitleFile:
220
            #     subt.delete()
221
            subtitle_file, created = CustomFileModel.objects.get_or_create(
×
222
                name="subtitle_%s_%s" % (lang, time.strftime("%Y%m%d-%H%M%S")),
223
                folder=video_dir,
224
                created_by=video.owner,
225
            )
226
            if subtitle_file.file and os.path.isfile(subtitle_file.file.path):
×
227
                os.remove(subtitle_file.file.path)
×
228
        else:
229
            subtitle_file, created = CustomFileModel.objects.get_or_create()
×
230

231
        subtitle_file.file.save(
×
232
            "subtitle_%s_%s.vtt" % (lang, time.strftime("%Y%m%d-%H%M%S")),
233
            File(temp_vtt_file),
234
        )
235
        msg += "\nstore vtt file in bdd with Track model src field"
×
236

237
        subtitle_btt, created = Track.objects.get_or_create(video=video, lang=lang)
×
238
        subtitle_btt.src = subtitle_file
×
239
        subtitle_btt.lang = lang
×
240
        subtitle_btt.save()
×
241
    else:
242
        msg += "\nERROR SUBTITLES Output size is 0"
×
243
    return msg
×
244

245

246
def remove_unnecessary_spaces(text: str) -> str:
1✔
247
    """
248
    Remove unnecessary spaces from a string.
249

250
    Args:
251
        text (str): The string.
252

253
    Returns:
254
        str: The new string.
255
    """
256
    return " ".join(text.split())
×
257

258

259
def improve_captions_accessibility(
1✔
260
    webvtt, strict_accessibility=CAPTIONS_STRICT_ACCESSIBILITY
261
) -> None:
262
    """
263
    Parse the vtt file in argument to render the caption conform to accessibility.
264

265
    - see `https://github.com/knarf18/Bonnes-pratiques-du-sous-titrage/blob/master/Liste%20de%20bonnes%20pratiques.md`
266
    - 40 car maximum per line (CPL)
267
    - 2 lines max by caption
268

269
    Args:
270
        webvtt (:class:`webvtt.WebVTT`): The webvtt file content
271
        strict_accessibility (bool): If True, the caption will be more accessible
272

273
    """
274
    new_captions = []
×
275
    for caption in webvtt.captions:
×
276
        sent = split_string(caption.text, 40 if strict_accessibility else 55, sep=" ")
×
277
        # nb mots total
278
        nbTotWords = len(caption.text.split())
×
279
        if len(sent) > 2:
×
280
            num_captions = int(len(sent) / 2)
×
281
            if len(sent) % 2:
×
282
                num_captions += 1
×
283
            dur = caption.end_in_seconds - caption.start_in_seconds
×
284
            # On se positionne sur le point de départ en sec
285
            startTime = caption.start_in_seconds
×
286
            for x in range(num_captions):
×
287
                new_cap = Caption()
×
288
                new_cap.text = remove_unnecessary_spaces(get_cap_text(sent, x))
×
289
                # Durée d'affichage au prorata du nombre de mots
290
                timeCalc = dur * (len(new_cap.text.split()) / nbTotWords)
×
291
                new_cap.start = sec_to_timestamp(startTime)
×
292
                new_cap.end = sec_to_timestamp(startTime + timeCalc)
×
293
                startTime = startTime + timeCalc
×
294
                new_captions.append(new_cap)
×
295
        else:
296
            new_cap = Caption()
×
297
            new_cap.start = caption.start
×
298
            new_cap.end = caption.end
×
299
            new_cap.text = "\n".join(sent)
×
300
            new_captions.append(new_cap)
×
301
    # remove all old captions
302
    while len(webvtt.captions) > 0:
×
303
        del webvtt.captions[0]
×
304
    # add the new one
305
    for cap in new_captions:
×
306
        webvtt.captions.append(cap)
×
307
    webvtt.save()
×
308

309

310
def get_cap_text(sent, x):
1✔
311
    """
312
    Get the text in the sent array at the position gived in arg.
313

314
    Args:
315
        sent (list): The list of text
316
        x (int): The position to extract
317

318
    Returns:
319
        str: The extracted text
320
    """
321
    new_cap_text = sent[x * 2]
×
322
    try:
×
323
        new_cap_text += "\n" + sent[x * 2 + 1]
×
324
    except IndexError:
×
325
        pass
×
326
    return new_cap_text
×
327

328

329
def pad(line, limit):
1✔
330
    """
331
    Add some space at the end of line to specified limit.
332

333
    Args:
334
        line (str): A line of text
335
        limit (int): The size of line
336

337
    Returns:
338
        str: the line with space at the end
339
    """
340
    return line + " " * (limit - len(line))
×
341

342

343
def split_string(text, limit, sep=" "):
1✔
344
    """
345
    Split text by word for specified limit.
346

347
    Args:
348
        text (str): the text of the caption
349
        limit (int): size of line
350
        sep (str): default " "
351

352
    Returns:
353
        array: list of words in the text
354
    """
355
    words = text.split()
×
356
    if max(map(len, words)) > limit:
×
357
        raise ValueError("limit is too small")
×
358
    res = []
×
359
    part = words[0]
×
360
    others = words[1:]
×
361
    for word in others:
×
362
        if len(sep) + len(word) > limit - len(part):
×
363
            res.append(part)
×
364
            part = word
×
365
        else:
366
            part += sep + word
×
367
    if part:
×
368
        res.append(part)
×
369
    # add space to the end of line
370
    result = [pad(line, limit) for line in res]
×
371
    return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc