• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 5976101876

25 Aug 2023 12:51PM UTC coverage: 70.8%. First build
5976101876

Pull #900

github

web-flow
add check configuration to show filter in filter asde template (#917)
Pull Request #900: [WIP] develop #3.4.0

633 of 633 new or added lines in 26 files covered. (100.0%)

9056 of 12791 relevant lines covered (70.8%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
11

12
from webvtt import WebVTT, Caption
×
13

14
try:
×
15
    from shhlex import quote
×
16
except ImportError:
×
17
    from pipes import quote
×
18

19
import logging
×
20

21
from ..custom import settings_local
×
22

23
DEBUG = getattr(settings_local, "DEBUG", False)
×
24

25
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
26
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
27
if USE_TRANSCRIPTION:
×
28
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
29
    if TRANSCRIPTION_TYPE == "VOSK":
×
30
        from vosk import Model, KaldiRecognizer
×
31
    elif TRANSCRIPTION_TYPE == "STT":
×
32
        from stt import Model
×
33

34
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
35
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
36
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
37
)
38

39
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
40
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
41
)  # 10min
42
# time in sec for phrase length
43
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
44
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 3
45
)
46
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
47
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
48
)
49
log = logging.getLogger(__name__)
×
50

51

52
def get_model(lang):
×
53
    """Get model for STT or Vosk software to transcript audio."""
54
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
55
    if TRANSCRIPTION_TYPE == "STT":
×
56
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
57
            transript_model.setBeamWidth(
×
58
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
59
            )
60
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
61
            print(
×
62
                "Loading scorer from files {}".format(
63
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
64
                ),
65
                file=sys.stderr,
66
            )
67
            scorer_load_start = timer()
×
68
            transript_model.enableExternalScorer(
×
69
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
70
            )
71
            scorer_load_end = timer() - scorer_load_start
×
72
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
73
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
74
                "lm_alpha"
75
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
76
                transript_model.setScorerAlphaBeta(
×
77
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
78
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
79
                )
80
    return transript_model
×
81

82

83
def start_transcripting(mp3filepath, duration, lang):
×
84
    """
85
    Normalize the audio if set, get the model according to the lang and start transcript.
86
    """
87
    if TRANSCRIPTION_NORMALIZE:
×
88
        mp3filepath = normalize_mp3(mp3filepath)
×
89
    transript_model = get_model(lang)
×
90
    msg, webvtt, all_text = start_main_transcript(mp3filepath, duration, transript_model)
×
91
    if DEBUG:
×
92
        print(msg)
×
93
        print(webvtt)
×
94
        print("\n%s\n" % all_text)
×
95

96
    return msg, webvtt
×
97

98

99
def start_main_transcript(mp3filepath, duration, transript_model):
×
100
    """Call transcription depending software type."""
101
    if TRANSCRIPTION_TYPE == "STT":
×
102
        msg, webvtt, all_text = main_stt_transcript(
×
103
            mp3filepath, duration, transript_model
104
        )
105
    elif TRANSCRIPTION_TYPE == "VOSK":
×
106
        msg, webvtt, all_text = main_vosk_transcript(
×
107
            mp3filepath, duration, transript_model
108
        )
109
    return msg, webvtt, all_text
×
110

111

112
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
113
    """Convert audio to subaudio and add good sample rate."""
114
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
115
        quote(audio_path), desired_sample_rate
116
    )
117
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
118
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
119

120
    try:
×
121
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
122

123
    except subprocess.CalledProcessError as e:
×
124
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
125
    except OSError as e:
×
126
        raise OSError(
×
127
            e.errno,
128
            "SoX not found, use {}hz files or install it: {}".format(
129
                desired_sample_rate, e.strerror
130
            ),
131
        )
132

133
    return np.frombuffer(output, np.int16)
×
134

135

136
def normalize_mp3(mp3filepath):
×
137
    """Normalize the audio to good format and sound level."""
138
    filename, file_extension = os.path.splitext(mp3filepath)
×
139
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
140
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
141
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
142
    # normalize_cmd += \
143
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
144
    normalize_cmd += "--target-level {} -f -o {}".format(
×
145
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
146
    )
147
    if DEBUG:
×
148
        print(normalize_cmd)
×
149
    try:
×
150
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
151
        return mp3normfile
×
152
    except subprocess.CalledProcessError as e:
×
153
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
154
        return mp3filepath
×
155
    except OSError as e:
×
156
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
157
        return mp3filepath
×
158

159

160
# #################################
161
# TRANSCRIPT VIDEO : MAIN FUNCTION
162
# #################################
163

164

165
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
166
    """Convert audio to the good sample rate."""
167
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
168
        quote(audio_path), desired_sample_rate
169
    )
170
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
171
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
172

173
    try:
×
174
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
175

176
    except subprocess.CalledProcessError as e:
×
177
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
178
    except OSError as e:
×
179
        raise OSError(
×
180
            e.errno,
181
            "SoX not found, use {}hz files or install it: {}".format(
182
                desired_sample_rate, e.strerror
183
            ),
184
        )
185
    return output
×
186

187

188
def get_word_result_from_data(results, audio, rec):
×
189
    """Get subsound from audio and add transcription to result parameter."""
190
    while True:
191
        data = audio.stdout.read(4000)
×
192
        if len(data) == 0:
×
193
            break
×
194
        if rec.AcceptWaveform(data):
×
195
            results.append(rec.Result())
×
196
    results.append(rec.Result())
×
197

198

199
def words_to_vtt(
×
200
    words,
201
    start_trim,
202
    duration,
203
    is_first_caption,
204
    text_caption,
205
    start_caption,
206
    last_word_added,
207
    all_text,
208
    webvtt,
209
):
210
    """Convert word and time to webvtt captions."""
211
    for index, word in enumerate(words):
×
212
        start_key = "start_time"
×
213
        word_duration = word.get("duration", 0)
×
214
        last_word = words[-1]
×
215
        last_word_duration = last_word.get("duration", 0)
×
216
        if TRANSCRIPTION_TYPE == "VOSK":
×
217
            start_key = "start"
×
218
            word_duration = word["end"] - word["start"]
×
219
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
220
        next_word = None
×
221
        blank_duration = 0
×
222
        if word != words[-1] and (index + 1) < len(words):
×
223
            next_word = words[index + 1]
×
224
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
225
                ((word[start_key]) - start_caption) + word_duration
226
            )
227
        all_text += word["word"] + " "
×
228
        # word : <class 'dict'> {'word': 'bonjour', 'start ':
229
        # 0.58, 'duration': 7.34}
230
        text_caption.append(word["word"])
×
231
        if not (
×
232
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
233
            and (
234
                next_word is not None
235
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
236
            )
237
        ):
238
            # on créé le caption
239
            if is_first_caption:
×
240
                # A revoir, fusion de la nouvelle ligne avec
241
                # l'ancienne...
242
                is_first_caption = False
×
243
                text_caption = get_text_caption(text_caption, last_word_added)
×
244

245
            stop_caption = word[start_key] + word_duration
×
246

247
            # on evite le chevauchement
248
            change_previous_end_caption(webvtt, start_caption)
×
249

250
            caption = Caption(
×
251
                format_time_caption(start_caption),
252
                format_time_caption(stop_caption),
253
                " ".join(text_caption),
254
            )
255

256
            webvtt.captions.append(caption)
×
257
            # on remet tout à zero pour la prochaine phrase
258
            start_caption = word[start_key]
×
259
            text_caption = []
×
260
            last_word_added = word["word"]
×
261
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
262
        # on ajoute ici la dernière phrase de la vidéo
263
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
264
        caption = Caption(
×
265
            format_time_caption(start_caption),
266
            format_time_caption(stop_caption),
267
            " ".join(text_caption),
268
        )
269
        webvtt.captions.append(caption)
×
270
    return all_text, webvtt
×
271

272

273
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
274
    """Vosk transcription."""
275
    msg = ""
×
276
    inference_start = timer()
×
277
    msg += "\nInference start %0.3fs." % inference_start
×
278
    desired_sample_rate = 16000
×
279

280
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
281
    rec.SetWords(True)
×
282

283
    webvtt = WebVTT()
×
284
    all_text = ""
×
285
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
286
        audio = convert_vosk_samplerate(
×
287
            norm_mp3_file,
288
            desired_sample_rate,
289
            start_trim,
290
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
291
        )
292
        msg += "\nRunning inference."
×
293
        results = []
×
294
        get_word_result_from_data(results, audio, rec)
×
295
        for res in results:
×
296
            words = json.loads(res).get("result")
×
297
            text = json.loads(res).get("text")
×
298
            if not words:
×
299
                continue
×
300
            start_caption = words[0]["start"]
×
301
            stop_caption = words[-1]["end"]
×
302
            caption = Caption(
×
303
                format_time_caption(start_caption),
304
                format_time_caption(stop_caption),
305
                text,
306
            )
307
            webvtt.captions.append(caption)
×
308
            """
×
309
            text_caption = []
310
            is_first_caption = True
311
            all_text, webvtt = words_to_vtt(
312
                words,
313
                start_trim,
314
                duration,
315
                is_first_caption,
316
                text_caption,
317
                start_caption,
318
                last_word_added,
319
                all_text,
320
                webvtt,
321
            )
322
            """
323
    inference_end = timer() - inference_start
×
324

325
    msg += "\nInference took %0.3fs." % inference_end
×
326
    return msg, webvtt, all_text
×
327

328

329
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
330
    """STT transcription."""
331
    msg = ""
×
332
    inference_start = timer()
×
333
    msg += "\nInference start %0.3fs." % inference_start
×
334
    desired_sample_rate = transript_model.sampleRate()
×
335
    webvtt = WebVTT()
×
336
    last_word_added = ""
×
337
    metadata = None
×
338
    all_text = ""
×
339
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
340
        end_trim = (
×
341
            duration
342
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
343
            else (
344
                start_trim
345
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
346
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
347
            )
348
        )
349

350
        dur = (
×
351
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
352
            if (
353
                (
354
                    start_trim
355
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
356
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
357
                )
358
                < duration
359
            )
360
            else (duration - start_trim)
361
        )
362

363
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
364

365
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
366
        msg += "\nRunning inference."
×
367

368
        metadata = transript_model.sttWithMetadata(audio)
×
369

370
        for transcript in metadata.transcripts:
×
371
            msg += "\nConfidence : %s" % transcript.confidence
×
372
            words = words_from_candidate_transcript(transcript)
×
373
            start_caption = start_trim + words[0]["start_time"]
×
374
            text_caption = []
×
375
            is_first_caption = True
×
376
            all_text, webvtt = words_to_vtt(
×
377
                words,
378
                start_trim,
379
                duration,
380
                is_first_caption,
381
                text_caption,
382
                start_caption,
383
                last_word_added,
384
                all_text,
385
                webvtt,
386
            )
387
    inference_end = timer() - inference_start
×
388

389
    msg += "\nInference took %0.3fs." % inference_end
×
390
    return msg, webvtt, all_text
×
391

392

393
def change_previous_end_caption(webvtt, start_caption):
×
394
    """Change the end time for caption."""
395
    if len(webvtt.captions) > 0:
×
396
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
397
        td_prev_end = timedelta(
×
398
            hours=prev_end.hour,
399
            minutes=prev_end.minute,
400
            seconds=prev_end.second,
401
            microseconds=prev_end.microsecond,
402
        ).total_seconds()
403
        if td_prev_end > start_caption:
×
404
            webvtt.captions[-1].end = format_time_caption(start_caption)
×
405

406

407
def format_time_caption(time_caption):
×
408
    """Format time for webvtt caption."""
409
    return (
×
410
        dt.datetime.utcfromtimestamp(0) + timedelta(seconds=float(time_caption))
411
    ).strftime("%H:%M:%S.%f")[:-3]
412

413

414
def get_text_caption(text_caption, last_word_added):
×
415
    """get the text for a caption."""
416
    try:
×
417
        first_index = text_caption.index(last_word_added)
×
418
        return text_caption[first_index + 1 :]
×
419
    except ValueError:
×
420
        return text_caption
×
421

422

423
def words_from_candidate_transcript(metadata):
×
424
    """Get words list from transcription."""
425
    word = ""
×
426
    word_list = []
×
427
    word_start_time = 0
×
428
    # Loop through each character
429
    for i, token in enumerate(metadata.tokens):
×
430
        # Append character to word if it's not a space
431
        if token.text != " ":
×
432
            if len(word) == 0:
×
433
                # Log the start time of the new word
434
                word_start_time = token.start_time
×
435

436
            word = word + token.text
×
437
        # Word boundary is either a space or the last character in the array
438
        if token.text == " " or i == len(metadata.tokens) - 1:
×
439
            word_duration = token.start_time - word_start_time
×
440

441
            if word_duration < 0:
×
442
                word_duration = 0
×
443

444
            each_word = dict()
×
445
            each_word["word"] = word
×
446
            each_word["start_time"] = round(word_start_time, 4)
×
447
            each_word["duration"] = round(word_duration, 4)
×
448

449
            word_list.append(each_word)
×
450
            # Reset
451
            word = ""
×
452
            word_start_time = 0
×
453

454
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc