• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 6377635546

02 Oct 2023 08:21AM UTC coverage: 70.396% (-1.6%) from 71.99%
6377635546

push

github

web-flow
Merge pull request #900 from EsupPortail/develop

[DONE] #3.4.0

1509 of 1509 new or added lines in 58 files covered. (100.0%)

9288 of 13194 relevant lines covered (70.4%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
11

12
from webvtt import WebVTT, Caption
×
13

14
try:
×
15
    from shhlex import quote
×
16
except ImportError:
×
17
    from pipes import quote
×
18

19
import logging
×
20

21
try:
×
22
    from ..custom import settings_local
×
23
except ImportError:
×
24
    from .. import settings as settings_local
×
25

26
DEBUG = getattr(settings_local, "DEBUG", False)
×
27

28
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
29
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
30
if USE_TRANSCRIPTION:
×
31
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
32
    if TRANSCRIPTION_TYPE == "VOSK":
×
33
        from vosk import Model, KaldiRecognizer
×
34
    elif TRANSCRIPTION_TYPE == "STT":
×
35
        from stt import Model
×
36

37
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
38
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
39
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
40
)
41

42
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
43
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
44
)  # 10min
45
# time in sec for phrase length
46
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
47
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 3
48
)
49
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
50
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
51
)
52
log = logging.getLogger(__name__)
×
53

54

55
def get_model(lang):
×
56
    """Get model for STT or Vosk software to transcript audio."""
57
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
58
    if TRANSCRIPTION_TYPE == "STT":
×
59
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
60
            transript_model.setBeamWidth(
×
61
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
62
            )
63
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
64
            print(
×
65
                "Loading scorer from files {}".format(
66
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
67
                ),
68
                file=sys.stderr,
69
            )
70
            scorer_load_start = timer()
×
71
            transript_model.enableExternalScorer(
×
72
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
73
            )
74
            scorer_load_end = timer() - scorer_load_start
×
75
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
76
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
77
                "lm_alpha"
78
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
79
                transript_model.setScorerAlphaBeta(
×
80
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
81
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
82
                )
83
    return transript_model
×
84

85

86
def start_transcripting(mp3filepath, duration, lang):
×
87
    """
88
    Normalize the audio if set, get the model according to the lang and start transcript.
89
    """
90
    if TRANSCRIPTION_NORMALIZE:
×
91
        mp3filepath = normalize_mp3(mp3filepath)
×
92
    transript_model = get_model(lang)
×
93
    msg, webvtt, all_text = start_main_transcript(mp3filepath, duration, transript_model)
×
94
    if DEBUG:
×
95
        print(msg)
×
96
        print(webvtt)
×
97
        print("\n%s\n" % all_text)
×
98

99
    return msg, webvtt
×
100

101

102
def start_main_transcript(mp3filepath, duration, transript_model):
×
103
    """Call transcription depending software type."""
104
    if TRANSCRIPTION_TYPE == "STT":
×
105
        msg, webvtt, all_text = main_stt_transcript(
×
106
            mp3filepath, duration, transript_model
107
        )
108
    elif TRANSCRIPTION_TYPE == "VOSK":
×
109
        msg, webvtt, all_text = main_vosk_transcript(
×
110
            mp3filepath, duration, transript_model
111
        )
112
    return msg, webvtt, all_text
×
113

114

115
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
116
    """Convert audio to subaudio and add good sample rate."""
117
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
118
        quote(audio_path), desired_sample_rate
119
    )
120
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
121
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
122

123
    try:
×
124
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
125

126
    except subprocess.CalledProcessError as e:
×
127
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
128
    except OSError as e:
×
129
        raise OSError(
×
130
            e.errno,
131
            "SoX not found, use {}hz files or install it: {}".format(
132
                desired_sample_rate, e.strerror
133
            ),
134
        )
135

136
    return np.frombuffer(output, np.int16)
×
137

138

139
def normalize_mp3(mp3filepath):
×
140
    """Normalize the audio to good format and sound level."""
141
    filename, file_extension = os.path.splitext(mp3filepath)
×
142
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
143
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
144
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
145
    # normalize_cmd += \
146
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
147
    normalize_cmd += "--target-level {} -f -o {}".format(
×
148
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
149
    )
150
    if DEBUG:
×
151
        print(normalize_cmd)
×
152
    try:
×
153
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
154
        return mp3normfile
×
155
    except subprocess.CalledProcessError as e:
×
156
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
157
        return mp3filepath
×
158
    except OSError as e:
×
159
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
160
        return mp3filepath
×
161

162

163
# #################################
164
# TRANSCRIPT VIDEO : MAIN FUNCTION
165
# #################################
166

167

168
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
169
    """Convert audio to the good sample rate."""
170
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
171
        quote(audio_path), desired_sample_rate
172
    )
173
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
174
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
175

176
    try:
×
177
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
178

179
    except subprocess.CalledProcessError as e:
×
180
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
181
    except OSError as e:
×
182
        raise OSError(
×
183
            e.errno,
184
            "SoX not found, use {}hz files or install it: {}".format(
185
                desired_sample_rate, e.strerror
186
            ),
187
        )
188
    return output
×
189

190

191
def get_word_result_from_data(results, audio, rec):
×
192
    """Get subsound from audio and add transcription to result parameter."""
193
    while True:
194
        data = audio.stdout.read(4000)
×
195
        if len(data) == 0:
×
196
            break
×
197
        if rec.AcceptWaveform(data):
×
198
            results.append(rec.Result())
×
199
    results.append(rec.Result())
×
200

201

202
def words_to_vtt(
×
203
    words,
204
    start_trim,
205
    duration,
206
    is_first_caption,
207
    text_caption,
208
    start_caption,
209
    last_word_added,
210
    all_text,
211
    webvtt,
212
):
213
    """Convert word and time to webvtt captions."""
214
    for index, word in enumerate(words):
×
215
        start_key = "start_time"
×
216
        word_duration = word.get("duration", 0)
×
217
        last_word = words[-1]
×
218
        last_word_duration = last_word.get("duration", 0)
×
219
        if TRANSCRIPTION_TYPE == "VOSK":
×
220
            start_key = "start"
×
221
            word_duration = word["end"] - word["start"]
×
222
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
223
        next_word = None
×
224
        blank_duration = 0
×
225
        if word != words[-1] and (index + 1) < len(words):
×
226
            next_word = words[index + 1]
×
227
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
228
                ((word[start_key]) - start_caption) + word_duration
229
            )
230
        all_text += word["word"] + " "
×
231
        # word : <class 'dict'> {'word': 'bonjour', 'start ':
232
        # 0.58, 'duration': 7.34}
233
        text_caption.append(word["word"])
×
234
        if not (
×
235
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
236
            and (
237
                next_word is not None
238
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
239
            )
240
        ):
241
            # on créé le caption
242
            if is_first_caption:
×
243
                # A revoir, fusion de la nouvelle ligne avec
244
                # l'ancienne...
245
                is_first_caption = False
×
246
                text_caption = get_text_caption(text_caption, last_word_added)
×
247

248
            stop_caption = word[start_key] + word_duration
×
249

250
            # on evite le chevauchement
251
            change_previous_end_caption(webvtt, start_caption)
×
252

253
            caption = Caption(
×
254
                format_time_caption(start_caption),
255
                format_time_caption(stop_caption),
256
                " ".join(text_caption),
257
            )
258

259
            webvtt.captions.append(caption)
×
260
            # on remet tout à zero pour la prochaine phrase
261
            start_caption = word[start_key]
×
262
            text_caption = []
×
263
            last_word_added = word["word"]
×
264
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
265
        # on ajoute ici la dernière phrase de la vidéo
266
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
267
        caption = Caption(
×
268
            format_time_caption(start_caption),
269
            format_time_caption(stop_caption),
270
            " ".join(text_caption),
271
        )
272
        webvtt.captions.append(caption)
×
273
    return all_text, webvtt
×
274

275

276
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
277
    """Vosk transcription."""
278
    msg = ""
×
279
    inference_start = timer()
×
280
    msg += "\nInference start %0.3fs." % inference_start
×
281
    desired_sample_rate = 16000
×
282

283
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
284
    rec.SetWords(True)
×
285

286
    webvtt = WebVTT()
×
287
    all_text = ""
×
288
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
289
        audio = convert_vosk_samplerate(
×
290
            norm_mp3_file,
291
            desired_sample_rate,
292
            start_trim,
293
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
294
        )
295
        msg += "\nRunning inference."
×
296
        results = []
×
297
        get_word_result_from_data(results, audio, rec)
×
298
        for res in results:
×
299
            words = json.loads(res).get("result")
×
300
            text = json.loads(res).get("text")
×
301
            if not words:
×
302
                continue
×
303
            start_caption = words[0]["start"]
×
304
            stop_caption = words[-1]["end"]
×
305
            caption = Caption(
×
306
                format_time_caption(start_caption),
307
                format_time_caption(stop_caption),
308
                text,
309
            )
310
            webvtt.captions.append(caption)
×
311
            """
×
312
            text_caption = []
313
            is_first_caption = True
314
            all_text, webvtt = words_to_vtt(
315
                words,
316
                start_trim,
317
                duration,
318
                is_first_caption,
319
                text_caption,
320
                start_caption,
321
                last_word_added,
322
                all_text,
323
                webvtt,
324
            )
325
            """
326
    inference_end = timer() - inference_start
×
327

328
    msg += "\nInference took %0.3fs." % inference_end
×
329
    return msg, webvtt, all_text
×
330

331

332
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
333
    """STT transcription."""
334
    msg = ""
×
335
    inference_start = timer()
×
336
    msg += "\nInference start %0.3fs." % inference_start
×
337
    desired_sample_rate = transript_model.sampleRate()
×
338
    webvtt = WebVTT()
×
339
    last_word_added = ""
×
340
    metadata = None
×
341
    all_text = ""
×
342
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
343
        end_trim = (
×
344
            duration
345
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
346
            else (
347
                start_trim
348
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
349
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
350
            )
351
        )
352

353
        dur = (
×
354
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
355
            if (
356
                (
357
                    start_trim
358
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
359
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
360
                )
361
                < duration
362
            )
363
            else (duration - start_trim)
364
        )
365

366
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
367

368
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
369
        msg += "\nRunning inference."
×
370

371
        metadata = transript_model.sttWithMetadata(audio)
×
372

373
        for transcript in metadata.transcripts:
×
374
            msg += "\nConfidence : %s" % transcript.confidence
×
375
            words = words_from_candidate_transcript(transcript)
×
376
            start_caption = start_trim + words[0]["start_time"]
×
377
            text_caption = []
×
378
            is_first_caption = True
×
379
            all_text, webvtt = words_to_vtt(
×
380
                words,
381
                start_trim,
382
                duration,
383
                is_first_caption,
384
                text_caption,
385
                start_caption,
386
                last_word_added,
387
                all_text,
388
                webvtt,
389
            )
390
    inference_end = timer() - inference_start
×
391

392
    msg += "\nInference took %0.3fs." % inference_end
×
393
    return msg, webvtt, all_text
×
394

395

396
def change_previous_end_caption(webvtt, start_caption):
×
397
    """Change the end time for caption."""
398
    if len(webvtt.captions) > 0:
×
399
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
400
        td_prev_end = timedelta(
×
401
            hours=prev_end.hour,
402
            minutes=prev_end.minute,
403
            seconds=prev_end.second,
404
            microseconds=prev_end.microsecond,
405
        ).total_seconds()
406
        if td_prev_end > start_caption:
×
407
            webvtt.captions[-1].end = format_time_caption(start_caption)
×
408

409

410
def format_time_caption(time_caption):
×
411
    """Format time for webvtt caption."""
412
    return (
×
413
        dt.datetime.utcfromtimestamp(0) + timedelta(seconds=float(time_caption))
414
    ).strftime("%H:%M:%S.%f")[:-3]
415

416

417
def get_text_caption(text_caption, last_word_added):
×
418
    """get the text for a caption."""
419
    try:
×
420
        first_index = text_caption.index(last_word_added)
×
421
        return text_caption[first_index + 1 :]
×
422
    except ValueError:
×
423
        return text_caption
×
424

425

426
def words_from_candidate_transcript(metadata):
×
427
    """Get words list from transcription."""
428
    word = ""
×
429
    word_list = []
×
430
    word_start_time = 0
×
431
    # Loop through each character
432
    for i, token in enumerate(metadata.tokens):
×
433
        # Append character to word if it's not a space
434
        if token.text != " ":
×
435
            if len(word) == 0:
×
436
                # Log the start time of the new word
437
                word_start_time = token.start_time
×
438

439
            word = word + token.text
×
440
        # Word boundary is either a space or the last character in the array
441
        if token.text == " " or i == len(metadata.tokens) - 1:
×
442
            word_duration = token.start_time - word_start_time
×
443

444
            if word_duration < 0:
×
445
                word_duration = 0
×
446

447
            each_word = dict()
×
448
            each_word["word"] = word
×
449
            each_word["start_time"] = round(word_start_time, 4)
×
450
            each_word["duration"] = round(word_duration, 4)
×
451

452
            word_list.append(each_word)
×
453
            # Reset
454
            word = ""
×
455
            word_start_time = 0
×
456

457
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc