• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 8156742544

05 Mar 2024 01:03PM UTC coverage: 69.889% (-0.3%) from 70.212%
8156742544

push

github

web-flow
Merge pull request #1054 from EsupPortail/develop

[DONE] Develop #3.5.1

281 of 580 new or added lines in 23 files covered. (48.45%)

19 existing lines in 11 files now uncovered.

9911 of 14181 relevant lines covered (69.89%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
11

12
from webvtt import WebVTT, Caption
×
13

14
try:
×
15
    from shhlex import quote
×
16
except ImportError:
×
17
    from pipes import quote
×
18

19
import logging
×
20

21
try:
×
22
    from ..custom import settings_local
×
23
except ImportError:
×
24
    from .. import settings as settings_local
×
25

NEW
26
from .encoding_utils import sec_to_timestamp
×
27

UNCOV
28
DEBUG = getattr(settings_local, "DEBUG", False)
×
29

30
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
31
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
32
if USE_TRANSCRIPTION:
×
33
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
34
    if TRANSCRIPTION_TYPE == "VOSK":
×
35
        from vosk import Model, KaldiRecognizer
×
36
    elif TRANSCRIPTION_TYPE == "STT":
×
37
        from stt import Model
×
38
    elif TRANSCRIPTION_TYPE == "WHISPER":
×
39
        import whisper
×
40

41
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
42
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
43
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
44
)
45

46
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
47
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
48
)  # 10min
49
# time in sec for phrase length
50
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
51
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 2
52
)
53
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
54
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
55
)
56
log = logging.getLogger(__name__)
×
57

58

59
def get_model(lang):
×
60
    """Get model for STT or Vosk software to transcript audio."""
61
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
62
    if TRANSCRIPTION_TYPE == "STT":
×
63
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
64
            transript_model.setBeamWidth(
×
65
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
66
            )
67
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
68
            print(
×
69
                "Loading scorer from files {}".format(
70
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
71
                ),
72
                file=sys.stderr,
73
            )
74
            scorer_load_start = timer()
×
75
            transript_model.enableExternalScorer(
×
76
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
77
            )
78
            scorer_load_end = timer() - scorer_load_start
×
79
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
80
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
81
                "lm_alpha"
82
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
83
                transript_model.setScorerAlphaBeta(
×
84
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
85
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
86
                )
87
    return transript_model
×
88

89

90
def start_transcripting(mp3filepath, duration, lang):
×
91
    """
92
    Start direct transcription.
93

94
    Normalize the audio if set, get the model according to the lang and start transcript.
95
    """
96
    if TRANSCRIPTION_NORMALIZE:
×
97
        mp3filepath = normalize_mp3(mp3filepath)
×
98
    if TRANSCRIPTION_TYPE == "WHISPER":
×
99
        msg, webvtt, all_text = main_whisper_transcript(mp3filepath, duration, lang)
×
100
    else:
101
        transript_model = get_model(lang)
×
102
        msg, webvtt, all_text = start_main_transcript(
×
103
            mp3filepath, duration, transript_model
104
        )
105
    if DEBUG:
×
106
        print(msg)
×
107
        print(webvtt)
×
108
        print("\n%s\n" % all_text)
×
109

110
    return msg, webvtt
×
111

112

113
def start_main_transcript(mp3filepath, duration, transript_model):
×
114
    """Call transcription depending software type."""
115
    if TRANSCRIPTION_TYPE == "STT":
×
116
        msg, webvtt, all_text = main_stt_transcript(
×
117
            mp3filepath, duration, transript_model
118
        )
119
    elif TRANSCRIPTION_TYPE == "VOSK":
×
120
        msg, webvtt, all_text = main_vosk_transcript(
×
121
            mp3filepath, duration, transript_model
122
        )
123
    return msg, webvtt, all_text
×
124

125

126
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
127
    """Convert audio to subaudio and add good sample rate."""
128
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
129
        quote(audio_path), desired_sample_rate
130
    )
131
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
132
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
133

134
    try:
×
135
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
136

137
    except subprocess.CalledProcessError as e:
×
138
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
139
    except OSError as e:
×
140
        raise OSError(
×
141
            e.errno,
142
            "SoX not found, use {}hz files or install it: {}".format(
143
                desired_sample_rate, e.strerror
144
            ),
145
        )
146

147
    if TRANSCRIPTION_TYPE == "WHISPER":
×
148
        return np.frombuffer(output, np.int16).flatten().astype(np.float32) / 32768.0
×
149
    else:
150
        return np.frombuffer(output, np.int16)
×
151

152

153
def normalize_mp3(mp3filepath):
×
154
    """Normalize the audio to good format and sound level."""
155
    filename, file_extension = os.path.splitext(mp3filepath)
×
156
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
157
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
158
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
159
    # normalize_cmd += \
160
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
161
    normalize_cmd += "--target-level {} -f -o {}".format(
×
162
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
163
    )
164
    if DEBUG:
×
165
        print(normalize_cmd)
×
166
    try:
×
167
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
168
        return mp3normfile
×
169
    except subprocess.CalledProcessError as e:
×
170
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
171
        return mp3filepath
×
172
    except OSError as e:
×
173
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
174
        return mp3filepath
×
175

176

177
# #################################
178
# TRANSCRIPT VIDEO: MAIN FUNCTION
179
# #################################
180

181

182
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
183
    """Convert audio to the good sample rate."""
184
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
185
        quote(audio_path), desired_sample_rate
186
    )
187
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
188
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
189

190
    try:
×
191
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
192

193
    except subprocess.CalledProcessError as e:
×
194
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
195
    except OSError as e:
×
196
        raise OSError(
×
197
            e.errno,
198
            "SoX not found, use {}hz files or install it: {}".format(
199
                desired_sample_rate, e.strerror
200
            ),
201
        )
202
    return output
×
203

204

205
def get_word_result_from_data(results, audio, rec):
×
206
    """Get subsound from audio and add transcription to result parameter."""
207
    while True:
208
        data = audio.stdout.read(4000)
×
209
        if len(data) == 0:
×
210
            break
×
211
        if rec.AcceptWaveform(data):
×
212
            results.append(rec.Result())
×
213
    results.append(rec.Result())
×
214

215

216
def words_to_vtt(
×
217
    words,
218
    start_trim,
219
    duration,
220
    is_first_caption,
221
    text_caption,
222
    start_caption,
223
    last_word_added,
224
    all_text,
225
    webvtt,
226
):
227
    """Convert word and time to webvtt captions."""
228
    for index, word in enumerate(words):
×
229
        start_key = "start_time"
×
230
        word_duration = word.get("duration", 0)
×
231
        last_word = words[-1]
×
232
        last_word_duration = last_word.get("duration", 0)
×
233
        if TRANSCRIPTION_TYPE == "VOSK":
×
234
            start_key = "start"
×
235
            word_duration = word["end"] - word["start"]
×
236
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
237
        next_word = None
×
238
        blank_duration = 0
×
239
        if word != words[-1] and (index + 1) < len(words):
×
240
            next_word = words[index + 1]
×
241
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
242
                ((word[start_key]) - start_caption) + word_duration
243
            )
244
        all_text += word["word"] + " "
×
245
        # word: <class 'dict'> {'word': 'bonjour', 'start ':
246
        # 0.58, 'duration': 7.34}
247
        text_caption.append(word["word"])
×
248
        if not (
×
249
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
250
            and (
251
                next_word is not None
252
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
253
            )
254
        ):
255
            # on créé le caption
256
            if is_first_caption:
×
257
                # A revoir, fusion de la nouvelle ligne avec
258
                # l'ancienne...
259
                is_first_caption = False
×
260
                text_caption = get_text_caption(text_caption, last_word_added)
×
261

262
            stop_caption = word[start_key] + word_duration
×
263

264
            # on evite le chevauchement
265
            change_previous_end_caption(webvtt, start_caption)
×
266

267
            caption = Caption(
×
268
                sec_to_timestamp(start_caption),
269
                sec_to_timestamp(stop_caption),
270
                " ".join(text_caption),
271
            )
272

273
            webvtt.captions.append(caption)
×
274
            # on remet tout à zero pour la prochaine phrase
275
            start_caption = word[start_key]
×
276
            text_caption = []
×
277
            last_word_added = word["word"]
×
278
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
279
        # on ajoute ici la dernière phrase de la vidéo
280
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
281
        caption = Caption(
×
282
            sec_to_timestamp(start_caption),
283
            sec_to_timestamp(stop_caption),
284
            " ".join(text_caption),
285
        )
286
        webvtt.captions.append(caption)
×
287
    return all_text, webvtt
×
288

289

290
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
291
    """Vosk transcription."""
292
    msg = ""
×
293
    inference_start = timer()
×
294
    msg += "\nInference start %0.3fs." % inference_start
×
295
    desired_sample_rate = 16000
×
296

297
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
298
    rec.SetWords(True)
×
299

300
    webvtt = WebVTT()
×
301
    all_text = ""
×
302
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
303
        audio = convert_vosk_samplerate(
×
304
            norm_mp3_file,
305
            desired_sample_rate,
306
            start_trim,
307
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
308
        )
309
        msg += "\nRunning inference."
×
310
        results = []
×
311
        get_word_result_from_data(results, audio, rec)
×
312
        for res in results:
×
313
            words = json.loads(res).get("result")
×
314
            text = json.loads(res).get("text")
×
315
            if not words:
×
316
                continue
×
317
            start_caption = words[0]["start"]
×
318
            stop_caption = words[-1]["end"]
×
319
            caption = Caption(
×
320
                sec_to_timestamp(start_caption),
321
                sec_to_timestamp(stop_caption),
322
                text,
323
            )
324
            webvtt.captions.append(caption)
×
325
            """
×
326
            text_caption = []
327
            is_first_caption = True
328
            all_text, webvtt = words_to_vtt(
329
                words,
330
                start_trim,
331
                duration,
332
                is_first_caption,
333
                text_caption,
334
                start_caption,
335
                last_word_added,
336
                all_text,
337
                webvtt,
338
            )
339
            """
340
    inference_end = timer() - inference_start
×
341

342
    msg += "\nInference took %0.3fs." % inference_end
×
343
    return msg, webvtt, all_text
×
344

345

346
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
347
    """STT transcription."""
348
    msg = ""
×
349
    inference_start = timer()
×
350
    msg += "\nInference start %0.3fs." % inference_start
×
351
    desired_sample_rate = transript_model.sampleRate()
×
352
    webvtt = WebVTT()
×
353
    last_word_added = ""
×
354
    metadata = None
×
355
    all_text = ""
×
356
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
357
        end_trim = (
×
358
            duration
359
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
360
            else (
361
                start_trim
362
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
363
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
364
            )
365
        )
366

367
        dur = (
×
368
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
369
            if (
370
                (
371
                    start_trim
372
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
373
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
374
                )
375
                < duration
376
            )
377
            else (duration - start_trim)
378
        )
379

380
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
381

382
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
383
        msg += "\nRunning inference."
×
384

385
        metadata = transript_model.sttWithMetadata(audio)
×
386

387
        for transcript in metadata.transcripts:
×
388
            msg += "\nConfidence: %s" % transcript.confidence
×
389
            words = words_from_candidate_transcript(transcript)
×
390
            start_caption = start_trim + words[0]["start_time"]
×
391
            text_caption = []
×
392
            is_first_caption = True
×
393
            all_text, webvtt = words_to_vtt(
×
394
                words,
395
                start_trim,
396
                duration,
397
                is_first_caption,
398
                text_caption,
399
                start_caption,
400
                last_word_added,
401
                all_text,
402
                webvtt,
403
            )
404
    inference_end = timer() - inference_start
×
405

406
    msg += "\nInference took %0.3fs." % inference_end
×
407
    return msg, webvtt, all_text
×
408

409

410
def main_whisper_transcript(norm_mp3_file, duration, lang):
×
411
    """Whisper transcription."""
412
    msg = ""
×
413
    all_text = ""
×
414
    webvtt = WebVTT()
×
415
    inference_start = timer()
×
416
    desired_sample_rate = 16000
×
417
    msg += "\nInference start %0.3fs." % inference_start
×
418

419
    model = whisper.load_model(
×
420
        TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"],
421
        download_root=TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang][
422
            "download_root"
423
        ],
424
    )
425

426
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
427
        log.info("start_trim: " + str(start_trim))
×
428
        audio = convert_samplerate(
×
429
            norm_mp3_file,
430
            desired_sample_rate,
431
            start_trim,
432
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
433
        )
434
        transcription = model.transcribe(audio, language=lang)
×
435
        msg += "\nRunning inference."
×
436
        for segment in transcription["segments"]:
×
437
            caption = Caption(
×
438
                sec_to_timestamp(segment["start"] + start_trim),
439
                sec_to_timestamp(segment["end"] + start_trim),
440
                segment["text"],
441
            )
442
            webvtt.captions.append(caption)
×
443

444
    inference_end = timer() - inference_start
×
445
    msg += "\nInference took %0.3fs." % inference_end
×
446
    return msg, webvtt, all_text
×
447

448

449
def change_previous_end_caption(webvtt, start_caption):
×
450
    """Change the end time for caption."""
451
    if len(webvtt.captions) > 0:
×
452
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
453
        td_prev_end = timedelta(
×
454
            hours=prev_end.hour,
455
            minutes=prev_end.minute,
456
            seconds=prev_end.second,
457
            microseconds=prev_end.microsecond,
458
        ).total_seconds()
459
        if td_prev_end > start_caption:
×
460
            webvtt.captions[-1].end = sec_to_timestamp(start_caption)
×
461

462

463
def get_text_caption(text_caption, last_word_added):
×
464
    """Get the text for a caption."""
465
    try:
×
466
        first_index = text_caption.index(last_word_added)
×
467
        return text_caption[first_index + 1 :]
×
468
    except ValueError:
×
469
        return text_caption
×
470

471

472
def words_from_candidate_transcript(metadata):
×
473
    """Get words list from transcription."""
474
    word = ""
×
475
    word_list = []
×
476
    word_start_time = 0
×
477
    # Loop through each character
478
    for i, token in enumerate(metadata.tokens):
×
479
        # Append character to word if it's not a space
480
        if token.text != " ":
×
481
            if len(word) == 0:
×
482
                # Log the start time of the new word
483
                word_start_time = token.start_time
×
484

485
            word = word + token.text
×
486
        # Word boundary is either a space or the last character in the array
487
        if token.text == " " or i == len(metadata.tokens) - 1:
×
488
            word_duration = token.start_time - word_start_time
×
489

490
            if word_duration < 0:
×
491
                word_duration = 0
×
492

493
            each_word = dict()
×
494
            each_word["word"] = word
×
495
            each_word["start_time"] = round(word_start_time, 4)
×
496
            each_word["duration"] = round(word_duration, 4)
×
497

498
            word_list.append(each_word)
×
499
            # Reset
500
            word = ""
×
501
            word_start_time = 0
×
502

503
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc