• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22334798432

23 Feb 2026 06:42PM UTC coverage: 86.956% (-0.02%) from 86.973%
22334798432

push

github

web-flow
S3: regenerate test snapshots & parity fixes (#13824)

69831 of 80306 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.71
/localstack-core/localstack/services/transcribe/provider.py
1
import datetime
1✔
2
import json
1✔
3
import logging
1✔
4
import threading
1✔
5
import wave
1✔
6
from functools import cache
1✔
7
from pathlib import Path
1✔
8
from typing import Any
1✔
9
from zipfile import ZipFile
1✔
10

11
from localstack import config
1✔
12
from localstack.aws.api import RequestContext, handler
1✔
13
from localstack.aws.api.transcribe import (
1✔
14
    BadRequestException,
15
    ConflictException,
16
    GetTranscriptionJobResponse,
17
    LanguageCode,
18
    ListTranscriptionJobsResponse,
19
    MaxResults,
20
    MediaFormat,
21
    NextToken,
22
    NotFoundException,
23
    StartTranscriptionJobRequest,
24
    StartTranscriptionJobResponse,
25
    TranscribeApi,
26
    Transcript,
27
    TranscriptionJob,
28
    TranscriptionJobName,
29
    TranscriptionJobStatus,
30
    TranscriptionJobSummary,
31
)
32
from localstack.aws.connect import connect_to
1✔
33
from localstack.constants import HUGGING_FACE_ENDPOINT
1✔
34
from localstack.packages.ffmpeg import ffmpeg_package
1✔
35
from localstack.services.s3.utils import (
1✔
36
    get_bucket_and_key_from_presign_url,
37
    get_bucket_and_key_from_s3_uri,
38
)
39
from localstack.services.transcribe.models import TranscribeStore, transcribe_stores
1✔
40
from localstack.services.transcribe.packages import vosk_package
1✔
41
from localstack.state import StateVisitor
1✔
42
from localstack.utils.files import new_tmp_file
1✔
43
from localstack.utils.http import download
1✔
44
from localstack.utils.run import run
1✔
45
from localstack.utils.threads import start_thread
1✔
46

47
# Amazon Transcribe service calls are limited to four hours (or 2 GB) per API call for our batch service.
48
# The streaming service can accommodate open connections up to four hours long.
49
# See https://aws.amazon.com/transcribe/faqs/
50
MAX_AUDIO_DURATION_SECONDS = 60 * 60 * 4
1✔
51

52
LOG = logging.getLogger(__name__)
1✔
53

54
VOSK_MODELS_URL = f"{HUGGING_FACE_ENDPOINT}/vosk-models/resolve/main/"
1✔
55

56
# Map of language codes to Vosk language models
57
# See https://docs.aws.amazon.com/transcribe/latest/dg/supported-languages.html
58
LANGUAGE_MODELS = {
1✔
59
    LanguageCode.ca_ES: "vosk-model-small-ca-0.4",
60
    LanguageCode.cs_CZ: "vosk-model-small-cs-0.4-rhasspy",
61
    LanguageCode.en_GB: "vosk-model-small-en-gb-0.15",
62
    LanguageCode.en_IN: "vosk-model-small-en-in-0.4",
63
    LanguageCode.en_US: "vosk-model-small-en-us-0.15",
64
    LanguageCode.fa_IR: "vosk-model-small-fa-0.42",
65
    LanguageCode.fr_FR: "vosk-model-small-fr-0.22",
66
    LanguageCode.de_DE: "vosk-model-small-de-0.15",
67
    LanguageCode.es_ES: "vosk-model-small-es-0.42",
68
    LanguageCode.gu_IN: "vosk-model-small-gu-0.42",
69
    LanguageCode.hi_IN: "vosk-model-small-hi-0.22",
70
    LanguageCode.it_IT: "vosk-model-small-it-0.22",
71
    LanguageCode.ja_JP: "vosk-model-small-ja-0.22",
72
    LanguageCode.kk_KZ: "vosk-model-small-kz-0.15",
73
    LanguageCode.ko_KR: "vosk-model-small-ko-0.22",
74
    LanguageCode.nl_NL: "vosk-model-small-nl-0.22",
75
    LanguageCode.pl_PL: "vosk-model-small-pl-0.22",
76
    LanguageCode.pt_BR: "vosk-model-small-pt-0.3",
77
    LanguageCode.ru_RU: "vosk-model-small-ru-0.22",
78
    LanguageCode.te_IN: "vosk-model-small-te-0.42",
79
    LanguageCode.tr_TR: "vosk-model-small-tr-0.3",
80
    LanguageCode.uk_UA: "vosk-model-small-uk-v3-nano",
81
    LanguageCode.uz_UZ: "vosk-model-small-uz-0.22",
82
    LanguageCode.vi_VN: "vosk-model-small-vn-0.4",
83
    LanguageCode.zh_CN: "vosk-model-small-cn-0.22",
84
}
85

86
LANGUAGE_MODEL_DIR = Path(config.dirs.cache) / "vosk"
1✔
87

88
# List of ffmpeg format names that correspond the supported formats by AWS
89
# See https://docs.aws.amazon.com/transcribe/latest/dg/how-input.html
90
SUPPORTED_FORMAT_NAMES = {
1✔
91
    "amr": MediaFormat.amr,
92
    "flac": MediaFormat.flac,
93
    "mp3": MediaFormat.mp3,
94
    "mov,mp4,m4a,3gp,3g2,mj2": MediaFormat.mp4,
95
    "ogg": MediaFormat.ogg,
96
    "matroska,webm": MediaFormat.webm,
97
    "wav": MediaFormat.wav,
98
}
99

100
# Mutex for when downloading models
101
_DL_LOCK = threading.Lock()
1✔
102

103

104
class TranscribeProvider(TranscribeApi):
1✔
105
    def accept_state_visitor(self, visitor: StateVisitor) -> None:
1✔
106
        from moto.transcribe.models import transcribe_backends
×
107

108
        visitor.visit(transcribe_backends)
×
109
        visitor.visit(transcribe_stores)
×
110

111
    def get_transcription_job(
1✔
112
        self, context: RequestContext, transcription_job_name: TranscriptionJobName, **kwargs: Any
113
    ) -> GetTranscriptionJobResponse:
114
        store = transcribe_stores[context.account_id][context.region]
1✔
115

116
        if job := store.transcription_jobs.get(transcription_job_name):
1✔
117
            # fetch output key and output bucket
118
            output_bucket, output_key = get_bucket_and_key_from_presign_url(
1✔
119
                job["Transcript"]["TranscriptFileUri"]  # type: ignore[index,arg-type]
120
            )
121
            job["Transcript"]["TranscriptFileUri"] = connect_to().s3.generate_presigned_url(  # type: ignore[index]
1✔
122
                "get_object",
123
                Params={"Bucket": output_bucket, "Key": output_key},
124
                ExpiresIn=60 * 15,
125
            )
126
            return GetTranscriptionJobResponse(TranscriptionJob=job)
1✔
127

128
        raise NotFoundException(
1✔
129
            "The requested job couldn't be found. Check the job name and try your request again."
130
        )
131

132
    @staticmethod
1✔
133
    @cache
1✔
134
    def _setup_vosk() -> None:
1✔
135
        # Install and configure vosk
136
        vosk_package.install()
1✔
137

138
        from vosk import SetLogLevel  # type: ignore[import-not-found]  # noqa
1✔
139

140
        # Suppress Vosk logging
141
        SetLogLevel(-1)
1✔
142

143
    @handler("StartTranscriptionJob", expand=False)
1✔
144
    def start_transcription_job(  # type: ignore[override]
1✔
145
        self,
146
        context: RequestContext,
147
        request: StartTranscriptionJobRequest,
148
    ) -> StartTranscriptionJobResponse:
149
        job_name = request["TranscriptionJobName"]
1✔
150
        media = request["Media"]
1✔
151
        language_code = request.get("LanguageCode")
1✔
152

153
        if not language_code:
1✔
154
            raise BadRequestException("Language code is missing")
1✔
155

156
        if language_code not in LANGUAGE_MODELS:
1✔
157
            raise BadRequestException(f"Language code must be one of {LANGUAGE_MODELS.keys()}")
1✔
158

159
        store = transcribe_stores[context.account_id][context.region]
1✔
160

161
        if job_name in store.transcription_jobs:
1✔
162
            raise ConflictException(
1✔
163
                "The requested job name already exists. Use a different job name."
164
            )
165

166
        s3_path = request["Media"]["MediaFileUri"]
1✔
167
        output_bucket = request.get("OutputBucketName", get_bucket_and_key_from_s3_uri(s3_path)[0])  # type: ignore[arg-type]
1✔
168
        output_key = request.get("OutputKey")
1✔
169

170
        if not output_key:
1✔
171
            output_key = f"{job_name}.json"
1✔
172

173
        s3_client = connect_to().s3
1✔
174

175
        # the presign url is valid for 15 minutes
176
        presign_url = s3_client.generate_presigned_url(
1✔
177
            "get_object",
178
            Params={"Bucket": output_bucket, "Key": output_key},
179
            ExpiresIn=60 * 15,
180
        )
181

182
        transcript = Transcript(TranscriptFileUri=presign_url)
1✔
183

184
        job = TranscriptionJob(
1✔
185
            TranscriptionJobName=job_name,
186
            LanguageCode=language_code,
187
            Media=media,
188
            CreationTime=datetime.datetime.utcnow(),
189
            StartTime=datetime.datetime.utcnow(),
190
            TranscriptionJobStatus=TranscriptionJobStatus.QUEUED,
191
            Transcript=transcript,
192
        )
193
        store.transcription_jobs[job_name] = job
1✔
194

195
        start_thread(self._run_transcription_job, (store, job_name))
1✔
196

197
        return StartTranscriptionJobResponse(TranscriptionJob=job)
1✔
198

199
    def list_transcription_jobs(
1✔
200
        self,
201
        context: RequestContext,
202
        status: TranscriptionJobStatus | None = None,
203
        job_name_contains: TranscriptionJobName | None = None,
204
        next_token: NextToken | None = None,
205
        max_results: MaxResults | None = None,
206
        **kwargs: Any,
207
    ) -> ListTranscriptionJobsResponse:
208
        store = transcribe_stores[context.account_id][context.region]
1✔
209
        summaries = []
1✔
210
        for job in store.transcription_jobs.values():
1✔
211
            summaries.append(
1✔
212
                TranscriptionJobSummary(
213
                    TranscriptionJobName=job["TranscriptionJobName"],
214
                    LanguageCode=job["LanguageCode"],
215
                    CreationTime=job["CreationTime"],
216
                    StartTime=job["StartTime"],
217
                    TranscriptionJobStatus=job["TranscriptionJobStatus"],
218
                    CompletionTime=job.get("CompletionTime"),
219
                    FailureReason=job.get("FailureReason"),
220
                )
221
            )
222

223
        return ListTranscriptionJobsResponse(TranscriptionJobSummaries=summaries)
1✔
224

225
    def delete_transcription_job(
1✔
226
        self, context: RequestContext, transcription_job_name: TranscriptionJobName, **kwargs: Any
227
    ) -> None:
228
        store = transcribe_stores[context.account_id][context.region]
1✔
229

230
        if transcription_job_name not in store.transcription_jobs:
1✔
231
            raise NotFoundException(
1✔
232
                "The requested job couldn't be found. Check the job name and try your request again."
233
            )
234

235
        store.transcription_jobs.pop(transcription_job_name)
1✔
236

237
    #
238
    # Utils
239
    #
240

241
    @staticmethod
1✔
242
    def download_model(name: str) -> str:
1✔
243
        """
244
        Download a Vosk language model to LocalStack cache directory. Do nothing if model is already downloaded.
245

246
        While can Vosk also download a model if not available locally, it saves it to a
247
        non-configurable location ~/.cache/vosk.
248
        """
249
        model_path = LANGUAGE_MODEL_DIR / name
1✔
250

251
        with _DL_LOCK:
1✔
252
            # check if model path exists and is not empty
253
            if model_path.exists() and any(model_path.iterdir()):
1✔
254
                LOG.debug("Using a pre-downloaded language model: %s", model_path)
1✔
255
                return str(model_path)
1✔
256
            else:
257
                model_path.mkdir(parents=True)
1✔
258

259
            model_zip_path = str(model_path) + ".zip"
1✔
260

261
            LOG.debug("Downloading language model: %s", model_path.name)
1✔
262

263
            from vosk import MODEL_PRE_URL  # noqa
1✔
264

265
            download_urls = [MODEL_PRE_URL, VOSK_MODELS_URL]
1✔
266

267
            for url in download_urls:
1✔
268
                try:
1✔
269
                    download(url + str(model_path.name) + ".zip", model_zip_path, verify_ssl=False)
1✔
270
                except Exception as e:
×
271
                    LOG.warning("Failed to download model from %s: %s", url, e)
×
272
                    continue
×
273
                break
1✔
274

275
            LOG.debug("Extracting language model: %s", model_path.name)
1✔
276
            with ZipFile(model_zip_path, "r") as model_ref:
1✔
277
                model_ref.extractall(model_path.parent)
1✔
278

279
            Path(model_zip_path).unlink()
×
280

281
        return str(model_path)
1✔
282

283
    #
284
    # Threads
285
    #
286

287
    def _run_transcription_job(self, args: tuple[TranscribeStore, str]) -> None:
1✔
288
        store, job_name = args
1✔
289

290
        job = store.transcription_jobs[job_name]
1✔
291
        job["StartTime"] = datetime.datetime.utcnow()
1✔
292
        job["TranscriptionJobStatus"] = TranscriptionJobStatus.IN_PROGRESS
1✔
293

294
        failure_reason = None
1✔
295

296
        try:
1✔
297
            LOG.debug("Starting transcription: %s", job_name)
1✔
298

299
            # Get file from S3
300
            file_path = new_tmp_file()
1✔
301
            s3_client = connect_to().s3
1✔
302
            s3_path: str = job["Media"]["MediaFileUri"]  # type: ignore[index,assignment]
1✔
303
            bucket, _, key = s3_path.removeprefix("s3://").partition("/")
1✔
304
            s3_client.download_file(Bucket=bucket, Key=key, Filename=file_path)
1✔
305

306
            ffmpeg_package.install()
1✔
307
            ffmpeg_bin = ffmpeg_package.get_installer().get_ffmpeg_path()
1✔
308
            ffprobe_bin = ffmpeg_package.get_installer().get_ffprobe_path()
1✔
309

310
            LOG.debug("Determining media format")
1✔
311
            # TODO set correct failure_reason if ffprobe execution fails
312
            ffprobe_output = json.loads(
1✔
313
                run(  # type: ignore[arg-type]
314
                    f"{ffprobe_bin} -show_streams -show_format -print_format json -hide_banner -v error {file_path}"
315
                )
316
            )
317
            format = ffprobe_output["format"]["format_name"]
1✔
318
            LOG.debug("Media format detected as: %s", format)
1✔
319
            job["MediaFormat"] = SUPPORTED_FORMAT_NAMES[format]
1✔
320
            duration = ffprobe_output["format"]["duration"]
1✔
321

322
            if float(duration) > MAX_AUDIO_DURATION_SECONDS:
1✔
323
                failure_reason = "Invalid file size: file size too large. Maximum audio duration is 4.000000 hours.Check the length of the file and try your request again."
1✔
324
                raise RuntimeError()
1✔
325

326
            # Determine the sample rate of input audio if possible
327
            for stream in ffprobe_output["streams"]:
1✔
328
                if stream["codec_type"] == "audio":
1✔
329
                    job["MediaSampleRateHertz"] = int(stream["sample_rate"])
1✔
330

331
            if format in SUPPORTED_FORMAT_NAMES:
1✔
332
                wav_path = new_tmp_file(suffix=".wav")
1✔
333
                LOG.debug("Transcoding media to wav")
1✔
334
                # TODO set correct failure_reason if ffmpeg execution fails
335
                run(
1✔
336
                    f"{ffmpeg_bin} -y -nostdin -loglevel quiet -i '{file_path}' -ar 16000 -ac 1 '{wav_path}'"
337
                )
338
            else:
339
                failure_reason = f"Unsupported media format: {format}"
×
340
                raise RuntimeError()
×
341

342
            # Check if file is valid wav
343
            audio = wave.open(wav_path, "rb")
1✔
344
            if (
1✔
345
                audio.getnchannels() != 1
346
                or audio.getsampwidth() != 2
347
                or audio.getcomptype() != "NONE"
348
            ):
349
                # Fail job
350
                failure_reason = (
×
351
                    "Audio file must be mono PCM WAV format. Transcoding may have failed. "
352
                )
353
                raise RuntimeError()
×
354

355
            # Prepare transcriber
356
            language_code: str = job["LanguageCode"]  # type: ignore[assignment]
1✔
357
            model_name = LANGUAGE_MODELS[language_code]  # type: ignore[index]
1✔
358
            self._setup_vosk()
1✔
359
            model_path = self.download_model(model_name)
1✔
360
            from vosk import KaldiRecognizer, Model  # noqa
1✔
361

362
            model = Model(model_path=model_path, model_name=model_name)
1✔
363

364
            tc = KaldiRecognizer(model, audio.getframerate())
1✔
365
            tc.SetWords(True)
1✔
366
            tc.SetPartialWords(True)
1✔
367

368
            # Start transcription
369
            while True:
1✔
370
                data = audio.readframes(4000)
1✔
371
                if len(data) == 0:
1✔
372
                    break
1✔
373
                tc.AcceptWaveform(data)
1✔
374

375
            tc_result = json.loads(tc.FinalResult())
1✔
376

377
            # Convert to AWS format
378
            items = []
1✔
379
            for unigram in tc_result["result"]:
1✔
380
                items.append(
1✔
381
                    {
382
                        "start_time": unigram["start"],
383
                        "end_time": unigram["end"],
384
                        "type": "pronunciation",
385
                        "alternatives": [
386
                            {
387
                                "confidence": unigram["conf"],
388
                                "content": unigram["word"],
389
                            }
390
                        ],
391
                    }
392
                )
393
            output = {
1✔
394
                "jobName": job_name,
395
                "status": TranscriptionJobStatus.COMPLETED,
396
                "results": {
397
                    "transcripts": [
398
                        {
399
                            "transcript": tc_result["text"],
400
                        }
401
                    ],
402
                    "items": items,
403
                },
404
            }
405

406
            # Save to S3
407
            output_s3_path: str = job["Transcript"]["TranscriptFileUri"]  # type: ignore[index,assignment]
1✔
408
            output_bucket, output_key = get_bucket_and_key_from_presign_url(output_s3_path)
1✔
409
            s3_client.put_object(Bucket=output_bucket, Key=output_key, Body=json.dumps(output))
1✔
410

411
            # Update job details
412
            job["CompletionTime"] = datetime.datetime.utcnow()
1✔
413
            job["TranscriptionJobStatus"] = TranscriptionJobStatus.COMPLETED
1✔
414
            job["MediaFormat"] = MediaFormat.wav
1✔
415

416
            LOG.info("Transcription job completed: %s", job_name)
1✔
417

418
        except Exception as exc:
1✔
419
            job["FailureReason"] = failure_reason or str(exc)
1✔
420
            job["TranscriptionJobStatus"] = TranscriptionJobStatus.FAILED
1✔
421

422
            LOG.error(
1✔
423
                "Transcription job %s failed: %s",
424
                job_name,
425
                job["FailureReason"],
426
                exc_info=LOG.isEnabledFor(logging.DEBUG),
427
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc