• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 17752573338

15 Sep 2025 10:56PM UTC coverage: 86.879% (+0.03%) from 86.851%
17752573338

push

github

web-flow
CFn: validate during get template (#13139)

6 of 6 new or added lines in 1 file covered. (100.0%)

138 existing lines in 10 files now uncovered.

67201 of 77350 relevant lines covered (86.88%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.98
/localstack-core/localstack/services/cloudwatch/provider_v2.py
1
import datetime
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import threading
1✔
6
import uuid
1✔
7

8
from localstack.aws.api import CommonServiceException, RequestContext, handler
1✔
9
from localstack.aws.api.cloudwatch import (
1✔
10
    AccountId,
11
    ActionPrefix,
12
    AlarmName,
13
    AlarmNamePrefix,
14
    AlarmNames,
15
    AlarmTypes,
16
    AmazonResourceName,
17
    CloudwatchApi,
18
    ContributorId,
19
    DashboardBody,
20
    DashboardName,
21
    DashboardNamePrefix,
22
    DashboardNames,
23
    Datapoint,
24
    DeleteDashboardsOutput,
25
    DescribeAlarmHistoryOutput,
26
    DescribeAlarmsForMetricOutput,
27
    DescribeAlarmsOutput,
28
    DimensionFilters,
29
    Dimensions,
30
    EntityMetricDataList,
31
    ExtendedStatistic,
32
    ExtendedStatistics,
33
    GetDashboardOutput,
34
    GetMetricDataMaxDatapoints,
35
    GetMetricDataOutput,
36
    GetMetricStatisticsOutput,
37
    HistoryItemType,
38
    IncludeLinkedAccounts,
39
    InvalidParameterCombinationException,
40
    InvalidParameterValueException,
41
    LabelOptions,
42
    ListDashboardsOutput,
43
    ListMetricsOutput,
44
    ListTagsForResourceOutput,
45
    MaxRecords,
46
    MetricData,
47
    MetricDataQueries,
48
    MetricDataQuery,
49
    MetricDataResult,
50
    MetricDataResultMessages,
51
    MetricName,
52
    MetricStat,
53
    Namespace,
54
    NextToken,
55
    Period,
56
    PutCompositeAlarmInput,
57
    PutDashboardOutput,
58
    PutMetricAlarmInput,
59
    RecentlyActive,
60
    ResourceNotFound,
61
    ScanBy,
62
    StandardUnit,
63
    StateReason,
64
    StateReasonData,
65
    StateValue,
66
    Statistic,
67
    Statistics,
68
    StrictEntityValidation,
69
    TagKeyList,
70
    TagList,
71
    TagResourceOutput,
72
    Timestamp,
73
    UntagResourceOutput,
74
)
75
from localstack.aws.connect import connect_to
1✔
76
from localstack.http import Request
1✔
77
from localstack.services.cloudwatch.alarm_scheduler import AlarmScheduler
1✔
78
from localstack.services.cloudwatch.cloudwatch_database_helper import CloudwatchDatabase
1✔
79
from localstack.services.cloudwatch.models import (
1✔
80
    CloudWatchStore,
81
    LocalStackAlarm,
82
    LocalStackCompositeAlarm,
83
    LocalStackDashboard,
84
    LocalStackMetricAlarm,
85
    cloudwatch_stores,
86
)
87
from localstack.services.edge import ROUTER
1✔
88
from localstack.services.plugins import SERVICE_PLUGINS, ServiceLifecycleHook
1✔
89
from localstack.state import AssetDirectory, StateVisitor
1✔
90
from localstack.utils.aws import arns
1✔
91
from localstack.utils.aws.arns import extract_account_id_from_arn, lambda_function_name
1✔
92
from localstack.utils.collections import PaginatedList
1✔
93
from localstack.utils.json import CustomEncoder as JSONEncoder
1✔
94
from localstack.utils.strings import camel_to_snake_case
1✔
95
from localstack.utils.sync import poll_condition
1✔
96
from localstack.utils.threads import start_worker_thread
1✔
97
from localstack.utils.time import timestamp_millis
1✔
98

99
PATH_GET_RAW_METRICS = "/_aws/cloudwatch/metrics/raw"
1✔
100
MOTO_INITIAL_UNCHECKED_REASON = "Unchecked: Initial alarm creation"
1✔
101
LIST_METRICS_MAX_RESULTS = 500
1✔
102
# If the values in these fields are not the same, their values are added when generating labels
103
LABEL_DIFFERENTIATORS = ["Stat", "Period"]
1✔
104
HISTORY_VERSION = "1.0"
1✔
105

106
LOG = logging.getLogger(__name__)
1✔
107
_STORE_LOCK = threading.RLock()
1✔
108
AWS_MAX_DATAPOINTS_ACCEPTED: int = 1440
1✔
109

110

111
class ValidationError(CommonServiceException):
1✔
112
    # TODO: check this error against AWS (doesn't exist in the API)
113
    def __init__(self, message: str):
1✔
114
        super().__init__("ValidationError", message, 400, True)
1✔
115

116

117
class InvalidParameterCombination(CommonServiceException):
1✔
118
    def __init__(self, message: str):
1✔
119
        super().__init__("InvalidParameterCombination", message, 400, True)
1✔
120

121

122
def _validate_parameters_for_put_metric_data(metric_data: MetricData) -> None:
1✔
123
    for index, metric_item in enumerate(metric_data):
1✔
124
        indexplusone = index + 1
1✔
125
        if metric_item.get("Value") and metric_item.get("Values"):
1✔
126
            raise InvalidParameterCombinationException(
1✔
127
                f"The parameters MetricData.member.{indexplusone}.Value and MetricData.member.{indexplusone}.Values are mutually exclusive and you have specified both."
128
            )
129

130
        if metric_item.get("StatisticValues") and metric_item.get("Value"):
1✔
131
            raise InvalidParameterCombinationException(
1✔
132
                f"The parameters MetricData.member.{indexplusone}.Value and MetricData.member.{indexplusone}.StatisticValues are mutually exclusive and you have specified both."
133
            )
134

135
        if metric_item.get("Values") and metric_item.get("Counts"):
1✔
136
            values = metric_item.get("Values")
1✔
137
            counts = metric_item.get("Counts")
1✔
138
            if len(values) != len(counts):
1✔
139
                raise InvalidParameterValueException(
1✔
140
                    f"The parameters MetricData.member.{indexplusone}.Values and MetricData.member.{indexplusone}.Counts must be of the same size."
141
                )
142

143

144
class CloudwatchProvider(CloudwatchApi, ServiceLifecycleHook):
1✔
145
    """
146
    Cloudwatch provider.
147

148
    LIMITATIONS:
149
        - simplified composite alarm rule evaluation:
150
            - only OR operator is supported
151
            - only ALARM expression is supported
152
            - only metric alarms can be included in the rule and they should be referenced by ARN only
153
    """
154

155
    def __init__(self):
1✔
156
        self.alarm_scheduler: AlarmScheduler = None
1✔
157
        self.store = None
1✔
158
        self.cloudwatch_database = CloudwatchDatabase()
1✔
159

160
    @staticmethod
1✔
161
    def get_store(account_id: str, region: str) -> CloudWatchStore:
1✔
162
        return cloudwatch_stores[account_id][region]
1✔
163

164
    def accept_state_visitor(self, visitor: StateVisitor):
1✔
165
        visitor.visit(cloudwatch_stores)
×
UNCOV
166
        visitor.visit(AssetDirectory(self.service, CloudwatchDatabase.CLOUDWATCH_DATA_ROOT))
×
167

168
    def on_after_init(self):
1✔
169
        ROUTER.add(PATH_GET_RAW_METRICS, self.get_raw_metrics)
1✔
170
        self.start_alarm_scheduler()
1✔
171

172
    def on_before_state_reset(self):
1✔
173
        self.shutdown_alarm_scheduler()
×
UNCOV
174
        self.cloudwatch_database.clear_tables()
×
175

176
    def on_after_state_reset(self):
1✔
177
        self.cloudwatch_database = CloudwatchDatabase()
×
UNCOV
178
        self.start_alarm_scheduler()
×
179

180
    def on_before_state_load(self):
1✔
UNCOV
181
        self.shutdown_alarm_scheduler()
×
182

183
    def on_after_state_load(self):
1✔
UNCOV
184
        self.start_alarm_scheduler()
×
185

186
        def restart_alarms(*args):
×
187
            poll_condition(lambda: SERVICE_PLUGINS.is_running("cloudwatch"))
×
UNCOV
188
            self.alarm_scheduler.restart_existing_alarms()
×
189

UNCOV
190
        start_worker_thread(restart_alarms)
×
191

192
    def on_before_stop(self):
1✔
193
        self.shutdown_alarm_scheduler()
1✔
194

195
    def start_alarm_scheduler(self):
1✔
196
        if not self.alarm_scheduler:
1✔
197
            LOG.debug("starting cloudwatch scheduler")
1✔
198
            self.alarm_scheduler = AlarmScheduler()
1✔
199

200
    def shutdown_alarm_scheduler(self):
1✔
201
        LOG.debug("stopping cloudwatch scheduler")
1✔
202
        self.alarm_scheduler.shutdown_scheduler()
1✔
203
        self.alarm_scheduler = None
1✔
204

205
    def delete_alarms(self, context: RequestContext, alarm_names: AlarmNames, **kwargs) -> None:
1✔
206
        """
207
        Delete alarms.
208
        """
209
        with _STORE_LOCK:
1✔
210
            for alarm_name in alarm_names:
1✔
211
                alarm_arn = arns.cloudwatch_alarm_arn(
1✔
212
                    alarm_name, account_id=context.account_id, region_name=context.region
213
                )  # obtain alarm ARN from alarm name
214
                self.alarm_scheduler.delete_scheduler_for_alarm(alarm_arn)
1✔
215
                store = self.get_store(context.account_id, context.region)
1✔
216
                store.alarms.pop(alarm_arn, None)
1✔
217

218
    def put_metric_data(
1✔
219
        self,
220
        context: RequestContext,
221
        namespace: Namespace,
222
        metric_data: MetricData = None,
223
        entity_metric_data: EntityMetricDataList = None,
224
        strict_entity_validation: StrictEntityValidation = None,
225
        **kwargs,
226
    ) -> None:
227
        # TODO add support for entity_metric_data and strict_entity_validation
228
        _validate_parameters_for_put_metric_data(metric_data)
1✔
229

230
        self.cloudwatch_database.add_metric_data(
1✔
231
            context.account_id, context.region, namespace, metric_data
232
        )
233

234
    def get_metric_data(
1✔
235
        self,
236
        context: RequestContext,
237
        metric_data_queries: MetricDataQueries,
238
        start_time: Timestamp,
239
        end_time: Timestamp,
240
        next_token: NextToken = None,
241
        scan_by: ScanBy = None,
242
        max_datapoints: GetMetricDataMaxDatapoints = None,
243
        label_options: LabelOptions = None,
244
        **kwargs,
245
    ) -> GetMetricDataOutput:
246
        results: list[MetricDataResult] = []
1✔
247
        limit = max_datapoints or 100_800
1✔
248
        messages: MetricDataResultMessages = []
1✔
249
        nxt = None
1✔
250
        label_additions = []
1✔
251

252
        for diff in LABEL_DIFFERENTIATORS:
1✔
253
            non_unique = []
1✔
254
            for query in metric_data_queries:
1✔
255
                non_unique.append(query["MetricStat"][diff])
1✔
256
            if len(set(non_unique)) > 1:
1✔
257
                label_additions.append(diff)
1✔
258

259
        for query in metric_data_queries:
1✔
260
            query_result = self.cloudwatch_database.get_metric_data_stat(
1✔
261
                account_id=context.account_id,
262
                region=context.region,
263
                query=query,
264
                start_time=start_time,
265
                end_time=end_time,
266
                scan_by=scan_by,
267
            )
268
            if query_result.get("messages"):
1✔
UNCOV
269
                messages.extend(query_result.get("messages"))
×
270

271
            label = query.get("Label") or f"{query['MetricStat']['Metric']['MetricName']}"
1✔
272
            # TODO: does this happen even if a label is set in the query?
273
            for label_addition in label_additions:
1✔
274
                label = f"{label} {query['MetricStat'][label_addition]}"
1✔
275

276
            timestamps = query_result.get("timestamps", {})
1✔
277
            values = query_result.get("values", {})
1✔
278

279
            # Paginate
280
            timestamp_value_dicts = [
1✔
281
                {
282
                    "Timestamp": timestamp,
283
                    "Value": value,
284
                }
285
                for timestamp, value in zip(timestamps, values, strict=False)
286
            ]
287

288
            pagination = PaginatedList(timestamp_value_dicts)
1✔
289
            timestamp_page, nxt = pagination.get_page(
1✔
290
                lambda item: item.get("Timestamp"),
291
                next_token=next_token,
292
                page_size=limit,
293
            )
294

295
            timestamps = [item.get("Timestamp") for item in timestamp_page]
1✔
296
            values = [item.get("Value") for item in timestamp_page]
1✔
297

298
            metric_data_result = {
1✔
299
                "Id": query.get("Id"),
300
                "Label": label,
301
                "StatusCode": "Complete",
302
                "Timestamps": timestamps,
303
                "Values": values,
304
            }
305
            results.append(MetricDataResult(**metric_data_result))
1✔
306

307
        return GetMetricDataOutput(MetricDataResults=results, NextToken=nxt, Messages=messages)
1✔
308

309
    def set_alarm_state(
1✔
310
        self,
311
        context: RequestContext,
312
        alarm_name: AlarmName,
313
        state_value: StateValue,
314
        state_reason: StateReason,
315
        state_reason_data: StateReasonData = None,
316
        **kwargs,
317
    ) -> None:
318
        try:
1✔
319
            if state_reason_data:
1✔
320
                state_reason_data = json.loads(state_reason_data)
1✔
321
        except ValueError:
×
UNCOV
322
            raise InvalidParameterValueException(
×
323
                "TODO: check right error message: Json was not correctly formatted"
324
            )
325
        with _STORE_LOCK:
1✔
326
            store = self.get_store(context.account_id, context.region)
1✔
327
            alarm = store.alarms.get(
1✔
328
                arns.cloudwatch_alarm_arn(
329
                    alarm_name, account_id=context.account_id, region_name=context.region
330
                )
331
            )
332
            if not alarm:
1✔
333
                raise ResourceNotFound()
1✔
334

335
            old_state = alarm.alarm["StateValue"]
1✔
336
            if state_value not in ("OK", "ALARM", "INSUFFICIENT_DATA"):
1✔
337
                raise ValidationError(
1✔
338
                    f"1 validation error detected: Value '{state_value}' at 'stateValue' failed to satisfy constraint: Member must satisfy enum value set: [INSUFFICIENT_DATA, ALARM, OK]"
339
                )
340

341
            old_state_reason = alarm.alarm["StateReason"]
1✔
342
            old_state_update_timestamp = alarm.alarm["StateUpdatedTimestamp"]
1✔
343

344
            if old_state == state_value:
1✔
UNCOV
345
                return
×
346

347
            alarm.alarm["StateTransitionedTimestamp"] = datetime.datetime.now(datetime.UTC)
1✔
348
            # update startDate (=last ALARM date) - should only update when a new alarm is triggered
349
            # the date is only updated if we have a reason-data, which is set by an alarm
350
            if state_reason_data:
1✔
351
                state_reason_data["startDate"] = state_reason_data.get("queryDate")
1✔
352

353
            self._update_state(
1✔
354
                context,
355
                alarm,
356
                state_value,
357
                state_reason,
358
                state_reason_data,
359
            )
360

361
            self._evaluate_composite_alarms(context, alarm)
1✔
362

363
            if not alarm.alarm["ActionsEnabled"]:
1✔
364
                return
1✔
365
            if state_value == "OK":
1✔
366
                actions = alarm.alarm["OKActions"]
1✔
367
            elif state_value == "ALARM":
1✔
368
                actions = alarm.alarm["AlarmActions"]
1✔
369
            else:
UNCOV
370
                actions = alarm.alarm["InsufficientDataActions"]
×
371
            for action in actions:
1✔
372
                data = arns.parse_arn(action)
1✔
373
                # test for sns - can this be done in a more generic way?
374
                if data["service"] == "sns":
1✔
375
                    service = connect_to(
1✔
376
                        region_name=data["region"], aws_access_key_id=data["account"]
377
                    ).sns
378
                    subject = f"""{state_value}: "{alarm_name}" in {context.region}"""
1✔
379
                    message = create_message_response_update_state_sns(alarm, old_state)
1✔
380
                    service.publish(TopicArn=action, Subject=subject, Message=message)
1✔
381
                elif data["service"] == "lambda":
1✔
382
                    service = connect_to(
1✔
383
                        region_name=data["region"], aws_access_key_id=data["account"]
384
                    ).lambda_
385
                    message = create_message_response_update_state_lambda(
1✔
386
                        alarm, old_state, old_state_reason, old_state_update_timestamp
387
                    )
388
                    service.invoke(FunctionName=lambda_function_name(action), Payload=message)
1✔
389
                else:
390
                    # TODO: support other actions
UNCOV
391
                    LOG.warning(
×
392
                        "Action for service %s not implemented, action '%s' will not be triggered.",
393
                        data["service"],
394
                        action,
395
                    )
396

397
    def get_raw_metrics(self, request: Request):
1✔
398
        """this feature was introduced with https://github.com/localstack/localstack/pull/3535
399
        # in the meantime, it required a valid aws-header so that the account-id/region could be extracted
400
        # with the new implementation, we want to return all data, but add the account-id/region as additional attributes
401

402
        # TODO endpoint should be refactored or deprecated at some point
403
        #   - result should be paginated
404
        #   - include aggregated metrics (but we would also need to change/adapt the shape of "metrics" that we return)
405
        :returns: json {"metrics": [{"ns": "namespace", "n": "metric_name", "v": value, "t": timestamp,
406
        "d": [<dimensions-key-pair-values>],"account": account, "region": region}]}
407
        """
408
        return {"metrics": self.cloudwatch_database.get_all_metric_data() or []}
1✔
409

410
    @handler("PutMetricAlarm", expand=False)
1✔
411
    def put_metric_alarm(self, context: RequestContext, request: PutMetricAlarmInput) -> None:
1✔
412
        # missing will be the default, when not set (but it will not explicitly be set)
413
        if request.get("TreatMissingData", "missing") not in [
1✔
414
            "breaching",
415
            "notBreaching",
416
            "ignore",
417
            "missing",
418
        ]:
UNCOV
419
            raise ValidationError(
×
420
                f"The value {request['TreatMissingData']} is not supported for TreatMissingData parameter. Supported values are [breaching, notBreaching, ignore, missing]."
421
            )
422
            # do some sanity checks:
423
        if request.get("Period"):
1✔
424
            # Valid values are 10, 30, and any multiple of 60.
425
            value = request.get("Period")
1✔
426
            if value not in (10, 30):
1✔
427
                if value % 60 != 0:
1✔
UNCOV
428
                    raise ValidationError("Period must be 10, 30 or a multiple of 60")
×
429
        if request.get("Statistic"):
1✔
430
            if request.get("Statistic") not in [
1✔
431
                "SampleCount",
432
                "Average",
433
                "Sum",
434
                "Minimum",
435
                "Maximum",
436
            ]:
UNCOV
437
                raise ValidationError(
×
438
                    f"Value '{request.get('Statistic')}' at 'statistic' failed to satisfy constraint: Member must satisfy enum value set: [Maximum, SampleCount, Sum, Minimum, Average]"
439
                )
440

441
        extended_statistic = request.get("ExtendedStatistic")
1✔
442
        if extended_statistic and not extended_statistic.startswith("p"):
1✔
UNCOV
443
            raise InvalidParameterValueException(
×
444
                f"The value {extended_statistic} for parameter ExtendedStatistic is not supported."
445
            )
446
        evaluate_low_sample_count_percentile = request.get("EvaluateLowSampleCountPercentile")
1✔
447
        if evaluate_low_sample_count_percentile and evaluate_low_sample_count_percentile not in (
1✔
448
            "evaluate",
449
            "ignore",
450
        ):
UNCOV
451
            raise ValidationError(
×
452
                f"Option {evaluate_low_sample_count_percentile} is not supported. "
453
                "Supported options for parameter EvaluateLowSampleCountPercentile are evaluate and ignore."
454
            )
455
        with _STORE_LOCK:
1✔
456
            store = self.get_store(context.account_id, context.region)
1✔
457
            metric_alarm = LocalStackMetricAlarm(context.account_id, context.region, {**request})
1✔
458
            alarm_arn = metric_alarm.alarm["AlarmArn"]
1✔
459
            store.alarms[alarm_arn] = metric_alarm
1✔
460
            self.alarm_scheduler.schedule_metric_alarm(alarm_arn)
1✔
461

462
    @handler("PutCompositeAlarm", expand=False)
1✔
463
    def put_composite_alarm(self, context: RequestContext, request: PutCompositeAlarmInput) -> None:
1✔
464
        with _STORE_LOCK:
1✔
465
            store = self.get_store(context.account_id, context.region)
1✔
466
            composite_alarm = LocalStackCompositeAlarm(
1✔
467
                context.account_id, context.region, {**request}
468
            )
469

470
            alarm_rule = composite_alarm.alarm["AlarmRule"]
1✔
471
            rule_expression_validation_result = self._validate_alarm_rule_expression(alarm_rule)
1✔
472
            [LOG.warning(w) for w in rule_expression_validation_result]
1✔
473

474
            alarm_arn = composite_alarm.alarm["AlarmArn"]
1✔
475
            store.alarms[alarm_arn] = composite_alarm
1✔
476

477
    def describe_alarms(
1✔
478
        self,
479
        context: RequestContext,
480
        alarm_names: AlarmNames = None,
481
        alarm_name_prefix: AlarmNamePrefix = None,
482
        alarm_types: AlarmTypes = None,
483
        children_of_alarm_name: AlarmName = None,
484
        parents_of_alarm_name: AlarmName = None,
485
        state_value: StateValue = None,
486
        action_prefix: ActionPrefix = None,
487
        max_records: MaxRecords = None,
488
        next_token: NextToken = None,
489
        **kwargs,
490
    ) -> DescribeAlarmsOutput:
491
        store = self.get_store(context.account_id, context.region)
1✔
492
        alarms = list(store.alarms.values())
1✔
493
        if action_prefix:
1✔
UNCOV
494
            alarms = [a.alarm for a in alarms if a.alarm["AlarmAction"].startswith(action_prefix)]
×
495
        elif alarm_name_prefix:
1✔
UNCOV
496
            alarms = [a.alarm for a in alarms if a.alarm["AlarmName"].startswith(alarm_name_prefix)]
×
497
        elif alarm_names:
1✔
498
            alarms = [a.alarm for a in alarms if a.alarm["AlarmName"] in alarm_names]
1✔
499
        elif state_value:
×
UNCOV
500
            alarms = [a.alarm for a in alarms if a.alarm["StateValue"] == state_value]
×
501
        else:
UNCOV
502
            alarms = [a.alarm for a in list(store.alarms.values())]
×
503

504
        # TODO: Pagination
505
        metric_alarms = [a for a in alarms if a.get("AlarmRule") is None]
1✔
506
        composite_alarms = [a for a in alarms if a.get("AlarmRule") is not None]
1✔
507
        return DescribeAlarmsOutput(CompositeAlarms=composite_alarms, MetricAlarms=metric_alarms)
1✔
508

509
    def describe_alarms_for_metric(
1✔
510
        self,
511
        context: RequestContext,
512
        metric_name: MetricName,
513
        namespace: Namespace,
514
        statistic: Statistic = None,
515
        extended_statistic: ExtendedStatistic = None,
516
        dimensions: Dimensions = None,
517
        period: Period = None,
518
        unit: StandardUnit = None,
519
        **kwargs,
520
    ) -> DescribeAlarmsForMetricOutput:
521
        store = self.get_store(context.account_id, context.region)
1✔
522
        alarms = [
1✔
523
            a.alarm
524
            for a in store.alarms.values()
525
            if isinstance(a, LocalStackMetricAlarm)
526
            and a.alarm.get("MetricName") == metric_name
527
            and a.alarm.get("Namespace") == namespace
528
        ]
529

530
        if statistic:
1✔
531
            alarms = [a for a in alarms if a.get("Statistic") == statistic]
1✔
532
        if dimensions:
1✔
533
            alarms = [a for a in alarms if a.get("Dimensions") == dimensions]
1✔
534
        if period:
1✔
UNCOV
535
            alarms = [a for a in alarms if a.get("Period") == period]
×
536
        if unit:
1✔
UNCOV
537
            alarms = [a for a in alarms if a.get("Unit") == unit]
×
538
        return DescribeAlarmsForMetricOutput(MetricAlarms=alarms)
1✔
539

540
    def list_tags_for_resource(
1✔
541
        self, context: RequestContext, resource_arn: AmazonResourceName, **kwargs
542
    ) -> ListTagsForResourceOutput:
543
        store = self.get_store(context.account_id, context.region)
1✔
544
        tags = store.TAGS.list_tags_for_resource(resource_arn)
1✔
545
        return ListTagsForResourceOutput(Tags=tags.get("Tags", []))
1✔
546

547
    def untag_resource(
1✔
548
        self,
549
        context: RequestContext,
550
        resource_arn: AmazonResourceName,
551
        tag_keys: TagKeyList,
552
        **kwargs,
553
    ) -> UntagResourceOutput:
554
        store = self.get_store(context.account_id, context.region)
1✔
555
        store.TAGS.untag_resource(resource_arn, tag_keys)
1✔
556
        return UntagResourceOutput()
1✔
557

558
    def tag_resource(
1✔
559
        self, context: RequestContext, resource_arn: AmazonResourceName, tags: TagList, **kwargs
560
    ) -> TagResourceOutput:
561
        store = self.get_store(context.account_id, context.region)
1✔
562
        store.TAGS.tag_resource(resource_arn, tags)
1✔
563
        return TagResourceOutput()
1✔
564

565
    def put_dashboard(
1✔
566
        self,
567
        context: RequestContext,
568
        dashboard_name: DashboardName,
569
        dashboard_body: DashboardBody,
570
        **kwargs,
571
    ) -> PutDashboardOutput:
572
        pattern = r"^[a-zA-Z0-9_-]+$"
1✔
573
        if not re.match(pattern, dashboard_name):
1✔
574
            raise InvalidParameterValueException(
1✔
575
                "The value for field DashboardName contains invalid characters. "
576
                "It can only contain alphanumerics, dash (-) and underscore (_).\n"
577
            )
578

579
        store = self.get_store(context.account_id, context.region)
1✔
580
        store.dashboards[dashboard_name] = LocalStackDashboard(
1✔
581
            context.account_id, context.region, dashboard_name, dashboard_body
582
        )
583
        return PutDashboardOutput()
1✔
584

585
    def get_dashboard(
1✔
586
        self, context: RequestContext, dashboard_name: DashboardName, **kwargs
587
    ) -> GetDashboardOutput:
588
        store = self.get_store(context.account_id, context.region)
1✔
589
        dashboard = store.dashboards.get(dashboard_name)
1✔
590
        if not dashboard:
1✔
UNCOV
591
            raise InvalidParameterValueException(f"Dashboard {dashboard_name} does not exist.")
×
592

593
        return GetDashboardOutput(
1✔
594
            DashboardName=dashboard_name,
595
            DashboardBody=dashboard.dashboard_body,
596
            DashboardArn=dashboard.dashboard_arn,
597
        )
598

599
    def delete_dashboards(
1✔
600
        self, context: RequestContext, dashboard_names: DashboardNames, **kwargs
601
    ) -> DeleteDashboardsOutput:
602
        store = self.get_store(context.account_id, context.region)
1✔
603
        for dashboard_name in dashboard_names:
1✔
604
            store.dashboards.pop(dashboard_name, None)
1✔
605
        return DeleteDashboardsOutput()
1✔
606

607
    def list_dashboards(
1✔
608
        self,
609
        context: RequestContext,
610
        dashboard_name_prefix: DashboardNamePrefix = None,
611
        next_token: NextToken = None,
612
        **kwargs,
613
    ) -> ListDashboardsOutput:
614
        store = self.get_store(context.account_id, context.region)
1✔
615
        dashboard_names = list(store.dashboards.keys())
1✔
616
        dashboard_names = [
1✔
617
            name for name in dashboard_names if name.startswith(dashboard_name_prefix or "")
618
        ]
619

620
        entries = [
1✔
621
            {
622
                "DashboardName": name,
623
                "DashboardArn": store.dashboards[name].dashboard_arn,
624
                "LastModified": store.dashboards[name].last_modified,
625
                "Size": store.dashboards[name].size,
626
            }
627
            for name in dashboard_names
628
        ]
629
        return ListDashboardsOutput(
1✔
630
            DashboardEntries=entries,
631
        )
632

633
    def list_metrics(
1✔
634
        self,
635
        context: RequestContext,
636
        namespace: Namespace = None,
637
        metric_name: MetricName = None,
638
        dimensions: DimensionFilters = None,
639
        next_token: NextToken = None,
640
        recently_active: RecentlyActive = None,
641
        include_linked_accounts: IncludeLinkedAccounts = None,
642
        owning_account: AccountId = None,
643
        **kwargs,
644
    ) -> ListMetricsOutput:
645
        result = self.cloudwatch_database.list_metrics(
1✔
646
            context.account_id,
647
            context.region,
648
            namespace,
649
            metric_name,
650
            dimensions or [],
651
        )
652

653
        metrics = [
1✔
654
            {
655
                "Namespace": metric.get("namespace"),
656
                "MetricName": metric.get("metric_name"),
657
                "Dimensions": metric.get("dimensions"),
658
            }
659
            for metric in result.get("metrics", [])
660
        ]
661
        aliases_list = PaginatedList(metrics)
1✔
662
        page, nxt = aliases_list.get_page(
1✔
663
            lambda metric: f"{metric.get('Namespace')}-{metric.get('MetricName')}-{metric.get('Dimensions')}",
664
            next_token=next_token,
665
            page_size=LIST_METRICS_MAX_RESULTS,
666
        )
667
        return ListMetricsOutput(Metrics=page, NextToken=nxt)
1✔
668

669
    def get_metric_statistics(
1✔
670
        self,
671
        context: RequestContext,
672
        namespace: Namespace,
673
        metric_name: MetricName,
674
        start_time: Timestamp,
675
        end_time: Timestamp,
676
        period: Period,
677
        dimensions: Dimensions = None,
678
        statistics: Statistics = None,
679
        extended_statistics: ExtendedStatistics = None,
680
        unit: StandardUnit = None,
681
        **kwargs,
682
    ) -> GetMetricStatisticsOutput:
683
        start_time_unix = int(start_time.timestamp())
1✔
684
        end_time_unix = int(end_time.timestamp())
1✔
685

686
        if not start_time_unix < end_time_unix:
1✔
687
            raise InvalidParameterValueException(
1✔
688
                "The parameter StartTime must be less than the parameter EndTime."
689
            )
690

691
        expected_datapoints = (end_time_unix - start_time_unix) / period
1✔
692

693
        if expected_datapoints > AWS_MAX_DATAPOINTS_ACCEPTED:
1✔
694
            raise InvalidParameterCombination(
1✔
695
                f"You have requested up to {int(expected_datapoints)} datapoints, which exceeds the limit of {AWS_MAX_DATAPOINTS_ACCEPTED}. "
696
                f"You may reduce the datapoints requested by increasing Period, or decreasing the time range."
697
            )
698

699
        stat_datapoints = {}
1✔
700

701
        units = (
1✔
702
            [unit]
703
            if unit
704
            else self.cloudwatch_database.get_units_for_metric_data_stat(
705
                account_id=context.account_id,
706
                region=context.region,
707
                start_time=start_time,
708
                end_time=end_time,
709
                metric_name=metric_name,
710
                namespace=namespace,
711
            )
712
        )
713

714
        for stat in statistics:
1✔
715
            for selected_unit in units:
1✔
716
                query_result = self.cloudwatch_database.get_metric_data_stat(
1✔
717
                    account_id=context.account_id,
718
                    region=context.region,
719
                    start_time=start_time,
720
                    end_time=end_time,
721
                    scan_by="TimestampDescending",
722
                    query=MetricDataQuery(
723
                        MetricStat=MetricStat(
724
                            Metric={
725
                                "MetricName": metric_name,
726
                                "Namespace": namespace,
727
                                "Dimensions": dimensions or [],
728
                            },
729
                            Period=period,
730
                            Stat=stat,
731
                            Unit=selected_unit,
732
                        )
733
                    ),
734
                )
735

736
                timestamps = query_result.get("timestamps", [])
1✔
737
                values = query_result.get("values", [])
1✔
738
                for i, timestamp in enumerate(timestamps):
1✔
739
                    stat_datapoints.setdefault(selected_unit, {})
1✔
740
                    stat_datapoints[selected_unit].setdefault(timestamp, {})
1✔
741
                    stat_datapoints[selected_unit][timestamp][stat] = values[i]
1✔
742
                    stat_datapoints[selected_unit][timestamp]["Unit"] = selected_unit
1✔
743

744
        datapoints: list[Datapoint] = []
1✔
745
        for selected_unit, results in stat_datapoints.items():
1✔
746
            for timestamp, stats in results.items():
1✔
747
                datapoints.append(
1✔
748
                    Datapoint(
749
                        Timestamp=timestamp,
750
                        SampleCount=stats.get("SampleCount"),
751
                        Average=stats.get("Average"),
752
                        Sum=stats.get("Sum"),
753
                        Minimum=stats.get("Minimum"),
754
                        Maximum=stats.get("Maximum"),
755
                        Unit="None" if selected_unit == "NULL_VALUE" else selected_unit,
756
                    )
757
                )
758

759
        return GetMetricStatisticsOutput(Datapoints=datapoints, Label=metric_name)
1✔
760

761
    def _update_state(
1✔
762
        self,
763
        context: RequestContext,
764
        alarm: LocalStackAlarm,
765
        state_value: str,
766
        state_reason: str,
767
        state_reason_data: dict = None,
768
    ):
769
        old_state = alarm.alarm["StateValue"]
1✔
770
        old_state_reason = alarm.alarm["StateReason"]
1✔
771
        store = self.get_store(context.account_id, context.region)
1✔
772
        current_time = datetime.datetime.now()
1✔
773
        # version is not present in state reason data for composite alarm, hence the check
774
        if state_reason_data and isinstance(alarm, LocalStackMetricAlarm):
1✔
775
            state_reason_data["version"] = HISTORY_VERSION
1✔
776
        history_data = {
1✔
777
            "version": HISTORY_VERSION,
778
            "oldState": {"stateValue": old_state, "stateReason": old_state_reason},
779
            "newState": {
780
                "stateValue": state_value,
781
                "stateReason": state_reason,
782
                "stateReasonData": state_reason_data,
783
            },
784
        }
785
        store.histories.append(
1✔
786
            {
787
                "Timestamp": timestamp_millis(alarm.alarm["StateUpdatedTimestamp"]),
788
                "HistoryItemType": HistoryItemType.StateUpdate,
789
                "AlarmName": alarm.alarm["AlarmName"],
790
                "HistoryData": json.dumps(history_data),
791
                "HistorySummary": f"Alarm updated from {old_state} to {state_value}",
792
                "AlarmType": "MetricAlarm"
793
                if isinstance(alarm, LocalStackMetricAlarm)
794
                else "CompositeAlarm",
795
            }
796
        )
797
        alarm.alarm["StateValue"] = state_value
1✔
798
        alarm.alarm["StateReason"] = state_reason
1✔
799
        if state_reason_data:
1✔
800
            alarm.alarm["StateReasonData"] = json.dumps(state_reason_data)
1✔
801
        alarm.alarm["StateUpdatedTimestamp"] = current_time
1✔
802

803
    def disable_alarm_actions(
1✔
804
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
805
    ) -> None:
806
        self._set_alarm_actions(context, alarm_names, enabled=False)
1✔
807

808
    def enable_alarm_actions(
1✔
809
        self, context: RequestContext, alarm_names: AlarmNames, **kwargs
810
    ) -> None:
811
        self._set_alarm_actions(context, alarm_names, enabled=True)
1✔
812

813
    def _set_alarm_actions(self, context, alarm_names, enabled):
1✔
814
        store = self.get_store(context.account_id, context.region)
1✔
815
        for name in alarm_names:
1✔
816
            alarm_arn = arns.cloudwatch_alarm_arn(
1✔
817
                name, account_id=context.account_id, region_name=context.region
818
            )
819
            alarm = store.alarms.get(alarm_arn)
1✔
820
            if alarm:
1✔
821
                alarm.alarm["ActionsEnabled"] = enabled
1✔
822

823
    def describe_alarm_history(
1✔
824
        self,
825
        context: RequestContext,
826
        alarm_name: AlarmName | None = None,
827
        alarm_contributor_id: ContributorId | None = None,
828
        alarm_types: AlarmTypes | None = None,
829
        history_item_type: HistoryItemType | None = None,
830
        start_date: Timestamp | None = None,
831
        end_date: Timestamp | None = None,
832
        max_records: MaxRecords | None = None,
833
        next_token: NextToken | None = None,
834
        scan_by: ScanBy | None = None,
835
        **kwargs,
836
    ) -> DescribeAlarmHistoryOutput:
837
        store = self.get_store(context.account_id, context.region)
1✔
838
        history = store.histories
1✔
839
        if alarm_name:
1✔
840
            history = [h for h in history if h["AlarmName"] == alarm_name]
1✔
841

842
        def _get_timestamp(input: dict):
1✔
843
            if timestamp_string := input.get("Timestamp"):
×
UNCOV
844
                return datetime.datetime.fromisoformat(timestamp_string)
×
UNCOV
845
            return None
×
846

847
        if start_date:
1✔
848
            history = [h for h in history if (date := _get_timestamp(h)) and date >= start_date]
×
849
        if end_date:
1✔
UNCOV
850
            history = [h for h in history if (date := _get_timestamp(h)) and date <= end_date]
×
851
        return DescribeAlarmHistoryOutput(AlarmHistoryItems=history)
1✔
852

853
    def _evaluate_composite_alarms(self, context: RequestContext, triggering_alarm):
1✔
854
        # TODO either pass store as a parameter or acquire RLock (with _STORE_LOCK:)
855
        # everything works ok now but better ensure protection of critical section in front of future changes
856
        store = self.get_store(context.account_id, context.region)
1✔
857
        alarms = list(store.alarms.values())
1✔
858
        composite_alarms = [a for a in alarms if isinstance(a, LocalStackCompositeAlarm)]
1✔
859
        for composite_alarm in composite_alarms:
1✔
860
            self._evaluate_composite_alarm(context, composite_alarm, triggering_alarm)
1✔
861

862
    def _evaluate_composite_alarm(self, context, composite_alarm, triggering_alarm):
1✔
863
        store = self.get_store(context.account_id, context.region)
1✔
864
        alarm_rule = composite_alarm.alarm["AlarmRule"]
1✔
865
        rule_expression_validation = self._validate_alarm_rule_expression(alarm_rule)
1✔
866
        if rule_expression_validation:
1✔
867
            LOG.warning(
1✔
868
                "Alarm rule contains unsupported expressions and will not be evaluated: %s",
869
                rule_expression_validation,
870
            )
871
            return
1✔
872
        new_state_value = StateValue.OK
1✔
873
        # assuming that a rule consists only of ALARM evaluations of metric alarms, with OR logic applied
874
        for metric_alarm_arn in self._get_alarm_arns(alarm_rule):
1✔
875
            metric_alarm = store.alarms.get(metric_alarm_arn)
1✔
876
            if not metric_alarm:
1✔
UNCOV
877
                LOG.warning(
×
878
                    "Alarm rule won't be evaluated as there is no alarm with ARN %s",
879
                    metric_alarm_arn,
880
                )
UNCOV
881
                return
×
882
            if metric_alarm.alarm["StateValue"] == StateValue.ALARM:
1✔
883
                triggering_alarm = metric_alarm
1✔
884
                new_state_value = StateValue.ALARM
1✔
885
                break
1✔
886
        old_state_value = composite_alarm.alarm["StateValue"]
1✔
887
        if old_state_value == new_state_value:
1✔
888
            return
1✔
889
        triggering_alarm_arn = triggering_alarm.alarm.get("AlarmArn")
1✔
890
        triggering_alarm_state = triggering_alarm.alarm.get("StateValue")
1✔
891
        triggering_alarm_state_change_timestamp = triggering_alarm.alarm.get(
1✔
892
            "StateTransitionedTimestamp"
893
        )
894
        state_reason_formatted_timestamp = triggering_alarm_state_change_timestamp.strftime(
1✔
895
            "%A %d %B, %Y %H:%M:%S %Z"
896
        )
897
        state_reason = (
1✔
898
            f"{triggering_alarm_arn} "
899
            f"transitioned to {triggering_alarm_state} "
900
            f"at {state_reason_formatted_timestamp}"
901
        )
902
        state_reason_data = {
1✔
903
            "triggeringAlarms": [
904
                {
905
                    "arn": triggering_alarm_arn,
906
                    "state": {
907
                        "value": triggering_alarm_state,
908
                        "timestamp": timestamp_millis(triggering_alarm_state_change_timestamp),
909
                    },
910
                }
911
            ]
912
        }
913
        self._update_state(
1✔
914
            context, composite_alarm, new_state_value, state_reason, state_reason_data
915
        )
916
        if composite_alarm.alarm["ActionsEnabled"]:
1✔
917
            self._run_composite_alarm_actions(
1✔
918
                context, composite_alarm, old_state_value, triggering_alarm
919
            )
920

921
    def _validate_alarm_rule_expression(self, alarm_rule):
1✔
922
        validation_result = []
1✔
923
        alarms_conditions = [alarm.strip() for alarm in alarm_rule.split("OR")]
1✔
924
        for alarm_condition in alarms_conditions:
1✔
925
            if not alarm_condition.startswith("ALARM"):
1✔
926
                validation_result.append(
1✔
927
                    f"Unsupported expression in alarm rule condition {alarm_condition}: Only ALARM expression is supported by Localstack as of now"
928
                )
929
        return validation_result
1✔
930

931
    def _get_alarm_arns(self, composite_alarm_rule):
1✔
932
        # regexp for everything within (" ")
933
        return re.findall(r'\("([^"]*)"\)', composite_alarm_rule)
1✔
934

935
    def _run_composite_alarm_actions(
1✔
936
        self, context, composite_alarm, old_state_value, triggering_alarm
937
    ):
938
        new_state_value = composite_alarm.alarm["StateValue"]
1✔
939
        if new_state_value == StateValue.OK:
1✔
940
            actions = composite_alarm.alarm["OKActions"]
1✔
941
        elif new_state_value == StateValue.ALARM:
1✔
942
            actions = composite_alarm.alarm["AlarmActions"]
1✔
943
        else:
UNCOV
944
            actions = composite_alarm.alarm["InsufficientDataActions"]
×
945
        for action in actions:
1✔
946
            data = arns.parse_arn(action)
1✔
947
            if data["service"] == "sns":
1✔
948
                service = connect_to(
1✔
949
                    region_name=data["region"], aws_access_key_id=data["account"]
950
                ).sns
951
                subject = f"""{new_state_value}: "{composite_alarm.alarm["AlarmName"]}" in {context.region}"""
1✔
952
                message = create_message_response_update_composite_alarm_state_sns(
1✔
953
                    composite_alarm, triggering_alarm, old_state_value
954
                )
955
                service.publish(TopicArn=action, Subject=subject, Message=message)
1✔
956
            else:
957
                # TODO: support other actions
UNCOV
958
                LOG.warning(
×
959
                    "Action for service %s not implemented, action '%s' will not be triggered.",
960
                    data["service"],
961
                    action,
962
                )
963

964

965
def create_metric_data_query_from_alarm(alarm: LocalStackMetricAlarm):
1✔
966
    # TODO may need to be adapted for other use cases
967
    #  verified return value with a snapshot test
968
    return [
1✔
969
        {
970
            "id": str(uuid.uuid4()),
971
            "metricStat": {
972
                "metric": {
973
                    "namespace": alarm.alarm["Namespace"],
974
                    "name": alarm.alarm["MetricName"],
975
                    "dimensions": alarm.alarm.get("Dimensions") or {},
976
                },
977
                "period": int(alarm.alarm["Period"]),
978
                "stat": alarm.alarm["Statistic"],
979
            },
980
            "returnData": True,
981
        }
982
    ]
983

984

985
def create_message_response_update_state_lambda(
1✔
986
    alarm: LocalStackMetricAlarm, old_state, old_state_reason, old_state_timestamp
987
):
988
    _alarm = alarm.alarm
1✔
989
    response = {
1✔
990
        "accountId": extract_account_id_from_arn(_alarm["AlarmArn"]),
991
        "alarmArn": _alarm["AlarmArn"],
992
        "alarmData": {
993
            "alarmName": _alarm["AlarmName"],
994
            "state": {
995
                "value": _alarm["StateValue"],
996
                "reason": _alarm["StateReason"],
997
                "timestamp": _alarm["StateUpdatedTimestamp"],
998
            },
999
            "previousState": {
1000
                "value": old_state,
1001
                "reason": old_state_reason,
1002
                "timestamp": old_state_timestamp,
1003
            },
1004
            "configuration": {
1005
                "description": _alarm.get("AlarmDescription", ""),
1006
                "metrics": _alarm.get(
1007
                    "Metrics", create_metric_data_query_from_alarm(alarm)
1008
                ),  # TODO: add test with metric_data_queries
1009
            },
1010
        },
1011
        "time": _alarm["StateUpdatedTimestamp"],
1012
        "region": alarm.region,
1013
        "source": "aws.cloudwatch",
1014
    }
1015
    return json.dumps(response, cls=JSONEncoder)
1✔
1016

1017

1018
def create_message_response_update_state_sns(alarm: LocalStackMetricAlarm, old_state: StateValue):
1✔
1019
    _alarm = alarm.alarm
1✔
1020
    response = {
1✔
1021
        "AWSAccountId": alarm.account_id,
1022
        "OldStateValue": old_state,
1023
        "AlarmName": _alarm["AlarmName"],
1024
        "AlarmDescription": _alarm.get("AlarmDescription"),
1025
        "AlarmConfigurationUpdatedTimestamp": _alarm["AlarmConfigurationUpdatedTimestamp"],
1026
        "NewStateValue": _alarm["StateValue"],
1027
        "NewStateReason": _alarm["StateReason"],
1028
        "StateChangeTime": _alarm["StateUpdatedTimestamp"],
1029
        # the long-name for 'region' should be used - as we don't have it, we use the short name
1030
        # which needs to be slightly changed to make snapshot tests work
1031
        "Region": alarm.region.replace("-", " ").capitalize(),
1032
        "AlarmArn": _alarm["AlarmArn"],
1033
        "OKActions": _alarm.get("OKActions", []),
1034
        "AlarmActions": _alarm.get("AlarmActions", []),
1035
        "InsufficientDataActions": _alarm.get("InsufficientDataActions", []),
1036
    }
1037

1038
    # collect trigger details
1039
    details = {
1✔
1040
        "MetricName": _alarm.get("MetricName", ""),
1041
        "Namespace": _alarm.get("Namespace", ""),
1042
        "Unit": _alarm.get("Unit", None),  # testing with AWS revealed this currently returns None
1043
        "Period": int(_alarm.get("Period", 0)),
1044
        "EvaluationPeriods": int(_alarm.get("EvaluationPeriods", 0)),
1045
        "ComparisonOperator": _alarm.get("ComparisonOperator", ""),
1046
        "Threshold": float(_alarm.get("Threshold", 0.0)),
1047
        "TreatMissingData": _alarm.get("TreatMissingData", ""),
1048
        "EvaluateLowSampleCountPercentile": _alarm.get("EvaluateLowSampleCountPercentile", ""),
1049
    }
1050

1051
    # Dimensions not serializable
1052
    dimensions = []
1✔
1053
    alarm_dimensions = _alarm.get("Dimensions", [])
1✔
1054
    if alarm_dimensions:
1✔
1055
        for d in _alarm["Dimensions"]:
1✔
1056
            dimensions.append({"value": d["Value"], "name": d["Name"]})
1✔
1057
    details["Dimensions"] = dimensions or ""
1✔
1058

1059
    alarm_statistic = _alarm.get("Statistic")
1✔
1060
    alarm_extended_statistic = _alarm.get("ExtendedStatistic")
1✔
1061

1062
    if alarm_statistic:
1✔
1063
        details["StatisticType"] = "Statistic"
1✔
1064
        details["Statistic"] = camel_to_snake_case(alarm_statistic).upper()  # AWS returns uppercase
1✔
1065
    elif alarm_extended_statistic:
×
UNCOV
1066
        details["StatisticType"] = "ExtendedStatistic"
×
UNCOV
1067
        details["ExtendedStatistic"] = alarm_extended_statistic
×
1068

1069
    response["Trigger"] = details
1✔
1070

1071
    return json.dumps(response, cls=JSONEncoder)
1✔
1072

1073

1074
def create_message_response_update_composite_alarm_state_sns(
1✔
1075
    composite_alarm: LocalStackCompositeAlarm,
1076
    triggering_alarm: LocalStackMetricAlarm,
1077
    old_state: StateValue,
1078
):
1079
    _alarm = composite_alarm.alarm
1✔
1080
    response = {
1✔
1081
        "AWSAccountId": composite_alarm.account_id,
1082
        "AlarmName": _alarm["AlarmName"],
1083
        "AlarmDescription": _alarm.get("AlarmDescription"),
1084
        "AlarmRule": _alarm.get("AlarmRule"),
1085
        "OldStateValue": old_state,
1086
        "NewStateValue": _alarm["StateValue"],
1087
        "NewStateReason": _alarm["StateReason"],
1088
        "StateChangeTime": _alarm["StateUpdatedTimestamp"],
1089
        # the long-name for 'region' should be used - as we don't have it, we use the short name
1090
        # which needs to be slightly changed to make snapshot tests work
1091
        "Region": composite_alarm.region.replace("-", " ").capitalize(),
1092
        "AlarmArn": _alarm["AlarmArn"],
1093
        "OKActions": _alarm.get("OKActions", []),
1094
        "AlarmActions": _alarm.get("AlarmActions", []),
1095
        "InsufficientDataActions": _alarm.get("InsufficientDataActions", []),
1096
    }
1097

1098
    triggering_children = [
1✔
1099
        {
1100
            "Arn": triggering_alarm.alarm.get("AlarmArn"),
1101
            "State": {
1102
                "Value": triggering_alarm.alarm["StateValue"],
1103
                "Timestamp": triggering_alarm.alarm["StateUpdatedTimestamp"],
1104
            },
1105
        }
1106
    ]
1107

1108
    response["TriggeringChildren"] = triggering_children
1✔
1109

1110
    return json.dumps(response, cls=JSONEncoder)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc