• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 16820655284

07 Aug 2025 05:03PM UTC coverage: 86.841% (-0.05%) from 86.892%
16820655284

push

github

web-flow
CFNV2: support CDK bootstrap and deployment (#12967)

32 of 38 new or added lines in 5 files covered. (84.21%)

2013 existing lines in 125 files now uncovered.

66606 of 76699 relevant lines covered (86.84%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.77
/localstack-core/localstack/services/cloudwatch/cloudwatch_database_helper.py
1
import logging
1✔
2
import os
1✔
3
import sqlite3
1✔
4
import threading
1✔
5
from datetime import UTC, datetime
1✔
6

7
from localstack import config
1✔
8
from localstack.aws.api.cloudwatch import MetricData, MetricDataQuery, ScanBy
1✔
9
from localstack.utils.files import mkdir
1✔
10

11
LOG = logging.getLogger(__name__)
1✔
12

13
STAT_TO_SQLITE_AGGREGATION_FUNC = {
1✔
14
    "Sum": "SUM(value)",
15
    "Average": "SUM(value)",  # we need to calculate the avg manually as we have also a table with aggregated data
16
    "Minimum": "MIN(value)",
17
    "Maximum": "MAX(value)",
18
    "SampleCount": "Sum(count)",
19
}
20

21
STAT_TO_SQLITE_COL_NAME_HELPER = {
1✔
22
    "Sum": "sum",
23
    "Average": "sum",
24
    "Minimum": "min",
25
    "Maximum": "max",
26
    "SampleCount": "sample_count",
27
}
28

29

30
class CloudwatchDatabase:
1✔
31
    DB_NAME = "metrics.db"
1✔
32
    CLOUDWATCH_DATA_ROOT: str = os.path.join(config.dirs.data, "cloudwatch")
1✔
33
    METRICS_DB: str = os.path.join(CLOUDWATCH_DATA_ROOT, DB_NAME)
1✔
34
    METRICS_DB_READ_ONLY: str = f"file:{METRICS_DB}?mode=ro"
1✔
35
    TABLE_SINGLE_METRICS = "SINGLE_METRICS"
1✔
36
    TABLE_AGGREGATED_METRICS = "AGGREGATED_METRICS"
1✔
37
    DATABASE_LOCK: threading.RLock
1✔
38

39
    def __init__(self):
1✔
40
        self.DATABASE_LOCK = threading.RLock()
1✔
41
        if os.path.exists(self.METRICS_DB):
1✔
UNCOV
42
            LOG.debug("database for metrics already exists (%s)", self.METRICS_DB)
×
43
            return
×
44

45
        mkdir(self.CLOUDWATCH_DATA_ROOT)
1✔
46
        with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB) as conn:
1✔
47
            cur = conn.cursor()
1✔
48
            common_columns = """
1✔
49
                    "id"                        INTEGER,
50
                    "account_id"                TEXT,
51
                    "region"                    TEXT,
52
                    "metric_name"                TEXT,
53
                    "namespace"                 TEXT,
54
                    "timestamp"                    NUMERIC,
55
                    "dimensions"                TEXT,
56
                    "unit"                        TEXT,
57
                    "storage_resolution"        INTEGER
58
                """
59
            cur.execute(
1✔
60
                f"""
61
                CREATE TABLE "{self.TABLE_SINGLE_METRICS}" (
62
                    {common_columns},
63
                    "value"                        NUMERIC,
64
                    PRIMARY KEY("id")
65
                );
66
                """
67
            )
68

69
            cur.execute(
1✔
70
                f"""
71
                CREATE TABLE "{self.TABLE_AGGREGATED_METRICS}" (
72
                    {common_columns},
73
                    "sample_count"          NUMERIC,
74
                    "sum"                        NUMERIC,
75
                    "min"                        NUMERIC,
76
                    "max"                        NUMERIC,
77
                    PRIMARY KEY("id")
78
                );
79
                """
80
            )
81
            # create indexes
82
            cur.executescript(
1✔
83
                """
84
                CREATE INDEX idx_single_metrics_comp ON SINGLE_METRICS (metric_name, namespace);
85
                CREATE INDEX idx_aggregated_metrics_comp ON AGGREGATED_METRICS (metric_name, namespace);
86
                """
87
            )
88
            conn.commit()
1✔
89

90
    def add_metric_data(
1✔
91
        self, account_id: str, region: str, namespace: str, metric_data: MetricData
92
    ):
93
        def _get_current_unix_timestamp_utc():
1✔
94
            now = datetime.utcnow().replace(tzinfo=UTC)
1✔
95
            return int(now.timestamp())
1✔
96

97
        for metric in metric_data:
1✔
98
            unix_timestamp = (
1✔
99
                self._convert_timestamp_to_unix(metric.get("Timestamp"))
100
                if metric.get("Timestamp")
101
                else _get_current_unix_timestamp_utc()
102
            )
103

104
            inserts = []
1✔
105
            if metric.get("Value") is not None:
1✔
106
                inserts.append({"Value": metric.get("Value"), "TimesToInsert": 1})
1✔
107
            elif metric.get("Values"):
1✔
108
                counts = metric.get("Counts", [1] * len(metric.get("Values")))
1✔
109
                inserts = [
1✔
110
                    {"Value": value, "TimesToInsert": int(counts[indexValue])}
111
                    for indexValue, value in enumerate(metric.get("Values"))
112
                ]
113
            all_data = []
1✔
114
            for insert in inserts:
1✔
115
                times_to_insert = insert.get("TimesToInsert")
1✔
116

117
                data = (
1✔
118
                    account_id,
119
                    region,
120
                    metric.get("MetricName"),
121
                    namespace,
122
                    unix_timestamp,
123
                    self._get_ordered_dimensions_with_separator(metric.get("Dimensions")),
124
                    metric.get("Unit"),
125
                    metric.get("StorageResolution"),
126
                    insert.get("Value"),
127
                )
128
                all_data.extend([data] * times_to_insert)
1✔
129

130
            if all_data:
1✔
131
                with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB) as conn:
1✔
132
                    cur = conn.cursor()
1✔
133
                    query = f"INSERT INTO {self.TABLE_SINGLE_METRICS} (account_id, region, metric_name, namespace, timestamp, dimensions, unit, storage_resolution, value) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
1✔
134
                    cur.executemany(query, all_data)
1✔
135
                    conn.commit()
1✔
136

137
            if statistic_values := metric.get("StatisticValues"):
1✔
138
                with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB) as conn:
1✔
139
                    cur = conn.cursor()
1✔
140
                    cur.execute(
1✔
141
                        f"""INSERT INTO {self.TABLE_AGGREGATED_METRICS}
142
                    ("account_id", "region", "metric_name", "namespace", "timestamp", "dimensions", "unit", "storage_resolution", "sample_count", "sum", "min", "max")
143
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
144
                        (
145
                            account_id,
146
                            region,
147
                            metric.get("MetricName"),
148
                            namespace,
149
                            unix_timestamp,
150
                            self._get_ordered_dimensions_with_separator(metric.get("Dimensions")),
151
                            metric.get("Unit"),
152
                            metric.get("StorageResolution"),
153
                            statistic_values.get("SampleCount"),
154
                            statistic_values.get("Sum"),
155
                            statistic_values.get("Minimum"),
156
                            statistic_values.get("Maximum"),
157
                        ),
158
                    )
159

160
                    conn.commit()
1✔
161

162
    def get_units_for_metric_data_stat(
1✔
163
        self,
164
        account_id: str,
165
        region: str,
166
        start_time: datetime,
167
        end_time: datetime,
168
        metric_name: str,
169
        namespace: str,
170
    ):
171
        # prepare SQL query
172
        start_time_unix = self._convert_timestamp_to_unix(start_time)
1✔
173
        end_time_unix = self._convert_timestamp_to_unix(end_time)
1✔
174

175
        data = (
1✔
176
            account_id,
177
            region,
178
            namespace,
179
            metric_name,
180
            start_time_unix,
181
            end_time_unix,
182
        )
183

184
        sql_query = f"""
1✔
185
        SELECT GROUP_CONCAT(unit) AS unit_values
186
        FROM(
187
            SELECT
188
                DISTINCT COALESCE(unit, 'NULL_VALUE') AS unit
189
            FROM (
190
                SELECT
191
                account_id, region, metric_name, namespace, timestamp, unit
192
                FROM {self.TABLE_SINGLE_METRICS}
193
                UNION ALL
194
                SELECT
195
                account_id, region, metric_name, namespace, timestamp, unit
196
                FROM {self.TABLE_AGGREGATED_METRICS}
197
            ) AS combined
198
            WHERE account_id = ? AND region = ?
199
            AND namespace = ? AND metric_name = ?
200
            AND timestamp >= ? AND timestamp < ?
201
        ) AS subquery
202
        """
203
        with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB_READ_ONLY, uri=True) as conn:
1✔
204
            cur = conn.cursor()
1✔
205
            cur.execute(
1✔
206
                sql_query,
207
                data,
208
            )
209
            result_row = cur.fetchone()
1✔
210
            return result_row[0].split(",") if result_row[0] else ["NULL_VALUE"]
1✔
211

212
    def get_metric_data_stat(
1✔
213
        self,
214
        account_id: str,
215
        region: str,
216
        query: MetricDataQuery,
217
        start_time: datetime,
218
        end_time: datetime,
219
        scan_by: str,
220
    ) -> dict[str, list]:
221
        metric_stat = query.get("MetricStat")
1✔
222
        metric = metric_stat.get("Metric")
1✔
223
        period = metric_stat.get("Period")
1✔
224
        stat = metric_stat.get("Stat")
1✔
225
        dimensions = metric.get("Dimensions", [])
1✔
226
        unit = metric_stat.get("Unit")
1✔
227

228
        # prepare SQL query
229
        start_time_unix = self._convert_timestamp_to_unix(start_time)
1✔
230
        end_time_unix = self._convert_timestamp_to_unix(end_time)
1✔
231

232
        data = (
1✔
233
            account_id,
234
            region,
235
            metric.get("Namespace"),
236
            metric.get("MetricName"),
237
        )
238

239
        dimension_filter = "AND dimensions is null " if not dimensions else "AND dimensions LIKE ? "
1✔
240
        if dimensions:
1✔
241
            data = data + (
1✔
242
                self._get_ordered_dimensions_with_separator(dimensions, for_search=True),
243
            )
244

245
        unit_filter = ""
1✔
246
        if unit:
1✔
247
            if unit == "NULL_VALUE":
1✔
248
                unit_filter = "AND unit IS NULL"
1✔
249
            else:
250
                unit_filter = "AND unit = ? "
1✔
251
                data += (unit,)
1✔
252

253
        sql_query = f"""
1✔
254
        SELECT
255
            {STAT_TO_SQLITE_AGGREGATION_FUNC[stat]},
256
            SUM(count)
257
        FROM (
258
            SELECT
259
            value, 1 as count,
260
            account_id, region, metric_name, namespace, timestamp, dimensions, unit, storage_resolution
261
            FROM {self.TABLE_SINGLE_METRICS}
262
            UNION ALL
263
            SELECT
264
            {STAT_TO_SQLITE_COL_NAME_HELPER[stat]} as value, sample_count as count,
265
            account_id, region, metric_name, namespace, timestamp, dimensions, unit, storage_resolution
266
            FROM {self.TABLE_AGGREGATED_METRICS}
267
        ) AS combined
268
        WHERE account_id = ? AND region = ?
269
        AND namespace = ? AND metric_name = ?
270
        {dimension_filter}
271
        {unit_filter}
272
        AND timestamp >= ? AND timestamp < ?
273
        ORDER BY timestamp ASC
274
        """
275

276
        timestamps = []
1✔
277
        values = []
1✔
278
        query_params = []
1✔
279

280
        # Prepare all the query parameters
281
        while start_time_unix < end_time_unix:
1✔
282
            next_start_time = start_time_unix + period
1✔
283
            query_params.append(data + (start_time_unix, next_start_time))
1✔
284
            start_time_unix = next_start_time
1✔
285

286
        all_results = []
1✔
287
        with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB_READ_ONLY, uri=True) as conn:
1✔
288
            cur = conn.cursor()
1✔
289
            batch_size = 500
1✔
290
            for i in range(0, len(query_params), batch_size):
1✔
291
                batch = query_params[i : i + batch_size]
1✔
292
                cur.execute(
1✔
293
                    f"""
294
                            SELECT * FROM (
295
                                {" UNION ALL ".join(["SELECT * FROM (" + sql_query + ")"] * len(batch))}
296
                            )
297
                        """,
298
                    sum(batch, ()),  # flatten the list of tuples in batch into a single tuple
299
                )
300
                all_results.extend(cur.fetchall())
1✔
301

302
        # Process results outside the lock
303
        for i, result_row in enumerate(all_results):
1✔
304
            if result_row[1]:
1✔
305
                calculated_result = (
1✔
306
                    result_row[0] / result_row[1] if stat == "Average" else result_row[0]
307
                )
308
                timestamps.append(query_params[i][-2])  # start_time_unix
1✔
309
                values.append(calculated_result)
1✔
310

311
        # The while loop while always give us the timestamps in ascending order as we start with the start_time
312
        # and increase it by the period until we reach the end_time
313
        # If we want the timestamps in descending order we need to reverse the list
314
        if scan_by is None or scan_by == ScanBy.TimestampDescending:
1✔
315
            timestamps = timestamps[::-1]
1✔
316
            values = values[::-1]
1✔
317

318
        return {
1✔
319
            "timestamps": timestamps,
320
            "values": values,
321
        }
322

323
    def list_metrics(
1✔
324
        self,
325
        account_id: str,
326
        region: str,
327
        namespace: str,
328
        metric_name: str,
329
        dimensions: list[dict[str, str]],
330
    ) -> dict:
331
        data = (account_id, region)
1✔
332

333
        namespace_filter = ""
1✔
334
        if namespace:
1✔
335
            namespace_filter = " AND namespace = ?"
1✔
336
            data = data + (namespace,)
1✔
337

338
        metric_name_filter = ""
1✔
339
        if metric_name:
1✔
340
            metric_name_filter = " AND metric_name = ?"
1✔
341
            data = data + (metric_name,)
1✔
342

343
        dimension_filter = "" if not dimensions else " AND dimensions LIKE ? "
1✔
344
        if dimensions:
1✔
345
            data = data + (
1✔
346
                self._get_ordered_dimensions_with_separator(dimensions, for_search=True),
347
            )
348

349
        query = f"""
1✔
350
            SELECT DISTINCT metric_name, namespace, dimensions
351
            FROM (
352
                SELECT metric_name, namespace, dimensions, account_id, region, timestamp
353
                FROM SINGLE_METRICS
354
                UNION
355
                SELECT metric_name, namespace, dimensions, account_id, region, timestamp
356
                FROM AGGREGATED_METRICS
357
            ) AS combined
358
            WHERE account_id = ? AND region = ?
359
            {namespace_filter}
360
            {metric_name_filter}
361
            {dimension_filter}
362
            ORDER BY timestamp DESC
363
        """
364
        with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB_READ_ONLY, uri=True) as conn:
1✔
365
            cur = conn.cursor()
1✔
366

367
            cur.execute(
1✔
368
                query,
369
                data,
370
            )
371
            metrics_result = [
1✔
372
                {
373
                    "metric_name": r[0],
374
                    "namespace": r[1],
375
                    "dimensions": self._restore_dimensions_from_string(r[2]),
376
                }
377
                for r in cur.fetchall()
378
            ]
379

380
            return {"metrics": metrics_result}
1✔
381

382
    def clear_tables(self):
1✔
UNCOV
383
        with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB) as conn:
×
384
            cur = conn.cursor()
×
385
            cur.execute(f"DELETE FROM {self.TABLE_SINGLE_METRICS}")
×
386
            cur.execute(f"DELETE FROM {self.TABLE_AGGREGATED_METRICS}")
×
387
            conn.commit()
×
388
            cur.execute("VACUUM")
×
389
            conn.commit()
×
390

391
    def _get_ordered_dimensions_with_separator(self, dims: list[dict] | None, for_search=False):
1✔
392
        """
393
        Returns a string with the dimensions in the format "Name=Value\tName=Value\tName=Value" in order to store the metric
394
        with the dimensions in a single column in the database
395

396
        :param dims: List of dimensions in the format [{"Name": "name", "Value": "value"}, ...]
397
        :param for_search: If True, the dimensions will be formatted in a way that can be used in a LIKE query to search. Default is False. Example: " %{Name}={Value}% "
398
        :return: String with the dimensions in the format "Name=Value\tName=Value\tName=Value"
399
        """
400
        if not dims:
1✔
401
            return None
1✔
402
        dims.sort(key=lambda d: d["Name"])
1✔
403
        dimensions = ""
1✔
404
        if not for_search:
1✔
405
            for d in dims:
1✔
406
                dimensions += f"{d['Name']}={d['Value']}\t"  # aws does not allow ascii control characters, we can use it a sa separator
1✔
407
        else:
408
            for d in dims:
1✔
409
                dimensions += f"%{d.get('Name')}={d.get('Value', '')}%"
1✔
410

411
        return dimensions
1✔
412

413
    def _restore_dimensions_from_string(self, dimensions: str):
1✔
414
        if not dimensions:
1✔
415
            return None
1✔
416
        dims = []
1✔
417
        for d in dimensions.split("\t"):
1✔
418
            if not d:
1✔
419
                continue
1✔
420
            name, value = d.split("=")
1✔
421
            dims.append({"Name": name, "Value": value})
1✔
422

423
        return dims
1✔
424

425
    def _convert_timestamp_to_unix(
1✔
426
        self, timestamp: datetime
427
    ):  # TODO verify if this is the standard format, might need to convert
428
        return int(timestamp.timestamp())
1✔
429

430
    def get_all_metric_data(self):
1✔
431
        with self.DATABASE_LOCK, sqlite3.connect(self.METRICS_DB_READ_ONLY, uri=True) as conn:
1✔
432
            cur = conn.cursor()
1✔
433
            """ shape for each data entry:
1✔
434
                    {
435
                        "ns": r.namespace,
436
                        "n": r.name,
437
                        "v": r.value,
438
                        "t": r.timestamp,
439
                        "d": [{"n": d.name, "v": d.value} for d in r.dimensions],
440
                        "account": account-id, # new for v2
441
                        "region": region_name, # new for v2
442
                    }
443
                    """
444
            query = f"SELECT namespace, metric_name, value, timestamp, dimensions, account_id, region from {self.TABLE_SINGLE_METRICS}"
1✔
445
            cur.execute(query)
1✔
446
            metrics_result = [
1✔
447
                {
448
                    "ns": r[0],
449
                    "n": r[1],
450
                    "v": r[2],
451
                    "t": r[3],
452
                    "d": r[4],
453
                    "account": r[5],
454
                    "region": r[6],
455
                }
456
                for r in cur.fetchall()
457
            ]
458
            # TODO add aggregated metrics (was not handled by v1 either)
459
            return metrics_result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc