• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 0ccff160-387e-4fce-9fce-d00219a1eea1

20 Feb 2025 10:06PM UTC coverage: 86.883% (+0.02%) from 86.859%
0ccff160-387e-4fce-9fce-d00219a1eea1

push

circleci

web-flow
Logging mask credentials (#12292)

15 of 15 new or added lines in 1 file covered. (100.0%)

1 existing line in 1 file now uncovered.

61581 of 70878 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.87
/localstack-core/localstack/services/cloudwatch/provider.py
1
import json
1✔
2
import logging
1✔
3
import uuid
1✔
4
from typing import Any, Optional
1✔
5
from xml.sax.saxutils import escape
1✔
6

7
from moto.cloudwatch import cloudwatch_backends
1✔
8
from moto.cloudwatch.models import CloudWatchBackend, FakeAlarm, MetricDatum
1✔
9

10
from localstack.aws.accounts import get_account_id_from_access_key_id
1✔
11
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
12
from localstack.aws.api.cloudwatch import (
1✔
13
    AlarmNames,
14
    AmazonResourceName,
15
    CloudwatchApi,
16
    DescribeAlarmsInput,
17
    DescribeAlarmsOutput,
18
    GetMetricDataInput,
19
    GetMetricDataOutput,
20
    GetMetricStatisticsInput,
21
    GetMetricStatisticsOutput,
22
    ListTagsForResourceOutput,
23
    PutCompositeAlarmInput,
24
    PutMetricAlarmInput,
25
    StateValue,
26
    TagKeyList,
27
    TagList,
28
    TagResourceOutput,
29
    UntagResourceOutput,
30
)
31
from localstack.aws.connect import connect_to
1✔
32
from localstack.constants import DEFAULT_AWS_ACCOUNT_ID
1✔
33
from localstack.http import Request
1✔
34
from localstack.services import moto
1✔
35
from localstack.services.cloudwatch.alarm_scheduler import AlarmScheduler
1✔
36
from localstack.services.edge import ROUTER
1✔
37
from localstack.services.plugins import SERVICE_PLUGINS, ServiceLifecycleHook
1✔
38
from localstack.utils.aws import arns
1✔
39
from localstack.utils.aws.arns import extract_account_id_from_arn, lambda_function_name
1✔
40
from localstack.utils.aws.request_context import (
1✔
41
    extract_access_key_id_from_auth_header,
42
    extract_region_from_auth_header,
43
)
44
from localstack.utils.patch import patch
1✔
45
from localstack.utils.strings import camel_to_snake_case
1✔
46
from localstack.utils.sync import poll_condition
1✔
47
from localstack.utils.tagging import TaggingService
1✔
48
from localstack.utils.threads import start_worker_thread
1✔
49

50
PATH_GET_RAW_METRICS = "/_aws/cloudwatch/metrics/raw"
1✔
51
DEPRECATED_PATH_GET_RAW_METRICS = "/cloudwatch/metrics/raw"
1✔
52
MOTO_INITIAL_UNCHECKED_REASON = "Unchecked: Initial alarm creation"
1✔
53

54
LOG = logging.getLogger(__name__)
1✔
55

56

57
@patch(target=FakeAlarm.update_state)
1✔
58
def update_state(target, self, reason, reason_data, state_value):
1✔
59
    if reason_data is None:
1✔
60
        reason_data = ""
1✔
61
    if self.state_reason == MOTO_INITIAL_UNCHECKED_REASON:
1✔
62
        old_state = StateValue.INSUFFICIENT_DATA
1✔
63
    else:
64
        old_state = self.state_value
1✔
65

66
    old_state_reason = self.state_reason
1✔
67
    old_state_update_timestamp = self.state_updated_timestamp
1✔
68
    target(self, reason, reason_data, state_value)
1✔
69

70
    # check the state and trigger required actions
71
    if not self.actions_enabled or old_state == self.state_value:
1✔
72
        return
1✔
73
    if self.state_value == "OK":
1✔
74
        actions = self.ok_actions
1✔
75
    elif self.state_value == "ALARM":
1✔
76
        actions = self.alarm_actions
1✔
77
    else:
78
        actions = self.insufficient_data_actions
×
79
    for action in actions:
1✔
80
        data = arns.parse_arn(action)
1✔
81
        if data["service"] == "sns":
1✔
82
            service = connect_to(region_name=data["region"], aws_access_key_id=data["account"]).sns
1✔
83
            subject = f"""{self.state_value}: "{self.name}" in {self.region_name}"""
1✔
84
            message = create_message_response_update_state_sns(self, old_state)
1✔
85
            service.publish(TopicArn=action, Subject=subject, Message=message)
1✔
86
        elif data["service"] == "lambda":
1✔
87
            service = connect_to(
1✔
88
                region_name=data["region"], aws_access_key_id=data["account"]
89
            ).lambda_
90
            message = create_message_response_update_state_lambda(
1✔
91
                self, old_state, old_state_reason, old_state_update_timestamp
92
            )
93
            service.invoke(FunctionName=lambda_function_name(action), Payload=message)
1✔
94
        else:
95
            # TODO: support other actions
96
            LOG.warning(
×
97
                "Action for service %s not implemented, action '%s' will not be triggered.",
98
                data["service"],
99
                action,
100
            )
101

102

103
@patch(target=CloudWatchBackend.put_metric_alarm)
1✔
104
def put_metric_alarm(
1✔
105
    target,
106
    self,
107
    name: str,
108
    namespace: str,
109
    metric_name: str,
110
    comparison_operator: str,
111
    evaluation_periods: int,
112
    period: int,
113
    threshold: float,
114
    statistic: str,
115
    description: str,
116
    dimensions: list[dict[str, str]],
117
    alarm_actions: list[str],
118
    metric_data_queries: Optional[list[Any]] = None,
119
    datapoints_to_alarm: Optional[int] = None,
120
    extended_statistic: Optional[str] = None,
121
    ok_actions: Optional[list[str]] = None,
122
    insufficient_data_actions: Optional[list[str]] = None,
123
    unit: Optional[str] = None,
124
    actions_enabled: bool = True,
125
    treat_missing_data: Optional[str] = None,
126
    evaluate_low_sample_count_percentile: Optional[str] = None,
127
    threshold_metric_id: Optional[str] = None,
128
    rule: Optional[str] = None,
129
    tags: Optional[list[dict[str, str]]] = None,
130
) -> FakeAlarm:
131
    if description:
1✔
132
        description = escape(description)
1✔
133
    return target(
1✔
134
        self,
135
        name,
136
        namespace,
137
        metric_name,
138
        comparison_operator,
139
        evaluation_periods,
140
        period,
141
        threshold,
142
        statistic,
143
        description,
144
        dimensions,
145
        alarm_actions,
146
        metric_data_queries,
147
        datapoints_to_alarm,
148
        extended_statistic,
149
        ok_actions,
150
        insufficient_data_actions,
151
        unit,
152
        actions_enabled,
153
        treat_missing_data,
154
        evaluate_low_sample_count_percentile,
155
        threshold_metric_id,
156
        rule,
157
        tags,
158
    )
159

160

161
def create_metric_data_query_from_alarm(alarm: FakeAlarm):
1✔
162
    # TODO may need to be adapted for other use cases
163
    #  verified return value with a snapshot test
164
    return [
1✔
165
        {
166
            "id": str(uuid.uuid4()),
167
            "metricStat": {
168
                "metric": {
169
                    "namespace": alarm.namespace,
170
                    "name": alarm.metric_name,
171
                    "dimensions": alarm.dimensions or {},
172
                },
173
                "period": int(alarm.period),
174
                "stat": alarm.statistic,
175
            },
176
            "returnData": True,
177
        }
178
    ]
179

180

181
def create_message_response_update_state_lambda(
1✔
182
    alarm: FakeAlarm, old_state, old_state_reason, old_state_timestamp
183
):
184
    response = {
1✔
185
        "accountId": extract_account_id_from_arn(alarm.alarm_arn),
186
        "alarmArn": alarm.alarm_arn,
187
        "alarmData": {
188
            "alarmName": alarm.name,
189
            "state": {
190
                "value": alarm.state_value,
191
                "reason": alarm.state_reason,
192
                "timestamp": alarm.state_updated_timestamp,
193
            },
194
            "previousState": {
195
                "value": old_state,
196
                "reason": old_state_reason,
197
                "timestamp": old_state_timestamp,
198
            },
199
            "configuration": {
200
                "description": alarm.description or "",
201
                "metrics": alarm.metric_data_queries
202
                or create_metric_data_query_from_alarm(
203
                    alarm
204
                ),  # TODO: add test with metric_data_queries
205
            },
206
        },
207
        "time": alarm.state_updated_timestamp,
208
        "region": alarm.region_name,
209
        "source": "aws.cloudwatch",
210
    }
211
    return json.dumps(response)
1✔
212

213

214
def create_message_response_update_state_sns(alarm, old_state):
1✔
215
    response = {
1✔
216
        "AWSAccountId": extract_account_id_from_arn(alarm.alarm_arn),
217
        "OldStateValue": old_state,
218
        "AlarmName": alarm.name,
219
        "AlarmDescription": alarm.description or "",
220
        "AlarmConfigurationUpdatedTimestamp": alarm.configuration_updated_timestamp,
221
        "NewStateValue": alarm.state_value,
222
        "NewStateReason": alarm.state_reason,
223
        "StateChangeTime": alarm.state_updated_timestamp,
224
        # the long-name for 'region' should be used - as we don't have it, we use the short name
225
        # which needs to be slightly changed to make snapshot tests work
226
        "Region": alarm.region_name.replace("-", " ").capitalize(),
227
        "AlarmArn": alarm.alarm_arn,
228
        "OKActions": alarm.ok_actions or [],
229
        "AlarmActions": alarm.alarm_actions or [],
230
        "InsufficientDataActions": alarm.insufficient_data_actions or [],
231
    }
232

233
    # collect trigger details
234
    details = {
1✔
235
        "MetricName": alarm.metric_name or "",
236
        "Namespace": alarm.namespace or "",
237
        "Unit": alarm.unit or None,  # testing with AWS revealed this currently returns None
238
        "Period": int(alarm.period) if alarm.period else 0,
239
        "EvaluationPeriods": int(alarm.evaluation_periods) if alarm.evaluation_periods else 0,
240
        "ComparisonOperator": alarm.comparison_operator or "",
241
        "Threshold": float(alarm.threshold) if alarm.threshold else 0.0,
242
        "TreatMissingData": alarm.treat_missing_data or "",
243
        "EvaluateLowSampleCountPercentile": alarm.evaluate_low_sample_count_percentile or "",
244
    }
245

246
    # Dimensions not serializable
247
    dimensions = []
1✔
248
    if alarm.dimensions:
1✔
249
        for d in alarm.dimensions:
1✔
250
            dimensions.append({"value": d.value, "name": d.name})
1✔
251

252
    details["Dimensions"] = dimensions or ""
1✔
253

254
    if alarm.statistic:
1✔
255
        details["StatisticType"] = "Statistic"
1✔
256
        details["Statistic"] = camel_to_snake_case(alarm.statistic).upper()  # AWS returns uppercase
1✔
257
    elif alarm.extended_statistic:
×
258
        details["StatisticType"] = "ExtendedStatistic"
×
259
        details["ExtendedStatistic"] = alarm.extended_statistic
×
260

261
    response["Trigger"] = details
1✔
262

263
    return json.dumps(response)
1✔
264

265

266
class ValidationError(CommonServiceException):
1✔
267
    def __init__(self, message: str):
1✔
268
        super().__init__("ValidationError", message, 400, True)
×
269

270

271
def _set_alarm_actions(context, alarm_names, enabled):
1✔
272
    backend = cloudwatch_backends[context.account_id][context.region]
1✔
273
    for name in alarm_names:
1✔
274
        alarm = backend.alarms.get(name)
1✔
275
        if alarm:
1✔
276
            alarm.actions_enabled = enabled
1✔
277

278

279
def _cleanup_describe_output(alarm):
1✔
280
    if "Metrics" in alarm and len(alarm["Metrics"]) == 0:
1✔
281
        alarm.pop("Metrics")
1✔
282
    reason_data = alarm.get("StateReasonData")
1✔
283
    if reason_data is not None and reason_data in ("{}", ""):
1✔
284
        alarm.pop("StateReasonData")
1✔
285
    if (
1✔
286
        alarm.get("StateReason", "") == MOTO_INITIAL_UNCHECKED_REASON
287
        and alarm.get("StateValue") != StateValue.INSUFFICIENT_DATA
288
    ):
289
        alarm["StateValue"] = StateValue.INSUFFICIENT_DATA
1✔
290

291

292
class CloudwatchProvider(CloudwatchApi, ServiceLifecycleHook):
1✔
293
    """
294
    Cloudwatch provider.
295

296
    LIMITATIONS:
297
        - no alarm rule evaluation
298
    """
299

300
    def __init__(self):
1✔
301
        self.tags = TaggingService()
1✔
302
        self.alarm_scheduler = None
1✔
303

304
    def on_after_init(self):
1✔
305
        ROUTER.add(PATH_GET_RAW_METRICS, self.get_raw_metrics)
1✔
306
        self.start_alarm_scheduler()
1✔
307

308
    def on_before_state_reset(self):
1✔
309
        self.shutdown_alarm_scheduler()
×
310

311
    def on_after_state_reset(self):
1✔
312
        self.start_alarm_scheduler()
×
313

314
    def on_before_state_load(self):
1✔
315
        self.shutdown_alarm_scheduler()
×
316

317
    def on_after_state_load(self):
1✔
318
        self.start_alarm_scheduler()
×
319

320
        def restart_alarms(*args):
×
321
            poll_condition(lambda: SERVICE_PLUGINS.is_running("cloudwatch"))
×
322
            self.alarm_scheduler.restart_existing_alarms()
×
323

324
        start_worker_thread(restart_alarms)
×
325

326
    def on_before_stop(self):
1✔
327
        self.shutdown_alarm_scheduler()
1✔
328

329
    def start_alarm_scheduler(self):
1✔
330
        if not self.alarm_scheduler:
1✔
331
            LOG.debug("starting cloudwatch scheduler")
1✔
332
            self.alarm_scheduler = AlarmScheduler()
1✔
333

334
    def shutdown_alarm_scheduler(self):
1✔
335
        LOG.debug("stopping cloudwatch scheduler")
1✔
336
        self.alarm_scheduler.shutdown_scheduler()
1✔
337
        self.alarm_scheduler = None
1✔
338

339
    def delete_alarms(self, context: RequestContext, alarm_names: AlarmNames, **kwargs) -> None:
1✔
340
        moto.call_moto(context)
1✔
341
        for alarm_name in alarm_names:
1✔
342
            arn = arns.cloudwatch_alarm_arn(alarm_name, context.account_id, context.region)
1✔
343
            self.alarm_scheduler.delete_scheduler_for_alarm(arn)
1✔
344

345
    def get_raw_metrics(self, request: Request):
1✔
346
        region = extract_region_from_auth_header(request.headers)
1✔
347
        account_id = (
1✔
348
            get_account_id_from_access_key_id(
349
                extract_access_key_id_from_auth_header(request.headers)
350
            )
351
            or DEFAULT_AWS_ACCOUNT_ID
352
        )
353
        backend = cloudwatch_backends[account_id][region]
1✔
354
        if backend:
1✔
355
            result = [m for m in backend.metric_data if isinstance(m, MetricDatum)]
1✔
356
            # TODO handle aggregated metrics as well (MetricAggregatedDatum)
357
        else:
358
            result = []
×
359

360
        result = [
1✔
361
            {
362
                "ns": r.namespace,
363
                "n": r.name,
364
                "v": r.value,
365
                "t": r.timestamp,
366
                "d": [{"n": d.name, "v": d.value} for d in r.dimensions],
367
                "account": account_id,
368
                "region": region,
369
            }
370
            for r in result
371
        ]
372
        return {"metrics": result}
1✔
373

374
    def list_tags_for_resource(
1✔
375
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
376
    ) -> ListTagsForResourceOutput:
377
        tags = self.tags.list_tags_for_resource(resource_arn)
1✔
378
        return ListTagsForResourceOutput(Tags=tags.get("Tags", []))
1✔
379

380
    def untag_resource(
1✔
381
        self,
382
        context: RequestContext,
383
        resource_arn: AmazonResourceName,
384
        tag_keys: TagKeyList,
385
        **kwargs,
386
    ) -> UntagResourceOutput:
387
        self.tags.untag_resource(resource_arn, tag_keys)
1✔
388
        return UntagResourceOutput()
1✔
389

390
    def tag_resource(
1✔
391
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
392
    ) -> TagResourceOutput:
393
        self.tags.tag_resource(resource_arn, tags)
1✔
394
        return TagResourceOutput()
1✔
395

396
    @handler("GetMetricData", expand=False)
1✔
397
    def get_metric_data(
1✔
398
        self, context: RequestContext, request: GetMetricDataInput
399
    ) -> GetMetricDataOutput:
400
        result = moto.call_moto(context)
1✔
401
        # moto currently uses hardcoded label metric_name + stat
402
        # parity tests shows that default is MetricStat, but there might also be a label explicitly set
403
        metric_data_queries = request["MetricDataQueries"]
1✔
404
        for i in range(0, len(metric_data_queries)):
1✔
405
            metric_query = metric_data_queries[i]
1✔
406
            label = metric_query.get("Label") or metric_query.get("MetricStat", {}).get(
1✔
407
                "Metric", {}
408
            ).get("MetricName")
409
            if label:
1✔
410
                result["MetricDataResults"][i]["Label"] = label
1✔
411
        if "Messages" not in result:
1✔
412
            # parity tests reveals that an empty messages list is added
413
            result["Messages"] = []
1✔
414
        return result
1✔
415

416
    @handler("PutMetricAlarm", expand=False)
1✔
417
    def put_metric_alarm(
1✔
418
        self,
419
        context: RequestContext,
420
        request: PutMetricAlarmInput,
421
    ) -> None:
422
        # missing will be the default, when not set (but it will not explicitly be set)
423
        if request.get("TreatMissingData", "missing") not in [
1✔
424
            "breaching",
425
            "notBreaching",
426
            "ignore",
427
            "missing",
428
        ]:
429
            raise ValidationError(
×
430
                f"The value {request['TreatMissingData']} is not supported for TreatMissingData parameter. Supported values are [breaching, notBreaching, ignore, missing]."
431
            )
432
        # do some sanity checks:
433
        if request.get("Period"):
1✔
434
            # Valid values are 10, 30, and any multiple of 60.
435
            value = request.get("Period")
1✔
436
            if value not in (10, 30):
1✔
437
                if value % 60 != 0:
1✔
438
                    raise ValidationError("Period must be 10, 30 or a multiple of 60")
×
439
        if request.get("Statistic"):
1✔
440
            if request.get("Statistic") not in [
1✔
441
                "SampleCount",
442
                "Average",
443
                "Sum",
444
                "Minimum",
445
                "Maximum",
446
            ]:
447
                raise ValidationError(
×
448
                    f"Value '{request.get('Statistic')}' at 'statistic' failed to satisfy constraint: Member must satisfy enum value set: [Maximum, SampleCount, Sum, Minimum, Average]"
449
                )
450

451
        moto.call_moto(context)
1✔
452

453
        name = request.get("AlarmName")
1✔
454
        arn = arns.cloudwatch_alarm_arn(name, context.account_id, context.region)
1✔
455
        self.tags.tag_resource(arn, request.get("Tags"))
1✔
456
        self.alarm_scheduler.schedule_metric_alarm(arn)
1✔
457

458
    @handler("PutCompositeAlarm", expand=False)
1✔
459
    def put_composite_alarm(
1✔
460
        self,
461
        context: RequestContext,
462
        request: PutCompositeAlarmInput,
463
    ) -> None:
464
        backend = cloudwatch_backends[context.account_id][context.region]
1✔
465
        backend.put_metric_alarm(
1✔
466
            name=request.get("AlarmName"),
467
            namespace=None,
468
            metric_name=None,
469
            metric_data_queries=None,
470
            comparison_operator=None,
471
            evaluation_periods=None,
472
            datapoints_to_alarm=None,
473
            period=None,
474
            threshold=None,
475
            statistic=None,
476
            extended_statistic=None,
477
            description=request.get("AlarmDescription"),
478
            dimensions=[],
479
            alarm_actions=request.get("AlarmActions", []),
480
            ok_actions=request.get("OKActions", []),
481
            insufficient_data_actions=request.get("InsufficientDataActions", []),
482
            unit=None,
483
            actions_enabled=request.get("ActionsEnabled"),
484
            treat_missing_data=None,
485
            evaluate_low_sample_count_percentile=None,
486
            threshold_metric_id=None,
487
            rule=request.get("AlarmRule"),
488
            tags=request.get("Tags", []),
489
        )
490
        LOG.warning(
1✔
491
            "Composite Alarms configuration is not yet supported, alarm state will not be evaluated"
492
        )
493

494
    @handler("EnableAlarmActions")
1✔
495
    def enable_alarm_actions(
1✔
496
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
497
    ) -> None:
498
        _set_alarm_actions(context, alarm_names, enabled=True)
1✔
499

500
    @handler("DisableAlarmActions")
1✔
501
    def disable_alarm_actions(
1✔
502
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
503
    ) -> None:
504
        _set_alarm_actions(context, alarm_names, enabled=False)
1✔
505

506
    @handler("DescribeAlarms", expand=False)
1✔
507
    def describe_alarms(
1✔
508
        self, context: RequestContext, request: DescribeAlarmsInput
509
    ) -> DescribeAlarmsOutput:
510
        response = moto.call_moto(context)
1✔
511

512
        for c in response["CompositeAlarms"]:
1✔
UNCOV
513
            _cleanup_describe_output(c)
×
514
        for m in response["MetricAlarms"]:
1✔
515
            _cleanup_describe_output(m)
1✔
516

517
        return response
1✔
518

519
    @handler("GetMetricStatistics", expand=False)
1✔
520
    def get_metric_statistics(
1✔
521
        self, context: RequestContext, request: GetMetricStatisticsInput
522
    ) -> GetMetricStatisticsOutput:
523
        response = moto.call_moto(context)
1✔
524

525
        # cleanup -> ExtendendStatics is not included in AWS response if it returned empty
526
        for datapoint in response.get("Datapoints"):
1✔
527
            if "ExtendedStatistics" in datapoint and not datapoint.get("ExtendedStatistics"):
1✔
528
                datapoint.pop("ExtendedStatistics")
1✔
529

530
        return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc