• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / fb695d20-3f14-49c6-99d2-b1b76030971b

03 Apr 2025 02:31PM UTC coverage: 86.835% (-0.02%) from 86.857%
fb695d20-3f14-49c6-99d2-b1b76030971b

push

circleci

web-flow
add VerifiedPermissions to the client types (#12474)

1 of 2 new or added lines in 1 file covered. (50.0%)

21 existing lines in 9 files now uncovered.

63550 of 73185 relevant lines covered (86.83%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.0
/localstack-core/localstack/utils/cloudwatch/cloudwatch_util.py
1
import logging
1✔
2
import time
1✔
3
from datetime import datetime, timezone
1✔
4
from itertools import islice
1✔
5
from typing import Optional, TypedDict
1✔
6

7
from werkzeug import Response as WerkzeugResponse
1✔
8

9
from localstack.aws.connect import connect_to
1✔
10
from localstack.utils.bootstrap import is_api_enabled
1✔
11
from localstack.utils.strings import to_str
1✔
12
from localstack.utils.time import now_utc
1✔
13

14
LOG = logging.getLogger(__name__)
1✔
15

16

17
# ---------------
18
# Lambda metrics
19
# ---------------
20
class SqsMetricBatchData(TypedDict, total=False):
1✔
21
    MetricName: str
1✔
22
    QueueName: str
1✔
23
    Value: Optional[int]
1✔
24
    Unit: Optional[str]
1✔
25

26

27
def dimension_lambda(kwargs):
1✔
28
    func_name = _func_name(kwargs)
1✔
29
    return [{"Name": "FunctionName", "Value": func_name}]
1✔
30

31

32
def publish_lambda_metric(
1✔
33
    metric, value, kwargs, account_id: Optional[str] = None, region_name: Optional[str] = None
34
):
35
    # publish metric only if CloudWatch service is available
36
    if not is_api_enabled("cloudwatch"):
1✔
37
        return
×
38
    cw_client = connect_to(aws_access_key_id=account_id, region_name=region_name).cloudwatch
1✔
39
    try:
1✔
40
        cw_client.put_metric_data(
1✔
41
            Namespace="AWS/Lambda",
42
            MetricData=[
43
                {
44
                    "MetricName": metric,
45
                    "Dimensions": dimension_lambda(kwargs),
46
                    "Timestamp": datetime.utcnow().replace(tzinfo=timezone.utc),
47
                    "Value": value,
48
                }
49
            ],
50
        )
51
    except Exception as e:
×
52
        LOG.info('Unable to put metric data for metric "%s" to CloudWatch: %s', metric, e)
×
53

54

55
def publish_sqs_metric_batch(
1✔
56
    account_id: str, region: str, sqs_metric_batch_data: list[SqsMetricBatchData]
57
):
58
    """
59
    Publishes SQS metrics to CloudWatch in a single batch using the namespace "AWS/SQS"
60
    See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-available-cloudwatch-metrics.html
61
    :param account_id The account id that should be used for CloudWatch
62
    :param region The region that should be used for CloudWatch
63
    :param sqs_metric_batch_data data to be published
64
    """
65
    if not is_api_enabled("cloudwatch"):
1✔
66
        return
×
67

68
    cw_client = connect_to(region_name=region, aws_access_key_id=account_id).cloudwatch
1✔
69
    metric_data = []
1✔
70
    timestamp = datetime.utcnow().replace(tzinfo=timezone.utc)
1✔
71
    # to be on the safe-side: check the size of the data again and only insert up to 1000 data metrics at once
72
    start = 0
1✔
73
    batch_size = 1000
1✔
74
    # can include up to 1000 metric queries for one put-metric-data call
75
    while start < len(sqs_metric_batch_data):
1✔
76
        # Process the current batch
77
        for d in islice(sqs_metric_batch_data, start, start + batch_size):
1✔
78
            metric_data.append(
1✔
79
                {
80
                    "MetricName": d.get("MetricName"),
81
                    "Dimensions": [{"Name": "QueueName", "Value": d.get("QueueName")}],
82
                    "Unit": d.get("Unit", "Count"),
83
                    "Timestamp": timestamp,
84
                    "Value": d.get("Value", 1),
85
                }
86
            )
87

88
        try:
1✔
89
            cw_client.put_metric_data(Namespace="AWS/SQS", MetricData=metric_data)
1✔
90
        except Exception as e:
×
91
            LOG.info("Unable to put metric data for metrics to CloudWatch: %s", e)
×
92

93
        # Update for the next batch
94
        metric_data.clear()
1✔
95
        start += batch_size
1✔
96

97

98
def publish_sqs_metric(
1✔
99
    account_id: str,
100
    region: str,
101
    queue_name: str,
102
    metric: str,
103
    value: float = 1,
104
    unit: str = "Count",
105
):
106
    """
107
    Publishes the metrics for SQS to CloudWatch using the namespace "AWS/SQS"
108
    See also: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-available-cloudwatch-metrics.html
109
    :param account_id The account id that should be used for CloudWatch
110
    :param region The region that should be used for CloudWatch
111
    :param queue_name The name of the queue
112
    :param metric The metric name to be used
113
    :param value The value of the metric data, default: 1
114
    :param unit The unit of the metric data, default: "Count"
115
    """
116
    if not is_api_enabled("cloudwatch"):
1✔
117
        return
×
118
    cw_client = connect_to(region_name=region, aws_access_key_id=account_id).cloudwatch
1✔
119
    try:
1✔
120
        cw_client.put_metric_data(
1✔
121
            Namespace="AWS/SQS",
122
            MetricData=[
123
                {
124
                    "MetricName": metric,
125
                    "Dimensions": [{"Name": "QueueName", "Value": queue_name}],
126
                    "Unit": unit,
127
                    "Timestamp": datetime.utcnow().replace(tzinfo=timezone.utc),
128
                    "Value": value,
129
                }
130
            ],
131
        )
132
    except Exception as e:
×
133
        LOG.info('Unable to put metric data for metric "%s" to CloudWatch: %s', metric, e)
×
134

135

136
def publish_lambda_duration(time_before, kwargs):
1✔
137
    time_after = now_utc()
×
138
    publish_lambda_metric("Duration", time_after - time_before, kwargs)
×
139

140

141
def publish_lambda_error(time_before, kwargs):
1✔
142
    publish_lambda_metric("Invocations", 1, kwargs)
×
143
    publish_lambda_metric("Errors", 1, kwargs)
×
144

145

146
def publish_lambda_result(time_before, result, kwargs):
1✔
147
    if isinstance(result, WerkzeugResponse) and result.status_code >= 400:
×
148
        return publish_lambda_error(time_before, kwargs)
×
149
    publish_lambda_metric("Invocations", 1, kwargs)
×
150

151

152
def store_cloudwatch_logs(
1✔
153
    logs_client,
154
    log_group_name,
155
    log_stream_name,
156
    log_output,
157
    start_time=None,
158
    auto_create_group: Optional[bool] = True,
159
):
160
    if not is_api_enabled("logs"):
1✔
161
        return
×
162
    start_time = start_time or int(time.time() * 1000)
1✔
163
    log_output = to_str(log_output)
1✔
164

165
    if auto_create_group:
1✔
166
        # make sure that the log group exists, create it if not
167
        try:
1✔
168
            logs_client.create_log_group(logGroupName=log_group_name)
1✔
169
        except Exception as e:
1✔
170
            if "ResourceAlreadyExistsException" in str(e):
1✔
171
                # the log group already exists, this is fine
172
                pass
1✔
173
            else:
174
                raise e
×
175

176
    # create a new log stream for this lambda invocation
177
    try:
1✔
178
        logs_client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
1✔
UNCOV
179
    except Exception:  # TODO: narrow down
×
UNCOV
180
        pass
×
181

182
    # store new log events under the log stream
183
    finish_time = int(time.time() * 1000)
1✔
184
    # fix for log lines that were merged into a singe line, e.g., "log line 1 ... \x1b[32mEND RequestId ..."
185
    log_output = log_output.replace("\\x1b", "\n\\x1b")
1✔
186
    log_output = log_output.replace("\x1b", "\n\x1b")
1✔
187
    log_lines = log_output.split("\n")
1✔
188
    time_diff_per_line = float(finish_time - start_time) / float(len(log_lines))
1✔
189
    log_events = []
1✔
190
    for i, line in enumerate(log_lines):
1✔
191
        if not line:
1✔
UNCOV
192
            continue
×
193
        # simple heuristic: assume log lines were emitted in regular intervals
194
        log_time = start_time + float(i) * time_diff_per_line
1✔
195
        event = {"timestamp": int(log_time), "message": line}
1✔
196
        log_events.append(event)
1✔
197
    if not log_events:
1✔
198
        return
×
199
    logs_client.put_log_events(
1✔
200
        logGroupName=log_group_name, logStreamName=log_stream_name, logEvents=log_events
201
    )
202

203

204
# ---------------
205
# Helper methods
206
# ---------------
207

208

209
def _func_name(kwargs):
1✔
210
    func_name = kwargs.get("func_name")
1✔
211
    if not func_name:
1✔
212
        func_name = kwargs.get("func_arn").split(":function:")[1].split(":")[0]
×
213
    return func_name
1✔
214

215

216
def publish_result(ns, time_before, result, kwargs):
1✔
217
    if ns == "lambda":
×
218
        publish_lambda_result(time_before, result, kwargs)
×
219
    else:
220
        LOG.info("Unexpected CloudWatch namespace: %s", ns)
×
221

222

223
def publish_error(ns, time_before, e, kwargs):
1✔
224
    if ns == "lambda":
×
225
        publish_lambda_error(time_before, kwargs)
×
226
    else:
227
        LOG.info("Unexpected CloudWatch namespace: %s", ns)
×
228

229

230
def cloudwatched(ns):
1✔
231
    """@cloudwatched(...) decorator for annotating methods to be monitored via CloudWatch"""
232

233
    def wrapping(func):
×
234
        def wrapped(*args, **kwargs):
×
235
            time_before = now_utc()
×
236
            try:
×
237
                result = func(*args, **kwargs)
×
238
                publish_result(ns, time_before, result, kwargs)
×
239
            except Exception as e:
×
240
                publish_error(ns, time_before, e, kwargs)
×
241
                raise e
×
242
            finally:
243
                # TODO
244
                # time_after = now_utc()
245
                pass
×
246
            return result
×
247

248
        return wrapped
×
249

250
    return wrapping
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc