• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

EsupPortail / Esup-Pod / 11591830518

29 Oct 2024 03:29PM UTC coverage: 70.755%. Remained the same
11591830518

push

github

web-flow
[RELEASE] Esup-Pod 3.8.2

## New functionalities:
* Use Whisper to format vtt subtitle (#1187)
* Remove BBB module and add sipmediagw feature (#1190)
* Add NOTIFY_SENDER boolean parameter (#1192)
* Add fields to recording rest response (#1193)
* Add return thumbnail url,width and height to oembed (#1194)
* Add optional proxy URL for request coming from Aristote (#1218)

## Bugs corrected:
* Change all filter_fields = () in filterset_fields = [], as filter_fields is deprecated  (#1191)
* Fix multi carousel (#1200)
* Change regroup videos by theme when ORGANIZE_BY_THEME = True (#1203)
* Bug habillage (#1211)
* Fix migration script for BBB (#1214)
* Improve RSS Feeds (#1215)
* Fix BBB meeting deletion link (issue #1216)
* No crop on thumbnail (#1217)
* Use `get_thumbnail` to serve video thumbnail via caching system, to prevent video folder url to be publicly available (#1221)
* Addition of a toolbar in the theme description editor of a chain. (issue #1185)
* Correction enabling channels and themes to be assigned to a set of videos (issue #1106)
* Manage restricted video access right in playlist

## Accessibility improvements: (#1219)
* Modify some redundant title strings, adding the targeted object
* Remove some redundant titles
* Correct i18n strings
* Correct duplicated id
* Remove broken aria-label id
* Add flatpage title in h1 for accessibility + remove H1 title in legal notice and accessibility statement pages

## Quality of Code:
* Minor code formatting
* Add missing DocStrings
* Upgrade GitGuardian config version

57 of 113 new or added lines in 21 files covered. (50.44%)

924 existing lines in 19 files now uncovered.

12010 of 16974 relevant lines covered (70.76%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/pod/video_encode_transcript/transcript_model.py
1
import numpy as np
×
2
import shlex
×
3
import subprocess
×
4
import json
×
5

6
import sys
×
7
import os
×
8
from timeit import default_timer as timer
×
9
import datetime as dt
×
10
from datetime import timedelta
×
NEW
11
import webvtt
×
12
from webvtt import WebVTT, Caption
×
13
from shlex import quote
×
14

15
import logging
×
16

17
try:
×
18
    from ..custom import settings_local
×
19
except ImportError:
×
20
    from .. import settings as settings_local
×
21

22
from .encoding_utils import sec_to_timestamp
×
23

24
DEBUG = getattr(settings_local, "DEBUG", False)
×
25

26
TRANSCRIPTION_MODEL_PARAM = getattr(settings_local, "TRANSCRIPTION_MODEL_PARAM", False)
×
27
USE_TRANSCRIPTION = getattr(settings_local, "USE_TRANSCRIPTION", False)
×
28
if USE_TRANSCRIPTION:
×
29
    TRANSCRIPTION_TYPE = getattr(settings_local, "TRANSCRIPTION_TYPE", "VOSK")
×
30
    if TRANSCRIPTION_TYPE == "VOSK":
×
31
        from vosk import Model, KaldiRecognizer
×
32
    elif TRANSCRIPTION_TYPE == "STT":
×
33
        from stt import Model
×
34
    elif TRANSCRIPTION_TYPE == "WHISPER":
×
35
        import whisper
×
NEW
36
        from whisper.utils import get_writer
×
37

UNCOV
38
TRANSCRIPTION_NORMALIZE = getattr(settings_local, "TRANSCRIPTION_NORMALIZE", False)
×
39
TRANSCRIPTION_NORMALIZE_TARGET_LEVEL = getattr(
×
40
    settings_local, "TRANSCRIPTION_NORMALIZE_TARGET_LEVEL", -16.0
41
)
42

UNCOV
43
TRANSCRIPTION_AUDIO_SPLIT_TIME = getattr(
×
44
    settings_local, "TRANSCRIPTION_AUDIO_SPLIT_TIME", 600
45
)  # 10min
46
# time in sec for phrase length
UNCOV
47
TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH = getattr(
×
48
    settings_local, "TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH", 2
49
)
UNCOV
50
TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME = getattr(
×
51
    settings_local, "TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME", 0.5
52
)
UNCOV
53
log = logging.getLogger(__name__)
×
54

55

UNCOV
56
def get_model(lang):
×
57
    """Get model for STT or Vosk software to transcript audio."""
UNCOV
58
    transript_model = Model(TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"])
×
59
    if TRANSCRIPTION_TYPE == "STT":
×
60
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("beam_width"):
×
61
            transript_model.setBeamWidth(
×
62
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["beam_width"]
63
            )
UNCOV
64
        if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("scorer"):
×
65
            print(
×
66
                "Loading scorer from files {}".format(
67
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
68
                ),
69
                file=sys.stderr,
70
            )
UNCOV
71
            scorer_load_start = timer()
×
72
            transript_model.enableExternalScorer(
×
73
                TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["scorer"]
74
            )
UNCOV
75
            scorer_load_end = timer() - scorer_load_start
×
76
            print("Loaded scorer in {:.3}s.".format(scorer_load_end), file=sys.stderr)
×
77
            if TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get(
×
78
                "lm_alpha"
79
            ) and TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang].get("lm_beta"):
UNCOV
80
                transript_model.setScorerAlphaBeta(
×
81
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_alpha"],
82
                    TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["lm_beta"],
83
                )
UNCOV
84
    return transript_model
×
85

86

UNCOV
87
def start_transcripting(mp3filepath, duration, lang):
×
88
    """
89
    Start direct transcription.
90

91
    Normalize the audio if set, get the model according to the lang and start transcript.
92
    """
UNCOV
93
    if TRANSCRIPTION_NORMALIZE:
×
94
        mp3filepath = normalize_mp3(mp3filepath)
×
95
    if TRANSCRIPTION_TYPE == "WHISPER":
×
96
        msg, webvtt, all_text = main_whisper_transcript(mp3filepath, duration, lang)
×
97
    else:
UNCOV
98
        transript_model = get_model(lang)
×
99
        msg, webvtt, all_text = start_main_transcript(
×
100
            mp3filepath, duration, transript_model
101
        )
UNCOV
102
    if DEBUG:
×
103
        print(msg)
×
104
        print(webvtt)
×
105
        print("\n%s\n" % all_text)
×
106

UNCOV
107
    return msg, webvtt
×
108

109

UNCOV
110
def start_main_transcript(mp3filepath, duration, transript_model):
×
111
    """Call transcription depending software type."""
UNCOV
112
    if TRANSCRIPTION_TYPE == "STT":
×
113
        msg, webvtt, all_text = main_stt_transcript(
×
114
            mp3filepath, duration, transript_model
115
        )
UNCOV
116
    elif TRANSCRIPTION_TYPE == "VOSK":
×
117
        msg, webvtt, all_text = main_vosk_transcript(
×
118
            mp3filepath, duration, transript_model
119
        )
UNCOV
120
    return msg, webvtt, all_text
×
121

122

UNCOV
123
def convert_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
124
    """Convert audio to subaudio and add good sample rate."""
UNCOV
125
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
126
        quote(audio_path), desired_sample_rate
127
    )
UNCOV
128
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
129
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
130

UNCOV
131
    try:
×
132
        output = subprocess.check_output(shlex.split(sox_cmd), stderr=subprocess.PIPE)
×
133

UNCOV
134
    except subprocess.CalledProcessError as e:
×
135
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
136
    except OSError as e:
×
137
        raise OSError(
×
138
            e.errno,
139
            "SoX not found, use {}hz files or install it: {}".format(
140
                desired_sample_rate, e.strerror
141
            ),
142
        )
143

UNCOV
144
    if TRANSCRIPTION_TYPE == "WHISPER":
×
145
        return np.frombuffer(output, np.int16).flatten().astype(np.float32) / 32768.0
×
146
    else:
UNCOV
147
        return np.frombuffer(output, np.int16)
×
148

149

UNCOV
150
def normalize_mp3(mp3filepath):
×
151
    """Normalize the audio to good format and sound level."""
UNCOV
152
    filename, file_extension = os.path.splitext(mp3filepath)
×
153
    mp3normfile = "{}{}{}".format(filename, "_norm", file_extension)
×
154
    normalize_cmd = "ffmpeg-normalize {} ".format(quote(mp3filepath))
×
155
    normalize_cmd += "-c:a libmp3lame -b:a 192k --normalization-type ebu "
×
156
    # normalize_cmd += \
157
    # '--loudness-range-target 7.0 --true-peak 0.0 --offset 0.0 '
UNCOV
158
    normalize_cmd += "--target-level {} -f -o {}".format(
×
159
        TRANSCRIPTION_NORMALIZE_TARGET_LEVEL, quote(mp3normfile)
160
    )
UNCOV
161
    if DEBUG:
×
162
        print(normalize_cmd)
×
163
    try:
×
164
        subprocess.check_output(shlex.split(normalize_cmd), stderr=subprocess.PIPE)
×
165
        return mp3normfile
×
166
    except subprocess.CalledProcessError as e:
×
167
        log.error("ffmpeg-normalize returned non-zero status: {}".format(e.stderr))
×
168
        return mp3filepath
×
169
    except OSError as e:
×
170
        log.error("ffmpeg-normalize not found {}".format(e.strerror))
×
171
        return mp3filepath
×
172

173

174
# #################################
175
# TRANSCRIPT VIDEO: MAIN FUNCTION
176
# #################################
177

178

UNCOV
179
def convert_vosk_samplerate(audio_path, desired_sample_rate, trim_start, duration):
×
180
    """Convert audio to the good sample rate."""
UNCOV
181
    sox_cmd = "sox {} --type raw --bits 16 --channels 1 --rate {} ".format(
×
182
        quote(audio_path), desired_sample_rate
183
    )
UNCOV
184
    sox_cmd += "--encoding signed-integer --endian little --compression 0.0 "
×
185
    sox_cmd += "--no-dither - trim {} {}".format(trim_start, duration)
×
186

UNCOV
187
    try:
×
188
        output = subprocess.Popen(shlex.split(sox_cmd), stdout=subprocess.PIPE)
×
189

UNCOV
190
    except subprocess.CalledProcessError as e:
×
191
        raise RuntimeError("SoX returned non-zero status: {}".format(e.stderr))
×
192
    except OSError as e:
×
193
        raise OSError(
×
194
            e.errno,
195
            "SoX not found, use {}hz files or install it: {}".format(
196
                desired_sample_rate, e.strerror
197
            ),
198
        )
UNCOV
199
    return output
×
200

201

UNCOV
202
def get_word_result_from_data(results, audio, rec):
×
203
    """Get subsound from audio and add transcription to result parameter."""
204
    while True:
UNCOV
205
        data = audio.stdout.read(4000)
×
206
        if len(data) == 0:
×
207
            break
×
208
        if rec.AcceptWaveform(data):
×
209
            results.append(rec.Result())
×
210
    results.append(rec.Result())
×
211

212

UNCOV
213
def words_to_vtt(
×
214
    words,
215
    start_trim,
216
    duration,
217
    is_first_caption,
218
    text_caption,
219
    start_caption,
220
    last_word_added,
221
    all_text,
222
    webvtt,
223
):
224
    """Convert word and time to webvtt captions."""
UNCOV
225
    for index, word in enumerate(words):
×
226
        start_key = "start_time"
×
227
        word_duration = word.get("duration", 0)
×
228
        last_word = words[-1]
×
229
        last_word_duration = last_word.get("duration", 0)
×
230
        if TRANSCRIPTION_TYPE == "VOSK":
×
231
            start_key = "start"
×
232
            word_duration = word["end"] - word["start"]
×
233
            last_word_duration = words[-1]["end"] - words[-1]["start"]
×
234
        next_word = None
×
235
        blank_duration = 0
×
236
        if word != words[-1] and (index + 1) < len(words):
×
237
            next_word = words[index + 1]
×
238
            blank_duration = ((next_word[start_key]) - start_caption) - (
×
239
                ((word[start_key]) - start_caption) + word_duration
240
            )
UNCOV
241
        all_text += word["word"] + " "
×
242
        # word: <class 'dict'> {'word': 'bonjour', 'start ':
243
        # 0.58, 'duration': 7.34}
UNCOV
244
        text_caption.append(word["word"])
×
245
        if not (
×
246
            (((word[start_key]) - start_caption) < TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
247
            and (
248
                next_word is not None
249
                and (blank_duration < TRANSCRIPTION_STT_SENTENCE_BLANK_SPLIT_TIME)
250
            )
251
        ):
252
            # on créé le caption
UNCOV
253
            if is_first_caption:
×
254
                # A revoir, fusion de la nouvelle ligne avec
255
                # l'ancienne...
UNCOV
256
                is_first_caption = False
×
257
                text_caption = get_text_caption(text_caption, last_word_added)
×
258

UNCOV
259
            stop_caption = word[start_key] + word_duration
×
260

261
            # on evite le chevauchement
UNCOV
262
            change_previous_end_caption(webvtt, start_caption)
×
263

UNCOV
264
            caption = Caption(
×
265
                sec_to_timestamp(start_caption),
266
                sec_to_timestamp(stop_caption),
267
                " ".join(text_caption),
268
            )
269

UNCOV
270
            webvtt.captions.append(caption)
×
271
            # on remet tout à zero pour la prochaine phrase
UNCOV
272
            start_caption = word[start_key]
×
273
            text_caption = []
×
274
            last_word_added = word["word"]
×
275
    if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration:
×
276
        # on ajoute ici la dernière phrase de la vidéo
UNCOV
277
        stop_caption = start_trim + words[-1][start_key] + last_word_duration
×
278
        caption = Caption(
×
279
            sec_to_timestamp(start_caption),
280
            sec_to_timestamp(stop_caption),
281
            " ".join(text_caption),
282
        )
UNCOV
283
        webvtt.captions.append(caption)
×
284
    return all_text, webvtt
×
285

286

UNCOV
287
def main_vosk_transcript(norm_mp3_file, duration, transript_model):
×
288
    """Vosk transcription."""
UNCOV
289
    msg = ""
×
290
    inference_start = timer()
×
291
    msg += "\nInference start %0.3fs." % inference_start
×
292
    desired_sample_rate = 16000
×
293

UNCOV
294
    rec = KaldiRecognizer(transript_model, desired_sample_rate)
×
295
    rec.SetWords(True)
×
296

UNCOV
297
    webvtt = WebVTT()
×
298
    all_text = ""
×
299
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
300
        audio = convert_vosk_samplerate(
×
301
            norm_mp3_file,
302
            desired_sample_rate,
303
            start_trim,
304
            TRANSCRIPTION_AUDIO_SPLIT_TIME,  # dur
305
        )
UNCOV
306
        msg += "\nRunning inference."
×
307
        results = []
×
308
        get_word_result_from_data(results, audio, rec)
×
309
        for res in results:
×
310
            words = json.loads(res).get("result")
×
311
            text = json.loads(res).get("text")
×
312
            if not words:
×
313
                continue
×
314
            start_caption = words[0]["start"]
×
315
            stop_caption = words[-1]["end"]
×
316
            caption = Caption(
×
317
                sec_to_timestamp(start_caption),
318
                sec_to_timestamp(stop_caption),
319
                text,
320
            )
UNCOV
321
            webvtt.captions.append(caption)
×
322
            """
×
323
            text_caption = []
324
            is_first_caption = True
325
            all_text, webvtt = words_to_vtt(
326
                words,
327
                start_trim,
328
                duration,
329
                is_first_caption,
330
                text_caption,
331
                start_caption,
332
                last_word_added,
333
                all_text,
334
                webvtt,
335
            )
336
            """
UNCOV
337
    inference_end = timer() - inference_start
×
338

UNCOV
339
    msg += "\nInference took %0.3fs." % inference_end
×
340
    return msg, webvtt, all_text
×
341

342

UNCOV
343
def main_stt_transcript(norm_mp3_file, duration, transript_model):
×
344
    """STT transcription."""
UNCOV
345
    msg = ""
×
346
    inference_start = timer()
×
347
    msg += "\nInference start %0.3fs." % inference_start
×
348
    desired_sample_rate = transript_model.sampleRate()
×
349
    webvtt = WebVTT()
×
350
    last_word_added = ""
×
351
    metadata = None
×
352
    all_text = ""
×
353
    for start_trim in range(0, duration, TRANSCRIPTION_AUDIO_SPLIT_TIME):
×
354
        end_trim = (
×
355
            duration
356
            if start_trim + TRANSCRIPTION_AUDIO_SPLIT_TIME > duration
357
            else (
358
                start_trim
359
                + TRANSCRIPTION_AUDIO_SPLIT_TIME
360
                + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
361
            )
362
        )
363

UNCOV
364
        dur = (
×
365
            (TRANSCRIPTION_AUDIO_SPLIT_TIME + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH)
366
            if (
367
                (
368
                    start_trim
369
                    + TRANSCRIPTION_AUDIO_SPLIT_TIME
370
                    + TRANSCRIPTION_STT_SENTENCE_MAX_LENGTH
371
                )
372
                < duration
373
            )
374
            else (duration - start_trim)
375
        )
376

UNCOV
377
        msg += "\ntake audio from %s to %s - %s" % (start_trim, end_trim, dur)
×
378

UNCOV
379
        audio = convert_samplerate(norm_mp3_file, desired_sample_rate, start_trim, dur)
×
380
        msg += "\nRunning inference."
×
381

UNCOV
382
        metadata = transript_model.sttWithMetadata(audio)
×
383

UNCOV
384
        for transcript in metadata.transcripts:
×
385
            msg += "\nConfidence: %s" % transcript.confidence
×
386
            words = words_from_candidate_transcript(transcript)
×
387
            start_caption = start_trim + words[0]["start_time"]
×
388
            text_caption = []
×
389
            is_first_caption = True
×
390
            all_text, webvtt = words_to_vtt(
×
391
                words,
392
                start_trim,
393
                duration,
394
                is_first_caption,
395
                text_caption,
396
                start_caption,
397
                last_word_added,
398
                all_text,
399
                webvtt,
400
            )
UNCOV
401
    inference_end = timer() - inference_start
×
402

UNCOV
403
    msg += "\nInference took %0.3fs." % inference_end
×
404
    return msg, webvtt, all_text
×
405

406

UNCOV
407
def main_whisper_transcript(norm_mp3_file, duration, lang):
×
408
    """Whisper transcription."""
UNCOV
409
    msg = ""
×
410
    all_text = ""
×
411
    inference_start = timer()
×
412
    desired_sample_rate = 16000
×
413
    msg += "\nInference start %0.3fs." % inference_start
×
414

415
    model = whisper.load_model(
×
416
        TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang]["model"],
417
        download_root=TRANSCRIPTION_MODEL_PARAM[TRANSCRIPTION_TYPE][lang][
418
            "download_root"
419
        ],
420
    )
421
    audio = convert_samplerate(norm_mp3_file, desired_sample_rate, 0, duration)
×
NEW
422
    transcription = model.transcribe(
×
423
        audio, language=lang, initial_prompt="prompt", word_timestamps=True
424
    )
NEW
425
    dirname = os.path.dirname(norm_mp3_file)
×
NEW
426
    filename = os.path.basename(norm_mp3_file).replace(".mp3", ".vtt")
×
NEW
427
    vtt_writer = get_writer("vtt", dirname)
×
NEW
428
    word_options = {"highlight_words": False, "max_line_count": 2, "max_line_width": 40}
×
NEW
429
    vtt_writer(transcription, filename, word_options)
×
NEW
430
    wvtt = webvtt.read(os.path.join(dirname, filename))
×
431
    inference_end = timer() - inference_start
×
432
    msg += "\nInference took %0.3fs." % inference_end
×
NEW
433
    return msg, wvtt, all_text
×
434

435

436
def change_previous_end_caption(webvtt, start_caption):
×
437
    """Change the end time for caption."""
438
    if len(webvtt.captions) > 0:
×
UNCOV
439
        prev_end = dt.datetime.strptime(webvtt.captions[-1].end, "%H:%M:%S.%f")
×
UNCOV
440
        td_prev_end = timedelta(
×
441
            hours=prev_end.hour,
442
            minutes=prev_end.minute,
443
            seconds=prev_end.second,
444
            microseconds=prev_end.microsecond,
445
        ).total_seconds()
UNCOV
446
        if td_prev_end > start_caption:
×
447
            webvtt.captions[-1].end = sec_to_timestamp(start_caption)
×
448

449

450
def get_text_caption(text_caption, last_word_added):
×
451
    """Get the text for a caption."""
452
    try:
×
UNCOV
453
        first_index = text_caption.index(last_word_added)
×
454
        return text_caption[first_index + 1 :]
×
UNCOV
455
    except ValueError:
×
456
        return text_caption
×
457

458

459
def words_from_candidate_transcript(metadata):
×
460
    """Get words list from transcription."""
UNCOV
461
    word = ""
×
462
    word_list = []
×
463
    word_start_time = 0
×
464
    # Loop through each character
465
    for i, token in enumerate(metadata.tokens):
×
466
        # Append character to word if it's not a space
467
        if token.text != " ":
×
UNCOV
468
            if len(word) == 0:
×
469
                # Log the start time of the new word
470
                word_start_time = token.start_time
×
471

472
            word = word + token.text
×
473
        # Word boundary is either a space or the last character in the array
UNCOV
474
        if token.text == " " or i == len(metadata.tokens) - 1:
×
UNCOV
475
            word_duration = token.start_time - word_start_time
×
476

UNCOV
477
            if word_duration < 0:
×
UNCOV
478
                word_duration = 0
×
479

UNCOV
480
            each_word = dict()
×
UNCOV
481
            each_word["word"] = word
×
UNCOV
482
            each_word["start_time"] = round(word_start_time, 4)
×
UNCOV
483
            each_word["duration"] = round(word_duration, 4)
×
484

UNCOV
485
            word_list.append(each_word)
×
486
            # Reset
UNCOV
487
            word = ""
×
UNCOV
488
            word_start_time = 0
×
489

UNCOV
490
    return word_list
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc