• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / snowplow-python-tracker / 3956992538

pending completion
3956992538

Pull #312

github

GitHub
Merge bbb276e87 into ecca49d70
Pull Request #312: Release/0.13.0

511 of 582 new or added lines in 15 files covered. (87.8%)

2 existing lines in 1 file now uncovered.

2391 of 2522 relevant lines covered (94.81%)

11.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.84
/snowplow_tracker/event_store.py
1
# """
2
#     event_store.py
3

4
#     Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
5

6
#     This program is licensed to you under the Apache License Version 2.0,
7
#     and you may not use this file except in compliance with the Apache License
8
#     Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
9
#     http://www.apache.org/licenses/LICENSE-2.0.
10

11
#     Unless required by applicable law or agreed to in writing,
12
#     software distributed under the Apache License Version 2.0 is distributed on
13
#     an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
#     express or implied. See the Apache License Version 2.0 for the specific
15
#     language governing permissions and limitations there under.
16

17
#     Authors: Jack Keene, Anuj More, Alex Dean, Fred Blundun, Paul Boocock
18
#     Copyright: Copyright (c) 2013-2022 Snowplow Analytics Ltd
19
#     License: Apache License Version 2.0
20
# """
21

22
from typing_extensions import Protocol
12✔
23
from snowplow_tracker.typing import PayloadDict, PayloadDictList
12✔
24
from logging import Logger
12✔
25

26

27
class EventStore(Protocol):
12✔
28
    """
29
    EventStore protocol. For buffering events in the Emitter.
30
    """
31

32
    def add_event(payload: PayloadDict) -> bool:
12✔
33
        """
34
        Add PayloadDict to buffer. Returns True if successful.
35

36
        :param payload: The payload to add
37
        :type  payload: PayloadDict
38
        :rtype  bool
39
        """
NEW
40
        ...
×
41

42
    def get_events_batch() -> PayloadDictList:
12✔
43
        """
44
        Get a list of all the PayloadDicts in the buffer.
45

46
        :rtype  PayloadDictList
47
        """
NEW
48
        ...
×
49

50
    def cleanup(batch: PayloadDictList, need_retry: bool) -> None:
12✔
51
        """
52
        Removes sent events from the event store. If events need to be retried they are re-added to the buffer.
53

54
        :param  batch:  The events to be removed from the buffer
55
        :type   batch:  PayloadDictList
56
        :param  need_retry  Whether the events should be re-sent or not
57
        :type   need_retry  bool
58
        """
NEW
59
        ...
×
60

61
    def size() -> int:
12✔
62
        """
63
        Returns the number of events in the buffer
64

65
        :rtype  int
66
        """
NEW
67
        ...
×
68

69

70
class InMemoryEventStore(EventStore):
12✔
71
    """
72
    Create a InMemoryEventStore object with custom buffer capacity. The default is 10,000 events.
73
    """
74

75
    def __init__(self, logger: Logger, buffer_capacity: int = 10000) -> None:
12✔
76
        """
77
        :param  logger: Logging module
78
        :type   logger: Logger
79
        :param  buffer_capacity:    The maximum capacity of the event buffer.
80
                                    When the buffer is full new events are lost.
81
        :type   buffer_capacity     int
82
        """
83
        self.event_buffer = []
12✔
84
        self.buffer_capacity = buffer_capacity
12✔
85
        self.logger = logger
12✔
86

87
    def add_event(self, payload: PayloadDict) -> bool:
12✔
88
        """
89
        Add PayloadDict to buffer.
90

91
        :param payload: The payload to add
92
        :type  payload: PayloadDict
93
        """
94
        if self._buffer_capacity_reached():
12✔
95
            self.logger.error("Event buffer is full, dropping event.")
12✔
96
            return False
12✔
97

98
        self.event_buffer.append(payload)
12✔
99
        return True
12✔
100

101
    def get_events_batch(self) -> PayloadDictList:
12✔
102
        """
103
        Get a list of all the PayloadDicts in the in the buffer.
104

105
        :rtype  PayloadDictList
106
        """
107
        batch = self.event_buffer
12✔
108
        self.event_buffer = []
12✔
109
        return batch
12✔
110

111
    def cleanup(self, batch: PayloadDictList, need_retry: bool) -> None:
12✔
112
        """
113
        Removes sent events from the InMemoryEventStore buffer. If events need to be retried they are re-added to the buffer.
114

115
        :param  batch:  The events to be removed from the buffer
116
        :type   batch:  PayloadDictList
117
        :param  need_retry  Whether the events should be re-sent or not
118
        :type   need_retry  bool
119
        """
120
        if not need_retry:
12✔
121
            return
12✔
122

123
        for event in batch:
12✔
124
            if not event in self.event_buffer:
12✔
125
                if not self.add_event(event):
12✔
NEW
126
                    return
×
127

128
    def size(self) -> int:
12✔
129
        """
130
        Returns the number of events in the buffer
131

132
        :rtype  int
133
        """
134
        return len(self.event_buffer)
12✔
135

136
    def _buffer_capacity_reached(self) -> bool:
12✔
137
        """
138
        Returns true if buffer capacity is reached
139

140
        :rtype: bool
141
        """
142
        return self.size() >= self.buffer_capacity
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc