• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 20565403496

29 Dec 2025 05:11AM UTC coverage: 84.103% (-2.8%) from 86.921%
20565403496

Pull #13567

github

web-flow
Merge 4816837a5 into 2417384aa
Pull Request #13567: Update ASF APIs

67166 of 79862 relevant lines covered (84.1%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.11
/localstack-core/localstack/services/cloudwatch/provider.py
1
import json
1✔
2
import logging
1✔
3
import uuid
1✔
4
from datetime import datetime
1✔
5
from typing import Any
1✔
6

7
from moto.cloudwatch import cloudwatch_backends
1✔
8
from moto.cloudwatch.models import Alarm, CloudWatchBackend, MetricDatum
1✔
9

10
from localstack.aws.accounts import get_account_id_from_access_key_id
1✔
11
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
12
from localstack.aws.api.cloudwatch import (
1✔
13
    AlarmNames,
14
    AmazonResourceName,
15
    CloudwatchApi,
16
    DescribeAlarmsInput,
17
    DescribeAlarmsOutput,
18
    GetMetricDataInput,
19
    GetMetricDataOutput,
20
    GetMetricStatisticsInput,
21
    GetMetricStatisticsOutput,
22
    ListTagsForResourceOutput,
23
    PutCompositeAlarmInput,
24
    PutMetricAlarmInput,
25
    StateValue,
26
    TagKeyList,
27
    TagList,
28
    TagResourceOutput,
29
    UntagResourceOutput,
30
)
31
from localstack.aws.connect import connect_to
1✔
32
from localstack.constants import DEFAULT_AWS_ACCOUNT_ID
1✔
33
from localstack.http import Request
1✔
34
from localstack.services import moto
1✔
35
from localstack.services.cloudwatch.alarm_scheduler import AlarmScheduler
1✔
36
from localstack.services.edge import ROUTER
1✔
37
from localstack.services.plugins import SERVICE_PLUGINS, ServiceLifecycleHook
1✔
38
from localstack.state import StateVisitor
1✔
39
from localstack.utils.aws import arns
1✔
40
from localstack.utils.aws.arns import extract_account_id_from_arn, lambda_function_name
1✔
41
from localstack.utils.aws.request_context import (
1✔
42
    extract_access_key_id_from_auth_header,
43
    extract_region_from_auth_header,
44
)
45
from localstack.utils.patch import patch
1✔
46
from localstack.utils.strings import camel_to_snake_case
1✔
47
from localstack.utils.sync import poll_condition
1✔
48
from localstack.utils.tagging import TaggingService
1✔
49
from localstack.utils.threads import start_worker_thread
1✔
50

51
PATH_GET_RAW_METRICS = "/_aws/cloudwatch/metrics/raw"
1✔
52
DEPRECATED_PATH_GET_RAW_METRICS = "/cloudwatch/metrics/raw"
1✔
53
MOTO_INITIAL_UNCHECKED_REASON = "Unchecked: Initial alarm creation"
1✔
54

55
LOG = logging.getLogger(__name__)
1✔
56

57

58
@patch(target=Alarm.update_state)
1✔
59
def update_state(target, self, reason, reason_data, state_value):
1✔
60
    if reason_data is None:
×
61
        reason_data = ""
×
62
    if self.state_reason == MOTO_INITIAL_UNCHECKED_REASON:
×
63
        old_state = StateValue.INSUFFICIENT_DATA
×
64
    else:
65
        old_state = self.state_value
×
66

67
    old_state_reason = self.state_reason
×
68
    old_state_update_timestamp = self.state_updated_timestamp
×
69
    target(self, reason, reason_data, state_value)
×
70

71
    # check the state and trigger required actions
72
    if not self.actions_enabled or old_state == self.state_value:
×
73
        return
×
74
    if self.state_value == "OK":
×
75
        actions = self.ok_actions
×
76
    elif self.state_value == "ALARM":
×
77
        actions = self.alarm_actions
×
78
    else:
79
        actions = self.insufficient_data_actions
×
80
    for action in actions:
×
81
        data = arns.parse_arn(action)
×
82
        if data["service"] == "sns":
×
83
            service = connect_to(region_name=data["region"], aws_access_key_id=data["account"]).sns
×
84
            subject = f"""{self.state_value}: "{self.name}" in {self.region_name}"""
×
85
            message = create_message_response_update_state_sns(self, old_state)
×
86
            service.publish(TopicArn=action, Subject=subject, Message=message)
×
87
        elif data["service"] == "lambda":
×
88
            service = connect_to(
×
89
                region_name=data["region"], aws_access_key_id=data["account"]
90
            ).lambda_
91
            message = create_message_response_update_state_lambda(
×
92
                self, old_state, old_state_reason, old_state_update_timestamp
93
            )
94
            service.invoke(FunctionName=lambda_function_name(action), Payload=message)
×
95
        else:
96
            # TODO: support other actions
97
            LOG.warning(
×
98
                "Action for service %s not implemented, action '%s' will not be triggered.",
99
                data["service"],
100
                action,
101
            )
102

103

104
@patch(target=CloudWatchBackend.put_metric_alarm)
1✔
105
def put_metric_alarm(
1✔
106
    target,
107
    self,
108
    name: str,
109
    namespace: str,
110
    metric_name: str,
111
    comparison_operator: str,
112
    evaluation_periods: int,
113
    period: int,
114
    threshold: float,
115
    statistic: str,
116
    description: str,
117
    dimensions: list[dict[str, str]],
118
    alarm_actions: list[str],
119
    metric_data_queries: list[Any] | None = None,
120
    datapoints_to_alarm: int | None = None,
121
    extended_statistic: str | None = None,
122
    ok_actions: list[str] | None = None,
123
    insufficient_data_actions: list[str] | None = None,
124
    unit: str | None = None,
125
    actions_enabled: bool = True,
126
    treat_missing_data: str | None = None,
127
    evaluate_low_sample_count_percentile: str | None = None,
128
    threshold_metric_id: str | None = None,
129
    rule: str | None = None,
130
    tags: list[dict[str, str]] | None = None,
131
) -> Alarm:
132
    return target(
×
133
        self,
134
        name,
135
        namespace,
136
        metric_name,
137
        comparison_operator,
138
        evaluation_periods,
139
        period,
140
        threshold,
141
        statistic,
142
        description,
143
        dimensions,
144
        alarm_actions,
145
        metric_data_queries,
146
        datapoints_to_alarm,
147
        extended_statistic,
148
        ok_actions,
149
        insufficient_data_actions,
150
        unit,
151
        actions_enabled,
152
        treat_missing_data,
153
        evaluate_low_sample_count_percentile,
154
        threshold_metric_id,
155
        rule,
156
        tags,
157
    )
158

159

160
def create_metric_data_query_from_alarm(alarm: Alarm):
1✔
161
    # TODO may need to be adapted for other use cases
162
    #  verified return value with a snapshot test
163
    return [
×
164
        {
165
            "id": str(uuid.uuid4()),
166
            "metricStat": {
167
                "metric": {
168
                    "namespace": alarm.namespace,
169
                    "name": alarm.metric_name,
170
                    "dimensions": alarm.dimensions or {},
171
                },
172
                "period": int(alarm.period),
173
                "stat": alarm.statistic,
174
            },
175
            "returnData": True,
176
        }
177
    ]
178

179

180
def create_message_response_update_state_lambda(
1✔
181
    alarm: Alarm, old_state, old_state_reason, old_state_timestamp
182
):
183
    response = {
×
184
        "accountId": extract_account_id_from_arn(alarm.alarm_arn),
185
        "alarmArn": alarm.alarm_arn,
186
        "alarmData": {
187
            "alarmName": alarm.name,
188
            "state": {
189
                "value": alarm.state_value,
190
                "reason": alarm.state_reason,
191
                "timestamp": _to_iso_8601_datetime_with_nanoseconds(alarm.state_updated_timestamp),
192
            },
193
            "previousState": {
194
                "value": old_state,
195
                "reason": old_state_reason,
196
                "timestamp": _to_iso_8601_datetime_with_nanoseconds(old_state_timestamp),
197
            },
198
            "configuration": {
199
                "description": alarm.description or "",
200
                "metrics": alarm.metric_data_queries
201
                or create_metric_data_query_from_alarm(
202
                    alarm
203
                ),  # TODO: add test with metric_data_queries
204
            },
205
        },
206
        "time": _to_iso_8601_datetime_with_nanoseconds(alarm.state_updated_timestamp),
207
        "region": alarm.region_name,
208
        "source": "aws.cloudwatch",
209
    }
210
    return json.dumps(response)
×
211

212

213
def create_message_response_update_state_sns(alarm, old_state):
1✔
214
    response = {
×
215
        "AWSAccountId": extract_account_id_from_arn(alarm.alarm_arn),
216
        "OldStateValue": old_state,
217
        "AlarmName": alarm.name,
218
        "AlarmDescription": alarm.description or "",
219
        "AlarmConfigurationUpdatedTimestamp": _to_iso_8601_datetime_with_nanoseconds(
220
            alarm.configuration_updated_timestamp
221
        ),
222
        "NewStateValue": alarm.state_value,
223
        "NewStateReason": alarm.state_reason,
224
        "StateChangeTime": _to_iso_8601_datetime_with_nanoseconds(alarm.state_updated_timestamp),
225
        # the long-name for 'region' should be used - as we don't have it, we use the short name
226
        # which needs to be slightly changed to make snapshot tests work
227
        "Region": alarm.region_name.replace("-", " ").capitalize(),
228
        "AlarmArn": alarm.alarm_arn,
229
        "OKActions": alarm.ok_actions or [],
230
        "AlarmActions": alarm.alarm_actions or [],
231
        "InsufficientDataActions": alarm.insufficient_data_actions or [],
232
    }
233

234
    # collect trigger details
235
    details = {
×
236
        "MetricName": alarm.metric_name or "",
237
        "Namespace": alarm.namespace or "",
238
        "Unit": alarm.unit or None,  # testing with AWS revealed this currently returns None
239
        "Period": int(alarm.period) if alarm.period else 0,
240
        "EvaluationPeriods": int(alarm.evaluation_periods) if alarm.evaluation_periods else 0,
241
        "ComparisonOperator": alarm.comparison_operator or "",
242
        "Threshold": float(alarm.threshold) if alarm.threshold else 0.0,
243
        "TreatMissingData": alarm.treat_missing_data or "",
244
        "EvaluateLowSampleCountPercentile": alarm.evaluate_low_sample_count_percentile or "",
245
    }
246

247
    # Dimensions not serializable
248
    dimensions = []
×
249
    if alarm.dimensions:
×
250
        for d in alarm.dimensions:
×
251
            dimensions.append({"value": d.value, "name": d.name})
×
252

253
    details["Dimensions"] = dimensions or ""
×
254

255
    if alarm.statistic:
×
256
        details["StatisticType"] = "Statistic"
×
257
        details["Statistic"] = camel_to_snake_case(alarm.statistic).upper()  # AWS returns uppercase
×
258
    elif alarm.extended_statistic:
×
259
        details["StatisticType"] = "ExtendedStatistic"
×
260
        details["ExtendedStatistic"] = alarm.extended_statistic
×
261

262
    response["Trigger"] = details
×
263

264
    return json.dumps(response)
×
265

266

267
class ValidationError(CommonServiceException):
1✔
268
    def __init__(self, message: str):
1✔
269
        super().__init__("ValidationError", message, 400, True)
×
270

271

272
def _to_iso_8601_datetime_with_nanoseconds(date: datetime | None) -> str | None:
1✔
273
    if date is not None:
×
274
        return date.strftime("%Y-%m-%dT%H:%M:%S.%f000Z")
×
275

276

277
def _set_alarm_actions(context, alarm_names, enabled):
1✔
278
    backend = cloudwatch_backends[context.account_id][context.region]
×
279
    for name in alarm_names:
×
280
        alarm = backend.alarms.get(name)
×
281
        if alarm:
×
282
            alarm.actions_enabled = enabled
×
283

284

285
def _cleanup_describe_output(alarm):
1✔
286
    if "Metrics" in alarm and len(alarm["Metrics"]) == 0:
×
287
        alarm.pop("Metrics")
×
288
    reason_data = alarm.get("StateReasonData")
×
289
    if reason_data is not None and reason_data in ("{}", ""):
×
290
        alarm.pop("StateReasonData")
×
291
    if (
×
292
        alarm.get("StateReason", "") == MOTO_INITIAL_UNCHECKED_REASON
293
        and alarm.get("StateValue") != StateValue.INSUFFICIENT_DATA
294
    ):
295
        alarm["StateValue"] = StateValue.INSUFFICIENT_DATA
×
296

297

298
class CloudwatchProvider(CloudwatchApi, ServiceLifecycleHook):
1✔
299
    """
300
    Cloudwatch provider.
301

302
    LIMITATIONS:
303
        - no alarm rule evaluation
304
    """
305

306
    def __init__(self):
1✔
307
        self.tags = TaggingService()
×
308
        self.alarm_scheduler = None
×
309

310
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
311
        visitor.visit(cloudwatch_backends)
×
312

313
    def on_after_init(self):
1✔
314
        ROUTER.add(PATH_GET_RAW_METRICS, self.get_raw_metrics)
×
315

316
    def on_before_start(self):
1✔
317
        self.start_alarm_scheduler()
×
318

319
    def on_before_state_reset(self):
1✔
320
        self.shutdown_alarm_scheduler()
×
321

322
    def on_after_state_reset(self):
1✔
323
        self.start_alarm_scheduler()
×
324

325
    def on_before_state_load(self):
1✔
326
        self.shutdown_alarm_scheduler()
×
327

328
    def on_after_state_load(self):
1✔
329
        self.start_alarm_scheduler()
×
330

331
        def restart_alarms(*args):
×
332
            poll_condition(lambda: SERVICE_PLUGINS.is_running("cloudwatch"))
×
333
            self.alarm_scheduler.restart_existing_alarms()
×
334

335
        start_worker_thread(restart_alarms)
×
336

337
    def on_before_stop(self):
1✔
338
        self.shutdown_alarm_scheduler()
×
339

340
    def start_alarm_scheduler(self):
1✔
341
        if not self.alarm_scheduler:
×
342
            LOG.debug("starting cloudwatch scheduler")
×
343
            self.alarm_scheduler = AlarmScheduler()
×
344

345
    def shutdown_alarm_scheduler(self):
1✔
346
        if self.alarm_scheduler:
×
347
            LOG.debug("stopping cloudwatch scheduler")
×
348
            self.alarm_scheduler.shutdown_scheduler()
×
349
            self.alarm_scheduler = None
×
350

351
    def delete_alarms(self, context: RequestContext, alarm_names: AlarmNames, **kwargs) -> None:
1✔
352
        moto.call_moto(context)
×
353
        for alarm_name in alarm_names:
×
354
            arn = arns.cloudwatch_alarm_arn(alarm_name, context.account_id, context.region)
×
355
            self.alarm_scheduler.delete_scheduler_for_alarm(arn)
×
356

357
    def get_raw_metrics(self, request: Request):
1✔
358
        region = extract_region_from_auth_header(request.headers)
×
359
        account_id = (
×
360
            get_account_id_from_access_key_id(
361
                extract_access_key_id_from_auth_header(request.headers)
362
            )
363
            or DEFAULT_AWS_ACCOUNT_ID
364
        )
365
        backend = cloudwatch_backends[account_id][region]
×
366
        if backend:
×
367
            result = [m for m in backend.metric_data if isinstance(m, MetricDatum)]
×
368
            # TODO handle aggregated metrics as well (MetricAggregatedDatum)
369
        else:
370
            result = []
×
371

372
        result = [
×
373
            {
374
                "ns": r.namespace,
375
                "n": r.name,
376
                "v": r.value,
377
                "t": r.timestamp,
378
                "d": [{"n": d.name, "v": d.value} for d in r.dimensions],
379
                "account": account_id,
380
                "region": region,
381
            }
382
            for r in result
383
        ]
384
        return {"metrics": result}
×
385

386
    def list_tags_for_resource(
1✔
387
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
388
    ) -> ListTagsForResourceOutput:
389
        tags = self.tags.list_tags_for_resource(resource_arn)
×
390
        return ListTagsForResourceOutput(Tags=tags.get("Tags", []))
×
391

392
    def untag_resource(
1✔
393
        self,
394
        context: RequestContext,
395
        resource_arn: AmazonResourceName,
396
        tag_keys: TagKeyList,
397
        **kwargs,
398
    ) -> UntagResourceOutput:
399
        self.tags.untag_resource(resource_arn, tag_keys)
×
400
        return UntagResourceOutput()
×
401

402
    def tag_resource(
1✔
403
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
404
    ) -> TagResourceOutput:
405
        self.tags.tag_resource(resource_arn, tags)
×
406
        return TagResourceOutput()
×
407

408
    @handler("GetMetricData", expand=False)
1✔
409
    def get_metric_data(
1✔
410
        self, context: RequestContext, request: GetMetricDataInput
411
    ) -> GetMetricDataOutput:
412
        result = moto.call_moto(context)
×
413
        # moto currently uses hardcoded label metric_name + stat
414
        # parity tests shows that default is MetricStat, but there might also be a label explicitly set
415
        metric_data_queries = request["MetricDataQueries"]
×
416
        for i in range(0, len(metric_data_queries)):
×
417
            metric_query = metric_data_queries[i]
×
418
            label = metric_query.get("Label") or metric_query.get("MetricStat", {}).get(
×
419
                "Metric", {}
420
            ).get("MetricName")
421
            if label:
×
422
                result["MetricDataResults"][i]["Label"] = label
×
423
        if "Messages" not in result:
×
424
            # parity tests reveals that an empty messages list is added
425
            result["Messages"] = []
×
426
        return result
×
427

428
    @handler("PutMetricAlarm", expand=False)
1✔
429
    def put_metric_alarm(
1✔
430
        self,
431
        context: RequestContext,
432
        request: PutMetricAlarmInput,
433
    ) -> None:
434
        # missing will be the default, when not set (but it will not explicitly be set)
435
        if request.get("TreatMissingData", "missing") not in [
×
436
            "breaching",
437
            "notBreaching",
438
            "ignore",
439
            "missing",
440
        ]:
441
            raise ValidationError(
×
442
                f"The value {request['TreatMissingData']} is not supported for TreatMissingData parameter. Supported values are [breaching, notBreaching, ignore, missing]."
443
            )
444
        # do some sanity checks:
445
        if request.get("Period"):
×
446
            # Valid values are 10, 30, and any multiple of 60.
447
            value = request.get("Period")
×
448
            if value not in (10, 30):
×
449
                if value % 60 != 0:
×
450
                    raise ValidationError("Period must be 10, 30 or a multiple of 60")
×
451
        if request.get("Statistic"):
×
452
            if request.get("Statistic") not in [
×
453
                "SampleCount",
454
                "Average",
455
                "Sum",
456
                "Minimum",
457
                "Maximum",
458
            ]:
459
                raise ValidationError(
×
460
                    f"Value '{request.get('Statistic')}' at 'statistic' failed to satisfy constraint: Member must satisfy enum value set: [Maximum, SampleCount, Sum, Minimum, Average]"
461
                )
462

463
        moto.call_moto(context)
×
464

465
        name = request.get("AlarmName")
×
466
        arn = arns.cloudwatch_alarm_arn(name, context.account_id, context.region)
×
467
        self.tags.tag_resource(arn, request.get("Tags"))
×
468
        self.alarm_scheduler.schedule_metric_alarm(arn)
×
469

470
    @handler("PutCompositeAlarm", expand=False)
1✔
471
    def put_composite_alarm(
1✔
472
        self,
473
        context: RequestContext,
474
        request: PutCompositeAlarmInput,
475
    ) -> None:
476
        backend = cloudwatch_backends[context.account_id][context.region]
×
477
        backend.put_metric_alarm(
×
478
            name=request.get("AlarmName"),
479
            namespace=None,
480
            metric_name=None,
481
            metric_data_queries=None,
482
            comparison_operator=None,
483
            evaluation_periods=None,
484
            datapoints_to_alarm=None,
485
            period=None,
486
            threshold=None,
487
            statistic=None,
488
            extended_statistic=None,
489
            description=request.get("AlarmDescription"),
490
            dimensions=[],
491
            alarm_actions=request.get("AlarmActions", []),
492
            ok_actions=request.get("OKActions", []),
493
            insufficient_data_actions=request.get("InsufficientDataActions", []),
494
            unit=None,
495
            actions_enabled=request.get("ActionsEnabled"),
496
            treat_missing_data=None,
497
            evaluate_low_sample_count_percentile=None,
498
            threshold_metric_id=None,
499
            rule=request.get("AlarmRule"),
500
            tags=request.get("Tags", []),
501
        )
502
        LOG.warning(
×
503
            "Composite Alarms configuration is not yet supported, alarm state will not be evaluated"
504
        )
505

506
    @handler("EnableAlarmActions")
1✔
507
    def enable_alarm_actions(
1✔
508
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
509
    ) -> None:
510
        _set_alarm_actions(context, alarm_names, enabled=True)
×
511

512
    @handler("DisableAlarmActions")
1✔
513
    def disable_alarm_actions(
1✔
514
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
515
    ) -> None:
516
        _set_alarm_actions(context, alarm_names, enabled=False)
×
517

518
    @handler("DescribeAlarms", expand=False)
1✔
519
    def describe_alarms(
1✔
520
        self, context: RequestContext, request: DescribeAlarmsInput
521
    ) -> DescribeAlarmsOutput:
522
        response = moto.call_moto(context)
×
523

524
        for c in response["CompositeAlarms"]:
×
525
            _cleanup_describe_output(c)
×
526
        for m in response["MetricAlarms"]:
×
527
            _cleanup_describe_output(m)
×
528

529
        return response
×
530

531
    @handler("GetMetricStatistics", expand=False)
1✔
532
    def get_metric_statistics(
1✔
533
        self, context: RequestContext, request: GetMetricStatisticsInput
534
    ) -> GetMetricStatisticsOutput:
535
        response = moto.call_moto(context)
×
536

537
        # cleanup -> ExtendendStatics is not included in AWS response if it returned empty
538
        for datapoint in response.get("Datapoints"):
×
539
            if "ExtendedStatistics" in datapoint and not datapoint.get("ExtendedStatistics"):
×
540
                datapoint.pop("ExtendedStatistics")
×
541

542
        return response
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc