• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 5424233138

pending completion
5424233138

Pull #899

github

web-flow
Merge bfc949828 into b8d45a116
Pull Request #899: [WIP] Ptitloup/feature new encoding

336 of 336 new or added lines in 8 files covered. (100.0%)

9028 of 12629 relevant lines covered (71.49%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.54
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
1✔
2
import shlex
1✔
3
import subprocess
1✔
4
import json
1✔
5

6
import sys
1✔
7
import os
1✔
8
from timeit import default_timer as timer
1✔
9
import datetime as dt
1✔
10
from datetime import timedelta
1✔
11

12
from webvtt import WebVTT, Caption
1✔
13

14
try:
1✔
15
    from shhlex import quote
1✔
16
except ImportError:
1✔
17
    from pipes import quote
1✔
18

19
import logging
1✔
20

21
from .. import settings
1✔
22

23
DEBUG = getattr(settings, "DEBUG", False)
1✔
24

25
TRANSCRIPTION_MODEL_PARAM = getattr(settings, "TRANSCRIPTION_MODEL_PARAM", False)
1✔
26
USE_TRANSCRIPTION = getattr(settings, "USE_TRANSCRIPTION", False)
1✔
27
if USE_TRANSCRIPTION:
1✔
28
    TRANSCRIPTION_TYPE = getattr(settings, "TRANSCRIPTION_TYPE", "VOSK")
×
29
    if TRANSCRIPTION_TYPE == "VOSK":
×
30
        from vosk import Model, KaldiRecognizer
×
31
    elif TRANSCRIPTION_TYPE == "STT":
×
32
        from stt import Model
×
33

34
TRANSCRIPTION_NORMALIZE = getattr(settings, "TRANSCRIPTION_NORMALIZE", False)
1✔
35
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
1✔
36
    settings, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
37
)
38

39
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
1✔
40
    settings, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
41
)  # 10min
42
# time in sec for phrase length
43
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
1✔
44
    settings, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 3
45
)
46
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
1✔
47
    settings, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
48
)
49
log = logging.getLogger(__name__)
1✔
50

51

52
def get_model(lang):
1✔
53
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
54
    if TRANSCRIPTION_TYPE == "STT":
×
55
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
56
            transript_model.setBeamWidth(
×
57
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
58
            )
59
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
60
            print(
×
61
                "Loading scorer from files {}".format(
62
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
63
                ),
64
                file=sys.stderr,
65
            )
66
            scorer_load_start = timer()
×
67
            transript_model.enableExternalScorer(
×
68
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
69
            )
70
            scorer_load_end = timer() - scorer_load_start
×
71
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
72
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
73
                "lm_alpha"
74
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
75
                transript_model.setScorerAlphaBeta(
×
76
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
77
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
78
                )
79
    return transript_model
×
80

81

82
def start_transcripting(mp3filepath, duration, lang):
1✔
83
    if TRANSCRIPTION_NORMALIZE:
×
84
        mp3filepath = normalize_mp3(mp3filepath)
×
85
    transript_model = get_model(lang)
×
86
    msg, webvtt, all_text = start_main_transcript(
×
87
        mp3filepath, duration, transript_model
88
    )
89
    if DEBUG:
×
90
        print(msg)
×
91
        print(webvtt)
×
92
        print("\n%s\n" % all_text)
×
93

94
    return msg, webvtt
×
95

96

97
def start_main_transcript(mp3filepath, duration, transript_model):
1✔
98
    if TRANSCRIPTION_TYPE == "STT":
×
99
        msg, webvtt, all_text = main_stt_transcript(
×
100
            mp3filepath, duration, transript_model
101
        )
102
    elif TRANSCRIPTION_TYPE == "VOSK":
×
103
        msg, webvtt, all_text = main_vosk_transcript(
×
104
            mp3filepath, duration, transript_model
105
        )
106
    return msg, webvtt, all_text
×
107

108

109
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
1✔
110
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
111
        quote(audio_path), desired_sample_rate
112
    )
113
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
114
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
115

116
    try:
×
117
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
118

119
    except subprocess.CalledProcessError as e:
×
120
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
121
    except OSError as e:
×
122
        raise OSError(
×
123
            e.errno,
124
            "SoX not found, use {}hz files or install it: {}".format(
125
                desired_sample_rate, e.strerror
126
            ),
127
        )
128

129
    return np.frombuffer(output, np.int16)
×
130

131

132
def normalize_mp3(mp3filepath):
1✔
133
    filename, file_extension = os.path.splitext(mp3filepath)
×
134
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
135
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
136
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
137
    # normalize_cmd += \
138
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
139
    normalize_cmd += "--target-level {} -f -o {}".format(
×
140
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
141
    )
142
    if DEBUG:
×
143
        print(normalize_cmd)
×
144
    try:
×
145
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
146
        return mp3normfile
×
147
    except subprocess.CalledProcessError as e:
×
148
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
149
        return mp3filepath
×
150
    except OSError as e:
×
151
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
152
        return mp3filepath
×
153

154

155
# #################################
156
# TRANSCRIPT VIDEO : MAIN FUNCTION
157
# #################################
158

159

160
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
1✔
161
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
162
        quote(audio_path), desired_sample_rate
163
    )
164
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
165
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
166

167
    try:
×
168
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
169

170
    except subprocess.CalledProcessError as e:
×
171
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
172
    except OSError as e:
×
173
        raise OSError(
×
174
            e.errno,
175
            "SoX not found, use {}hz files or install it: {}".format(
176
                desired_sample_rate, e.strerror
177
            ),
178
        )
179
    return output
×
180

181

182
def get_word_result_from_data(results, audio, rec):
1✔
183
    while True:
184
        data = audio.stdout.read(4000)
×
185
        if len(data) == 0:
×
186
            break
×
187
        if rec.AcceptWaveform(data):
×
188
            results.append(rec.Result())
×
189
    results.append(rec.Result())
×
190

191

192
def words_to_vtt(
1✔
193
    words,
194
    start_trim,
195
    duration,
196
    is_first_caption,
197
    text_caption,
198
    start_caption,
199
    last_word_added,
200
    all_text,
201
    webvtt,
202
):
203
    for index, word in enumerate(words):
×
204
        start_key = "start_time"
×
205
        word_duration = word.get("duration", 0)
×
206
        last_word = words[-1]
×
207
        last_word_duration = last_word.get("duration", 0)
×
208
        if TRANSCRIPTION_TYPE == "VOSK":
×
209
            start_key = "start"
×
210
            word_duration = word["end"] - word["start"]
×
211
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
212
        next_word = None
×
213
        blank_duration = 0
×
214
        if word != words[-1] and (index + 1) < len(words):
×
215
            next_word = words[index + 1]
×
216
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
217
                ((word[start_key]) - start_caption) + word_duration
218
            )
219
        all_text += word["word"] + " "
×
220
        # word : <class 'dict'> {'word': 'bonjour', 'start ':
221
        # 0.58, 'duration': 7.34}
222
        text_caption.append(word["word"])
×
223
        if not (
×
224
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
225
            and (
226
                next_word is not None
227
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
228
            )
229
        ):
230
            # on créé le caption
231
            if is_first_caption:
×
232
                # A revoir, fusion de la nouvelle ligne avec
233
                # l'ancienne...
234
                is_first_caption = False
×
235
                text_caption = get_text_caption(text_caption, last_word_added)
×
236

237
            stop_caption = word[start_key] + word_duration
×
238

239
            # on evite le chevauchement
240
            change_previous_end_caption(webvtt, start_caption)
×
241

242
            caption = Caption(
×
243
                format_time_caption(start_caption),
244
                format_time_caption(stop_caption),
245
                " ".join(text_caption),
246
            )
247

248
            webvtt.captions.append(caption)
×
249
            # on remet tout à zero pour la prochaine phrase
250
            start_caption = word[start_key]
×
251
            text_caption = []
×
252
            last_word_added = word["word"]
×
253
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
254
        # on ajoute ici la dernière phrase de la vidéo
255
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
256
        caption = Caption(
×
257
            format_time_caption(start_caption),
258
            format_time_caption(stop_caption),
259
            " ".join(text_caption),
260
        )
261
        webvtt.captions.append(caption)
×
262
    return all_text, webvtt
×
263

264

265
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
1✔
266
    msg = ""
×
267
    inference_start = timer()
×
268
    msg += "\nInference start %0.3fs." % inference_start
×
269
    desired_sample_rate = 16000
×
270

271
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
272
    rec.SetWords(True)
×
273

274
    webvtt = WebVTT()
×
275
    all_text = ""
×
276
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
277
        audio = convert_vosk_samplerate(
×
278
            norm_mp3_file,
279
            desired_sample_rate,
280
            start_trim,
281
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
282
        )
283
        msg += "\nRunning inference."
×
284
        results = []
×
285
        get_word_result_from_data(results, audio, rec)
×
286
        for res in results:
×
287
            words = json.loads(res).get("result")
×
288
            text = json.loads(res).get("text")
×
289
            if not words:
×
290
                continue
×
291
            start_caption = words[0]["start"]
×
292
            stop_caption = words[-1]["end"]
×
293
            caption = Caption(
×
294
                format_time_caption(start_caption),
295
                format_time_caption(stop_caption),
296
                text,
297
            )
298
            webvtt.captions.append(caption)
×
299
            """
×
300
            text_caption = []
301
            is_first_caption = True
302
            all_text, webvtt = words_to_vtt(
303
                words,
304
                start_trim,
305
                duration,
306
                is_first_caption,
307
                text_caption,
308
                start_caption,
309
                last_word_added,
310
                all_text,
311
                webvtt,
312
            )
313
            """
314
    inference_end = timer() - inference_start
×
315

316
    msg += "\nInference took %0.3fs." % inference_end
×
317
    return msg, webvtt, all_text
×
318

319

320
def main_stt_transcript(norm_mp3_file, duration, transript_model):
1✔
321
    msg = ""
×
322
    inference_start = timer()
×
323
    msg += "\nInference start %0.3fs." % inference_start
×
324
    desired_sample_rate = transript_model.sampleRate()
×
325
    webvtt = WebVTT()
×
326
    last_word_added = ""
×
327
    metadata = None
×
328
    all_text = ""
×
329
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
330
        end_trim = (
×
331
            duration
332
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
333
            else (
334
                start_trim
335
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
336
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
337
            )
338
        )
339

340
        dur = (
×
341
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
342
            if (
343
                (
344
                    start_trim
345
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
346
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
347
                )
348
                < duration
349
            )
350
            else (duration - start_trim)
351
        )
352

353
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
354

355
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
356
        msg += "\nRunning inference."
×
357

358
        metadata = transript_model.sttWithMetadata(audio)
×
359

360
        for transcript in metadata.transcripts:
×
361
            msg += "\nConfidence : %s" % transcript.confidence
×
362
            words = words_from_candidate_transcript(transcript)
×
363
            start_caption = start_trim + words[0]["start_time"]
×
364
            text_caption = []
×
365
            is_first_caption = True
×
366
            all_text, webvtt = words_to_vtt(
×
367
                words,
368
                start_trim,
369
                duration,
370
                is_first_caption,
371
                text_caption,
372
                start_caption,
373
                last_word_added,
374
                all_text,
375
                webvtt,
376
            )
377
    inference_end = timer() - inference_start
×
378

379
    msg += "\nInference took %0.3fs." % inference_end
×
380
    return msg, webvtt, all_text
×
381

382

383
def change_previous_end_caption(webvtt, start_caption):
1✔
384
    if len(webvtt.captions) > 0:
×
385
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
386
        td_prev_end = timedelta(
×
387
            hours=prev_end.hour,
388
            minutes=prev_end.minute,
389
            seconds=prev_end.second,
390
            microseconds=prev_end.microsecond,
391
        ).total_seconds()
392
        if td_prev_end > start_caption:
×
393
            webvtt.captions[-1].end = format_time_caption(start_caption)
×
394

395

396
def format_time_caption(time_caption):
1✔
397
    return (
×
398
        dt.datetime.utcfromtimestamp(0) + timedelta(seconds=float(time_caption))
399
    ).strftime("%H:%M:%S.%f")[:-3]
400

401

402
def get_text_caption(text_caption, last_word_added):
1✔
403
    try:
×
404
        first_index = text_caption.index(last_word_added)
×
405
        return text_caption[first_index + 1 :]
×
406
    except ValueError:
×
407
        return text_caption
×
408

409

410
def words_from_candidate_transcript(metadata):
1✔
411
    word = ""
×
412
    word_list = []
×
413
    word_start_time = 0
×
414
    # Loop through each character
415
    for i, token in enumerate(metadata.tokens):
×
416
        # Append character to word if it's not a space
417
        if token.text != " ":
×
418
            if len(word) == 0:
×
419
                # Log the start time of the new word
420
                word_start_time = token.start_time
×
421

422
            word = word + token.text
×
423
        # Word boundary is either a space or the last character in the array
424
        if token.text == " " or i == len(metadata.tokens) - 1:
×
425
            word_duration = token.start_time - word_start_time
×
426

427
            if word_duration < 0:
×
428
                word_duration = 0
×
429

430
            each_word = dict()
×
431
            each_word["word"] = word
×
432
            each_word["start_time"] = round(word_start_time, 4)
×
433
            each_word["duration"] = round(word_duration, 4)
×
434

435
            word_list.append(each_word)
×
436
            # Reset
437
            word = ""
×
438
            word_start_time = 0
×
439

440
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc