• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 2009fc4e-e31d-490f-88f3-dd98faad493a

14 Mar 2025 12:28PM UTC coverage: 86.958% (+0.03%) from 86.933%
2009fc4e-e31d-490f-88f3-dd98faad493a

push

circleci

web-flow
Docker Utils: Expose the build logs (#12376)

6 of 7 new or added lines in 2 files covered. (85.71%)

8 existing lines in 2 files now uncovered.

62320 of 71667 relevant lines covered (86.96%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.92
/localstack-core/localstack/utils/analytics/metrics.py
1
from __future__ import annotations
1✔
2

3
import datetime
1✔
4
import logging
1✔
5
import threading
1✔
6
from abc import ABC, abstractmethod
1✔
7
from collections import defaultdict
1✔
8
from typing import Dict, List, Optional, Tuple, Union, overload
1✔
9

10
from localstack import config
1✔
11
from localstack.runtime import hooks
1✔
12
from localstack.utils.analytics import get_session_id
1✔
13
from localstack.utils.analytics.events import Event, EventMetadata
1✔
14
from localstack.utils.analytics.publisher import AnalyticsClientPublisher
1✔
15

16
LOG = logging.getLogger(__name__)
1✔
17

18

19
class MetricRegistry:
1✔
20
    """
21
    A Singleton class responsible for managing all registered metrics.
22
    Provides methods for retrieving and collecting metrics.
23
    """
24

25
    _instance: "MetricRegistry" = None
1✔
26
    _mutex: threading.Lock = threading.Lock()
1✔
27

28
    def __new__(cls):
1✔
29
        # avoid locking if the instance already exist
30
        if cls._instance is None:
1✔
31
            with cls._mutex:
1✔
32
                # Prevents race conditions when multiple threads enter the first check simultaneously
33
                if cls._instance is None:
1✔
34
                    cls._instance = super().__new__(cls)
1✔
35
        return cls._instance
1✔
36

37
    def __init__(self):
1✔
38
        if not hasattr(self, "_registry"):
1✔
39
            self._registry = dict()
1✔
40

41
    @property
1✔
42
    def registry(self) -> Dict[str, "Metric"]:
1✔
43
        return self._registry
1✔
44

45
    def register(self, metric: Metric) -> None:
1✔
46
        """
47
        Registers a new metric.
48

49
        :param metric: The metric instance to register.
50
        :type metric: Metric
51
        :raises TypeError: If the provided metric is not an instance of `Metric`.
52
        :raises ValueError: If a metric with the same name already exists.
53
        """
54
        if not isinstance(metric, Metric):
1✔
55
            raise TypeError("Only subclasses of `Metric` can be registered.")
×
56

57
        if metric.name in self._registry:
1✔
58
            raise ValueError(f"Metric '{metric.name}' already exists.")
1✔
59

60
        self._registry[metric.name] = metric
1✔
61

62
    def collect(self) -> Dict[str, List[Dict[str, Union[str, int]]]]:
1✔
63
        """
64
        Collects all registered metrics.
65
        """
66
        return {
1✔
67
            "metrics": [
68
                metric
69
                for metric_instance in self._registry.values()
70
                for metric in metric_instance.collect()
71
            ]
72
        }
73

74

75
class Metric(ABC):
1✔
76
    """
77
    Base class for all metrics (e.g., Counter, Gauge).
78

79
    Each subclass must implement the `collect()` method.
80
    """
81

82
    _name: str
1✔
83

84
    def __init__(self, name: str):
1✔
85
        if not name or name.strip() == "":
1✔
86
            raise ValueError("Metric name must be non-empty string.")
1✔
87

88
        self._name = name
1✔
89

90
    @property
1✔
91
    def name(self) -> str:
1✔
92
        return self._name
1✔
93

94
    @abstractmethod
1✔
95
    def collect(self) -> List[Dict[str, Union[str, int]]]:
1✔
96
        """
97
        Collects and returns metric data. Subclasses must implement this to return collected metric data.
98
        """
99
        pass
×
100

101

102
class BaseCounter:
1✔
103
    """
104
    A thread-safe counter for any kind of tracking.
105
    This class should not be instantiated directly, use the Counter class instead.
106
    """
107

108
    _mutex: threading.Lock
1✔
109
    _count: int
1✔
110

111
    def __init__(self):
1✔
112
        super(BaseCounter, self).__init__()
1✔
113
        self._mutex = threading.Lock()
1✔
114
        self._count = 0
1✔
115

116
    @property
1✔
117
    def count(self) -> int:
1✔
118
        return self._count
1✔
119

120
    def increment(self, value: int = 1) -> None:
1✔
121
        """Increments the counter unless events are disabled."""
122
        if config.DISABLE_EVENTS:
1✔
123
            return
1✔
124

125
        if value <= 0:
1✔
UNCOV
126
            raise ValueError("Increment value must be positive.")
×
127

128
        with self._mutex:
1✔
129
            self._count += value
1✔
130

131
    def reset(self) -> None:
1✔
132
        """Resets the counter to zero unless events are disabled."""
133
        if config.DISABLE_EVENTS:
1✔
UNCOV
134
            return
×
135

136
        with self._mutex:
1✔
137
            self._count = 0
1✔
138

139

140
class CounterMetric(Metric, BaseCounter):
1✔
141
    """
142
    A thread-safe counter for tracking occurrences of an event without labels.
143
    This class should not be instantiated directly, use the Counter class instead.
144
    """
145

146
    _namespace: Optional[str]
1✔
147
    _type: str
1✔
148

149
    def __init__(self, name: str, namespace: Optional[str] = ""):
1✔
150
        Metric.__init__(self, name=name)
1✔
151
        BaseCounter.__init__(self)
1✔
152

153
        self._namespace = namespace.strip() if namespace else ""
1✔
154
        self._type = "counter"
1✔
155
        MetricRegistry().register(self)
1✔
156

157
    def collect(self) -> List[Dict[str, Union[str, int]]]:
1✔
158
        """Collects the metric unless events are disabled."""
159
        if config.DISABLE_EVENTS:
1✔
160
            return list()
1✔
161

162
        if self._count == 0:
1✔
163
            # Return an empty list if the count is 0, as there are no metrics to send to the analytics backend.
164
            return list()
1✔
165
        return [
1✔
166
            {
167
                "namespace": self._namespace,
168
                "name": self.name,
169
                "value": self._count,
170
                "type": self._type,
171
            }
172
        ]
173

174

175
class LabeledCounterMetric(Metric):
1✔
176
    """
177
    A labeled counter that tracks occurrences of an event across different label combinations.
178
    This class should not be instantiated directly, use the Counter class instead.
179
    """
180

181
    _namespace: Optional[str]
1✔
182
    _type: str
1✔
183
    _unit: str
1✔
184
    _labels: list[str]
1✔
185
    _label_values: Tuple[Optional[Union[str, float]], ...]
1✔
186
    _counters_by_label_values: defaultdict[Tuple[Optional[Union[str, float]], ...], BaseCounter]
1✔
187

188
    def __init__(self, name: str, labels: List[str], namespace: Optional[str] = ""):
1✔
189
        super(LabeledCounterMetric, self).__init__(name=name)
1✔
190

191
        if not labels:
1✔
192
            raise ValueError("At least one label is required; the labels list cannot be empty.")
1✔
193

194
        if any(not label for label in labels):
1✔
UNCOV
195
            raise ValueError("Labels must be non-empty strings.")
×
196

197
        if len(labels) > 8:
1✔
198
            raise ValueError("A maximum of 8 labels are allowed.")
1✔
199

200
        self._namespace = namespace.strip() if namespace else ""
1✔
201
        self._type = "counter"
1✔
202
        self._labels = labels
1✔
203
        self._counters_by_label_values = defaultdict(BaseCounter)
1✔
204
        MetricRegistry().register(self)
1✔
205

206
    def labels(self, **kwargs: Union[str, float, None]) -> BaseCounter:
1✔
207
        """
208
        Create a scoped counter instance with specific label values.
209

210
        This method assigns values to the predefined labels of a labeled counter and returns
211
        a BaseCounter object that allows tracking metrics for that specific
212
        combination of label values.
213

214
        :raises ValueError:
215
            - If the set of keys provided labels does not match the expected set of labels.
216
        """
217
        if set(self._labels) != set(kwargs.keys()):
1✔
218
            raise ValueError(f"Expected labels {self._labels}, got {list(kwargs.keys())}")
1✔
219

220
        _label_values = tuple(kwargs[label] for label in self._labels)
1✔
221

222
        return self._counters_by_label_values[_label_values]
1✔
223

224
    def _as_list(self) -> List[Dict[str, Union[str, int]]]:
1✔
225
        num_labels = len(self._labels)
1✔
226

227
        static_key_label_value = [f"label_{i + 1}_value" for i in range(num_labels)]
1✔
228
        static_key_label = [f"label_{i + 1}" for i in range(num_labels)]
1✔
229

230
        collected_metrics = []
1✔
231

232
        for label_values, counter in self._counters_by_label_values.items():
1✔
233
            if counter.count == 0:
1✔
234
                continue  # Skip items with a count of 0, as they should not be sent to the analytics backend.
1✔
235

236
            if len(label_values) != num_labels:
1✔
UNCOV
237
                raise ValueError(
×
238
                    f"Label count mismatch: expected {num_labels} labels {self._labels}, "
239
                    f"but got {len(label_values)} values {label_values}."
240
                )
241

242
            collected_metrics.append(
1✔
243
                {
244
                    "namespace": self._namespace,
245
                    "name": self.name,
246
                    "value": counter.count,
247
                    "type": self._type,
248
                    **dict(zip(static_key_label_value, label_values)),
249
                    **dict(zip(static_key_label, self._labels)),
250
                }
251
            )
252

253
        return collected_metrics
1✔
254

255
    def collect(self) -> List[Dict[str, Union[str, int]]]:
1✔
256
        if config.DISABLE_EVENTS:
1✔
257
            return list()
1✔
258
        return self._as_list()
1✔
259

260

261
class Counter:
1✔
262
    """
263
    A factory class for creating counter instances.
264

265
    This class provides a flexible way to create either a simple counter
266
    (`CounterMetric`) or a labeled counter (`LabeledCounterMetric`) based on
267
    whether labels are provided.
268
    """
269

270
    @overload
1✔
271
    def __new__(cls, name: str, namespace: Optional[str] = "") -> CounterMetric:
1✔
UNCOV
272
        return CounterMetric(namespace=namespace, name=name)
×
273

274
    @overload
1✔
275
    def __new__(
1✔
276
        cls, name: str, labels: List[str], namespace: Optional[str] = ""
277
    ) -> LabeledCounterMetric:
UNCOV
278
        return LabeledCounterMetric(namespace=namespace, name=name, labels=labels)
×
279

280
    def __new__(
1✔
281
        cls, name: str, namespace: Optional[str] = "", labels: Optional[List[str]] = None
282
    ) -> Union[CounterMetric, LabeledCounterMetric]:
283
        if labels is not None:
1✔
284
            return LabeledCounterMetric(namespace=namespace, name=name, labels=labels)
1✔
285
        return CounterMetric(namespace=namespace, name=name)
1✔
286

287

288
@hooks.on_infra_shutdown()
1✔
289
def publish_metrics() -> None:
1✔
290
    """
291
    Collects all the registered metrics and immediately sends them to the analytics service.
292
    Skips execution if event tracking is disabled (`config.DISABLE_EVENTS`).
293

294
    This function is automatically triggered on infrastructure shutdown.
295
    """
296
    if config.DISABLE_EVENTS:
1✔
UNCOV
297
        return
×
298

299
    collected_metrics = MetricRegistry().collect()
1✔
300
    if not collected_metrics["metrics"]:  # Skip publishing if no metrics remain after filtering
1✔
301
        return
1✔
302

303
    metadata = EventMetadata(
1✔
304
        session_id=get_session_id(),
305
        client_time=str(datetime.datetime.now()),
306
    )
307

308
    if collected_metrics:
1✔
309
        publisher = AnalyticsClientPublisher()
1✔
310
        publisher.publish([Event(name="ls_metrics", metadata=metadata, payload=collected_metrics)])
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc