• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / d166ec6d-e321-44a4-963c-9d9dd94aa89d

11 Mar 2025 05:46PM UTC coverage: 86.936% (+0.04%) from 86.901%
d166ec6d-e321-44a4-963c-9d9dd94aa89d

push

circleci

web-flow
chore: improve test snapshot (#12367)

62152 of 71492 relevant lines covered (86.94%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.45
/localstack-core/localstack/utils/analytics/usage.py
1
"""
2
[DEPRECATED] This module is deprecated in favor of `localstack.utils.analytics.metrics`.
3
"""
4

5
import datetime
1✔
6
import math
1✔
7
from collections import defaultdict
1✔
8
from itertools import count
1✔
9
from typing import Any
1✔
10

11
from localstack import config
1✔
12
from localstack.runtime import hooks
1✔
13
from localstack.utils.analytics import get_session_id
1✔
14
from localstack.utils.analytics.events import Event, EventMetadata
1✔
15
from localstack.utils.analytics.publisher import AnalyticsClientPublisher
1✔
16

17
# Counters have to register with the registry
18
collector_registry: dict[str, Any] = dict()
1✔
19

20

21
# TODO: introduce some base abstraction for the counters after gather some initial experience working with it
22
#  we could probably do intermediate aggregations over time to avoid unbounded counters for very long LS sessions
23
#  for now, we can recommend to use config.DISABLE_EVENTS=1
24

25

26
class UsageSetCounter:
1✔
27
    """
28
    [DEPRECATED] Use `localstack.utils.analytics.metrics.Counter` instead.
29
    Use this counter to count occurrences of unique values
30

31
    Example:
32
        my_feature_counter = UsageSetCounter("lambda:runtime")
33
        my_feature_counter.record("python3.7")
34
        my_feature_counter.record("nodejs16.x")
35
        my_feature_counter.record("nodejs16.x")
36
        my_feature_counter.aggregate() # returns {"python3.7": 1, "nodejs16.x": 2}
37
    """
38

39
    state: dict[str, int]
1✔
40
    _counter: dict[str, count]
1✔
41
    namespace: str
1✔
42

43
    def __init__(self, namespace: str):
1✔
44
        self.enabled = not config.DISABLE_EVENTS
1✔
45
        self.state = {}
1✔
46
        self._counter = defaultdict(lambda: count(1))
1✔
47
        self.namespace = namespace
1✔
48
        collector_registry[namespace] = self
1✔
49

50
    def record(self, value: str):
1✔
51
        if self.enabled:
1✔
52
            self.state[value] = next(self._counter[value])
1✔
53

54
    def aggregate(self) -> dict:
1✔
55
        return self.state
1✔
56

57

58
class UsageCounter:
1✔
59
    """
60
    [DEPRECATED] Use `localstack.utils.analytics.metrics.Counter` instead.
61
    Use this counter to count numeric values
62

63
    Example:
64
        my__counter = UsageCounter("lambda:somefeature")
65
        my_counter.increment()
66
        my_counter.increment()
67
        my_counter.aggregate()  # returns {"count": 2}
68
    """
69

70
    state: int
1✔
71
    namespace: str
1✔
72

73
    def __init__(self, namespace: str):
1✔
74
        self.enabled = not config.DISABLE_EVENTS
1✔
75
        self.state = 0
1✔
76
        self._counter = count(1)
1✔
77
        self.namespace = namespace
1✔
78
        collector_registry[namespace] = self
1✔
79

80
    def increment(self):
1✔
81
        # TODO: we should instead have different underlying datastructures to store the state, and have no-op operations
82
        #  when config.DISABLE_EVENTS is set
83
        if self.enabled:
1✔
84
            self.state = next(self._counter)
1✔
85

86
    def aggregate(self) -> dict:
1✔
87
        # TODO: should we just keep `count`? "sum" might need to be kept for historical data?
88
        return {"count": self.state, "sum": self.state}
1✔
89

90

91
class TimingStats:
1✔
92
    """
93
    Use this counter to measure numeric values and perform aggregations
94

95
    Available aggregations: min, max, sum, mean, median, count
96

97
    Example:
98
        my_feature_counter = TimingStats("lambda:somefeature", aggregations=["min", "max", "sum", "count"])
99
        my_feature_counter.record_value(512)
100
        my_feature_counter.record_value(256)
101
        my_feature_counter.aggregate()  # returns {"min": 256, "max": 512, "sum": 768, "count": 2}
102
    """
103

104
    state: list[int | float]
1✔
105
    namespace: str
1✔
106
    aggregations: list[str]
1✔
107

108
    def __init__(self, namespace: str, aggregations: list[str]):
1✔
109
        self.enabled = not config.DISABLE_EVENTS
×
110
        self.state = []
×
111
        self.namespace = namespace
×
112
        self.aggregations = aggregations
×
113
        collector_registry[namespace] = self
×
114

115
    def record_value(self, value: int | float):
1✔
116
        if self.enabled:
×
117
            self.state.append(value)
×
118

119
    def aggregate(self) -> dict:
1✔
120
        result = {}
×
121
        if self.state:
×
122
            for aggregation in self.aggregations:
×
123
                if aggregation == "sum":
×
124
                    result[aggregation] = sum(self.state)
×
125
                elif aggregation == "min":
×
126
                    result[aggregation] = min(self.state)
×
127
                elif aggregation == "max":
×
128
                    result[aggregation] = max(self.state)
×
129
                elif aggregation == "mean":
×
130
                    result[aggregation] = sum(self.state) / len(self.state)
×
131
                elif aggregation == "median":
×
132
                    median_index = math.floor(len(self.state) / 2)
×
133
                    result[aggregation] = sorted(self.state)[median_index]
×
134
                elif aggregation == "count":
×
135
                    result[aggregation] = len(self.state)
×
136
                else:
137
                    raise Exception(f"Unsupported aggregation: {aggregation}")
×
138
        return result
×
139

140

141
def aggregate() -> dict:
1✔
142
    aggregated_payload = {}
1✔
143
    for ns, collector in collector_registry.items():
1✔
144
        agg = collector.aggregate()
1✔
145
        if agg:
1✔
146
            aggregated_payload[ns] = agg
1✔
147
    return aggregated_payload
1✔
148

149

150
@hooks.on_infra_shutdown()
1✔
151
def aggregate_and_send():
1✔
152
    """
153
    Aggregates data from all registered usage trackers and immediately sends the aggregated result to the analytics service.
154
    """
155
    if config.DISABLE_EVENTS:
1✔
156
        return
×
157

158
    metadata = EventMetadata(
1✔
159
        session_id=get_session_id(),
160
        client_time=str(datetime.datetime.now()),
161
    )
162

163
    aggregated_payload = aggregate()
1✔
164

165
    if aggregated_payload:
1✔
166
        publisher = AnalyticsClientPublisher()
1✔
167
        publisher.publish(
1✔
168
            [Event(name="ls:usage_analytics", metadata=metadata, payload=aggregated_payload)]
169
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc