• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 3d7e8107-7269-48e8-a3dd-0ccb8cee5b40

28 Feb 2025 06:48PM UTC coverage: 86.906% (+0.04%) from 86.863%
3d7e8107-7269-48e8-a3dd-0ccb8cee5b40

push

circleci

web-flow
Fix vulnerabilities in lambda runtime init (#12316)

1 of 1 new or added line in 1 file covered. (100.0%)

20 existing lines in 6 files now uncovered.

61891 of 71216 relevant lines covered (86.91%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.89
/localstack-core/localstack/utils/analytics/usage.py
1
"""
2
[DEPRECATED] This module is deprecated in favor of `localstack.utils.analytics.metrics`.
3
"""
4

5
import datetime
1✔
6
import math
1✔
7
from collections import defaultdict
1✔
8
from itertools import count
1✔
9
from typing import Any
1✔
10

11
from localstack import config
1✔
12
from localstack.runtime import hooks
1✔
13
from localstack.utils.analytics import get_session_id
1✔
14
from localstack.utils.analytics.events import Event, EventMetadata
1✔
15
from localstack.utils.analytics.publisher import AnalyticsClientPublisher
1✔
16

17
# Counters have to register with the registry
18
collector_registry: dict[str, Any] = dict()
1✔
19

20
# TODO: introduce some base abstraction for the counters after gather some initial experience working with it
21
#  we could probably do intermediate aggregations over time to avoid unbounded counters for very long LS sessions
22
#  for now, we can recommend to use config.DISABLE_EVENTS=1
23

24

25
class UsageSetCounter:
1✔
26
    """
27
    [DEPRECATED] Use `localstack.utils.analytics.metrics.Counter` instead.
28
    Use this counter to count occurrences of unique values
29

30
    Example:
31
        my_feature_counter = UsageSetCounter("lambda:runtime")
32
        my_feature_counter.record("python3.7")
33
        my_feature_counter.record("nodejs16.x")
34
        my_feature_counter.record("nodejs16.x")
35
        my_feature_counter.aggregate() # returns {"python3.7": 1, "nodejs16.x": 2}
36
    """
37

38
    state: dict[str, int]
1✔
39
    _counter: dict[str, count]
1✔
40
    namespace: str
1✔
41

42
    def __init__(self, namespace: str):
1✔
43
        self.enabled = not config.DISABLE_EVENTS
1✔
44
        self.state = {}
1✔
45
        self._counter = defaultdict(lambda: count(1))
1✔
46
        self.namespace = namespace
1✔
47
        collector_registry[namespace] = self
1✔
48

49
    def record(self, value: str):
1✔
50
        if self.enabled:
1✔
51
            self.state[value] = next(self._counter[value])
1✔
52

53
    def aggregate(self) -> dict:
1✔
54
        return self.state
1✔
55

56

57
class UsageCounter:
1✔
58
    """
59
    [DEPRECATED] Use `localstack.utils.analytics.metrics.Counter` instead.
60
    Use this counter to count numeric values
61

62
    Example:
63
        my__counter = UsageCounter("lambda:somefeature")
64
        my_counter.increment()
65
        my_counter.increment()
66
        my_counter.aggregate()  # returns {"count": 2}
67
    """
68

69
    state: int
1✔
70
    namespace: str
1✔
71

72
    def __init__(self, namespace: str):
1✔
73
        self.enabled = not config.DISABLE_EVENTS
1✔
74
        self.state = 0
1✔
75
        self._counter = count(1)
1✔
76
        self.namespace = namespace
1✔
77
        collector_registry[namespace] = self
1✔
78

79
    def increment(self):
1✔
80
        # TODO: we should instead have different underlying datastructures to store the state, and have no-op operations
81
        #  when config.DISABLE_EVENTS is set
82
        if self.enabled:
1✔
83
            self.state = next(self._counter)
1✔
84

85
    def aggregate(self) -> dict:
1✔
86
        # TODO: should we just keep `count`? "sum" might need to be kept for historical data?
87
        return {"count": self.state, "sum": self.state}
1✔
88

89

90
class TimingStats:
1✔
91
    """
92
    Use this counter to measure numeric values and perform aggregations
93

94
    Available aggregations: min, max, sum, mean, median, count
95

96
    Example:
97
        my_feature_counter = TimingStats("lambda:somefeature", aggregations=["min", "max", "sum", "count"])
98
        my_feature_counter.record_value(512)
99
        my_feature_counter.record_value(256)
100
        my_feature_counter.aggregate()  # returns {"min": 256, "max": 512, "sum": 768, "count": 2}
101
    """
102

103
    state: list[int | float]
1✔
104
    namespace: str
1✔
105
    aggregations: list[str]
1✔
106

107
    def __init__(self, namespace: str, aggregations: list[str]):
1✔
UNCOV
108
        self.enabled = not config.DISABLE_EVENTS
×
109
        self.state = []
×
110
        self.namespace = namespace
×
UNCOV
111
        self.aggregations = aggregations
×
UNCOV
112
        collector_registry[namespace] = self
×
113

114
    def record_value(self, value: int | float):
1✔
115
        if self.enabled:
×
116
            self.state.append(value)
×
117

118
    def aggregate(self) -> dict:
1✔
119
        result = {}
×
120
        if self.state:
×
121
            for aggregation in self.aggregations:
×
122
                match aggregation:
×
123
                    case "sum":
×
124
                        result[aggregation] = sum(self.state)
×
125
                    case "min":
×
126
                        result[aggregation] = min(self.state)
×
127
                    case "max":
×
128
                        result[aggregation] = max(self.state)
×
129
                    case "mean":
×
130
                        result[aggregation] = sum(self.state) / len(self.state)
×
131
                    case "median":
×
132
                        median_index = math.floor(len(self.state) / 2)
×
UNCOV
133
                        result[aggregation] = sorted(self.state)[median_index]
×
UNCOV
134
                    case "count":
×
UNCOV
135
                        result[aggregation] = len(self.state)
×
UNCOV
136
                    case _:
×
UNCOV
137
                        raise Exception(f"Unsupported aggregation: {aggregation}")
×
UNCOV
138
        return result
×
139

140

141
def aggregate() -> dict:
1✔
142
    aggregated_payload = {}
1✔
143
    for ns, collector in collector_registry.items():
1✔
144
        agg = collector.aggregate()
1✔
145
        if agg:
1✔
146
            aggregated_payload[ns] = agg
1✔
147
    return aggregated_payload
1✔
148

149

150
@hooks.on_infra_shutdown()
1✔
151
def aggregate_and_send():
1✔
152
    """
153
    Aggregates data from all registered usage trackers and immediately sends the aggregated result to the analytics service.
154
    """
155
    if config.DISABLE_EVENTS:
1✔
UNCOV
156
        return
×
157

158
    metadata = EventMetadata(
1✔
159
        session_id=get_session_id(),
160
        client_time=str(datetime.datetime.now()),
161
    )
162

163
    aggregated_payload = aggregate()
1✔
164

165
    if aggregated_payload:
1✔
166
        publisher = AnalyticsClientPublisher()
1✔
167
        publisher.publish(
1✔
168
            [Event(name="ls:usage_analytics", metadata=metadata, payload=aggregated_payload)]
169
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc