• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 9664421460

25 Jun 2024 02:31PM UTC coverage: 70.938%. First build
9664421460

Pull #1139

github

web-flow
Merge 0834a594a into 46941e1c4
Pull Request #1139: [WIP] Ptitloup/remote encoding fix

3 of 16 new or added lines in 3 files covered. (18.75%)

12031 of 16960 relevant lines covered (70.94%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
11

12
from webvtt import WebVTT, Caption
×
13
from shlex import quote
×
14

15
import logging
×
16

17
try:
×
18
    from ..custom import settings_local
×
19
except ImportError:
×
20
    from .. import settings as settings_local
×
21

22
from .encoding_utils import sec_to_timestamp
×
23

24
DEBUG = getattr(settings_local, "DEBUG", False)
×
25

26
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
27
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
28
if USE_TRANSCRIPTION:
×
29
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
30
    if TRANSCRIPTION_TYPE == "VOSK":
×
31
        from vosk import Model, KaldiRecognizer
×
32
    elif TRANSCRIPTION_TYPE == "STT":
×
33
        from stt import Model
×
34
    elif TRANSCRIPTION_TYPE == "WHISPER":
×
35
        import whisper
×
36

37
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
38
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
39
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
40
)
41

42
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
43
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
44
)  # 10min
45
# time in sec for phrase length
46
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
47
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 2
48
)
49
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
50
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
51
)
52
log = logging.getLogger(__name__)
×
53

54

55
def get_model(lang):
×
56
    """Get model for STT or Vosk software to transcript audio."""
57
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
58
    if TRANSCRIPTION_TYPE == "STT":
×
59
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
60
            transript_model.setBeamWidth(
×
61
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
62
            )
63
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
64
            print(
×
65
                "Loading scorer from files {}".format(
66
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
67
                ),
68
                file=sys.stderr,
69
            )
70
            scorer_load_start = timer()
×
71
            transript_model.enableExternalScorer(
×
72
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
73
            )
74
            scorer_load_end = timer() - scorer_load_start
×
75
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
76
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
77
                "lm_alpha"
78
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
79
                transript_model.setScorerAlphaBeta(
×
80
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
81
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
82
                )
83
    return transript_model
×
84

85

86
def start_transcripting(mp3filepath, duration, lang):
×
87
    """
88
    Start direct transcription.
89

90
    Normalize the audio if set, get the model according to the lang and start transcript.
91
    """
92
    if TRANSCRIPTION_NORMALIZE:
×
93
        mp3filepath = normalize_mp3(mp3filepath)
×
94
    if TRANSCRIPTION_TYPE == "WHISPER":
×
95
        msg, webvtt, all_text = main_whisper_transcript(mp3filepath, duration, lang)
×
96
    else:
97
        transript_model = get_model(lang)
×
98
        msg, webvtt, all_text = start_main_transcript(
×
99
            mp3filepath, duration, transript_model
100
        )
101
    if DEBUG:
×
102
        print(msg)
×
103
        print(webvtt)
×
104
        print("\n%s\n" % all_text)
×
105

106
    return msg, webvtt
×
107

108

109
def start_main_transcript(mp3filepath, duration, transript_model):
×
110
    """Call transcription depending software type."""
111
    if TRANSCRIPTION_TYPE == "STT":
×
112
        msg, webvtt, all_text = main_stt_transcript(
×
113
            mp3filepath, duration, transript_model
114
        )
115
    elif TRANSCRIPTION_TYPE == "VOSK":
×
116
        msg, webvtt, all_text = main_vosk_transcript(
×
117
            mp3filepath, duration, transript_model
118
        )
119
    return msg, webvtt, all_text
×
120

121

122
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
123
    """Convert audio to subaudio and add good sample rate."""
124
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
125
        quote(audio_path), desired_sample_rate
126
    )
127
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
128
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
129

130
    try:
×
131
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
132

133
    except subprocess.CalledProcessError as e:
×
134
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
135
    except OSError as e:
×
136
        raise OSError(
×
137
            e.errno,
138
            "SoX not found, use {}hz files or install it: {}".format(
139
                desired_sample_rate, e.strerror
140
            ),
141
        )
142

143
    if TRANSCRIPTION_TYPE == "WHISPER":
×
144
        return np.frombuffer(output, np.int16).flatten().astype(np.float32) / 32768.0
×
145
    else:
146
        return np.frombuffer(output, np.int16)
×
147

148

149
def normalize_mp3(mp3filepath):
×
150
    """Normalize the audio to good format and sound level."""
151
    filename, file_extension = os.path.splitext(mp3filepath)
×
152
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
153
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
154
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
155
    # normalize_cmd += \
156
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
157
    normalize_cmd += "--target-level {} -f -o {}".format(
×
158
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
159
    )
160
    if DEBUG:
×
161
        print(normalize_cmd)
×
162
    try:
×
163
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
164
        return mp3normfile
×
165
    except subprocess.CalledProcessError as e:
×
166
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
167
        return mp3filepath
×
168
    except OSError as e:
×
169
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
170
        return mp3filepath
×
171

172

173
# #################################
174
# TRANSCRIPT VIDEO: MAIN FUNCTION
175
# #################################
176

177

178
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
179
    """Convert audio to the good sample rate."""
180
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
181
        quote(audio_path), desired_sample_rate
182
    )
183
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
184
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
185

186
    try:
×
187
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
188

189
    except subprocess.CalledProcessError as e:
×
190
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
191
    except OSError as e:
×
192
        raise OSError(
×
193
            e.errno,
194
            "SoX not found, use {}hz files or install it: {}".format(
195
                desired_sample_rate, e.strerror
196
            ),
197
        )
198
    return output
×
199

200

201
def get_word_result_from_data(results, audio, rec):
×
202
    """Get subsound from audio and add transcription to result parameter."""
203
    while True:
204
        data = audio.stdout.read(4000)
×
205
        if len(data) == 0:
×
206
            break
×
207
        if rec.AcceptWaveform(data):
×
208
            results.append(rec.Result())
×
209
    results.append(rec.Result())
×
210

211

212
def words_to_vtt(
×
213
    words,
214
    start_trim,
215
    duration,
216
    is_first_caption,
217
    text_caption,
218
    start_caption,
219
    last_word_added,
220
    all_text,
221
    webvtt,
222
):
223
    """Convert word and time to webvtt captions."""
224
    for index, word in enumerate(words):
×
225
        start_key = "start_time"
×
226
        word_duration = word.get("duration", 0)
×
227
        last_word = words[-1]
×
228
        last_word_duration = last_word.get("duration", 0)
×
229
        if TRANSCRIPTION_TYPE == "VOSK":
×
230
            start_key = "start"
×
231
            word_duration = word["end"] - word["start"]
×
232
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
233
        next_word = None
×
234
        blank_duration = 0
×
235
        if word != words[-1] and (index + 1) < len(words):
×
236
            next_word = words[index + 1]
×
237
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
238
                ((word[start_key]) - start_caption) + word_duration
239
            )
240
        all_text += word["word"] + " "
×
241
        # word: <class 'dict'> {'word': 'bonjour', 'start ':
242
        # 0.58, 'duration': 7.34}
243
        text_caption.append(word["word"])
×
244
        if not (
×
245
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
246
            and (
247
                next_word is not None
248
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
249
            )
250
        ):
251
            # on créé le caption
252
            if is_first_caption:
×
253
                # A revoir, fusion de la nouvelle ligne avec
254
                # l'ancienne...
255
                is_first_caption = False
×
256
                text_caption = get_text_caption(text_caption, last_word_added)
×
257

258
            stop_caption = word[start_key] + word_duration
×
259

260
            # on evite le chevauchement
261
            change_previous_end_caption(webvtt, start_caption)
×
262

263
            caption = Caption(
×
264
                sec_to_timestamp(start_caption),
265
                sec_to_timestamp(stop_caption),
266
                " ".join(text_caption),
267
            )
268

269
            webvtt.captions.append(caption)
×
270
            # on remet tout à zero pour la prochaine phrase
271
            start_caption = word[start_key]
×
272
            text_caption = []
×
273
            last_word_added = word["word"]
×
274
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
275
        # on ajoute ici la dernière phrase de la vidéo
276
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
277
        caption = Caption(
×
278
            sec_to_timestamp(start_caption),
279
            sec_to_timestamp(stop_caption),
280
            " ".join(text_caption),
281
        )
282
        webvtt.captions.append(caption)
×
283
    return all_text, webvtt
×
284

285

286
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
287
    """Vosk transcription."""
288
    msg = ""
×
289
    inference_start = timer()
×
290
    msg += "\nInference start %0.3fs." % inference_start
×
291
    desired_sample_rate = 16000
×
292

293
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
294
    rec.SetWords(True)
×
295

296
    webvtt = WebVTT()
×
297
    all_text = ""
×
298
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
299
        audio = convert_vosk_samplerate(
×
300
            norm_mp3_file,
301
            desired_sample_rate,
302
            start_trim,
303
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
304
        )
305
        msg += "\nRunning inference."
×
306
        results = []
×
307
        get_word_result_from_data(results, audio, rec)
×
308
        for res in results:
×
309
            words = json.loads(res).get("result")
×
310
            text = json.loads(res).get("text")
×
311
            if not words:
×
312
                continue
×
313
            start_caption = words[0]["start"]
×
314
            stop_caption = words[-1]["end"]
×
315
            caption = Caption(
×
316
                sec_to_timestamp(start_caption),
317
                sec_to_timestamp(stop_caption),
318
                text,
319
            )
320
            webvtt.captions.append(caption)
×
321
            """
×
322
            text_caption = []
323
            is_first_caption = True
324
            all_text, webvtt = words_to_vtt(
325
                words,
326
                start_trim,
327
                duration,
328
                is_first_caption,
329
                text_caption,
330
                start_caption,
331
                last_word_added,
332
                all_text,
333
                webvtt,
334
            )
335
            """
336
    inference_end = timer() - inference_start
×
337

338
    msg += "\nInference took %0.3fs." % inference_end
×
339
    return msg, webvtt, all_text
×
340

341

342
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
343
    """STT transcription."""
344
    msg = ""
×
345
    inference_start = timer()
×
346
    msg += "\nInference start %0.3fs." % inference_start
×
347
    desired_sample_rate = transript_model.sampleRate()
×
348
    webvtt = WebVTT()
×
349
    last_word_added = ""
×
350
    metadata = None
×
351
    all_text = ""
×
352
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
353
        end_trim = (
×
354
            duration
355
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
356
            else (
357
                start_trim
358
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
359
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
360
            )
361
        )
362

363
        dur = (
×
364
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
365
            if (
366
                (
367
                    start_trim
368
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
369
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
370
                )
371
                < duration
372
            )
373
            else (duration - start_trim)
374
        )
375

376
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
377

378
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
379
        msg += "\nRunning inference."
×
380

381
        metadata = transript_model.sttWithMetadata(audio)
×
382

383
        for transcript in metadata.transcripts:
×
384
            msg += "\nConfidence: %s" % transcript.confidence
×
385
            words = words_from_candidate_transcript(transcript)
×
386
            start_caption = start_trim + words[0]["start_time"]
×
387
            text_caption = []
×
388
            is_first_caption = True
×
389
            all_text, webvtt = words_to_vtt(
×
390
                words,
391
                start_trim,
392
                duration,
393
                is_first_caption,
394
                text_caption,
395
                start_caption,
396
                last_word_added,
397
                all_text,
398
                webvtt,
399
            )
400
    inference_end = timer() - inference_start
×
401

402
    msg += "\nInference took %0.3fs." % inference_end
×
403
    return msg, webvtt, all_text
×
404

405

406
def main_whisper_transcript(norm_mp3_file, duration, lang):
×
407
    """Whisper transcription."""
408
    msg = ""
×
409
    all_text = ""
×
410
    webvtt = WebVTT()
×
411
    inference_start = timer()
×
412
    desired_sample_rate = 16000
×
413
    msg += "\nInference start %0.3fs." % inference_start
×
414

415
    model = whisper.load_model(
×
416
        TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"],
417
        download_root=TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang][
418
            "download_root"
419
        ],
420
    )
421
    '''
422
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
423
        log.info("start_trim: " + str(start_trim))
424
        audio = convert_samplerate(
425
            norm_mp3_file,
426
            desired_sample_rate,
427
            start_trim,
428
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
429
        )
430
        transcription = model.transcribe(audio, language=lang)
431
        msg += "\nRunning inference."
432
        for segment in transcription["segments"]:
433
            caption = Caption(
434
                sec_to_timestamp(segment["start"] + start_trim),
435
                sec_to_timestamp(segment["end"] + start_trim),
436
                segment["text"],
437
            )
438
            webvtt.captions.append(caption)
439
    '''
NEW
440
    audio = convert_samplerate(
×
441
        norm_mp3_file,
442
        desired_sample_rate,
443
        0,
444
        duration
445
    )
NEW
446
    transcription = model.transcribe(audio, language=lang)
×
NEW
447
    for segment in transcription["segments"]:
×
NEW
448
        caption = Caption(
×
449
            sec_to_timestamp(segment["start"]),
450
            sec_to_timestamp(segment["end"]),
451
            segment["text"],
452
        )
NEW
453
        webvtt.captions.append(caption)
×
454
    inference_end = timer() - inference_start
×
455
    msg += "\nInference took %0.3fs." % inference_end
×
456
    return msg, webvtt, all_text
×
457

458

459
def change_previous_end_caption(webvtt, start_caption):
×
460
    """Change the end time for caption."""
461
    if len(webvtt.captions) > 0:
×
462
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
463
        td_prev_end = timedelta(
×
464
            hours=prev_end.hour,
465
            minutes=prev_end.minute,
466
            seconds=prev_end.second,
467
            microseconds=prev_end.microsecond,
468
        ).total_seconds()
469
        if td_prev_end > start_caption:
×
470
            webvtt.captions[-1].end = sec_to_timestamp(start_caption)
×
471

472

473
def get_text_caption(text_caption, last_word_added):
×
474
    """Get the text for a caption."""
475
    try:
×
476
        first_index = text_caption.index(last_word_added)
×
477
        return text_caption[first_index + 1 :]
×
478
    except ValueError:
×
479
        return text_caption
×
480

481

482
def words_from_candidate_transcript(metadata):
×
483
    """Get words list from transcription."""
484
    word = ""
×
485
    word_list = []
×
486
    word_start_time = 0
×
487
    # Loop through each character
488
    for i, token in enumerate(metadata.tokens):
×
489
        # Append character to word if it's not a space
490
        if token.text != " ":
×
491
            if len(word) == 0:
×
492
                # Log the start time of the new word
493
                word_start_time = token.start_time
×
494

495
            word = word + token.text
×
496
        # Word boundary is either a space or the last character in the array
497
        if token.text == " " or i == len(metadata.tokens) - 1:
×
498
            word_duration = token.start_time - word_start_time
×
499

500
            if word_duration < 0:
×
501
                word_duration = 0
×
502

503
            each_word = dict()
×
504
            each_word["word"] = word
×
505
            each_word["start_time"] = round(word_start_time, 4)
×
506
            each_word["duration"] = round(word_duration, 4)
×
507

508
            word_list.append(each_word)
×
509
            # Reset
510
            word = ""
×
511
            word_start_time = 0
×
512

513
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc