• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 17798713985

17 Sep 2025 01:10PM UTC coverage: 70.327%. First build
17798713985

Pull #1342

github

web-flow
Merge 66fd448d6 into a22fd1a86
Pull Request #1342: Fixes the `sec_to_timestamp function to avoid generating invalid timestamps

1 of 9 new or added lines in 2 files covered. (11.11%)

12182 of 17322 relevant lines covered (70.33%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.56
/pod/video_encode_transcript/transcript.py
1
"""Esup-Pod transcript video functions."""
2

3
from django.conf import settings
1✔
4
from django.core.files import File
1✔
5
from pod.completion.models import Track
1✔
6
from pod.main.tasks import task_start_transcript
1✔
7
from webvtt import Caption, WebVTT
1✔
8

9
from .utils import (
1✔
10
    send_email,
11
    send_email_transcript,
12
    change_encoding_step,
13
    add_encoding_log,
14
)
15
from ..video.models import Video
1✔
16
import importlib.util
1✔
17

18
if (
1✔
19
    importlib.util.find_spec("vosk") is not None
20
    or importlib.util.find_spec("whisper") is not None
21
):
22
    from .transcript_model import start_transcripting
×
23
else:
24
    def start_transcripting(*args, **kwargs):
1✔
NEW
25
        raise NotImplementedError("No transcription engine available.")
×
26

27

28
from .encoding_utils import sec_to_timestamp
1✔
29

30
import os
1✔
31
import time
1✔
32

33
from tempfile import NamedTemporaryFile
1✔
34

35
import threading
1✔
36
import logging
1✔
37

38
if getattr(settings, "USE_PODFILE", False):
1✔
39
    __FILEPICKER__ = True
1✔
40
    from pod.podfile.models import CustomFileModel
1✔
41
else:
42
    __FILEPICKER__ = False
×
43
    from pod.main.models import CustomFileModel
×
44

45
EMAIL_ON_TRANSCRIPTING_COMPLETION = getattr(
1✔
46
    settings, "EMAIL_ON_TRANSCRIPTING_COMPLETION", True
47
)
48
TRANSCRIPTION_MODEL_PARAM = getattr(settings, "TRANSCRIPTION_MODEL_PARAM", False)
1✔
49
USE_TRANSCRIPTION = getattr(settings, "USE_TRANSCRIPTION", False)
1✔
50
TRANSCRIPTION_TYPE = (
1✔
51
    getattr(settings, "TRANSCRIPTION_TYPE", "WHISPER") if USE_TRANSCRIPTION else None
52
)
53
TRANSCRIPTION_NORMALIZE = getattr(settings, "TRANSCRIPTION_NORMALIZE", False)
1✔
54
CELERY_TO_ENCODE = getattr(settings, "CELERY_TO_ENCODE", False)
1✔
55

56
USE_REMOTE_ENCODING_TRANSCODING = getattr(
1✔
57
    settings, "USE_REMOTE_ENCODING_TRANSCODING", False
58
)
59
if USE_REMOTE_ENCODING_TRANSCODING:
1✔
60
    from .transcripting_tasks import start_transcripting_task
1✔
61

62
CAPTIONS_STRICT_ACCESSIBILITY = getattr(
1✔
63
    settings,
64
    "CAPTIONS_STRICT_ACCESSIBILITY",
65
    False,
66
)
67

68
log = logging.getLogger(__name__)
1✔
69

70

71
# ##########################################################################
72
# TRANSCRIPT VIDEO: THREAD TO LAUNCH TRANSCRIPT
73
# ##########################################################################
74
def start_transcript(video_id, threaded=True) -> None:
1✔
75
    """
76
    Call to start transcript main function.
77

78
    Will launch transcript mode depending on configuration.
79
    """
80
    if threaded:
1✔
81
        if CELERY_TO_ENCODE:
1✔
82
            task_start_transcript.delay(video_id)
×
83
        else:
84
            log.info("START TRANSCRIPT VIDEO %s" % video_id)
1✔
85
            t = threading.Thread(target=main_threaded_transcript, args=[video_id])
1✔
86
            t.daemon = True
1✔
87
            t.start()
1✔
88
    else:
89
        main_threaded_transcript(video_id)
1✔
90

91

92
def main_threaded_transcript(video_to_encode_id) -> None:
1✔
93
    """
94
    Transcript main function.
95

96
    Will check all configuration and file and launch transcript.
97
    """
98
    change_encoding_step(video_to_encode_id, 5, "transcripting audio")
1✔
99

100
    video_to_encode = Video.objects.get(id=video_to_encode_id)
1✔
101
    video_to_encode.encoding_in_progress = True
1✔
102
    video_to_encode.save()
1✔
103
    msg = ""
1✔
104
    lang = video_to_encode.transcript
1✔
105
    # check if TRANSCRIPTION_MODEL_PARAM [lang] exist
106
    if not TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE].get(lang):
1✔
107
        msg += "\n no transcript model found for lang: %s." % lang
×
108
        msg += "Please add it in TRANSCRIPTION_MODEL_PARAM."
×
109
        change_encoding_step(video_to_encode.id, -1, msg)
×
110
        send_email(msg, video_to_encode.id)
×
111
    else:
112
        mp3file = (
1✔
113
            video_to_encode.get_video_mp3().source_file
114
            if video_to_encode.get_video_mp3()
115
            else None
116
        )
117
        if mp3file is None:
1✔
118
            msg += "\n no mp3 file found for video: %s." % video_to_encode.id
×
119
            change_encoding_step(video_to_encode.id, -1, msg)
×
120
            send_email(msg, video_to_encode.id)
×
121
        else:
122
            mp3filepath = mp3file.path
1✔
123
            if USE_REMOTE_ENCODING_TRANSCODING:
1✔
124
                start_transcripting_task.delay(
1✔
125
                    video_to_encode.id, mp3filepath, video_to_encode.duration, lang
126
                )
127
            else:
128
                msg, webvtt = start_transcripting(
×
129
                    mp3filepath, video_to_encode.duration, lang
130
                )
131
                save_vtt_and_notify(video_to_encode, msg, webvtt)
×
132
    add_encoding_log(video_to_encode.id, msg)
1✔
133

134

135
def save_vtt_and_notify(video_to_encode, msg, webvtt) -> None:
1✔
136
    """Call save vtt file function and notify by mail at the end."""
137
    msg += save_vtt(video_to_encode, webvtt)
×
138
    change_encoding_step(video_to_encode.id, 0, "done")
×
139
    video_to_encode.encoding_in_progress = False
×
140
    video_to_encode.save()
×
141
    # envois mail fin transcription
142
    if EMAIL_ON_TRANSCRIPTING_COMPLETION:
×
143
        send_email_transcript(video_to_encode)
×
144
    add_encoding_log(video_to_encode.id, msg)
×
145

146

147
def save_vtt(video: Video, webvtt: WebVTT, lang_code: str = None) -> str:
1✔
148
    """Save webvtt file with the video."""
149
    msg = "\nSAVE TRANSCRIPT WEBVTT : %s" % time.ctime()
×
150
    lang = lang_code if lang_code else video.transcript
×
151
    temp_vtt_file = NamedTemporaryFile(suffix=".vtt")
×
152
    webvtt.save(temp_vtt_file.name)
×
153
    if webvtt.captions:
×
154
        if TRANSCRIPTION_TYPE != "WHISPER":
×
155
            improve_captions_accessibility(webvtt)
×
156
        msg += "\nstore vtt file in bdd with CustomFileModel model file field"
×
157
        if __FILEPICKER__:
×
158
            video_dir = video.get_or_create_video_folder()
×
159
            """
160
            previousSubtitleFile = CustomFileModel.objects.filter(
161
                name__startswith="subtitle_%s" % lang,
162
                folder=video_dir,
163
                created_by=video.owner
164
            )
165
            """
166
            # for subt in previousSubtitleFile:
167
            #     subt.delete()
168
            subtitle_file, created = CustomFileModel.objects.get_or_create(
×
169
                name="subtitle_%s_%s" % (lang, time.strftime("%Y%m%d-%H%M%S")),
170
                folder=video_dir,
171
                created_by=video.owner,
172
            )
173
            if subtitle_file.file and os.path.isfile(subtitle_file.file.path):
×
174
                os.remove(subtitle_file.file.path)
×
175
        else:
176
            subtitle_file, created = CustomFileModel.objects.get_or_create()
×
177

178
        subtitle_file.file.save(
×
179
            "subtitle_%s_%s.vtt" % (lang, time.strftime("%Y%m%d-%H%M%S")),
180
            File(temp_vtt_file),
181
        )
182
        msg += "\nstore vtt file in bdd with Track model src field"
×
183

184
        subtitle_btt, created = Track.objects.get_or_create(video=video, lang=lang)
×
185
        subtitle_btt.src = subtitle_file
×
186
        subtitle_btt.lang = lang
×
187
        subtitle_btt.save()
×
188
    else:
189
        msg += "\nERROR SUBTITLES Output size is 0"
×
190
    return msg
×
191

192

193
def remove_unnecessary_spaces(text: str) -> str:
1✔
194
    """
195
    Remove unnecessary spaces from a string.
196

197
    Args:
198
        text (str): The string.
199

200
    Returns:
201
        str: The new string.
202
    """
203
    return " ".join(text.split())
×
204

205

206
def improve_captions_accessibility(
1✔
207
    webvtt, strict_accessibility=CAPTIONS_STRICT_ACCESSIBILITY
208
) -> None:
209
    """
210
    Parse the vtt file in argument to render the caption conform to accessibility.
211

212
    - see `https://github.com/knarf18/Bonnes-pratiques-du-sous-titrage/blob/master/Liste%20de%20bonnes%20pratiques.md`
213
    - 40 car maximum per line (CPL)
214
    - 2 lines max by caption
215

216
    Args:
217
        webvtt (:class:`webvtt.WebVTT`): The webvtt file content
218
        strict_accessibility (bool): If True, the caption will be more accessible
219

220
    """
221
    new_captions = []
×
222
    for caption in webvtt.captions:
×
223
        sent = split_string(caption.text, 40 if strict_accessibility else 55, sep=" ")
×
224
        # nb mots total
225
        nbTotWords = len(caption.text.split())
×
226
        if len(sent) > 2:
×
227
            num_captions = int(len(sent) / 2)
×
228
            if len(sent) % 2:
×
229
                num_captions += 1
×
230
            dur = caption.end_in_seconds - caption.start_in_seconds
×
231
            # On se positionne sur le point de départ en sec
232
            startTime = caption.start_in_seconds
×
233
            for x in range(num_captions):
×
234
                new_cap = Caption()
×
235
                new_cap.text = remove_unnecessary_spaces(get_cap_text(sent, x))
×
236
                # Durée d'affichage au prorata du nombre de mots
237
                timeCalc = dur * (len(new_cap.text.split()) / nbTotWords)
×
238
                new_cap.start = sec_to_timestamp(startTime)
×
239
                new_cap.end = sec_to_timestamp(startTime + timeCalc)
×
240
                startTime = startTime + timeCalc
×
241
                new_captions.append(new_cap)
×
242
        else:
243
            new_cap = Caption()
×
244
            new_cap.start = caption.start
×
245
            new_cap.end = caption.end
×
246
            new_cap.text = "\n".join(sent)
×
247
            new_captions.append(new_cap)
×
248
    # remove all old captions
249
    while len(webvtt.captions) > 0:
×
250
        del webvtt.captions[0]
×
251
    # add the new one
252
    for cap in new_captions:
×
253
        webvtt.captions.append(cap)
×
254
    webvtt.save()
×
255

256

257
def get_cap_text(sent, x):
1✔
258
    """
259
    Get the text in the sent array at the position gived in arg.
260

261
    Args:
262
        sent (list): The list of text
263
        x (int): The position to extract
264

265
    Returns:
266
        str: The extracted text
267
    """
268
    new_cap_text = sent[x * 2]
×
269
    try:
×
270
        new_cap_text += "\n" + sent[x * 2 + 1]
×
271
    except IndexError:
×
272
        pass
×
273
    return new_cap_text
×
274

275

276
def pad(line, limit):
1✔
277
    """
278
    Add some space at the end of line to specified limit.
279

280
    Args:
281
        line (str): A line of text
282
        limit (int): The size of line
283

284
    Returns:
285
        str: the line with space at the end
286
    """
287
    return line + " " * (limit - len(line))
×
288

289

290
def split_string(text, limit, sep=" "):
1✔
291
    """
292
    Split text by word for specified limit.
293

294
    Args:
295
        text (str): the text of the caption
296
        limit (int): size of line
297
        sep (str): default " "
298

299
    Returns:
300
        array: list of words in the text
301
    """
302
    words = text.split()
×
303
    if max(map(len, words)) > limit:
×
304
        raise ValueError("limit is too small")
×
305
    res = []
×
306
    part = words[0]
×
307
    others = words[1:]
×
308
    for word in others:
×
309
        if len(sep) + len(word) > limit - len(part):
×
310
            res.append(part)
×
311
            part = word
×
312
        else:
313
            part += sep + word
×
314
    if part:
×
315
        res.append(part)
×
316
    # add space to the end of line
317
    result = [pad(line, limit) for line in res]
×
318
    return result
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc