• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / snowplow-python-tracker / 3956992538

pending completion
3956992538

Pull #312

github

GitHub
Merge bbb276e87 into ecca49d70
Pull Request #312: Release/0.13.0

511 of 582 new or added lines in 15 files covered. (87.8%)

2 existing lines in 1 file now uncovered.

2391 of 2522 relevant lines covered (94.81%)

11.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.59
/snowplow_tracker/emitter_configuration.py
1
# """
2
#     emitter_configuration.py
3

4
#     Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
5

6
#     This program is licensed to you under the Apache License Version 2.0,
7
#     and you may not use this file except in compliance with the Apache License
8
#     Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
9
#     http://www.apache.org/licenses/LICENSE-2.0.
10

11
#     Unless required by applicable law or agreed to in writing,
12
#     software distributed under the Apache License Version 2.0 is distributed on
13
#     an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
#     express or implied. See the Apache License Version 2.0 for the specific
15
#     language governing permissions and limitations there under.
16

17
#     Authors: Jack Keene, Anuj More, Alex Dean, Fred Blundun, Paul Boocock
18
#     Copyright: Copyright (c) 2013-2022 Snowplow Analytics Ltd
19
#     License: Apache License Version 2.0
20
# """
21

22
from typing import Optional, Union, Tuple, Dict
12✔
23
from snowplow_tracker.typing import SuccessCallback, FailureCallback
12✔
24
from snowplow_tracker.event_store import EventStore
12✔
25

26

27
class EmitterConfiguration(object):
12✔
28
    def __init__(
12✔
29
        self,
30
        batch_size: Optional[int] = None,
31
        on_success: Optional[SuccessCallback] = None,
32
        on_failure: Optional[FailureCallback] = None,
33
        byte_limit: Optional[int] = None,
34
        request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
35
        buffer_capacity: Optional[int] = None,
36
        custom_retry_codes: Dict[int, bool] = {},
37
        event_store: Optional[EventStore] = None,
38
    ) -> None:
39
        """
40
        Configuration for the emitter that sends events to the Snowplow collector.
41
        :param batch_size:     The maximum number of queued events before the buffer is flushed. Default is 10.
42
        :type  batch_size:     int | None
43
        :param on_success:      Callback executed after every HTTP request in a flush has status code 200
44
                                Gets passed the number of events flushed.
45
        :type  on_success:      function | None
46
        :param on_failure:      Callback executed if at least one HTTP request in a flush has status code other than 200
47
                                Gets passed two arguments:
48
                                1) The number of events which were successfully sent
49
                                2) If method is "post": The unsent data in string form;
50
                                   If method is "get":  An array of dictionaries corresponding to the unsent events' payloads
51
        :type  on_failure:      function | None
52
        :param byte_limit:      The size event list after reaching which queued events will be flushed
53
        :type  byte_limit:      int | None
54
        :param request_timeout: Timeout for the HTTP requests. Can be set either as single float value which
55
                                 applies to both "connect" AND "read" timeout, or as tuple with two float values
56
                                 which specify the "connect" and "read" timeouts separately
57
        :type request_timeout:  float | tuple | None
58
        :param  custom_retry_codes: Set custom retry rules for HTTP status codes received in emit responses from the Collector.
59
                                    By default, retry will not occur for status codes 400, 401, 403, 410 or 422. This can be overridden here.
60
                                    Note that 2xx codes will never retry as they are considered successful.
61
        :type   custom_retry_codes: dict
62
        :param  event_store:    Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events.
63
        :type   event_store:    EventStore | None
64
        """
65

66
        self.batch_size = batch_size
12✔
67
        self.on_success = on_success
12✔
68
        self.on_failure = on_failure
12✔
69
        self.byte_limit = byte_limit
12✔
70
        self.request_timeout = request_timeout
12✔
71
        self.buffer_capacity = buffer_capacity
12✔
72
        self.custom_retry_codes = custom_retry_codes
12✔
73
        self.event_store = event_store
12✔
74

75
    @property
12✔
76
    def batch_size(self) -> Optional[int]:
12✔
77
        """
78
        The maximum number of queued events before the buffer is flushed. Default is 10.
79
        """
NEW
80
        return self._batch_size
×
81

82
    @batch_size.setter
12✔
83
    def batch_size(self, value: Optional[int]):
12✔
84
        if isinstance(value, int) and value < 0:
12✔
NEW
85
            raise ValueError("batch_size must greater than 0")
×
86
        if not isinstance(value, int) and value is not None:
12✔
NEW
87
            raise ValueError("batch_size must be of type int")
×
88
        self._batch_size = value
12✔
89

90
    @property
12✔
91
    def on_success(self) -> Optional[SuccessCallback]:
12✔
92
        """
93
        Callback executed after every HTTP request in a flush has status code 200. Gets passed the number of events flushed.
94
        """
NEW
95
        return self._on_success
×
96

97
    @on_success.setter
12✔
98
    def on_success(self, value: Optional[SuccessCallback]):
12✔
99
        self._on_success = value
12✔
100

101
    @property
12✔
102
    def on_failure(self) -> Optional[FailureCallback]:
12✔
103
        """
104
        Callback executed if at least one HTTP request in a flush has status code other than 200
105
                                Gets passed two arguments:
106
                                1) The number of events which were successfully sent
107
                                2) If method is "post": The unsent data in string form;
108
                                   If method is "get":  An array of dictionaries corresponding to the unsent events' payloads
109
        """
NEW
110
        return self._on_failure
×
111

112
    @on_failure.setter
12✔
113
    def on_failure(self, value: Optional[FailureCallback]):
12✔
114
        self._on_failure = value
12✔
115

116
    @property
12✔
117
    def byte_limit(self) -> Optional[int]:
12✔
118
        """
119
        The size event list after reaching which queued events will be flushed
120
        """
NEW
121
        return self._byte_limit
×
122

123
    @byte_limit.setter
12✔
124
    def byte_limit(self, value: Optional[int]):
12✔
125
        if isinstance(value, int) and value < 0:
12✔
NEW
126
            raise ValueError("byte_limit must greater than 0")
×
127
        if not isinstance(value, int) and value is not None:
12✔
NEW
128
            raise ValueError("byte_limit must be of type int")
×
129
        self._byte_limit = value
12✔
130

131
    @property
12✔
132
    def request_timeout(self) -> Optional[Union[float, Tuple[float, float]]]:
12✔
133
        """
134
        Timeout for the HTTP requests. Can be set either as single float value which
135
                                     applies to both "connect" AND "read" timeout, or as tuple with two float values
136
                                     which specify the "connect" and "read" timeouts separately
137
        """
NEW
138
        return self._request_timeout
×
139

140
    @request_timeout.setter
12✔
141
    def request_timeout(self, value: Optional[Union[float, Tuple[float, float]]]):
12✔
142
        self._request_timeout = value
12✔
143

144
    @property
12✔
145
    def buffer_capacity(self) -> Optional[int]:
12✔
146
        """
147
        The maximum capacity of the event buffer. The default buffer capacity is 10 000 events.
148
                                When the buffer is full new events are lost.
149
        """
NEW
150
        return self._buffer_capacity
×
151

152
    @buffer_capacity.setter
12✔
153
    def buffer_capacity(self, value: Optional[int]):
12✔
154
        if isinstance(value, int) and value < 0:
12✔
NEW
155
            raise ValueError("buffer_capacity must greater than 0")
×
156
        if not isinstance(value, int) and value is not None:
12✔
NEW
157
            raise ValueError("buffer_capacity must be of type int")
×
158
        self._buffer_capacity = value
12✔
159

160
    @property
12✔
161
    def custom_retry_codes(self) -> Dict[int, bool]:
12✔
162
        """
163
        Custom retry rules for HTTP status codes received in emit responses from the Collector.
164
        """
NEW
165
        return self._custom_retry_codes
×
166

167
    @custom_retry_codes.setter
12✔
168
    def custom_retry_codes(self, value: Dict[int, bool]):
12✔
169
        self._custom_retry_codes = value
12✔
170

171
    def set_retry_code(self, status_code: int, retry=True) -> bool:
12✔
172
        """
173
        Add a retry rule for HTTP status code received from emit responses from the Collector.
174
        :param  status_code:    HTTP response code
175
        :type   status_code:    int
176
        :param  retry:  Set the status_code to retry (True) or not retry (False). Default is True
177
        :type   retry:  bool
178
        """
NEW
179
        if not isinstance(status_code, int):
×
NEW
180
            print("status_code must be of type int")
×
NEW
181
            return False
×
182

NEW
183
        if not isinstance(retry, bool):
×
NEW
184
            print("retry must be of type bool")
×
NEW
185
            return False
×
186

NEW
187
        if 200 <= status_code < 300:
×
NEW
188
            print(
×
189
                "custom_retry_codes should not include codes for succesful requests (2XX codes)"
190
            )
NEW
191
            return False
×
192

NEW
193
        self.custom_retry_codes[status_code] = retry
×
194

NEW
195
        return status_code in self.custom_retry_codes.keys()
×
196

197
    @property
12✔
198
    def event_store(self) -> Optional[EventStore]:
12✔
NEW
199
        return self._event_store
×
200

201
    @event_store.setter
12✔
202
    def event_store(self, value: Optional[EventStore]):
12✔
203
        self._event_store = value
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc