• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19156361236

06 Nov 2025 09:29PM UTC coverage: 86.884% (-0.03%) from 86.914%
19156361236

push

github

web-flow
UNC-99 apigateway accept-encoding header wrong (#13350)

68494 of 78834 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.34
/localstack-core/localstack/services/logs/provider.py
1
import base64
1✔
2
import copy
1✔
3
import io
1✔
4
import json
1✔
5
import logging
1✔
6
from collections.abc import Callable
1✔
7
from gzip import GzipFile
1✔
8

9
from moto.core.utils import unix_time_millis
1✔
10
from moto.logs.models import LogEvent, LogsBackend
1✔
11
from moto.logs.models import LogGroup as MotoLogGroup
1✔
12
from moto.logs.models import LogStream as MotoLogStream
1✔
13

14
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
15
from localstack.aws.api.logs import (
1✔
16
    AmazonResourceName,
17
    DescribeLogGroupsRequest,
18
    DescribeLogGroupsResponse,
19
    DescribeLogStreamsRequest,
20
    DescribeLogStreamsResponse,
21
    Entity,
22
    InputLogEvents,
23
    InvalidParameterException,
24
    KmsKeyId,
25
    ListLogGroupsRequest,
26
    ListLogGroupsResponse,
27
    ListTagsForResourceResponse,
28
    ListTagsLogGroupResponse,
29
    LogGroupClass,
30
    LogGroupName,
31
    LogGroupSummary,
32
    LogsApi,
33
    LogStreamName,
34
    PutLogEventsResponse,
35
    ResourceNotFoundException,
36
    SequenceToken,
37
    TagKeyList,
38
    TagList,
39
    Tags,
40
)
41
from localstack.aws.connect import connect_to
1✔
42
from localstack.services import moto
1✔
43
from localstack.services.logs.models import get_moto_logs_backend, logs_stores
1✔
44
from localstack.services.moto import call_moto
1✔
45
from localstack.services.plugins import ServiceLifecycleHook
1✔
46
from localstack.utils.aws import arns
1✔
47
from localstack.utils.aws.client_types import ServicePrincipal
1✔
48
from localstack.utils.bootstrap import is_api_enabled
1✔
49
from localstack.utils.numbers import is_number
1✔
50
from localstack.utils.patch import patch
1✔
51

52
LOG = logging.getLogger(__name__)
1✔
53

54

55
class LogsProvider(LogsApi, ServiceLifecycleHook):
1✔
56
    def __init__(self):
1✔
57
        super().__init__()
1✔
58
        self.cw_client = connect_to().cloudwatch
1✔
59

60
    def put_log_events(
1✔
61
        self,
62
        context: RequestContext,
63
        log_group_name: LogGroupName,
64
        log_stream_name: LogStreamName,
65
        log_events: InputLogEvents,
66
        sequence_token: SequenceToken | None = None,
67
        entity: Entity | None = None,
68
        **kwargs,
69
    ) -> PutLogEventsResponse:
70
        logs_backend = get_moto_logs_backend(context.account_id, context.region)
1✔
71
        metric_filters = logs_backend.filters.metric_filters if is_api_enabled("cloudwatch") else []
1✔
72
        for metric_filter in metric_filters:
1✔
73
            pattern = metric_filter.get("filterPattern", "")
×
74
            transformations = metric_filter.get("metricTransformations", [])
×
75
            matches = get_pattern_matcher(pattern)
×
76
            for log_event in log_events:
×
77
                if matches(pattern, log_event):
×
78
                    for tf in transformations:
×
79
                        value = tf.get("metricValue") or "1"
×
80
                        if "$size" in value:
×
81
                            LOG.info(
×
82
                                "Expression not yet supported for log filter metricValue", value
83
                            )
84
                        value = float(value) if is_number(value) else 1
×
85
                        data = [{"MetricName": tf["metricName"], "Value": value}]
×
86
                        try:
×
87
                            client = connect_to(
×
88
                                aws_access_key_id=context.account_id, region_name=context.region
89
                            ).cloudwatch
90
                            client.put_metric_data(Namespace=tf["metricNamespace"], MetricData=data)
×
91
                        except Exception as e:
×
92
                            LOG.info(
×
93
                                "Unable to put metric data for matching CloudWatch log events", e
94
                            )
95
        return call_moto(context)
1✔
96

97
    @handler("DescribeLogGroups", expand=False)
1✔
98
    def describe_log_groups(
1✔
99
        self, context: RequestContext, request: DescribeLogGroupsRequest
100
    ) -> DescribeLogGroupsResponse:
101
        region_backend = get_moto_logs_backend(context.account_id, context.region)
1✔
102

103
        prefix: str | None = request.get("logGroupNamePrefix", "")
1✔
104
        pattern: str | None = request.get("logGroupNamePattern", "")
1✔
105

106
        if pattern and prefix:
1✔
107
            raise InvalidParameterException(
1✔
108
                "LogGroup name prefix and LogGroup name pattern are mutually exclusive parameters."
109
            )
110

111
        moto_groups = copy.deepcopy(dict(region_backend.groups)).values()
1✔
112

113
        groups = [
1✔
114
            {"logGroupClass": LogGroupClass.STANDARD} | group.to_describe_dict()
115
            for group in sorted(moto_groups, key=lambda g: g.name)
116
            if not (prefix or pattern)
117
            or (prefix and group.name.startswith(prefix))
118
            or (pattern and pattern in group.name)
119
        ]
120

121
        return DescribeLogGroupsResponse(logGroups=groups)
1✔
122

123
    @handler("DescribeLogStreams", expand=False)
1✔
124
    def describe_log_streams(
1✔
125
        self, context: RequestContext, request: DescribeLogStreamsRequest
126
    ) -> DescribeLogStreamsResponse:
127
        log_group_name: str | None = request.get("logGroupName")
1✔
128
        log_group_identifier: str | None = request.get("logGroupIdentifier")
1✔
129

130
        if log_group_identifier and log_group_name:
1✔
131
            raise CommonServiceException(
1✔
132
                "ValidationException",
133
                "LogGroup name and LogGroup ARN are mutually exclusive parameters.",
134
            )
135
        request_copy = copy.deepcopy(request)
1✔
136
        if log_group_identifier:
1✔
137
            request_copy.pop("logGroupIdentifier")
1✔
138
            # identifier can be arn or name
139
            request_copy["logGroupName"] = log_group_identifier.split(":")[-1]
1✔
140

141
        return moto.call_moto_with_request(context, request_copy)
1✔
142

143
    @handler("ListLogGroups", expand=False)
1✔
144
    def list_log_groups(
1✔
145
        self, context: RequestContext, request: ListLogGroupsRequest
146
    ) -> ListLogGroupsResponse:
147
        pattern: str | None = request.get("logGroupNamePattern")
1✔
148
        region_backend: LogsBackend = get_moto_logs_backend(context.account_id, context.region)
1✔
149
        moto_groups = copy.deepcopy(region_backend.groups).values()
1✔
150
        groups = [
1✔
151
            LogGroupSummary(
152
                logGroupName=group.name, logGroupArn=group.arn, logGroupClass=LogGroupClass.STANDARD
153
            )
154
            for group in sorted(moto_groups, key=lambda g: g.name)
155
            if not pattern or pattern in group.name
156
        ]
157
        return ListLogGroupsResponse(logGroups=groups)
1✔
158

159
    def create_log_group(
1✔
160
        self,
161
        context: RequestContext,
162
        log_group_name: LogGroupName,
163
        kms_key_id: KmsKeyId | None = None,
164
        tags: Tags | None = None,
165
        log_group_class: LogGroupClass | None = None,
166
        **kwargs,
167
    ) -> None:
168
        call_moto(context)
1✔
169
        if tags:
1✔
170
            resource_arn = arns.log_group_arn(
1✔
171
                group_name=log_group_name, account_id=context.account_id, region_name=context.region
172
            )
173
            store = logs_stores[context.account_id][context.region]
1✔
174
            store.TAGS.setdefault(resource_arn, {}).update(tags)
1✔
175

176
    def list_tags_for_resource(
1✔
177
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
178
    ) -> ListTagsForResourceResponse:
179
        self._check_resource_arn_tagging(resource_arn)
1✔
180
        store = logs_stores[context.account_id][context.region]
1✔
181
        tags = store.TAGS.get(resource_arn, {})
1✔
182
        return ListTagsForResourceResponse(tags=tags)
1✔
183

184
    def list_tags_log_group(
1✔
185
        self, context: RequestContext, log_group_name: LogGroupName, **kwargs
186
    ) -> ListTagsLogGroupResponse:
187
        # deprecated implementation, new one: list_tags_for_resource
188
        self._verify_log_group_exists(
1✔
189
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
190
        )
191
        resource_arn = arns.log_group_arn(
1✔
192
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
193
        )
194
        store = logs_stores[context.account_id][context.region]
1✔
195
        tags = store.TAGS.get(resource_arn, {})
1✔
196
        return ListTagsLogGroupResponse(tags=tags)
1✔
197

198
    def untag_resource(
1✔
199
        self,
200
        context: RequestContext,
201
        resource_arn: AmazonResourceName,
202
        tag_keys: TagKeyList,
203
        **kwargs,
204
    ) -> None:
205
        self._check_resource_arn_tagging(resource_arn)
1✔
206
        store = logs_stores[context.account_id][context.region]
1✔
207
        tags_stored = store.TAGS.get(resource_arn, {})
1✔
208
        for tag in tag_keys:
1✔
209
            tags_stored.pop(tag, None)
1✔
210

211
    def untag_log_group(
1✔
212
        self, context: RequestContext, log_group_name: LogGroupName, tags: TagList, **kwargs
213
    ) -> None:
214
        # deprecated implementation -> new one: untag_resource
215
        self._verify_log_group_exists(
1✔
216
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
217
        )
218
        resource_arn = arns.log_group_arn(
1✔
219
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
220
        )
221
        store = logs_stores[context.account_id][context.region]
1✔
222
        tags_stored = store.TAGS.get(resource_arn, {})
1✔
223
        for tag in tags:
1✔
224
            tags_stored.pop(tag, None)
1✔
225

226
    def tag_resource(
1✔
227
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: Tags, **kwargs
228
    ) -> None:
229
        self._check_resource_arn_tagging(resource_arn)
1✔
230
        store = logs_stores[context.account_id][context.region]
1✔
231
        store.TAGS.get(resource_arn, {}).update(tags or {})
1✔
232

233
    def tag_log_group(
1✔
234
        self, context: RequestContext, log_group_name: LogGroupName, tags: Tags, **kwargs
235
    ) -> None:
236
        # deprecated implementation -> new one: tag_resource
237
        self._verify_log_group_exists(
1✔
238
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
239
        )
240
        resource_arn = arns.log_group_arn(
1✔
241
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
242
        )
243
        store = logs_stores[context.account_id][context.region]
1✔
244
        store.TAGS.get(resource_arn, {}).update(tags or {})
1✔
245

246
    def _verify_log_group_exists(self, group_name: LogGroupName, account_id: str, region_name: str):
1✔
247
        store = get_moto_logs_backend(account_id, region_name)
1✔
248
        if group_name not in store.groups:
1✔
249
            raise ResourceNotFoundException()
×
250

251
    def _check_resource_arn_tagging(self, resource_arn):
1✔
252
        service = arns.extract_service_from_arn(resource_arn)
1✔
253
        region = arns.extract_region_from_arn(resource_arn)
1✔
254
        account = arns.extract_account_id_from_arn(resource_arn)
1✔
255

256
        # AWS currently only supports tagging for Log Group and Destinations
257
        # LS: we only verify if log group exists, and create tags for other resources
258
        if service.lower().startswith("log-group:"):
1✔
259
            self._verify_log_group_exists(
×
260
                service.split(":")[-1], account_id=account, region_name=region
261
            )
262

263

264
def get_pattern_matcher(pattern: str) -> Callable[[str, dict], bool]:
1✔
265
    """Returns a pattern matcher. Can be patched by plugins to return a more sophisticated pattern matcher."""
266
    return lambda _pattern, _log_event: True
1✔
267

268

269
@patch(LogsBackend.put_subscription_filter)
1✔
270
def moto_put_subscription_filter(fn, self, *args, **kwargs):
1✔
271
    log_group_name = args[0]
1✔
272
    filter_name = args[1]
1✔
273
    filter_pattern = args[2]
1✔
274
    destination_arn = args[3]
1✔
275
    role_arn = args[4]
1✔
276

277
    log_group = self.groups.get(log_group_name)
1✔
278
    log_group_arn = arns.log_group_arn(log_group_name, self.account_id, self.region_name)
1✔
279

280
    if not log_group:
1✔
281
        raise ResourceNotFoundException("The specified log group does not exist.")
×
282

283
    arn_data = arns.parse_arn(destination_arn)
1✔
284

285
    if role_arn:
1✔
286
        factory = connect_to.with_assumed_role(
1✔
287
            role_arn=role_arn,
288
            service_principal=ServicePrincipal.logs,
289
            region_name=arn_data["region"],
290
        )
291
    else:
292
        factory = connect_to(aws_access_key_id=arn_data["account"], region_name=arn_data["region"])
1✔
293

294
    if ":lambda:" in destination_arn:
1✔
295
        client = factory.lambda_.request_metadata(
1✔
296
            source_arn=log_group_arn, service_principal=ServicePrincipal.logs
297
        )
298
        try:
1✔
299
            client.get_function(FunctionName=destination_arn)
1✔
300
        except Exception:
×
301
            raise InvalidParameterException(
×
302
                "destinationArn for vendor lambda cannot be used with roleArn"
303
            )
304

305
    elif ":kinesis:" in destination_arn:
1✔
306
        client = factory.kinesis.request_metadata(
1✔
307
            source_arn=log_group_arn, service_principal=ServicePrincipal.logs
308
        )
309
        stream_name = arns.kinesis_stream_name(destination_arn)
1✔
310
        try:
1✔
311
            # Kinesis-Local DescribeStream does not support StreamArn param, so use StreamName instead
312
            client.describe_stream(StreamName=stream_name)
1✔
313
        except Exception:
×
314
            raise InvalidParameterException(
×
315
                "Could not deliver message to specified Kinesis stream. "
316
                "Ensure that the Kinesis stream exists and is ACTIVE."
317
            )
318

319
    elif ":firehose:" in destination_arn:
1✔
320
        client = factory.firehose.request_metadata(
1✔
321
            source_arn=log_group_arn, service_principal=ServicePrincipal.logs
322
        )
323
        firehose_name = arns.firehose_name(destination_arn)
1✔
324
        try:
1✔
325
            client.describe_delivery_stream(DeliveryStreamName=firehose_name)
1✔
326
        except Exception:
×
327
            raise InvalidParameterException(
×
328
                "Could not deliver message to specified Firehose stream. "
329
                "Ensure that the Firehose stream exists and is ACTIVE."
330
            )
331

332
    else:
333
        raise InvalidParameterException(
×
334
            f"PutSubscriptionFilter operation cannot work with destinationArn for vendor {arn_data['service']}"
335
        )
336

337
    if filter_pattern:
1✔
338
        for stream in log_group.streams.values():
×
339
            stream.filter_pattern = filter_pattern
×
340

341
    log_group.put_subscription_filter(filter_name, filter_pattern, destination_arn, role_arn)
1✔
342

343

344
@patch(MotoLogStream.put_log_events, pass_target=False)
1✔
345
def moto_put_log_events(self: "MotoLogStream", log_events):
1✔
346
    # TODO: call/patch upstream method here, instead of duplicating the code!
347
    self.last_ingestion_time = int(unix_time_millis())
1✔
348
    self.stored_bytes += sum([len(log_event["message"]) for log_event in log_events])
1✔
349
    events = [LogEvent(self.last_ingestion_time, log_event) for log_event in log_events]
1✔
350
    self.events += events
1✔
351
    self.upload_sequence_token += 1
1✔
352

353
    # apply filter_pattern -> only forward what matches the pattern
354
    for subscription_filter in self.log_group.subscription_filters.values():
1✔
355
        if subscription_filter.filter_pattern:
1✔
356
            # TODO only patched in pro
357
            matches = get_pattern_matcher(subscription_filter.filter_pattern)
×
358
            events = [
×
359
                LogEvent(self.last_ingestion_time, event)
360
                for event in log_events
361
                if matches(subscription_filter.filter_pattern, event)
362
            ]
363

364
        if events and subscription_filter.destination_arn:
1✔
365
            destination_arn = subscription_filter.destination_arn
1✔
366
            log_events = [
1✔
367
                {
368
                    "id": str(event.event_id),
369
                    "timestamp": event.timestamp,
370
                    "message": event.message,
371
                }
372
                for event in events
373
            ]
374

375
            data = {
1✔
376
                "messageType": "DATA_MESSAGE",
377
                "owner": self.account_id,  # AWS Account ID of the originating log data
378
                "logGroup": self.log_group.name,
379
                "logStream": self.log_stream_name,
380
                "subscriptionFilters": [subscription_filter.name],
381
                "logEvents": log_events,
382
            }
383

384
            output = io.BytesIO()
1✔
385
            with GzipFile(fileobj=output, mode="w") as f:
1✔
386
                f.write(json.dumps(data, separators=(",", ":")).encode("utf-8"))
1✔
387
            payload_gz_encoded = output.getvalue()
1✔
388
            event = {"awslogs": {"data": base64.b64encode(output.getvalue()).decode("utf-8")}}
1✔
389

390
            log_group_arn = arns.log_group_arn(self.log_group.name, self.account_id, self.region)
1✔
391
            arn_data = arns.parse_arn(destination_arn)
1✔
392

393
            if subscription_filter.role_arn:
1✔
394
                factory = connect_to.with_assumed_role(
1✔
395
                    role_arn=subscription_filter.role_arn,
396
                    service_principal=ServicePrincipal.logs,
397
                    region_name=arn_data["region"],
398
                )
399
            else:
400
                factory = connect_to(
1✔
401
                    aws_access_key_id=arn_data["account"], region_name=arn_data["region"]
402
                )
403

404
            if ":lambda:" in destination_arn:
1✔
405
                client = factory.lambda_.request_metadata(
1✔
406
                    source_arn=log_group_arn, service_principal=ServicePrincipal.logs
407
                )
408
                client.invoke(FunctionName=destination_arn, Payload=json.dumps(event))
1✔
409

410
            if ":kinesis:" in destination_arn:
1✔
411
                client = factory.kinesis.request_metadata(
1✔
412
                    source_arn=log_group_arn, service_principal=ServicePrincipal.logs
413
                )
414
                stream_name = arns.kinesis_stream_name(destination_arn)
1✔
415
                client.put_record(
1✔
416
                    StreamName=stream_name,
417
                    Data=payload_gz_encoded,
418
                    PartitionKey=self.log_group.name,
419
                )
420

421
            if ":firehose:" in destination_arn:
1✔
422
                client = factory.firehose.request_metadata(
1✔
423
                    source_arn=log_group_arn, service_principal=ServicePrincipal.logs
424
                )
425
                firehose_name = arns.firehose_name(destination_arn)
1✔
426
                client.put_record(
1✔
427
                    DeliveryStreamName=firehose_name,
428
                    Record={"Data": payload_gz_encoded},
429
                )
430

431
    return f"{self.upload_sequence_token:056d}"
1✔
432

433

434
@patch(MotoLogStream.filter_log_events)
1✔
435
def moto_filter_log_events(
1✔
436
    filter_log_events, self, start_time, end_time, filter_pattern, *args, **kwargs
437
):
438
    # moto currently raises an exception if filter_patterns is None, so we skip it
439
    events = filter_log_events(
1✔
440
        self, *args, start_time=start_time, end_time=end_time, filter_pattern=None, **kwargs
441
    )
442

443
    if not filter_pattern:
1✔
444
        return events
1✔
445

446
    matches = get_pattern_matcher(filter_pattern)
×
447
    return [event for event in events if matches(filter_pattern, event)]
×
448

449

450
@patch(MotoLogGroup.create_log_stream)
1✔
451
def moto_create_log_stream(target, self, log_stream_name):
1✔
452
    target(self, log_stream_name)
1✔
453
    stream = self.streams[log_stream_name]
1✔
454
    filters = self.describe_subscription_filters()
1✔
455
    stream.filter_pattern = filters[0]["filterPattern"] if filters else None
1✔
456

457

458
@patch(MotoLogGroup.to_describe_dict)
1✔
459
def moto_to_describe_dict(target, self):
1✔
460
    # reported race condition in https://github.com/localstack/localstack/issues/8011
461
    # making copy of "streams" dict here to avoid issues while summing up storedBytes
462
    copy_streams = copy.deepcopy(self.streams)
1✔
463
    log_group = {
1✔
464
        "arn": f"{self.arn}:*",
465
        "logGroupArn": self.arn,
466
        "creationTime": self.creation_time,
467
        "logGroupName": self.name,
468
        "metricFilterCount": 0,
469
        "storedBytes": sum(s.stored_bytes for s in copy_streams.values()),
470
    }
471
    if self.retention_in_days:
1✔
472
        log_group["retentionInDays"] = self.retention_in_days
×
473
    if self.kms_key_id:
1✔
474
        log_group["kmsKeyId"] = self.kms_key_id
×
475
    return log_group
1✔
476

477

478
@patch(MotoLogGroup.get_log_events)
1✔
479
def moto_get_log_events(
1✔
480
    target, self, log_stream_name, start_time, end_time, limit, next_token, start_from_head
481
):
482
    if log_stream_name not in self.streams:
1✔
483
        raise ResourceNotFoundException("The specified log stream does not exist.")
1✔
484
    return target(self, log_stream_name, start_time, end_time, limit, next_token, start_from_head)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc