• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 6611338684

23 Oct 2023 09:27AM UTC coverage: 70.317% (+0.01%) from 70.305%
6611338684

push

github

web-flow
Merge pull request #971 from EsupPortail/develop

[DONE] Develop #3.4.1

49 of 49 new or added lines in 10 files covered. (100.0%)

9298 of 13223 relevant lines covered (70.32%)

0.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
11

12
from webvtt import WebVTT, Caption
×
13

14
try:
×
15
    from shhlex import quote
×
16
except ImportError:
×
17
    from pipes import quote
×
18

19
import logging
×
20

21
try:
×
22
    from ..custom import settings_local
×
23
except ImportError:
×
24
    from .. import settings as settings_local
×
25

26
DEBUG = getattr(settings_local, "DEBUG", False)
×
27

28
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
29
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
30
if USE_TRANSCRIPTION:
×
31
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
32
    if TRANSCRIPTION_TYPE == "VOSK":
×
33
        from vosk import Model, KaldiRecognizer
×
34
    elif TRANSCRIPTION_TYPE == "STT":
×
35
        from stt import Model
×
36

37
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
38
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
39
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
40
)
41

42
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
43
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
44
)  # 10min
45
# time in sec for phrase length
46
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
47
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 2
48
)
49
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
50
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
51
)
52
log = logging.getLogger(__name__)
×
53

54

55
def get_model(lang):
×
56
    """Get model for STT or Vosk software to transcript audio."""
57
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
58
    if TRANSCRIPTION_TYPE == "STT":
×
59
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
60
            transript_model.setBeamWidth(
×
61
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
62
            )
63
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
64
            print(
×
65
                "Loading scorer from files {}".format(
66
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
67
                ),
68
                file=sys.stderr,
69
            )
70
            scorer_load_start = timer()
×
71
            transript_model.enableExternalScorer(
×
72
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
73
            )
74
            scorer_load_end = timer() - scorer_load_start
×
75
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
76
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
77
                "lm_alpha"
78
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
79
                transript_model.setScorerAlphaBeta(
×
80
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
81
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
82
                )
83
    return transript_model
×
84

85

86
def start_transcripting(mp3filepath, duration, lang):
×
87
    """
88
    Start direct transcription.
89

90
    Normalize the audio if set, get the model according to the lang and start transcript.
91
    """
92
    if TRANSCRIPTION_NORMALIZE:
×
93
        mp3filepath = normalize_mp3(mp3filepath)
×
94
    transript_model = get_model(lang)
×
95
    msg, webvtt, all_text = start_main_transcript(mp3filepath, duration, transript_model)
×
96
    if DEBUG:
×
97
        print(msg)
×
98
        print(webvtt)
×
99
        print("\n%s\n" % all_text)
×
100

101
    return msg, webvtt
×
102

103

104
def start_main_transcript(mp3filepath, duration, transript_model):
×
105
    """Call transcription depending software type."""
106
    if TRANSCRIPTION_TYPE == "STT":
×
107
        msg, webvtt, all_text = main_stt_transcript(
×
108
            mp3filepath, duration, transript_model
109
        )
110
    elif TRANSCRIPTION_TYPE == "VOSK":
×
111
        msg, webvtt, all_text = main_vosk_transcript(
×
112
            mp3filepath, duration, transript_model
113
        )
114
    return msg, webvtt, all_text
×
115

116

117
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
118
    """Convert audio to subaudio and add good sample rate."""
119
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
120
        quote(audio_path), desired_sample_rate
121
    )
122
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
123
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
124

125
    try:
×
126
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
127

128
    except subprocess.CalledProcessError as e:
×
129
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
130
    except OSError as e:
×
131
        raise OSError(
×
132
            e.errno,
133
            "SoX not found, use {}hz files or install it: {}".format(
134
                desired_sample_rate, e.strerror
135
            ),
136
        )
137

138
    return np.frombuffer(output, np.int16)
×
139

140

141
def normalize_mp3(mp3filepath):
×
142
    """Normalize the audio to good format and sound level."""
143
    filename, file_extension = os.path.splitext(mp3filepath)
×
144
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
145
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
146
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
147
    # normalize_cmd += \
148
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
149
    normalize_cmd += "--target-level {} -f -o {}".format(
×
150
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
151
    )
152
    if DEBUG:
×
153
        print(normalize_cmd)
×
154
    try:
×
155
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
156
        return mp3normfile
×
157
    except subprocess.CalledProcessError as e:
×
158
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
159
        return mp3filepath
×
160
    except OSError as e:
×
161
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
162
        return mp3filepath
×
163

164

165
# #################################
166
# TRANSCRIPT VIDEO: MAIN FUNCTION
167
# #################################
168

169

170
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
171
    """Convert audio to the good sample rate."""
172
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
173
        quote(audio_path), desired_sample_rate
174
    )
175
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
176
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
177

178
    try:
×
179
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
180

181
    except subprocess.CalledProcessError as e:
×
182
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
183
    except OSError as e:
×
184
        raise OSError(
×
185
            e.errno,
186
            "SoX not found, use {}hz files or install it: {}".format(
187
                desired_sample_rate, e.strerror
188
            ),
189
        )
190
    return output
×
191

192

193
def get_word_result_from_data(results, audio, rec):
×
194
    """Get subsound from audio and add transcription to result parameter."""
195
    while True:
196
        data = audio.stdout.read(4000)
×
197
        if len(data) == 0:
×
198
            break
×
199
        if rec.AcceptWaveform(data):
×
200
            results.append(rec.Result())
×
201
    results.append(rec.Result())
×
202

203

204
def words_to_vtt(
×
205
    words,
206
    start_trim,
207
    duration,
208
    is_first_caption,
209
    text_caption,
210
    start_caption,
211
    last_word_added,
212
    all_text,
213
    webvtt,
214
):
215
    """Convert word and time to webvtt captions."""
216
    for index, word in enumerate(words):
×
217
        start_key = "start_time"
×
218
        word_duration = word.get("duration", 0)
×
219
        last_word = words[-1]
×
220
        last_word_duration = last_word.get("duration", 0)
×
221
        if TRANSCRIPTION_TYPE == "VOSK":
×
222
            start_key = "start"
×
223
            word_duration = word["end"] - word["start"]
×
224
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
225
        next_word = None
×
226
        blank_duration = 0
×
227
        if word != words[-1] and (index + 1) < len(words):
×
228
            next_word = words[index + 1]
×
229
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
230
                ((word[start_key]) - start_caption) + word_duration
231
            )
232
        all_text += word["word"] + " "
×
233
        # word: <class 'dict'> {'word': 'bonjour', 'start ':
234
        # 0.58, 'duration': 7.34}
235
        text_caption.append(word["word"])
×
236
        if not (
×
237
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
238
            and (
239
                next_word is not None
240
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
241
            )
242
        ):
243
            # on créé le caption
244
            if is_first_caption:
×
245
                # A revoir, fusion de la nouvelle ligne avec
246
                # l'ancienne...
247
                is_first_caption = False
×
248
                text_caption = get_text_caption(text_caption, last_word_added)
×
249

250
            stop_caption = word[start_key] + word_duration
×
251

252
            # on evite le chevauchement
253
            change_previous_end_caption(webvtt, start_caption)
×
254

255
            caption = Caption(
×
256
                format_time_caption(start_caption),
257
                format_time_caption(stop_caption),
258
                " ".join(text_caption),
259
            )
260

261
            webvtt.captions.append(caption)
×
262
            # on remet tout à zero pour la prochaine phrase
263
            start_caption = word[start_key]
×
264
            text_caption = []
×
265
            last_word_added = word["word"]
×
266
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
267
        # on ajoute ici la dernière phrase de la vidéo
268
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
269
        caption = Caption(
×
270
            format_time_caption(start_caption),
271
            format_time_caption(stop_caption),
272
            " ".join(text_caption),
273
        )
274
        webvtt.captions.append(caption)
×
275
    return all_text, webvtt
×
276

277

278
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
279
    """Vosk transcription."""
280
    msg = ""
×
281
    inference_start = timer()
×
282
    msg += "\nInference start %0.3fs." % inference_start
×
283
    desired_sample_rate = 16000
×
284

285
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
286
    rec.SetWords(True)
×
287

288
    webvtt = WebVTT()
×
289
    all_text = ""
×
290
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
291
        audio = convert_vosk_samplerate(
×
292
            norm_mp3_file,
293
            desired_sample_rate,
294
            start_trim,
295
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
296
        )
297
        msg += "\nRunning inference."
×
298
        results = []
×
299
        get_word_result_from_data(results, audio, rec)
×
300
        for res in results:
×
301
            words = json.loads(res).get("result")
×
302
            text = json.loads(res).get("text")
×
303
            if not words:
×
304
                continue
×
305
            start_caption = words[0]["start"]
×
306
            stop_caption = words[-1]["end"]
×
307
            caption = Caption(
×
308
                format_time_caption(start_caption),
309
                format_time_caption(stop_caption),
310
                text,
311
            )
312
            webvtt.captions.append(caption)
×
313
            """
×
314
            text_caption = []
315
            is_first_caption = True
316
            all_text, webvtt = words_to_vtt(
317
                words,
318
                start_trim,
319
                duration,
320
                is_first_caption,
321
                text_caption,
322
                start_caption,
323
                last_word_added,
324
                all_text,
325
                webvtt,
326
            )
327
            """
328
    inference_end = timer() - inference_start
×
329

330
    msg += "\nInference took %0.3fs." % inference_end
×
331
    return msg, webvtt, all_text
×
332

333

334
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
335
    """STT transcription."""
336
    msg = ""
×
337
    inference_start = timer()
×
338
    msg += "\nInference start %0.3fs." % inference_start
×
339
    desired_sample_rate = transript_model.sampleRate()
×
340
    webvtt = WebVTT()
×
341
    last_word_added = ""
×
342
    metadata = None
×
343
    all_text = ""
×
344
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
345
        end_trim = (
×
346
            duration
347
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
348
            else (
349
                start_trim
350
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
351
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
352
            )
353
        )
354

355
        dur = (
×
356
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
357
            if (
358
                (
359
                    start_trim
360
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
361
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
362
                )
363
                < duration
364
            )
365
            else (duration - start_trim)
366
        )
367

368
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
369

370
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
371
        msg += "\nRunning inference."
×
372

373
        metadata = transript_model.sttWithMetadata(audio)
×
374

375
        for transcript in metadata.transcripts:
×
376
            msg += "\nConfidence: %s" % transcript.confidence
×
377
            words = words_from_candidate_transcript(transcript)
×
378
            start_caption = start_trim + words[0]["start_time"]
×
379
            text_caption = []
×
380
            is_first_caption = True
×
381
            all_text, webvtt = words_to_vtt(
×
382
                words,
383
                start_trim,
384
                duration,
385
                is_first_caption,
386
                text_caption,
387
                start_caption,
388
                last_word_added,
389
                all_text,
390
                webvtt,
391
            )
392
    inference_end = timer() - inference_start
×
393

394
    msg += "\nInference took %0.3fs." % inference_end
×
395
    return msg, webvtt, all_text
×
396

397

398
def change_previous_end_caption(webvtt, start_caption):
×
399
    """Change the end time for caption."""
400
    if len(webvtt.captions) > 0:
×
401
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
402
        td_prev_end = timedelta(
×
403
            hours=prev_end.hour,
404
            minutes=prev_end.minute,
405
            seconds=prev_end.second,
406
            microseconds=prev_end.microsecond,
407
        ).total_seconds()
408
        if td_prev_end > start_caption:
×
409
            webvtt.captions[-1].end = format_time_caption(start_caption)
×
410

411

412
def format_time_caption(time_caption):
×
413
    """Format time for webvtt caption."""
414
    return (
×
415
        dt.datetime.utcfromtimestamp(0) + timedelta(seconds=float(time_caption))
416
    ).strftime("%H:%M:%S.%f")[:-3]
417

418

419
def get_text_caption(text_caption, last_word_added):
×
420
    """Get the text for a caption."""
421
    try:
×
422
        first_index = text_caption.index(last_word_added)
×
423
        return text_caption[first_index + 1 :]
×
424
    except ValueError:
×
425
        return text_caption
×
426

427

428
def words_from_candidate_transcript(metadata):
×
429
    """Get words list from transcription."""
430
    word = ""
×
431
    word_list = []
×
432
    word_start_time = 0
×
433
    # Loop through each character
434
    for i, token in enumerate(metadata.tokens):
×
435
        # Append character to word if it's not a space
436
        if token.text != " ":
×
437
            if len(word) == 0:
×
438
                # Log the start time of the new word
439
                word_start_time = token.start_time
×
440

441
            word = word + token.text
×
442
        # Word boundary is either a space or the last character in the array
443
        if token.text == " " or i == len(metadata.tokens) - 1:
×
444
            word_duration = token.start_time - word_start_time
×
445

446
            if word_duration < 0:
×
447
                word_duration = 0
×
448

449
            each_word = dict()
×
450
            each_word["word"] = word
×
451
            each_word["start_time"] = round(word_start_time, 4)
×
452
            each_word["duration"] = round(word_duration, 4)
×
453

454
            word_list.append(each_word)
×
455
            # Reset
456
            word = ""
×
457
            word_start_time = 0
×
458

459
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc