• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19844934392

01 Dec 2025 07:55PM UTC coverage: 86.945% (+0.1%) from 86.821%
19844934392

push

github

web-flow
Update ASF APIs, provider signatures, disable lambda patches (#13444)

Co-authored-by: LocalStack Bot <localstack-bot@users.noreply.github.com>
Co-authored-by: Silvio Vasiljevic <silvio.vasiljevic@gmail.com>

69707 of 80174 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.39
/localstack-core/localstack/services/logs/provider.py
1
import base64
1✔
2
import copy
1✔
3
import io
1✔
4
import json
1✔
5
import logging
1✔
6
from collections.abc import Callable
1✔
7
from gzip import GzipFile
1✔
8

9
from moto.core.utils import unix_time_millis
1✔
10
from moto.logs.models import LogEvent, LogsBackend
1✔
11
from moto.logs.models import LogGroup as MotoLogGroup
1✔
12
from moto.logs.models import LogStream as MotoLogStream
1✔
13

14
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
15
from localstack.aws.api.logs import (
1✔
16
    AmazonResourceName,
17
    DeletionProtectionEnabled,
18
    DescribeLogGroupsRequest,
19
    DescribeLogGroupsResponse,
20
    DescribeLogStreamsRequest,
21
    DescribeLogStreamsResponse,
22
    Entity,
23
    InputLogEvents,
24
    InvalidParameterException,
25
    KmsKeyId,
26
    ListLogGroupsRequest,
27
    ListLogGroupsResponse,
28
    ListTagsForResourceResponse,
29
    ListTagsLogGroupResponse,
30
    LogGroupClass,
31
    LogGroupName,
32
    LogGroupSummary,
33
    LogsApi,
34
    LogStreamName,
35
    PutLogEventsResponse,
36
    ResourceNotFoundException,
37
    SequenceToken,
38
    TagKeyList,
39
    TagList,
40
    Tags,
41
)
42
from localstack.aws.connect import connect_to
1✔
43
from localstack.services import moto
1✔
44
from localstack.services.logs.models import get_moto_logs_backend, logs_stores
1✔
45
from localstack.services.moto import call_moto
1✔
46
from localstack.services.plugins import ServiceLifecycleHook
1✔
47
from localstack.state import StateVisitor
1✔
48
from localstack.utils.aws import arns
1✔
49
from localstack.utils.aws.client_types import ServicePrincipal
1✔
50
from localstack.utils.bootstrap import is_api_enabled
1✔
51
from localstack.utils.numbers import is_number
1✔
52
from localstack.utils.patch import patch
1✔
53

54
LOG = logging.getLogger(__name__)
1✔
55

56

57
class LogsProvider(LogsApi, ServiceLifecycleHook):
1✔
58
    def __init__(self):
1✔
59
        super().__init__()
1✔
60
        self.cw_client = connect_to().cloudwatch
1✔
61

62
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
63
        from moto.logs.models import logs_backends
×
64

65
        visitor.visit(logs_backends)
×
66
        visitor.visit(logs_stores)
×
67

68
    def put_log_events(
1✔
69
        self,
70
        context: RequestContext,
71
        log_group_name: LogGroupName,
72
        log_stream_name: LogStreamName,
73
        log_events: InputLogEvents,
74
        sequence_token: SequenceToken | None = None,
75
        entity: Entity | None = None,
76
        **kwargs,
77
    ) -> PutLogEventsResponse:
78
        logs_backend = get_moto_logs_backend(context.account_id, context.region)
1✔
79
        metric_filters = logs_backend.filters.metric_filters if is_api_enabled("cloudwatch") else []
1✔
80
        for metric_filter in metric_filters:
1✔
81
            pattern = metric_filter.get("filterPattern", "")
×
82
            transformations = metric_filter.get("metricTransformations", [])
×
83
            matches = get_pattern_matcher(pattern)
×
84
            for log_event in log_events:
×
85
                if matches(pattern, log_event):
×
86
                    for tf in transformations:
×
87
                        value = tf.get("metricValue") or "1"
×
88
                        if "$size" in value:
×
89
                            LOG.info(
×
90
                                "Expression not yet supported for log filter metricValue", value
91
                            )
92
                        value = float(value) if is_number(value) else 1
×
93
                        data = [{"MetricName": tf["metricName"], "Value": value}]
×
94
                        try:
×
95
                            client = connect_to(
×
96
                                aws_access_key_id=context.account_id, region_name=context.region
97
                            ).cloudwatch
98
                            client.put_metric_data(Namespace=tf["metricNamespace"], MetricData=data)
×
99
                        except Exception as e:
×
100
                            LOG.info(
×
101
                                "Unable to put metric data for matching CloudWatch log events", e
102
                            )
103
        return call_moto(context)
1✔
104

105
    @handler("DescribeLogGroups", expand=False)
1✔
106
    def describe_log_groups(
1✔
107
        self, context: RequestContext, request: DescribeLogGroupsRequest
108
    ) -> DescribeLogGroupsResponse:
109
        region_backend = get_moto_logs_backend(context.account_id, context.region)
1✔
110

111
        prefix: str | None = request.get("logGroupNamePrefix", "")
1✔
112
        pattern: str | None = request.get("logGroupNamePattern", "")
1✔
113

114
        if pattern and prefix:
1✔
115
            raise InvalidParameterException(
1✔
116
                "LogGroup name prefix and LogGroup name pattern are mutually exclusive parameters."
117
            )
118

119
        moto_groups = copy.deepcopy(dict(region_backend.groups)).values()
1✔
120

121
        groups = [
1✔
122
            {"logGroupClass": LogGroupClass.STANDARD} | group.to_describe_dict()
123
            for group in sorted(moto_groups, key=lambda g: g.name)
124
            if not (prefix or pattern)
125
            or (prefix and group.name.startswith(prefix))
126
            or (pattern and pattern in group.name)
127
        ]
128

129
        return DescribeLogGroupsResponse(logGroups=groups)
1✔
130

131
    @handler("DescribeLogStreams", expand=False)
1✔
132
    def describe_log_streams(
1✔
133
        self, context: RequestContext, request: DescribeLogStreamsRequest
134
    ) -> DescribeLogStreamsResponse:
135
        log_group_name: str | None = request.get("logGroupName")
1✔
136
        log_group_identifier: str | None = request.get("logGroupIdentifier")
1✔
137

138
        if log_group_identifier and log_group_name:
1✔
139
            raise CommonServiceException(
1✔
140
                "ValidationException",
141
                "LogGroup name and LogGroup ARN are mutually exclusive parameters.",
142
            )
143
        request_copy = copy.deepcopy(request)
1✔
144
        if log_group_identifier:
1✔
145
            request_copy.pop("logGroupIdentifier")
1✔
146
            # identifier can be arn or name
147
            request_copy["logGroupName"] = log_group_identifier.split(":")[-1]
1✔
148

149
        return moto.call_moto_with_request(context, request_copy)
1✔
150

151
    @handler("ListLogGroups", expand=False)
1✔
152
    def list_log_groups(
1✔
153
        self, context: RequestContext, request: ListLogGroupsRequest
154
    ) -> ListLogGroupsResponse:
155
        pattern: str | None = request.get("logGroupNamePattern")
1✔
156
        region_backend: LogsBackend = get_moto_logs_backend(context.account_id, context.region)
1✔
157
        moto_groups = copy.deepcopy(region_backend.groups).values()
1✔
158
        groups = [
1✔
159
            LogGroupSummary(
160
                logGroupName=group.name, logGroupArn=group.arn, logGroupClass=LogGroupClass.STANDARD
161
            )
162
            for group in sorted(moto_groups, key=lambda g: g.name)
163
            if not pattern or pattern in group.name
164
        ]
165
        return ListLogGroupsResponse(logGroups=groups)
1✔
166

167
    def create_log_group(
1✔
168
        self,
169
        context: RequestContext,
170
        log_group_name: LogGroupName,
171
        kms_key_id: KmsKeyId | None = None,
172
        tags: Tags | None = None,
173
        log_group_class: LogGroupClass | None = None,
174
        deletion_protection_enabled: DeletionProtectionEnabled | None = None,
175
        **kwargs,
176
    ) -> None:
177
        call_moto(context)
1✔
178
        if tags:
1✔
179
            resource_arn = arns.log_group_arn(
1✔
180
                group_name=log_group_name, account_id=context.account_id, region_name=context.region
181
            )
182
            store = logs_stores[context.account_id][context.region]
1✔
183
            store.TAGS.setdefault(resource_arn, {}).update(tags)
1✔
184

185
    def list_tags_for_resource(
1✔
186
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
187
    ) -> ListTagsForResourceResponse:
188
        self._check_resource_arn_tagging(resource_arn)
1✔
189
        store = logs_stores[context.account_id][context.region]
1✔
190
        tags = store.TAGS.get(resource_arn, {})
1✔
191
        return ListTagsForResourceResponse(tags=tags)
1✔
192

193
    def list_tags_log_group(
1✔
194
        self, context: RequestContext, log_group_name: LogGroupName, **kwargs
195
    ) -> ListTagsLogGroupResponse:
196
        # deprecated implementation, new one: list_tags_for_resource
197
        self._verify_log_group_exists(
1✔
198
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
199
        )
200
        resource_arn = arns.log_group_arn(
1✔
201
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
202
        )
203
        store = logs_stores[context.account_id][context.region]
1✔
204
        tags = store.TAGS.get(resource_arn, {})
1✔
205
        return ListTagsLogGroupResponse(tags=tags)
1✔
206

207
    def untag_resource(
1✔
208
        self,
209
        context: RequestContext,
210
        resource_arn: AmazonResourceName,
211
        tag_keys: TagKeyList,
212
        **kwargs,
213
    ) -> None:
214
        self._check_resource_arn_tagging(resource_arn)
1✔
215
        store = logs_stores[context.account_id][context.region]
1✔
216
        tags_stored = store.TAGS.get(resource_arn, {})
1✔
217
        for tag in tag_keys:
1✔
218
            tags_stored.pop(tag, None)
1✔
219

220
    def untag_log_group(
1✔
221
        self, context: RequestContext, log_group_name: LogGroupName, tags: TagList, **kwargs
222
    ) -> None:
223
        # deprecated implementation -> new one: untag_resource
224
        self._verify_log_group_exists(
1✔
225
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
226
        )
227
        resource_arn = arns.log_group_arn(
1✔
228
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
229
        )
230
        store = logs_stores[context.account_id][context.region]
1✔
231
        tags_stored = store.TAGS.get(resource_arn, {})
1✔
232
        for tag in tags:
1✔
233
            tags_stored.pop(tag, None)
1✔
234

235
    def tag_resource(
1✔
236
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: Tags, **kwargs
237
    ) -> None:
238
        self._check_resource_arn_tagging(resource_arn)
1✔
239
        store = logs_stores[context.account_id][context.region]
1✔
240
        store.TAGS.get(resource_arn, {}).update(tags or {})
1✔
241

242
    def tag_log_group(
1✔
243
        self, context: RequestContext, log_group_name: LogGroupName, tags: Tags, **kwargs
244
    ) -> None:
245
        # deprecated implementation -> new one: tag_resource
246
        self._verify_log_group_exists(
1✔
247
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
248
        )
249
        resource_arn = arns.log_group_arn(
1✔
250
            group_name=log_group_name, account_id=context.account_id, region_name=context.region
251
        )
252
        store = logs_stores[context.account_id][context.region]
1✔
253
        store.TAGS.get(resource_arn, {}).update(tags or {})
1✔
254

255
    def _verify_log_group_exists(self, group_name: LogGroupName, account_id: str, region_name: str):
1✔
256
        store = get_moto_logs_backend(account_id, region_name)
1✔
257
        if group_name not in store.groups:
1✔
258
            raise ResourceNotFoundException()
×
259

260
    def _check_resource_arn_tagging(self, resource_arn):
1✔
261
        service = arns.extract_service_from_arn(resource_arn)
1✔
262
        region = arns.extract_region_from_arn(resource_arn)
1✔
263
        account = arns.extract_account_id_from_arn(resource_arn)
1✔
264

265
        # AWS currently only supports tagging for Log Group and Destinations
266
        # LS: we only verify if log group exists, and create tags for other resources
267
        if service.lower().startswith("log-group:"):
1✔
268
            self._verify_log_group_exists(
×
269
                service.split(":")[-1], account_id=account, region_name=region
270
            )
271

272

273
def get_pattern_matcher(pattern: str) -> Callable[[str, dict], bool]:
1✔
274
    """Returns a pattern matcher. Can be patched by plugins to return a more sophisticated pattern matcher."""
275
    return lambda _pattern, _log_event: True
1✔
276

277

278
@patch(LogsBackend.put_subscription_filter)
1✔
279
def moto_put_subscription_filter(fn, self, *args, **kwargs):
1✔
280
    log_group_name = args[0]
1✔
281
    filter_name = args[1]
1✔
282
    filter_pattern = args[2]
1✔
283
    destination_arn = args[3]
1✔
284
    role_arn = args[4]
1✔
285

286
    log_group = self.groups.get(log_group_name)
1✔
287
    log_group_arn = arns.log_group_arn(log_group_name, self.account_id, self.region_name)
1✔
288

289
    if not log_group:
1✔
290
        raise ResourceNotFoundException("The specified log group does not exist.")
×
291

292
    arn_data = arns.parse_arn(destination_arn)
1✔
293

294
    if role_arn:
1✔
295
        factory = connect_to.with_assumed_role(
1✔
296
            role_arn=role_arn,
297
            service_principal=ServicePrincipal.logs,
298
            region_name=arn_data["region"],
299
        )
300
    else:
301
        factory = connect_to(aws_access_key_id=arn_data["account"], region_name=arn_data["region"])
1✔
302

303
    if ":lambda:" in destination_arn:
1✔
304
        client = factory.lambda_.request_metadata(
1✔
305
            source_arn=log_group_arn, service_principal=ServicePrincipal.logs
306
        )
307
        try:
1✔
308
            client.get_function(FunctionName=destination_arn)
1✔
309
        except Exception:
×
310
            raise InvalidParameterException(
×
311
                "destinationArn for vendor lambda cannot be used with roleArn"
312
            )
313

314
    elif ":kinesis:" in destination_arn:
1✔
315
        client = factory.kinesis.request_metadata(
1✔
316
            source_arn=log_group_arn, service_principal=ServicePrincipal.logs
317
        )
318
        stream_name = arns.kinesis_stream_name(destination_arn)
1✔
319
        try:
1✔
320
            # Kinesis-Local DescribeStream does not support StreamArn param, so use StreamName instead
321
            client.describe_stream(StreamName=stream_name)
1✔
322
        except Exception:
×
323
            raise InvalidParameterException(
×
324
                "Could not deliver message to specified Kinesis stream. "
325
                "Ensure that the Kinesis stream exists and is ACTIVE."
326
            )
327

328
    elif ":firehose:" in destination_arn:
1✔
329
        client = factory.firehose.request_metadata(
1✔
330
            source_arn=log_group_arn, service_principal=ServicePrincipal.logs
331
        )
332
        firehose_name = arns.firehose_name(destination_arn)
1✔
333
        try:
1✔
334
            client.describe_delivery_stream(DeliveryStreamName=firehose_name)
1✔
335
        except Exception:
×
336
            raise InvalidParameterException(
×
337
                "Could not deliver message to specified Firehose stream. "
338
                "Ensure that the Firehose stream exists and is ACTIVE."
339
            )
340

341
    else:
342
        raise InvalidParameterException(
×
343
            f"PutSubscriptionFilter operation cannot work with destinationArn for vendor {arn_data['service']}"
344
        )
345

346
    if filter_pattern:
1✔
347
        for stream in log_group.streams.values():
×
348
            stream.filter_pattern = filter_pattern
×
349

350
    log_group.put_subscription_filter(filter_name, filter_pattern, destination_arn, role_arn)
1✔
351

352

353
@patch(MotoLogStream.put_log_events, pass_target=False)
1✔
354
def moto_put_log_events(self: "MotoLogStream", log_events):
1✔
355
    # TODO: call/patch upstream method here, instead of duplicating the code!
356
    self.last_ingestion_time = int(unix_time_millis())
1✔
357
    self.stored_bytes += sum([len(log_event["message"]) for log_event in log_events])
1✔
358
    events = [LogEvent(self.last_ingestion_time, log_event) for log_event in log_events]
1✔
359
    self.events += events
1✔
360
    self.upload_sequence_token += 1
1✔
361

362
    # apply filter_pattern -> only forward what matches the pattern
363
    for subscription_filter in self.log_group.subscription_filters.values():
1✔
364
        if subscription_filter.filter_pattern:
1✔
365
            # TODO only patched in pro
366
            matches = get_pattern_matcher(subscription_filter.filter_pattern)
×
367
            events = [
×
368
                LogEvent(self.last_ingestion_time, event)
369
                for event in log_events
370
                if matches(subscription_filter.filter_pattern, event)
371
            ]
372

373
        if events and subscription_filter.destination_arn:
1✔
374
            destination_arn = subscription_filter.destination_arn
1✔
375
            log_events = [
1✔
376
                {
377
                    "id": str(event.event_id),
378
                    "timestamp": event.timestamp,
379
                    "message": event.message,
380
                }
381
                for event in events
382
            ]
383

384
            data = {
1✔
385
                "messageType": "DATA_MESSAGE",
386
                "owner": self.account_id,  # AWS Account ID of the originating log data
387
                "logGroup": self.log_group.name,
388
                "logStream": self.log_stream_name,
389
                "subscriptionFilters": [subscription_filter.name],
390
                "logEvents": log_events,
391
            }
392

393
            output = io.BytesIO()
1✔
394
            with GzipFile(fileobj=output, mode="w") as f:
1✔
395
                f.write(json.dumps(data, separators=(",", ":")).encode("utf-8"))
1✔
396
            payload_gz_encoded = output.getvalue()
1✔
397
            event = {"awslogs": {"data": base64.b64encode(output.getvalue()).decode("utf-8")}}
1✔
398

399
            log_group_arn = arns.log_group_arn(self.log_group.name, self.account_id, self.region)
1✔
400
            arn_data = arns.parse_arn(destination_arn)
1✔
401

402
            if subscription_filter.role_arn:
1✔
403
                factory = connect_to.with_assumed_role(
1✔
404
                    role_arn=subscription_filter.role_arn,
405
                    service_principal=ServicePrincipal.logs,
406
                    region_name=arn_data["region"],
407
                )
408
            else:
409
                factory = connect_to(
1✔
410
                    aws_access_key_id=arn_data["account"], region_name=arn_data["region"]
411
                )
412

413
            if ":lambda:" in destination_arn:
1✔
414
                client = factory.lambda_.request_metadata(
1✔
415
                    source_arn=log_group_arn, service_principal=ServicePrincipal.logs
416
                )
417
                client.invoke(FunctionName=destination_arn, Payload=json.dumps(event))
1✔
418

419
            if ":kinesis:" in destination_arn:
1✔
420
                client = factory.kinesis.request_metadata(
1✔
421
                    source_arn=log_group_arn, service_principal=ServicePrincipal.logs
422
                )
423
                stream_name = arns.kinesis_stream_name(destination_arn)
1✔
424
                client.put_record(
1✔
425
                    StreamName=stream_name,
426
                    Data=payload_gz_encoded,
427
                    PartitionKey=self.log_group.name,
428
                )
429

430
            if ":firehose:" in destination_arn:
1✔
431
                client = factory.firehose.request_metadata(
1✔
432
                    source_arn=log_group_arn, service_principal=ServicePrincipal.logs
433
                )
434
                firehose_name = arns.firehose_name(destination_arn)
1✔
435
                client.put_record(
1✔
436
                    DeliveryStreamName=firehose_name,
437
                    Record={"Data": payload_gz_encoded},
438
                )
439

440
    return f"{self.upload_sequence_token:056d}"
1✔
441

442

443
@patch(MotoLogStream.filter_log_events)
1✔
444
def moto_filter_log_events(
1✔
445
    filter_log_events, self, start_time, end_time, filter_pattern, *args, **kwargs
446
):
447
    # moto currently raises an exception if filter_patterns is None, so we skip it
448
    events = filter_log_events(
1✔
449
        self, *args, start_time=start_time, end_time=end_time, filter_pattern=None, **kwargs
450
    )
451

452
    if not filter_pattern:
1✔
453
        return events
1✔
454

455
    matches = get_pattern_matcher(filter_pattern)
×
456
    return [event for event in events if matches(filter_pattern, event)]
×
457

458

459
@patch(MotoLogGroup.create_log_stream)
1✔
460
def moto_create_log_stream(target, self, log_stream_name):
1✔
461
    target(self, log_stream_name)
1✔
462
    stream = self.streams[log_stream_name]
1✔
463
    filters = self.describe_subscription_filters()
1✔
464
    stream.filter_pattern = filters[0]["filterPattern"] if filters else None
1✔
465

466

467
@patch(MotoLogGroup.to_describe_dict)
1✔
468
def moto_to_describe_dict(target, self):
1✔
469
    # reported race condition in https://github.com/localstack/localstack/issues/8011
470
    # making copy of "streams" dict here to avoid issues while summing up storedBytes
471
    copy_streams = copy.deepcopy(self.streams)
1✔
472
    log_group = {
1✔
473
        "arn": f"{self.arn}:*",
474
        "logGroupArn": self.arn,
475
        "creationTime": self.creation_time,
476
        "logGroupName": self.name,
477
        "metricFilterCount": 0,
478
        "storedBytes": sum(s.stored_bytes for s in copy_streams.values()),
479
    }
480
    if self.retention_in_days:
1✔
481
        log_group["retentionInDays"] = self.retention_in_days
×
482
    if self.kms_key_id:
1✔
483
        log_group["kmsKeyId"] = self.kms_key_id
×
484
    return log_group
1✔
485

486

487
@patch(MotoLogGroup.get_log_events)
1✔
488
def moto_get_log_events(
1✔
489
    target, self, log_stream_name, start_time, end_time, limit, next_token, start_from_head
490
):
491
    if log_stream_name not in self.streams:
1✔
492
        raise ResourceNotFoundException("The specified log stream does not exist.")
1✔
493
    return target(self, log_stream_name, start_time, end_time, limit, next_token, start_from_head)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc