• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 21912953474

11 Feb 2026 04:12PM UTC coverage: 70.24% (-0.04%) from 70.284%
21912953474

Pull #1402

github

Badatos
Remove Python 3.9 from supported versions.
Pull Request #1402: Bump pillow from 10.3.0 to 12.1.1

12363 of 17601 relevant lines covered (70.24%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import os
×
7
from timeit import default_timer as timer
×
8
import datetime as dt
×
9
from datetime import timedelta
×
10
import webvtt
×
11
from webvtt import WebVTT, Caption
×
12
from shlex import quote
×
13

14
import logging
×
15

16
try:
×
17
    from ..custom import settings_local
×
18
except ImportError:
×
19
    from .. import settings as settings_local
×
20

21
from .encoding_utils import sec_to_timestamp
×
22

23
DEBUG = getattr(settings_local, "DEBUG", False)
×
24

25
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
26
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
27
if USE_TRANSCRIPTION:
×
28
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "WHISPER")
×
29
    if TRANSCRIPTION_TYPE == "VOSK":
×
30
        from vosk import Model, KaldiRecognizer
×
31
    elif TRANSCRIPTION_TYPE == "WHISPER":
×
32
        import whisper
×
33
        from whisper.utils import get_writer
×
34

35
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
36
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
37
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
38
)
39

40
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
41
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
42
)  # 10min
43
# time in sec for phrase length
44
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
45
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 2
46
)
47
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
48
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
49
)
50
log = logging.getLogger(__name__)
×
51

52

53
def get_model(lang):
×
54
    """Get model for Whisper or Vosk software to transcript audio."""
55
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
56
    return transript_model
×
57

58

59
def start_transcripting(mp3filepath, duration, lang):
×
60
    """
61
    Start direct transcription.
62

63
    Normalize the audio if set, get the model according to the lang and start transcript.
64
    """
65
    if TRANSCRIPTION_NORMALIZE:
×
66
        mp3filepath = normalize_mp3(mp3filepath)
×
67
    if TRANSCRIPTION_TYPE == "WHISPER":
×
68
        msg, webvtt, all_text = main_whisper_transcript(mp3filepath, duration, lang)
×
69
    else:
70
        transript_model = get_model(lang)
×
71
        msg, webvtt, all_text = start_main_transcript(
×
72
            mp3filepath, duration, transript_model
73
        )
74
    if DEBUG:
×
75
        print(msg)
×
76
        print(webvtt)
×
77
        print("\n%s\n" % all_text)
×
78

79
    return msg, webvtt
×
80

81

82
def start_main_transcript(mp3filepath, duration, transript_model):
×
83
    """Call transcription depending software type."""
84
    if TRANSCRIPTION_TYPE == "VOSK":
×
85
        msg, webvtt, all_text = main_vosk_transcript(
×
86
            mp3filepath, duration, transript_model
87
        )
88
    return msg, webvtt, all_text
×
89

90

91
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
92
    """Convert audio to subaudio and add good sample rate."""
93
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
94
        quote(audio_path), desired_sample_rate
95
    )
96
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
97
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
98

99
    try:
×
100
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
101

102
    except subprocess.CalledProcessError as e:
×
103
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
104
    except OSError as e:
×
105
        raise OSError(
×
106
            e.errno,
107
            "SoX not found, use {}hz files or install it: {}".format(
108
                desired_sample_rate, e.strerror
109
            ),
110
        )
111

112
    if TRANSCRIPTION_TYPE == "WHISPER":
×
113
        return np.frombuffer(output, np.int16).flatten().astype(np.float32) / 32768.0
×
114
    else:
115
        return np.frombuffer(output, np.int16)
×
116

117

118
def normalize_mp3(mp3filepath):
×
119
    """Normalize the audio to good format and sound level."""
120
    filename, file_extension = os.path.splitext(mp3filepath)
×
121
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
122
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
123
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
124
    # normalize_cmd += \
125
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
126
    normalize_cmd += "--target-level {} -f -o {}".format(
×
127
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
128
    )
129
    if DEBUG:
×
130
        print(normalize_cmd)
×
131
    try:
×
132
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
133
        return mp3normfile
×
134
    except subprocess.CalledProcessError as e:
×
135
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
136
        return mp3filepath
×
137
    except OSError as e:
×
138
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
139
        return mp3filepath
×
140

141

142
# #################################
143
# TRANSCRIPT VIDEO: MAIN FUNCTION
144
# #################################
145

146

147
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
148
    """Convert audio to the good sample rate."""
149
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
150
        quote(audio_path), desired_sample_rate
151
    )
152
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
153
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
154

155
    try:
×
156
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
157

158
    except subprocess.CalledProcessError as e:
×
159
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
160
    except OSError as e:
×
161
        raise OSError(
×
162
            e.errno,
163
            "SoX not found, use {}hz files or install it: {}".format(
164
                desired_sample_rate, e.strerror
165
            ),
166
        )
167
    return output
×
168

169

170
def get_word_result_from_data(results, audio, rec):
×
171
    """Get subsound from audio and add transcription to result parameter."""
172
    while True:
×
173
        data = audio.stdout.read(4000)
×
174
        if len(data) == 0:
×
175
            break
×
176
        if rec.AcceptWaveform(data):
×
177
            results.append(rec.Result())
×
178
    results.append(rec.Result())
×
179

180

181
def words_to_vtt(
×
182
    words,
183
    start_trim,
184
    duration,
185
    is_first_caption,
186
    text_caption,
187
    start_caption,
188
    last_word_added,
189
    all_text,
190
    webvtt,
191
):
192
    """Convert word and time to webvtt captions."""
193
    # Function retained because it could be used with the Vosk model
194
    # (initially used with the old STT model).
195
    for index, word in enumerate(words):
×
196
        start_key = "start_time"
×
197
        word_duration = word.get("duration", 0)
×
198
        last_word = words[-1]
×
199
        last_word_duration = last_word.get("duration", 0)
×
200
        if TRANSCRIPTION_TYPE == "VOSK":
×
201
            start_key = "start"
×
202
            word_duration = word["end"] - word["start"]
×
203
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
204
        next_word = None
×
205
        blank_duration = 0
×
206
        if word != words[-1] and (index + 1) < len(words):
×
207
            next_word = words[index + 1]
×
208
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
209
                ((word[start_key]) - start_caption) + word_duration
210
            )
211
        all_text += word["word"] + " "
×
212
        # word: <class 'dict'> {'word': 'bonjour', 'start ':
213
        # 0.58, 'duration': 7.34}
214
        text_caption.append(word["word"])
×
215
        if not (
×
216
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
217
            and (
218
                next_word is not None
219
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
220
            )
221
        ):
222
            # on créé le caption
223
            if is_first_caption:
×
224
                # A revoir, fusion de la nouvelle ligne avec
225
                # l'ancienne...
226
                is_first_caption = False
×
227
                text_caption = get_text_caption(text_caption, last_word_added)
×
228

229
            stop_caption = word[start_key] + word_duration
×
230

231
            # on evite le chevauchement
232
            change_previous_end_caption(webvtt, start_caption)
×
233

234
            caption = Caption(
×
235
                sec_to_timestamp(start_caption),
236
                sec_to_timestamp(stop_caption),
237
                " ".join(text_caption),
238
            )
239

240
            webvtt.captions.append(caption)
×
241
            # on remet tout à zero pour la prochaine phrase
242
            start_caption = word[start_key]
×
243
            text_caption = []
×
244
            last_word_added = word["word"]
×
245
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
246
        # on ajoute ici la dernière phrase de la vidéo
247
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
248
        caption = Caption(
×
249
            sec_to_timestamp(start_caption),
250
            sec_to_timestamp(stop_caption),
251
            " ".join(text_caption),
252
        )
253
        webvtt.captions.append(caption)
×
254
    return all_text, webvtt
×
255

256

257
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
258
    """Vosk transcription."""
259
    msg = ""
×
260
    inference_start = timer()
×
261
    msg += "\nInference start %0.3fs." % inference_start
×
262
    desired_sample_rate = 16000
×
263

264
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
265
    rec.SetWords(True)
×
266

267
    webvtt = WebVTT()
×
268
    all_text = ""
×
269
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
270
        audio = convert_vosk_samplerate(
×
271
            norm_mp3_file,
272
            desired_sample_rate,
273
            start_trim,
274
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
275
        )
276
        msg += "\nRunning inference."
×
277
        results = []
×
278
        get_word_result_from_data(results, audio, rec)
×
279
        for res in results:
×
280
            words = json.loads(res).get("result")
×
281
            text = json.loads(res).get("text")
×
282
            if not words:
×
283
                continue
×
284
            start_caption = words[0]["start"]
×
285
            stop_caption = words[-1]["end"]
×
286
            caption = Caption(
×
287
                sec_to_timestamp(start_caption),
288
                sec_to_timestamp(stop_caption),
289
                text,
290
            )
291
            webvtt.captions.append(caption)
×
292
            """
×
293
            text_caption = []
294
            is_first_caption = True
295
            all_text, webvtt = words_to_vtt(
296
                words,
297
                start_trim,
298
                duration,
299
                is_first_caption,
300
                text_caption,
301
                start_caption,
302
                last_word_added,
303
                all_text,
304
                webvtt,
305
            )
306
            """
307
    inference_end = timer() - inference_start
×
308

309
    msg += "\nInference took %0.3fs." % inference_end
×
310
    return msg, webvtt, all_text
×
311

312

313
def main_whisper_transcript(norm_mp3_file, duration, lang):
×
314
    """Whisper transcription."""
315
    msg = ""
×
316
    all_text = ""
×
317
    inference_start = timer()
×
318
    desired_sample_rate = 16000
×
319
    msg += "\nInference start %0.3fs." % inference_start
×
320

321
    model = whisper.load_model(
×
322
        TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"],
323
        download_root=TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang][
324
            "download_root"
325
        ],
326
    )
327
    audio = convert_samplerate(norm_mp3_file, desired_sample_rate, 0, duration)
×
328
    transcription = model.transcribe(
×
329
        audio, language=lang, initial_prompt="prompt", word_timestamps=True
330
    )
331
    dirname = os.path.dirname(norm_mp3_file)
×
332
    filename = os.path.basename(norm_mp3_file).replace(".mp3", ".vtt")
×
333
    vtt_writer = get_writer("vtt", dirname)
×
334
    word_options = {"highlight_words": False, "max_line_count": 2, "max_line_width": 40}
×
335
    vtt_writer(transcription, filename, word_options)
×
336
    wvtt = webvtt.read(os.path.join(dirname, filename))
×
337
    inference_end = timer() - inference_start
×
338
    msg += "\nInference took %0.3fs." % inference_end
×
339
    return msg, wvtt, all_text
×
340

341

342
def change_previous_end_caption(webvtt, start_caption):
×
343
    """Change the end time for caption."""
344
    if len(webvtt.captions) > 0:
×
345
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
346
        td_prev_end = timedelta(
×
347
            hours=prev_end.hour,
348
            minutes=prev_end.minute,
349
            seconds=prev_end.second,
350
            microseconds=prev_end.microsecond,
351
        ).total_seconds()
352
        if td_prev_end > start_caption:
×
353
            webvtt.captions[-1].end = sec_to_timestamp(start_caption)
×
354

355

356
def get_text_caption(text_caption, last_word_added):
×
357
    """Get the text for a caption."""
358
    try:
×
359
        first_index = text_caption.index(last_word_added)
×
360
        return text_caption[first_index + 1 :]
×
361
    except ValueError:
×
362
        return text_caption
×
363

364

365
def words_from_candidate_transcript(metadata):
×
366
    """Get words list from transcription."""
367
    word = ""
×
368
    word_list = []
×
369
    word_start_time = 0
×
370
    # Loop through each character
371
    for i, token in enumerate(metadata.tokens):
×
372
        # Append character to word if it's not a space
373
        if token.text != " ":
×
374
            if len(word) == 0:
×
375
                # Log the start time of the new word
376
                word_start_time = token.start_time
×
377

378
            word = word + token.text
×
379
        # Word boundary is either a space or the last character in the array
380
        if token.text == " " or i == len(metadata.tokens) - 1:
×
381
            word_duration = token.start_time - word_start_time
×
382

383
            if word_duration < 0:
×
384
                word_duration = 0
×
385

386
            each_word = dict()
×
387
            each_word["word"] = word
×
388
            each_word["start_time"] = round(word_start_time, 4)
×
389
            each_word["duration"] = round(word_duration, 4)
×
390

391
            word_list.append(each_word)
×
392
            # Reset
393
            word = ""
×
394
            word_start_time = 0
×
395

396
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc