• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 6654967569

26 Oct 2023 01:39PM UTC coverage: 70.273% (-0.04%) from 70.317%
6654967569

Pull #993

github

web-flow
change version to 3.5.0 (#994)
Pull Request #993: [WIP] Develop -> 3.5.0 (#980)

69 of 69 new or added lines in 5 files covered. (100.0%)

9340 of 13291 relevant lines covered (70.27%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
11

12
from webvtt import WebVTT, Caption
×
13

14
try:
×
15
    from shhlex import quote
×
16
except ImportError:
×
17
    from pipes import quote
×
18

19
import logging
×
20

21
try:
×
22
    from ..custom import settings_local
×
23
except ImportError:
×
24
    from .. import settings as settings_local
×
25

26
DEBUG = getattr(settings_local, "DEBUG", False)
×
27

28
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
29
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
30
if USE_TRANSCRIPTION:
×
31
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
32
    if TRANSCRIPTION_TYPE == "VOSK":
×
33
        from vosk import Model, KaldiRecognizer
×
34
    elif TRANSCRIPTION_TYPE == "STT":
×
35
        from stt import Model
×
36
    elif TRANSCRIPTION_TYPE == "WHISPER":
×
37
        import whisper
×
38

39
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
40
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
41
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
42
)
43

44
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
45
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
46
)  # 10min
47
# time in sec for phrase length
48
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
49
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 2
50
)
51
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
52
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
53
)
54
log = logging.getLogger(__name__)
×
55

56

57
def get_model(lang):
×
58
    """Get model for STT or Vosk software to transcript audio."""
59
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
60
    if TRANSCRIPTION_TYPE == "STT":
×
61
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
62
            transript_model.setBeamWidth(
×
63
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
64
            )
65
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
66
            print(
×
67
                "Loading scorer from files {}".format(
68
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
69
                ),
70
                file=sys.stderr,
71
            )
72
            scorer_load_start = timer()
×
73
            transript_model.enableExternalScorer(
×
74
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
75
            )
76
            scorer_load_end = timer() - scorer_load_start
×
77
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
78
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
79
                "lm_alpha"
80
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
81
                transript_model.setScorerAlphaBeta(
×
82
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
83
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
84
                )
85
    return transript_model
×
86

87

88
def start_transcripting(mp3filepath, duration, lang):
×
89
    """
90
    Start direct transcription.
91

92
    Normalize the audio if set, get the model according to the lang and start transcript.
93
    """
94
    if TRANSCRIPTION_NORMALIZE:
×
95
        mp3filepath = normalize_mp3(mp3filepath)
×
96
    if TRANSCRIPTION_TYPE == "WHISPER":
×
97
        msg, webvtt, all_text = main_whisper_transcript(mp3filepath, lang)
×
98
    else:
99
        transript_model = get_model(lang)
×
100
        msg, webvtt, all_text = start_main_transcript(
×
101
            mp3filepath, duration, transript_model
102
        )
103
    if DEBUG:
×
104
        print(msg)
×
105
        print(webvtt)
×
106
        print("\n%s\n" % all_text)
×
107

108
    return msg, webvtt
×
109

110

111
def start_main_transcript(mp3filepath, duration, transript_model):
×
112
    """Call transcription depending software type."""
113
    if TRANSCRIPTION_TYPE == "STT":
×
114
        msg, webvtt, all_text = main_stt_transcript(
×
115
            mp3filepath, duration, transript_model
116
        )
117
    elif TRANSCRIPTION_TYPE == "VOSK":
×
118
        msg, webvtt, all_text = main_vosk_transcript(
×
119
            mp3filepath, duration, transript_model
120
        )
121
    return msg, webvtt, all_text
×
122

123

124
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
125
    """Convert audio to subaudio and add good sample rate."""
126
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
127
        quote(audio_path), desired_sample_rate
128
    )
129
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
130
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
131

132
    try:
×
133
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
134

135
    except subprocess.CalledProcessError as e:
×
136
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
137
    except OSError as e:
×
138
        raise OSError(
×
139
            e.errno,
140
            "SoX not found, use {}hz files or install it: {}".format(
141
                desired_sample_rate, e.strerror
142
            ),
143
        )
144

145
    return np.frombuffer(output, np.int16)
×
146

147

148
def normalize_mp3(mp3filepath):
×
149
    """Normalize the audio to good format and sound level."""
150
    filename, file_extension = os.path.splitext(mp3filepath)
×
151
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
152
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
153
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
154
    # normalize_cmd += \
155
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
156
    normalize_cmd += "--target-level {} -f -o {}".format(
×
157
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
158
    )
159
    if DEBUG:
×
160
        print(normalize_cmd)
×
161
    try:
×
162
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
163
        return mp3normfile
×
164
    except subprocess.CalledProcessError as e:
×
165
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
166
        return mp3filepath
×
167
    except OSError as e:
×
168
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
169
        return mp3filepath
×
170

171

172
# #################################
173
# TRANSCRIPT VIDEO: MAIN FUNCTION
174
# #################################
175

176

177
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
178
    """Convert audio to the good sample rate."""
179
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
180
        quote(audio_path), desired_sample_rate
181
    )
182
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
183
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
184

185
    try:
×
186
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
187

188
    except subprocess.CalledProcessError as e:
×
189
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
190
    except OSError as e:
×
191
        raise OSError(
×
192
            e.errno,
193
            "SoX not found, use {}hz files or install it: {}".format(
194
                desired_sample_rate, e.strerror
195
            ),
196
        )
197
    return output
×
198

199

200
def get_word_result_from_data(results, audio, rec):
×
201
    """Get subsound from audio and add transcription to result parameter."""
202
    while True:
203
        data = audio.stdout.read(4000)
×
204
        if len(data) == 0:
×
205
            break
×
206
        if rec.AcceptWaveform(data):
×
207
            results.append(rec.Result())
×
208
    results.append(rec.Result())
×
209

210

211
def words_to_vtt(
×
212
    words,
213
    start_trim,
214
    duration,
215
    is_first_caption,
216
    text_caption,
217
    start_caption,
218
    last_word_added,
219
    all_text,
220
    webvtt,
221
):
222
    """Convert word and time to webvtt captions."""
223
    for index, word in enumerate(words):
×
224
        start_key = "start_time"
×
225
        word_duration = word.get("duration", 0)
×
226
        last_word = words[-1]
×
227
        last_word_duration = last_word.get("duration", 0)
×
228
        if TRANSCRIPTION_TYPE == "VOSK":
×
229
            start_key = "start"
×
230
            word_duration = word["end"] - word["start"]
×
231
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
232
        next_word = None
×
233
        blank_duration = 0
×
234
        if word != words[-1] and (index + 1) < len(words):
×
235
            next_word = words[index + 1]
×
236
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
237
                ((word[start_key]) - start_caption) + word_duration
238
            )
239
        all_text += word["word"] + " "
×
240
        # word: <class 'dict'> {'word': 'bonjour', 'start ':
241
        # 0.58, 'duration': 7.34}
242
        text_caption.append(word["word"])
×
243
        if not (
×
244
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
245
            and (
246
                next_word is not None
247
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
248
            )
249
        ):
250
            # on créé le caption
251
            if is_first_caption:
×
252
                # A revoir, fusion de la nouvelle ligne avec
253
                # l'ancienne...
254
                is_first_caption = False
×
255
                text_caption = get_text_caption(text_caption, last_word_added)
×
256

257
            stop_caption = word[start_key] + word_duration
×
258

259
            # on evite le chevauchement
260
            change_previous_end_caption(webvtt, start_caption)
×
261

262
            caption = Caption(
×
263
                format_time_caption(start_caption),
264
                format_time_caption(stop_caption),
265
                " ".join(text_caption),
266
            )
267

268
            webvtt.captions.append(caption)
×
269
            # on remet tout à zero pour la prochaine phrase
270
            start_caption = word[start_key]
×
271
            text_caption = []
×
272
            last_word_added = word["word"]
×
273
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
274
        # on ajoute ici la dernière phrase de la vidéo
275
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
276
        caption = Caption(
×
277
            format_time_caption(start_caption),
278
            format_time_caption(stop_caption),
279
            " ".join(text_caption),
280
        )
281
        webvtt.captions.append(caption)
×
282
    return all_text, webvtt
×
283

284

285
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
286
    """Vosk transcription."""
287
    msg = ""
×
288
    inference_start = timer()
×
289
    msg += "\nInference start %0.3fs." % inference_start
×
290
    desired_sample_rate = 16000
×
291

292
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
293
    rec.SetWords(True)
×
294

295
    webvtt = WebVTT()
×
296
    all_text = ""
×
297
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
298
        audio = convert_vosk_samplerate(
×
299
            norm_mp3_file,
300
            desired_sample_rate,
301
            start_trim,
302
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
303
        )
304
        msg += "\nRunning inference."
×
305
        results = []
×
306
        get_word_result_from_data(results, audio, rec)
×
307
        for res in results:
×
308
            words = json.loads(res).get("result")
×
309
            text = json.loads(res).get("text")
×
310
            if not words:
×
311
                continue
×
312
            start_caption = words[0]["start"]
×
313
            stop_caption = words[-1]["end"]
×
314
            caption = Caption(
×
315
                format_time_caption(start_caption),
316
                format_time_caption(stop_caption),
317
                text,
318
            )
319
            webvtt.captions.append(caption)
×
320
            """
×
321
            text_caption = []
322
            is_first_caption = True
323
            all_text, webvtt = words_to_vtt(
324
                words,
325
                start_trim,
326
                duration,
327
                is_first_caption,
328
                text_caption,
329
                start_caption,
330
                last_word_added,
331
                all_text,
332
                webvtt,
333
            )
334
            """
335
    inference_end = timer() - inference_start
×
336

337
    msg += "\nInference took %0.3fs." % inference_end
×
338
    return msg, webvtt, all_text
×
339

340

341
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
342
    """STT transcription."""
343
    msg = ""
×
344
    inference_start = timer()
×
345
    msg += "\nInference start %0.3fs." % inference_start
×
346
    desired_sample_rate = transript_model.sampleRate()
×
347
    webvtt = WebVTT()
×
348
    last_word_added = ""
×
349
    metadata = None
×
350
    all_text = ""
×
351
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
352
        end_trim = (
×
353
            duration
354
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
355
            else (
356
                start_trim
357
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
358
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
359
            )
360
        )
361

362
        dur = (
×
363
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
364
            if (
365
                (
366
                    start_trim
367
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
368
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
369
                )
370
                < duration
371
            )
372
            else (duration - start_trim)
373
        )
374

375
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
376

377
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
378
        msg += "\nRunning inference."
×
379

380
        metadata = transript_model.sttWithMetadata(audio)
×
381

382
        for transcript in metadata.transcripts:
×
383
            msg += "\nConfidence: %s" % transcript.confidence
×
384
            words = words_from_candidate_transcript(transcript)
×
385
            start_caption = start_trim + words[0]["start_time"]
×
386
            text_caption = []
×
387
            is_first_caption = True
×
388
            all_text, webvtt = words_to_vtt(
×
389
                words,
390
                start_trim,
391
                duration,
392
                is_first_caption,
393
                text_caption,
394
                start_caption,
395
                last_word_added,
396
                all_text,
397
                webvtt,
398
            )
399
    inference_end = timer() - inference_start
×
400

401
    msg += "\nInference took %0.3fs." % inference_end
×
402
    return msg, webvtt, all_text
×
403

404

405
def main_whisper_transcript(norm_mp3_file, lang):
×
406
    """Whisper transcription."""
407
    msg = ""
×
408
    all_text = ""
×
409
    webvtt = WebVTT()
×
410
    inference_start = timer()
×
411
    msg += "\nInference start %0.3fs." % inference_start
×
412

413
    model = whisper.load_model(
×
414
        TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"],
415
        download_root=TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang][
416
            "download_root"
417
        ],
418
    )
419

420
    transcription = model.transcribe(norm_mp3_file, language=lang)
×
421
    msg += "\nRunning inference."
×
422

423
    for segment in transcription["segments"]:
×
424
        caption = Caption(
×
425
            format_time_caption(segment["start"]),
426
            format_time_caption(segment["end"]),
427
            segment["text"],
428
        )
429
        webvtt.captions.append(caption)
×
430

431
    inference_end = timer() - inference_start
×
432
    msg += "\nInference took %0.3fs." % inference_end
×
433
    return msg, webvtt, all_text
×
434

435

436
def change_previous_end_caption(webvtt, start_caption):
×
437
    """Change the end time for caption."""
438
    if len(webvtt.captions) > 0:
×
439
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
440
        td_prev_end = timedelta(
×
441
            hours=prev_end.hour,
442
            minutes=prev_end.minute,
443
            seconds=prev_end.second,
444
            microseconds=prev_end.microsecond,
445
        ).total_seconds()
446
        if td_prev_end > start_caption:
×
447
            webvtt.captions[-1].end = format_time_caption(start_caption)
×
448

449

450
def format_time_caption(time_caption):
×
451
    """Format time for webvtt caption."""
452
    return (
×
453
        dt.datetime.utcfromtimestamp(0) + timedelta(seconds=float(time_caption))
454
    ).strftime("%H:%M:%S.%f")[:-3]
455

456

457
def get_text_caption(text_caption, last_word_added):
×
458
    """Get the text for a caption."""
459
    try:
×
460
        first_index = text_caption.index(last_word_added)
×
461
        return text_caption[first_index + 1 :]
×
462
    except ValueError:
×
463
        return text_caption
×
464

465

466
def words_from_candidate_transcript(metadata):
×
467
    """Get words list from transcription."""
468
    word = ""
×
469
    word_list = []
×
470
    word_start_time = 0
×
471
    # Loop through each character
472
    for i, token in enumerate(metadata.tokens):
×
473
        # Append character to word if it's not a space
474
        if token.text != " ":
×
475
            if len(word) == 0:
×
476
                # Log the start time of the new word
477
                word_start_time = token.start_time
×
478

479
            word = word + token.text
×
480
        # Word boundary is either a space or the last character in the array
481
        if token.text == " " or i == len(metadata.tokens) - 1:
×
482
            word_duration = token.start_time - word_start_time
×
483

484
            if word_duration < 0:
×
485
                word_duration = 0
×
486

487
            each_word = dict()
×
488
            each_word["word"] = word
×
489
            each_word["start_time"] = round(word_start_time, 4)
×
490
            each_word["duration"] = round(word_duration, 4)
×
491

492
            word_list.append(each_word)
×
493
            # Reset
494
            word = ""
×
495
            word_start_time = 0
×
496

497
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc