• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

localstack / localstack / 19558051963

20 Nov 2025 05:48PM UTC coverage: 86.859% (-0.05%) from 86.907%
19558051963

push

github

web-flow
Sns:v2 publish (#13399)

199 of 279 new or added lines in 5 files covered. (71.33%)

168 existing lines in 9 files now uncovered.

68851 of 79268 relevant lines covered (86.86%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.57
/localstack-core/localstack/utils/analytics/publisher.py
1
import abc
1✔
2
import atexit
1✔
3
import logging
1✔
4
import threading
1✔
5

6
from localstack import config
1✔
7
from localstack.utils.batching import AsyncBatcher
1✔
8
from localstack.utils.threads import FuncThread, start_thread, start_worker_thread
1✔
9

10
from .client import AnalyticsClient
1✔
11
from .events import Event, EventHandler
1✔
12
from .metadata import get_client_metadata
1✔
13

14
LOG = logging.getLogger(__name__)
1✔
15

16

17
class Publisher(abc.ABC):
1✔
18
    """
19
    A publisher takes a batch of events and publishes them to a destination.
20
    """
21

22
    def publish(self, events: list[Event]):
1✔
23
        raise NotImplementedError
24

25
    def close(self):
1✔
UNCOV
26
        pass
×
27

28

29
class AnalyticsClientPublisher(Publisher):
1✔
30
    client: AnalyticsClient
1✔
31

32
    def __init__(self, client: AnalyticsClient = None) -> None:
1✔
33
        super().__init__()
1✔
34
        self.client = client or AnalyticsClient()
1✔
35

36
    def publish(self, events: list[Event]):
1✔
37
        self.client.append_events(events)
1✔
38

39
    def close(self):
1✔
UNCOV
40
        self.client.close()
×
41

42

43
class Printer(Publisher):
1✔
44
    """
45
    Publisher that prints serialized events to stdout.
46
    """
47

48
    def publish(self, events: list[Event]):
1✔
UNCOV
49
        for event in events:
×
50
            print(event.asdict())
×
51

52

53
class GlobalAnalyticsBus(EventHandler):
1✔
54
    _batcher: AsyncBatcher[Event]
1✔
55
    _client: AnalyticsClient
1✔
56
    _worker_thread: FuncThread | None
1✔
57

58
    def __init__(self, client: AnalyticsClient = None, flush_size=20, flush_interval=10) -> None:
1✔
59
        self._client = client or AnalyticsClient()
1✔
60
        self._publisher = AnalyticsClientPublisher(self._client)
1✔
61
        self._batcher = AsyncBatcher(
1✔
62
            self._handle_batch,
63
            max_batch_size=flush_size,
64
            max_flush_interval=flush_interval,
65
        )
66

67
        self._started = False
1✔
68
        self._startup_mutex = threading.Lock()
1✔
69
        self._worker_thread = None
1✔
70

71
        self.force_tracking = False  # allow class to ignore all other tracking config
1✔
72
        self.tracking_disabled = False  # disables tracking if global config would otherwise track
1✔
73

74
    def _handle_batch(self, batch: list[Event]):
1✔
75
        """Method that satisfies the BatchHandler[Event] protocol and is passed to AsyncBatcher."""
76
        try:
1✔
77
            self._publisher.publish(batch)
1✔
UNCOV
78
        except Exception:
×
79
            # currently we're just dropping events if something goes wrong during publishing
UNCOV
80
            if config.DEBUG_ANALYTICS:
×
UNCOV
81
                LOG.exception("error while publishing analytics events")
×
82

83
    @property
1✔
84
    def is_tracking_disabled(self):
1✔
85
        if self.force_tracking:
1✔
86
            return False
1✔
87

88
        # don't track if event tracking is disabled globally
89
        if config.DISABLE_EVENTS:
1✔
UNCOV
90
            return True
×
91
        # don't track for internal test runs (like integration tests)
92
        if config.is_local_test_mode():
1✔
93
            return True
1✔
UNCOV
94
        if self.tracking_disabled:
×
UNCOV
95
            return True
×
96

UNCOV
97
        return False
×
98

99
    def handle(self, event: Event):
1✔
100
        """
101
        Publish an event to the global analytics event publisher.
102
        """
103
        if self.is_tracking_disabled:
1✔
104
            if config.DEBUG_ANALYTICS:
1✔
UNCOV
105
                LOG.debug("tracking disabled, skipping event %s", event)
×
106
            return
1✔
107

108
        if not self._started:
1✔
109
            # we make sure the batching worker is started
110
            self._start()
1✔
111

112
        self._batcher.add(event)
1✔
113

114
    def _start(self):
1✔
115
        with self._startup_mutex:
1✔
116
            if self._started:
1✔
UNCOV
117
                return
×
118
            self._started = True
1✔
119

120
            # startup has to run async, otherwise first call to handle() could block a long time.
121
            start_worker_thread(self._do_start_retry)
1✔
122

123
    def _do_start_retry(self, *_):
1✔
124
        # TODO: actually retry
125
        try:
1✔
126
            if config.DEBUG_ANALYTICS:
1✔
UNCOV
127
                LOG.debug("trying to register session with analytics backend")
×
128
            response = self._client.start_session(get_client_metadata())
1✔
129
            if config.DEBUG_ANALYTICS:
1✔
UNCOV
130
                LOG.debug("session endpoint returned: %s", response)
×
131

132
            if not response.track_events():
1✔
133
                if config.DEBUG_ANALYTICS:
1✔
UNCOV
134
                    LOG.debug("gracefully disabling analytics tracking")
×
135
                self.tracking_disabled = True
1✔
136

137
        except Exception:
1✔
138
            self.tracking_disabled = True
1✔
139
            if config.DEBUG_ANALYTICS:
1✔
140
                LOG.exception("error while registering session. disabling tracking")
×
141
            return
1✔
142

143
        self._worker_thread = start_thread(self._run, name="global-analytics-bus")
1✔
144

145
        # given the "Global" nature of this class, we register a global atexit hook to make sure all events are flushed
146
        # when localstack shuts down.
147
        def _do_close():
1✔
UNCOV
148
            self.close_sync(timeout=2)
×
149

150
        atexit.register(_do_close)
1✔
151

152
    def _run(self, *_):
1✔
153
        # main control loop, simply runs the batcher
154
        self._batcher.run()
1✔
155

156
    def close_sync(self, timeout=None):
1✔
UNCOV
157
        self._batcher.close()
×
158

UNCOV
159
        if self._worker_thread:
×
160
            self._worker_thread.join(timeout=timeout)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc