• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 5462629459

pending completion
5462629459

Pull #899

github

web-flow
Merge e759dacb6 into c94f0e331
Pull Request #899: [DONE] Ptitloup/feature new encoding

400 of 400 new or added lines in 14 files covered. (100.0%)

8968 of 12675 relevant lines covered (70.75%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
11

12
from webvtt import WebVTT, Caption
×
13

14
try:
×
15
    from shhlex import quote
×
16
except ImportError:
×
17
    from pipes import quote
×
18

19
import logging
×
20

21
from ..custom import settings_local
×
22

23
DEBUG = getattr(settings_local, "DEBUG", False)
×
24

25
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
26
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
27
if USE_TRANSCRIPTION:
×
28
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
29
    if TRANSCRIPTION_TYPE == "VOSK":
×
30
        from vosk import Model, KaldiRecognizer
×
31
    elif TRANSCRIPTION_TYPE == "STT":
×
32
        from stt import Model
×
33

34
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
35
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
36
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
37
)
38

39
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
40
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
41
)  # 10min
42
# time in sec for phrase length
43
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
44
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 3
45
)
46
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
47
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
48
)
49
log = logging.getLogger(__name__)
×
50

51

52
def get_model(lang):
×
53
    """Get model for STT or Vosk software to transcript audio."""
54
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
55
    if TRANSCRIPTION_TYPE == "STT":
×
56
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
57
            transript_model.setBeamWidth(
×
58
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
59
            )
60
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
61
            print(
×
62
                "Loading scorer from files {}".format(
63
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
64
                ),
65
                file=sys.stderr,
66
            )
67
            scorer_load_start = timer()
×
68
            transript_model.enableExternalScorer(
×
69
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
70
            )
71
            scorer_load_end = timer() - scorer_load_start
×
72
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
73
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
74
                "lm_alpha"
75
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
76
                transript_model.setScorerAlphaBeta(
×
77
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
78
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
79
                )
80
    return transript_model
×
81

82

83
def start_transcripting(mp3filepath, duration, lang):
×
84
    """
85
    Normalize the audio if set, get the model according to the lang and start transcript.
86
    """
87
    if TRANSCRIPTION_NORMALIZE:
×
88
        mp3filepath = normalize_mp3(mp3filepath)
×
89
    transript_model = get_model(lang)
×
90
    msg, webvtt, all_text = start_main_transcript(
×
91
        mp3filepath, duration, transript_model
92
    )
93
    if DEBUG:
×
94
        print(msg)
×
95
        print(webvtt)
×
96
        print("\n%s\n" % all_text)
×
97

98
    return msg, webvtt
×
99

100

101
def start_main_transcript(mp3filepath, duration, transript_model):
×
102
    """Call transcription depending software type."""
103
    if TRANSCRIPTION_TYPE == "STT":
×
104
        msg, webvtt, all_text = main_stt_transcript(
×
105
            mp3filepath, duration, transript_model
106
        )
107
    elif TRANSCRIPTION_TYPE == "VOSK":
×
108
        msg, webvtt, all_text = main_vosk_transcript(
×
109
            mp3filepath, duration, transript_model
110
        )
111
    return msg, webvtt, all_text
×
112

113

114
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
115
    """Convert audio to subaudio and add good sample rate."""
116
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
117
        quote(audio_path), desired_sample_rate
118
    )
119
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
120
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
121

122
    try:
×
123
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
124

125
    except subprocess.CalledProcessError as e:
×
126
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
127
    except OSError as e:
×
128
        raise OSError(
×
129
            e.errno,
130
            "SoX not found, use {}hz files or install it: {}".format(
131
                desired_sample_rate, e.strerror
132
            ),
133
        )
134

135
    return np.frombuffer(output, np.int16)
×
136

137

138
def normalize_mp3(mp3filepath):
×
139
    """Normalize the audio to good format and sound level."""
140
    filename, file_extension = os.path.splitext(mp3filepath)
×
141
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
142
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
143
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
144
    # normalize_cmd += \
145
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
146
    normalize_cmd += "--target-level {} -f -o {}".format(
×
147
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
148
    )
149
    if DEBUG:
×
150
        print(normalize_cmd)
×
151
    try:
×
152
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
153
        return mp3normfile
×
154
    except subprocess.CalledProcessError as e:
×
155
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
156
        return mp3filepath
×
157
    except OSError as e:
×
158
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
159
        return mp3filepath
×
160

161

162
# #################################
163
# TRANSCRIPT VIDEO : MAIN FUNCTION
164
# #################################
165

166

167
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
168
    """Convert audio to the good sample rate."""
169
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
170
        quote(audio_path), desired_sample_rate
171
    )
172
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
173
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
174

175
    try:
×
176
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
177

178
    except subprocess.CalledProcessError as e:
×
179
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
180
    except OSError as e:
×
181
        raise OSError(
×
182
            e.errno,
183
            "SoX not found, use {}hz files or install it: {}".format(
184
                desired_sample_rate, e.strerror
185
            ),
186
        )
187
    return output
×
188

189

190
def get_word_result_from_data(results, audio, rec):
×
191
    """Get subsound from audio and add transcription to result parameter."""
192
    while True:
193
        data = audio.stdout.read(4000)
×
194
        if len(data) == 0:
×
195
            break
×
196
        if rec.AcceptWaveform(data):
×
197
            results.append(rec.Result())
×
198
    results.append(rec.Result())
×
199

200

201
def words_to_vtt(
×
202
    words,
203
    start_trim,
204
    duration,
205
    is_first_caption,
206
    text_caption,
207
    start_caption,
208
    last_word_added,
209
    all_text,
210
    webvtt,
211
):
212
    """Convert word and time to webvtt captions."""
213
    for index, word in enumerate(words):
×
214
        start_key = "start_time"
×
215
        word_duration = word.get("duration", 0)
×
216
        last_word = words[-1]
×
217
        last_word_duration = last_word.get("duration", 0)
×
218
        if TRANSCRIPTION_TYPE == "VOSK":
×
219
            start_key = "start"
×
220
            word_duration = word["end"] - word["start"]
×
221
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
222
        next_word = None
×
223
        blank_duration = 0
×
224
        if word != words[-1] and (index + 1) < len(words):
×
225
            next_word = words[index + 1]
×
226
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
227
                ((word[start_key]) - start_caption) + word_duration
228
            )
229
        all_text += word["word"] + " "
×
230
        # word : <class 'dict'> {'word': 'bonjour', 'start ':
231
        # 0.58, 'duration': 7.34}
232
        text_caption.append(word["word"])
×
233
        if not (
×
234
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
235
            and (
236
                next_word is not None
237
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
238
            )
239
        ):
240
            # on créé le caption
241
            if is_first_caption:
×
242
                # A revoir, fusion de la nouvelle ligne avec
243
                # l'ancienne...
244
                is_first_caption = False
×
245
                text_caption = get_text_caption(text_caption, last_word_added)
×
246

247
            stop_caption = word[start_key] + word_duration
×
248

249
            # on evite le chevauchement
250
            change_previous_end_caption(webvtt, start_caption)
×
251

252
            caption = Caption(
×
253
                format_time_caption(start_caption),
254
                format_time_caption(stop_caption),
255
                " ".join(text_caption),
256
            )
257

258
            webvtt.captions.append(caption)
×
259
            # on remet tout à zero pour la prochaine phrase
260
            start_caption = word[start_key]
×
261
            text_caption = []
×
262
            last_word_added = word["word"]
×
263
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
264
        # on ajoute ici la dernière phrase de la vidéo
265
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
266
        caption = Caption(
×
267
            format_time_caption(start_caption),
268
            format_time_caption(stop_caption),
269
            " ".join(text_caption),
270
        )
271
        webvtt.captions.append(caption)
×
272
    return all_text, webvtt
×
273

274

275
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
276
    """Vosk transcription."""
277
    msg = ""
×
278
    inference_start = timer()
×
279
    msg += "\nInference start %0.3fs." % inference_start
×
280
    desired_sample_rate = 16000
×
281

282
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
283
    rec.SetWords(True)
×
284

285
    webvtt = WebVTT()
×
286
    all_text = ""
×
287
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
288
        audio = convert_vosk_samplerate(
×
289
            norm_mp3_file,
290
            desired_sample_rate,
291
            start_trim,
292
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
293
        )
294
        msg += "\nRunning inference."
×
295
        results = []
×
296
        get_word_result_from_data(results, audio, rec)
×
297
        for res in results:
×
298
            words = json.loads(res).get("result")
×
299
            text = json.loads(res).get("text")
×
300
            if not words:
×
301
                continue
×
302
            start_caption = words[0]["start"]
×
303
            stop_caption = words[-1]["end"]
×
304
            caption = Caption(
×
305
                format_time_caption(start_caption),
306
                format_time_caption(stop_caption),
307
                text,
308
            )
309
            webvtt.captions.append(caption)
×
310
            """
×
311
            text_caption = []
312
            is_first_caption = True
313
            all_text, webvtt = words_to_vtt(
314
                words,
315
                start_trim,
316
                duration,
317
                is_first_caption,
318
                text_caption,
319
                start_caption,
320
                last_word_added,
321
                all_text,
322
                webvtt,
323
            )
324
            """
325
    inference_end = timer() - inference_start
×
326

327
    msg += "\nInference took %0.3fs." % inference_end
×
328
    return msg, webvtt, all_text
×
329

330

331
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
332
    """STT transcription."""
333
    msg = ""
×
334
    inference_start = timer()
×
335
    msg += "\nInference start %0.3fs." % inference_start
×
336
    desired_sample_rate = transript_model.sampleRate()
×
337
    webvtt = WebVTT()
×
338
    last_word_added = ""
×
339
    metadata = None
×
340
    all_text = ""
×
341
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
342
        end_trim = (
×
343
            duration
344
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
345
            else (
346
                start_trim
347
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
348
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
349
            )
350
        )
351

352
        dur = (
×
353
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
354
            if (
355
                (
356
                    start_trim
357
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
358
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
359
                )
360
                < duration
361
            )
362
            else (duration - start_trim)
363
        )
364

365
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
366

367
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
368
        msg += "\nRunning inference."
×
369

370
        metadata = transript_model.sttWithMetadata(audio)
×
371

372
        for transcript in metadata.transcripts:
×
373
            msg += "\nConfidence : %s" % transcript.confidence
×
374
            words = words_from_candidate_transcript(transcript)
×
375
            start_caption = start_trim + words[0]["start_time"]
×
376
            text_caption = []
×
377
            is_first_caption = True
×
378
            all_text, webvtt = words_to_vtt(
×
379
                words,
380
                start_trim,
381
                duration,
382
                is_first_caption,
383
                text_caption,
384
                start_caption,
385
                last_word_added,
386
                all_text,
387
                webvtt,
388
            )
389
    inference_end = timer() - inference_start
×
390

391
    msg += "\nInference took %0.3fs." % inference_end
×
392
    return msg, webvtt, all_text
×
393

394

395
def change_previous_end_caption(webvtt, start_caption):
×
396
    """Change the end time for caption."""
397
    if len(webvtt.captions) > 0:
×
398
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
399
        td_prev_end = timedelta(
×
400
            hours=prev_end.hour,
401
            minutes=prev_end.minute,
402
            seconds=prev_end.second,
403
            microseconds=prev_end.microsecond,
404
        ).total_seconds()
405
        if td_prev_end > start_caption:
×
406
            webvtt.captions[-1].end = format_time_caption(start_caption)
×
407

408

409
def format_time_caption(time_caption):
×
410
    """Format time for webvtt caption."""
411
    return (
×
412
        dt.datetime.utcfromtimestamp(0) + timedelta(seconds=float(time_caption))
413
    ).strftime("%H:%M:%S.%f")[:-3]
414

415

416
def get_text_caption(text_caption, last_word_added):
×
417
    """get the text for a caption."""
418
    try:
×
419
        first_index = text_caption.index(last_word_added)
×
420
        return text_caption[first_index + 1 :]
×
421
    except ValueError:
×
422
        return text_caption
×
423

424

425
def words_from_candidate_transcript(metadata):
×
426
    """Get words list from transcription."""
427
    word = ""
×
428
    word_list = []
×
429
    word_start_time = 0
×
430
    # Loop through each character
431
    for i, token in enumerate(metadata.tokens):
×
432
        # Append character to word if it's not a space
433
        if token.text != " ":
×
434
            if len(word) == 0:
×
435
                # Log the start time of the new word
436
                word_start_time = token.start_time
×
437

438
            word = word + token.text
×
439
        # Word boundary is either a space or the last character in the array
440
        if token.text == " " or i == len(metadata.tokens) - 1:
×
441
            word_duration = token.start_time - word_start_time
×
442

443
            if word_duration < 0:
×
444
                word_duration = 0
×
445

446
            each_word = dict()
×
447
            each_word["word"] = word
×
448
            each_word["start_time"] = round(word_start_time, 4)
×
449
            each_word["duration"] = round(word_duration, 4)
×
450

451
            word_list.append(each_word)
×
452
            # Reset
453
            word = ""
×
454
            word_start_time = 0
×
455

456
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc