• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 22519085314

27 Feb 2026 11:47PM UTC coverage: 86.962% (+0.006%) from 86.956%
22519085314

push

github

web-flow
SNS: update store typing (#13866)

3 of 3 new or added lines in 1 file covered. (100.0%)

388 existing lines in 19 files now uncovered.

69828 of 80297 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.24
/localstack-core/localstack/services/cloudwatch/provider.py
1
import json
1✔
2
import logging
1✔
3
import uuid
1✔
4
from datetime import datetime
1✔
5
from typing import Any
1✔
6

7
from moto.cloudwatch import cloudwatch_backends
1✔
8
from moto.cloudwatch.models import Alarm, CloudWatchBackend, MetricDatum
1✔
9

10
from localstack.aws.accounts import get_account_id_from_access_key_id
1✔
11
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
12
from localstack.aws.api.cloudwatch import (
1✔
13
    AlarmNames,
14
    AmazonResourceName,
15
    CloudwatchApi,
16
    DescribeAlarmsInput,
17
    DescribeAlarmsOutput,
18
    GetMetricDataInput,
19
    GetMetricDataOutput,
20
    GetMetricStatisticsInput,
21
    GetMetricStatisticsOutput,
22
    ListTagsForResourceOutput,
23
    PutCompositeAlarmInput,
24
    PutMetricAlarmInput,
25
    StateValue,
26
    TagKeyList,
27
    TagList,
28
    TagResourceOutput,
29
    UntagResourceOutput,
30
)
31
from localstack.aws.connect import connect_to
1✔
32
from localstack.constants import DEFAULT_AWS_ACCOUNT_ID
1✔
33
from localstack.http import Request
1✔
34
from localstack.services import moto
1✔
35
from localstack.services.cloudwatch.alarm_scheduler import AlarmScheduler
1✔
36
from localstack.services.edge import ROUTER
1✔
37
from localstack.services.plugins import SERVICE_PLUGINS, ServiceLifecycleHook
1✔
38
from localstack.state import StateVisitor
1✔
39
from localstack.utils.aws import arns
1✔
40
from localstack.utils.aws.arns import extract_account_id_from_arn, lambda_function_name
1✔
41
from localstack.utils.aws.request_context import (
1✔
42
    extract_access_key_id_from_auth_header,
43
    extract_region_from_auth_header,
44
)
45
from localstack.utils.patch import patch
1✔
46
from localstack.utils.strings import camel_to_snake_case
1✔
47
from localstack.utils.sync import poll_condition
1✔
48
from localstack.utils.tagging import TaggingService
1✔
49
from localstack.utils.threads import start_worker_thread
1✔
50

51
PATH_GET_RAW_METRICS = "/_aws/cloudwatch/metrics/raw"
1✔
52
DEPRECATED_PATH_GET_RAW_METRICS = "/cloudwatch/metrics/raw"
1✔
53
MOTO_INITIAL_UNCHECKED_REASON = "Unchecked: Initial alarm creation"
1✔
54

55
LOG = logging.getLogger(__name__)
1✔
56

57

58
@patch(target=Alarm.update_state)
1✔
59
def update_state(target, self, reason, reason_data, state_value):
1✔
60
    if reason_data is None:
1✔
61
        reason_data = ""
1✔
62
    if self.state_reason == MOTO_INITIAL_UNCHECKED_REASON:
1✔
63
        old_state = StateValue.INSUFFICIENT_DATA
1✔
64
    else:
65
        old_state = self.state_value
1✔
66

67
    old_state_reason = self.state_reason
1✔
68
    old_state_update_timestamp = self.state_updated_timestamp
1✔
69
    target(self, reason, reason_data, state_value)
1✔
70

71
    # check the state and trigger required actions
72
    if not self.actions_enabled or old_state == self.state_value:
1✔
73
        return
1✔
74
    if self.state_value == "OK":
1✔
75
        actions = self.ok_actions
1✔
76
    elif self.state_value == "ALARM":
1✔
77
        actions = self.alarm_actions
1✔
78
    else:
79
        actions = self.insufficient_data_actions
×
80
    for action in actions:
1✔
81
        data = arns.parse_arn(action)
1✔
82
        if data["service"] == "sns":
1✔
83
            service = connect_to(region_name=data["region"], aws_access_key_id=data["account"]).sns
1✔
84
            subject = f"""{self.state_value}: "{self.name}" in {self.region_name}"""
1✔
85
            message = create_message_response_update_state_sns(self, old_state)
1✔
86
            service.publish(TopicArn=action, Subject=subject, Message=message)
1✔
87
        elif data["service"] == "lambda":
1✔
88
            service = connect_to(
1✔
89
                region_name=data["region"], aws_access_key_id=data["account"]
90
            ).lambda_
91
            message = create_message_response_update_state_lambda(
1✔
92
                self, old_state, old_state_reason, old_state_update_timestamp
93
            )
94
            service.invoke(FunctionName=lambda_function_name(action), Payload=message)
1✔
95
        else:
96
            # TODO: support other actions
97
            LOG.warning(
×
98
                "Action for service %s not implemented, action '%s' will not be triggered.",
99
                data["service"],
100
                action,
101
            )
102

103

104
@patch(target=CloudWatchBackend.put_metric_alarm)
1✔
105
def put_metric_alarm(
1✔
106
    target,
107
    self,
108
    name: str,
109
    namespace: str,
110
    metric_name: str,
111
    comparison_operator: str,
112
    evaluation_periods: int,
113
    period: int,
114
    threshold: float,
115
    statistic: str,
116
    description: str,
117
    dimensions: list[dict[str, str]],
118
    alarm_actions: list[str],
119
    metric_data_queries: list[Any] | None = None,
120
    datapoints_to_alarm: int | None = None,
121
    extended_statistic: str | None = None,
122
    ok_actions: list[str] | None = None,
123
    insufficient_data_actions: list[str] | None = None,
124
    unit: str | None = None,
125
    actions_enabled: bool = True,
126
    treat_missing_data: str | None = None,
127
    evaluate_low_sample_count_percentile: str | None = None,
128
    threshold_metric_id: str | None = None,
129
    rule: str | None = None,
130
    tags: list[dict[str, str]] | None = None,
131
) -> Alarm:
132
    return target(
1✔
133
        self,
134
        name,
135
        namespace,
136
        metric_name,
137
        comparison_operator,
138
        evaluation_periods,
139
        period,
140
        threshold,
141
        statistic,
142
        description,
143
        dimensions,
144
        alarm_actions,
145
        metric_data_queries,
146
        datapoints_to_alarm,
147
        extended_statistic,
148
        ok_actions,
149
        insufficient_data_actions,
150
        unit,
151
        actions_enabled,
152
        treat_missing_data,
153
        evaluate_low_sample_count_percentile,
154
        threshold_metric_id,
155
        rule,
156
        tags,
157
    )
158

159

160
def create_metric_data_query_from_alarm(alarm: Alarm):
1✔
161
    # TODO may need to be adapted for other use cases
162
    #  verified return value with a snapshot test
163
    return [
1✔
164
        {
165
            "id": str(uuid.uuid4()),
166
            "metricStat": {
167
                "metric": {
168
                    "namespace": alarm.namespace,
169
                    "name": alarm.metric_name,
170
                    "dimensions": alarm.dimensions or {},
171
                },
172
                "period": int(alarm.period),
173
                "stat": alarm.statistic,
174
            },
175
            "returnData": True,
176
        }
177
    ]
178

179

180
def create_message_response_update_state_lambda(
1✔
181
    alarm: Alarm, old_state, old_state_reason, old_state_timestamp
182
):
183
    response = {
1✔
184
        "accountId": extract_account_id_from_arn(alarm.alarm_arn),
185
        "alarmArn": alarm.alarm_arn,
186
        "alarmData": {
187
            "alarmName": alarm.name,
188
            "state": {
189
                "value": alarm.state_value,
190
                "reason": alarm.state_reason,
191
                "timestamp": _to_iso_8601_datetime_with_nanoseconds(alarm.state_updated_timestamp),
192
            },
193
            "previousState": {
194
                "value": old_state,
195
                "reason": old_state_reason,
196
                "timestamp": _to_iso_8601_datetime_with_nanoseconds(old_state_timestamp),
197
            },
198
            "configuration": {
199
                "description": alarm.description or "",
200
                "metrics": alarm.metric_data_queries
201
                or create_metric_data_query_from_alarm(
202
                    alarm
203
                ),  # TODO: add test with metric_data_queries
204
            },
205
        },
206
        "time": _to_iso_8601_datetime_with_nanoseconds(alarm.state_updated_timestamp),
207
        "region": alarm.region_name,
208
        "source": "aws.cloudwatch",
209
    }
210
    return json.dumps(response)
1✔
211

212

213
def create_message_response_update_state_sns(alarm, old_state):
1✔
214
    response = {
1✔
215
        "AWSAccountId": extract_account_id_from_arn(alarm.alarm_arn),
216
        "OldStateValue": old_state,
217
        "AlarmName": alarm.name,
218
        "AlarmDescription": alarm.description or "",
219
        "AlarmConfigurationUpdatedTimestamp": _to_iso_8601_datetime_with_nanoseconds(
220
            alarm.configuration_updated_timestamp
221
        ),
222
        "NewStateValue": alarm.state_value,
223
        "NewStateReason": alarm.state_reason,
224
        "StateChangeTime": _to_iso_8601_datetime_with_nanoseconds(alarm.state_updated_timestamp),
225
        # the long-name for 'region' should be used - as we don't have it, we use the short name
226
        # which needs to be slightly changed to make snapshot tests work
227
        "Region": alarm.region_name.replace("-", " ").capitalize(),
228
        "AlarmArn": alarm.alarm_arn,
229
        "OKActions": alarm.ok_actions or [],
230
        "AlarmActions": alarm.alarm_actions or [],
231
        "InsufficientDataActions": alarm.insufficient_data_actions or [],
232
    }
233

234
    # collect trigger details
235
    details = {
1✔
236
        "MetricName": alarm.metric_name or "",
237
        "Namespace": alarm.namespace or "",
238
        "Unit": alarm.unit or None,  # testing with AWS revealed this currently returns None
239
        "Period": int(alarm.period) if alarm.period else 0,
240
        "EvaluationPeriods": int(alarm.evaluation_periods) if alarm.evaluation_periods else 0,
241
        "ComparisonOperator": alarm.comparison_operator or "",
242
        "Threshold": float(alarm.threshold) if alarm.threshold else 0.0,
243
        "TreatMissingData": alarm.treat_missing_data or "",
244
        "EvaluateLowSampleCountPercentile": alarm.evaluate_low_sample_count_percentile or "",
245
    }
246

247
    # Dimensions not serializable
248
    dimensions = []
1✔
249
    if alarm.dimensions:
1✔
250
        for d in alarm.dimensions:
1✔
251
            dimensions.append({"value": d.value, "name": d.name})
1✔
252

253
    details["Dimensions"] = dimensions or ""
1✔
254

255
    if alarm.statistic:
1✔
256
        details["StatisticType"] = "Statistic"
1✔
257
        details["Statistic"] = camel_to_snake_case(alarm.statistic).upper()  # AWS returns uppercase
1✔
258
    elif alarm.extended_statistic:
×
259
        details["StatisticType"] = "ExtendedStatistic"
×
260
        details["ExtendedStatistic"] = alarm.extended_statistic
×
261

262
    response["Trigger"] = details
1✔
263

264
    return json.dumps(response)
1✔
265

266

267
class ValidationError(CommonServiceException):
1✔
268
    def __init__(self, message: str):
1✔
269
        super().__init__("ValidationError", message, 400, True)
×
270

271

272
def _to_iso_8601_datetime_with_nanoseconds(date: datetime | None) -> str | None:
1✔
273
    if date is not None:
1✔
274
        return date.strftime("%Y-%m-%dT%H:%M:%S.%f000Z")
1✔
275

276

277
def _set_alarm_actions(context, alarm_names, enabled):
1✔
278
    backend = cloudwatch_backends[context.account_id][context.region]
1✔
279
    for name in alarm_names:
1✔
280
        alarm = backend.alarms.get(name)
1✔
281
        if alarm:
1✔
282
            alarm.actions_enabled = enabled
1✔
283

284

285
def _cleanup_describe_output(alarm):
1✔
286
    if "Metrics" in alarm and len(alarm["Metrics"]) == 0:
1✔
287
        alarm.pop("Metrics")
1✔
288
    reason_data = alarm.get("StateReasonData")
1✔
289
    if reason_data is not None and reason_data in ("{}", ""):
1✔
290
        alarm.pop("StateReasonData")
1✔
291
    if (
1✔
292
        alarm.get("StateReason", "") == MOTO_INITIAL_UNCHECKED_REASON
293
        and alarm.get("StateValue") != StateValue.INSUFFICIENT_DATA
294
    ):
295
        alarm["StateValue"] = StateValue.INSUFFICIENT_DATA
1✔
296

297

298
class CloudwatchProvider(CloudwatchApi, ServiceLifecycleHook):
1✔
299
    """
300
    Cloudwatch provider.
301

302
    LIMITATIONS:
303
        - no alarm rule evaluation
304
    """
305

306
    def __init__(self):
1✔
307
        self.tags = TaggingService()
1✔
308
        self.alarm_scheduler = AlarmScheduler()
1✔
309

310
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
311
        visitor.visit(cloudwatch_backends)
×
312

313
    def on_after_init(self):
1✔
314
        ROUTER.add(PATH_GET_RAW_METRICS, self.get_raw_metrics)
1✔
315

316
    def on_before_start(self):
1✔
317
        self.alarm_scheduler.start()
1✔
318

319
    def on_before_state_reset(self):
1✔
320
        self.alarm_scheduler.shutdown()
×
321

322
    def on_after_state_reset(self):
1✔
323
        self.alarm_scheduler.start()
×
324

325
    def on_before_state_load(self):
1✔
326
        self.alarm_scheduler.shutdown()
×
327

328
    def on_after_state_load(self):
1✔
329
        self.alarm_scheduler.start()
×
330

331
        def restart_alarms(*args):
×
332
            poll_condition(lambda: SERVICE_PLUGINS.is_running("cloudwatch"))
×
333
            self.alarm_scheduler.restart_existing_alarms()
×
334

335
        start_worker_thread(restart_alarms)
×
336

337
    def on_before_stop(self):
1✔
338
        self.alarm_scheduler.shutdown()
1✔
339

340
    def delete_alarms(self, context: RequestContext, alarm_names: AlarmNames, **kwargs) -> None:
1✔
341
        moto.call_moto(context)
1✔
342
        for alarm_name in alarm_names:
1✔
343
            arn = arns.cloudwatch_alarm_arn(alarm_name, context.account_id, context.region)
1✔
344
            self.alarm_scheduler.delete_scheduler_for_alarm(arn)
1✔
345

346
    def get_raw_metrics(self, request: Request):
1✔
347
        region = extract_region_from_auth_header(request.headers)
1✔
348
        account_id = (
1✔
349
            get_account_id_from_access_key_id(
350
                extract_access_key_id_from_auth_header(request.headers)
351
            )
352
            or DEFAULT_AWS_ACCOUNT_ID
353
        )
354
        backend = cloudwatch_backends[account_id][region]
1✔
355
        if backend:
1✔
356
            result = [m for m in backend.metric_data if isinstance(m, MetricDatum)]
1✔
357
            # TODO handle aggregated metrics as well (MetricAggregatedDatum)
358
        else:
UNCOV
359
            result = []
×
360

361
        result = [
1✔
362
            {
363
                "ns": r.namespace,
364
                "n": r.name,
365
                "v": r.value,
366
                "t": r.timestamp,
367
                "d": [{"n": d.name, "v": d.value} for d in r.dimensions],
368
                "account": account_id,
369
                "region": region,
370
            }
371
            for r in result
372
        ]
373
        return {"metrics": result}
1✔
374

375
    def list_tags_for_resource(
1✔
376
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
377
    ) -> ListTagsForResourceOutput:
378
        tags = self.tags.list_tags_for_resource(resource_arn)
1✔
379
        return ListTagsForResourceOutput(Tags=tags.get("Tags", []))
1✔
380

381
    def untag_resource(
1✔
382
        self,
383
        context: RequestContext,
384
        resource_arn: AmazonResourceName,
385
        tag_keys: TagKeyList,
386
        **kwargs,
387
    ) -> UntagResourceOutput:
388
        self.tags.untag_resource(resource_arn, tag_keys)
1✔
389
        return UntagResourceOutput()
1✔
390

391
    def tag_resource(
1✔
392
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
393
    ) -> TagResourceOutput:
394
        self.tags.tag_resource(resource_arn, tags)
1✔
395
        return TagResourceOutput()
1✔
396

397
    @handler("GetMetricData", expand=False)
1✔
398
    def get_metric_data(
1✔
399
        self, context: RequestContext, request: GetMetricDataInput
400
    ) -> GetMetricDataOutput:
401
        result = moto.call_moto(context)
1✔
402
        # moto currently uses hardcoded label metric_name + stat
403
        # parity tests shows that default is MetricStat, but there might also be a label explicitly set
404
        metric_data_queries = request["MetricDataQueries"]
1✔
405
        for i in range(0, len(metric_data_queries)):
1✔
406
            metric_query = metric_data_queries[i]
1✔
407
            label = metric_query.get("Label") or metric_query.get("MetricStat", {}).get(
1✔
408
                "Metric", {}
409
            ).get("MetricName")
410
            if label:
1✔
411
                result["MetricDataResults"][i]["Label"] = label
1✔
412
        if "Messages" not in result:
1✔
413
            # parity tests reveals that an empty messages list is added
414
            result["Messages"] = []
1✔
415
        return result
1✔
416

417
    @handler("PutMetricAlarm", expand=False)
1✔
418
    def put_metric_alarm(
1✔
419
        self,
420
        context: RequestContext,
421
        request: PutMetricAlarmInput,
422
    ) -> None:
423
        # missing will be the default, when not set (but it will not explicitly be set)
424
        if request.get("TreatMissingData", "missing") not in [
1✔
425
            "breaching",
426
            "notBreaching",
427
            "ignore",
428
            "missing",
429
        ]:
UNCOV
430
            raise ValidationError(
×
431
                f"The value {request['TreatMissingData']} is not supported for TreatMissingData parameter. Supported values are [breaching, notBreaching, ignore, missing]."
432
            )
433
        # do some sanity checks:
434
        if request.get("Period"):
1✔
435
            # Valid values are 10, 30, and any multiple of 60.
436
            value = request.get("Period")
1✔
437
            if value not in (10, 30):
1✔
438
                if value % 60 != 0:
1✔
UNCOV
439
                    raise ValidationError("Period must be 10, 30 or a multiple of 60")
×
440
        if request.get("Statistic"):
1✔
441
            if request.get("Statistic") not in [
1✔
442
                "SampleCount",
443
                "Average",
444
                "Sum",
445
                "Minimum",
446
                "Maximum",
447
            ]:
UNCOV
448
                raise ValidationError(
×
449
                    f"Value '{request.get('Statistic')}' at 'statistic' failed to satisfy constraint: Member must satisfy enum value set: [Maximum, SampleCount, Sum, Minimum, Average]"
450
                )
451

452
        moto.call_moto(context)
1✔
453

454
        name = request.get("AlarmName")
1✔
455
        arn = arns.cloudwatch_alarm_arn(name, context.account_id, context.region)
1✔
456
        self.tags.tag_resource(arn, request.get("Tags"))
1✔
457
        self.alarm_scheduler.schedule_metric_alarm(arn)
1✔
458

459
    @handler("PutCompositeAlarm", expand=False)
1✔
460
    def put_composite_alarm(
1✔
461
        self,
462
        context: RequestContext,
463
        request: PutCompositeAlarmInput,
464
    ) -> None:
465
        backend = cloudwatch_backends[context.account_id][context.region]
1✔
466
        backend.put_metric_alarm(
1✔
467
            name=request.get("AlarmName"),
468
            namespace=None,
469
            metric_name=None,
470
            metric_data_queries=None,
471
            comparison_operator=None,
472
            evaluation_periods=None,
473
            datapoints_to_alarm=None,
474
            period=None,
475
            threshold=None,
476
            statistic=None,
477
            extended_statistic=None,
478
            description=request.get("AlarmDescription"),
479
            dimensions=[],
480
            alarm_actions=request.get("AlarmActions", []),
481
            ok_actions=request.get("OKActions", []),
482
            insufficient_data_actions=request.get("InsufficientDataActions", []),
483
            unit=None,
484
            actions_enabled=request.get("ActionsEnabled"),
485
            treat_missing_data=None,
486
            evaluate_low_sample_count_percentile=None,
487
            threshold_metric_id=None,
488
            rule=request.get("AlarmRule"),
489
            tags=request.get("Tags", []),
490
        )
491
        LOG.warning(
1✔
492
            "Composite Alarms configuration is not yet supported, alarm state will not be evaluated"
493
        )
494

495
    @handler("EnableAlarmActions")
1✔
496
    def enable_alarm_actions(
1✔
497
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
498
    ) -> None:
499
        _set_alarm_actions(context, alarm_names, enabled=True)
1✔
500

501
    @handler("DisableAlarmActions")
1✔
502
    def disable_alarm_actions(
1✔
503
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
504
    ) -> None:
505
        _set_alarm_actions(context, alarm_names, enabled=False)
1✔
506

507
    @handler("DescribeAlarms", expand=False)
1✔
508
    def describe_alarms(
1✔
509
        self, context: RequestContext, request: DescribeAlarmsInput
510
    ) -> DescribeAlarmsOutput:
511
        response = moto.call_moto(context)
1✔
512

513
        for c in response["CompositeAlarms"]:
1✔
514
            _cleanup_describe_output(c)
1✔
515
        for m in response["MetricAlarms"]:
1✔
516
            _cleanup_describe_output(m)
1✔
517

518
        return response
1✔
519

520
    @handler("GetMetricStatistics", expand=False)
1✔
521
    def get_metric_statistics(
1✔
522
        self, context: RequestContext, request: GetMetricStatisticsInput
523
    ) -> GetMetricStatisticsOutput:
524
        response = moto.call_moto(context)
1✔
525

526
        # cleanup -> ExtendendStatics is not included in AWS response if it returned empty
527
        for datapoint in response.get("Datapoints"):
1✔
528
            if "ExtendedStatistics" in datapoint and not datapoint.get("ExtendedStatistics"):
1✔
UNCOV
529
                datapoint.pop("ExtendedStatistics")
×
530

531
        return response
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc