• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / snowplow-python-tracker / 3956992538

pending completion
3956992538

Pull #312

github

GitHub
Merge bbb276e87 into ecca49d70
Pull Request #312: Release/0.13.0

511 of 582 new or added lines in 15 files covered. (87.8%)

2 existing lines in 1 file now uncovered.

2391 of 2522 relevant lines covered (94.81%)

11.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.57
/snowplow_tracker/emitters.py
1
# """
2
#     emitters.py
3

4
#     Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
5

6
#     This program is licensed to you under the Apache License Version 2.0,
7
#     and you may not use this file except in compliance with the Apache License
8
#     Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
9
#     http://www.apache.org/licenses/LICENSE-2.0.
10

11
#     Unless required by applicable law or agreed to in writing,
12
#     software distributed under the Apache License Version 2.0 is distributed on
13
#     an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
#     express or implied. See the Apache License Version 2.0 for the specific
15
#     language governing permissions and limitations there under.
16

17
#     Authors: Anuj More, Alex Dean, Fred Blundun, Paul Boocock
18
#     Copyright: Copyright (c) 2013-2022 Snowplow Analytics Ltd
19
#     License: Apache License Version 2.0
20
# """
21

22

23
import logging
12✔
24
import time
12✔
25
import threading
12✔
26
import requests
12✔
27
import random
12✔
28
from typing import Optional, Union, Tuple, Dict
12✔
29
from queue import Queue
12✔
30

31
from snowplow_tracker.self_describing_json import SelfDescribingJson
12✔
32
from snowplow_tracker.typing import (
12✔
33
    PayloadDict,
34
    PayloadDictList,
35
    HttpProtocol,
36
    Method,
37
    SuccessCallback,
38
    FailureCallback,
39
)
40
from snowplow_tracker.contracts import one_of
12✔
41
from snowplow_tracker.event_store import EventStore, InMemoryEventStore
12✔
42

43
# logging
44
logging.basicConfig()
12✔
45
logger = logging.getLogger(__name__)
12✔
46
logger.setLevel(logging.INFO)
12✔
47

48
DEFAULT_MAX_LENGTH = 10
12✔
49
PAYLOAD_DATA_SCHEMA = (
12✔
50
    "iglu:com.snowplowanalytics.snowplow/payload_data/jsonschema/1-0-4"
51
)
52
PROTOCOLS = {"http", "https"}
12✔
53
METHODS = {"get", "post"}
12✔
54

55

56
class Emitter(object):
12✔
57
    """
58
    Synchronously send Snowplow events to a Snowplow collector
59
    Supports both GET and POST requests
60
    """
61

62
    def __init__(
12✔
63
        self,
64
        endpoint: str,
65
        protocol: HttpProtocol = "https",
66
        port: Optional[int] = None,
67
        method: Method = "post",
68
        batch_size: Optional[int] = None,
69
        on_success: Optional[SuccessCallback] = None,
70
        on_failure: Optional[FailureCallback] = None,
71
        byte_limit: Optional[int] = None,
72
        request_timeout: Optional[Union[float, Tuple[float, float]]] = None,
73
        max_retry_delay_seconds: int = 60,
74
        buffer_capacity: Optional[int] = None,
75
        custom_retry_codes: Dict[int, bool] = {},
76
        event_store: Optional[EventStore] = None,
77
    ) -> None:
78
        """
79
        :param endpoint:    The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically.
80
        :type  endpoint:    string
81
        :param protocol:    The protocol to use - http or https. Defaults to https.
82
        :type  protocol:    protocol
83
        :param port:        The collector port to connect to
84
        :type  port:        int | None
85
        :param method:      The HTTP request method. Defaults to post.
86
        :type  method:      method
87
        :param batch_size:  The maximum number of queued events before the buffer is flushed. Default is 10.
88
        :type  batch_size:  int | None
89
        :param on_success:  Callback executed after every HTTP request in a flush has status code 200
90
                            Gets passed the number of events flushed.
91
        :type  on_success:  function | None
92
        :param on_failure:  Callback executed if at least one HTTP request in a flush has status code other than 200
93
                            Gets passed two arguments:
94
                            1) The number of events which were successfully sent
95
                            2) If method is "post": The unsent data in string form;
96
                               If method is "get":  An array of dictionaries corresponding to the unsent events' payloads
97
        :type  on_failure:  function | None
98
        :param byte_limit:  The size event list after reaching which queued events will be flushed
99
        :type  byte_limit:  int | None
100
        :param request_timeout: Timeout for the HTTP requests. Can be set either as single float value which
101
                                 applies to both "connect" AND "read" timeout, or as tuple with two float values
102
                                 which specify the "connect" and "read" timeouts separately
103
        :type request_timeout:  float | tuple | None
104
        :param max_retry_delay_seconds:     Set the maximum time between attempts to send failed events to the collector. Default 60 seconds
105
        :type max_retry_delay_seconds:      int
106
        :param buffer_capacity: The maximum capacity of the event buffer.
107
                                When the buffer is full new events are lost.
108
        :type buffer_capacity: int
109
        :param  custom_retry_codes: Set custom retry rules for HTTP status codes received in emit responses from the Collector.
110
                                    By default, retry will not occur for status codes 400, 401, 403, 410 or 422. This can be overridden here.
111
                                    Note that 2xx codes will never retry as they are considered successful.
112
        :type   custom_retry_codes: dict
113
        :param  event_store:    Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events.
114
        :type   event_store:    EventStore | None
115
        """
116
        one_of(protocol, PROTOCOLS)
12✔
117
        one_of(method, METHODS)
12✔
118

119
        self.endpoint = Emitter.as_collector_uri(endpoint, protocol, port, method)
12✔
120

121
        self.method = method
12✔
122

123
        if event_store is None:
12✔
124
            if buffer_capacity is None:
12✔
125
                event_store = InMemoryEventStore(logger=logger)
12✔
126
            else:
NEW
127
                event_store = InMemoryEventStore(
×
128
                    buffer_capacity=buffer_capacity, logger=logger
129
                )
130

131
        self.event_store = event_store
12✔
132

133
        if batch_size is None:
12✔
134
            if method == "post":
12✔
135
                batch_size = DEFAULT_MAX_LENGTH
12✔
136
            else:
137
                batch_size = 1
12✔
138

139
        if buffer_capacity is not None and batch_size > buffer_capacity:
12✔
NEW
140
            batch_size = buffer_capacity
×
141

142
        self.batch_size = batch_size
12✔
143
        self.byte_limit = byte_limit
12✔
144
        self.bytes_queued = None if byte_limit is None else 0
12✔
145
        self.request_timeout = request_timeout
12✔
146

147
        self.on_success = on_success
12✔
148
        self.on_failure = on_failure
12✔
149

150
        self.lock = threading.RLock()
12✔
151

152
        self.timer = FlushTimer(emitter=self, repeating=True)
12✔
153
        self.retry_timer = FlushTimer(emitter=self, repeating=False)
12✔
154

155
        self.max_retry_delay_seconds = max_retry_delay_seconds
12✔
156
        self.retry_delay = 0
12✔
157

158
        self.custom_retry_codes = custom_retry_codes
12✔
159
        logger.info("Emitter initialized with endpoint " + self.endpoint)
12✔
160

161
    @staticmethod
12✔
162
    def as_collector_uri(
12✔
163
        endpoint: str,
164
        protocol: HttpProtocol = "https",
165
        port: Optional[int] = None,
166
        method: Method = "post",
167
    ) -> str:
168
        """
169
        :param endpoint:  The raw endpoint provided by the user
170
        :type  endpoint:  string
171
        :param protocol:  The protocol to use - http or https
172
        :type  protocol:  protocol
173
        :param port:      The collector port to connect to
174
        :type  port:      int | None
175
        :param method:    Either `get` or `post` HTTP method
176
        :type  method:    method
177
        :rtype:           string
178
        """
179
        if len(endpoint) < 1:
12✔
180
            raise ValueError("No endpoint provided.")
12✔
181

182
        endpoint = endpoint.rstrip("/")
12✔
183

184
        if endpoint.split("://")[0] in PROTOCOLS:
12✔
185
            endpoint_arr = endpoint.split("://")
12✔
186
            protocol = endpoint_arr[0]
12✔
187
            endpoint = endpoint_arr[1]
12✔
188

189
        if method == "get":
12✔
190
            path = "/i"
12✔
191
        else:
192
            path = "/com.snowplowanalytics.snowplow/tp2"
12✔
193
        if port is None:
12✔
194
            return protocol + "://" + endpoint + path
12✔
195
        else:
196
            return protocol + "://" + endpoint + ":" + str(port) + path
12✔
197

198
    def input(self, payload: PayloadDict) -> None:
12✔
199
        """
200
        Adds an event to the buffer.
201
        If the maximum size has been reached, flushes the buffer.
202

203
        :param payload:   The name-value pairs for the event
204
        :type  payload:   dict(string:\\*)
205
        """
206
        with self.lock:
12✔
207
            if self.bytes_queued is not None:
12✔
208
                self.bytes_queued += len(str(payload))
12✔
209

210
            if self.method == "post":
12✔
211
                self.event_store.add_event({key: str(payload[key]) for key in payload})
12✔
212
            else:
213
                self.event_store.add_event(payload)
12✔
214

215
            if self.reached_limit():
12✔
216
                self.flush()
12✔
217

218
    def reached_limit(self) -> bool:
12✔
219
        """
220
        Checks if event-size or bytes limit are reached
221

222
        :rtype: bool
223
        """
224
        if self.byte_limit is None:
12✔
225
            return self.event_store.size() >= self.batch_size
12✔
226
        else:
227
            return (
12✔
228
                self.bytes_queued or 0
229
            ) >= self.byte_limit or self.event_store.size() >= self.batch_size
230

231
    def flush(self) -> None:
12✔
232
        """
233
        Sends all events in the buffer to the collector.
234
        """
235
        with self.lock:
12✔
236
            if self.retry_timer.is_active():
12✔
237
                return
12✔
238
            send_events = self.event_store.get_events_batch()
12✔
239
            self.send_events(send_events)
12✔
240
            if self.bytes_queued is not None:
12✔
241
                self.bytes_queued = 0
12✔
242

243
    def http_post(self, data: str) -> int:
12✔
244
        """
245
        :param data:  The array of JSONs to be sent
246
        :type  data:  string
247
        """
248
        logger.info("Sending POST request to %s..." % self.endpoint)
12✔
249
        logger.debug("Payload: %s" % data)
12✔
250
        try:
12✔
251
            r = requests.post(
12✔
252
                self.endpoint,
253
                data=data,
254
                headers={"Content-Type": "application/json; charset=utf-8"},
255
                timeout=self.request_timeout,
256
            )
257
        except requests.RequestException as e:
12✔
258
            logger.warning(e)
12✔
259
            return -1
12✔
260

261
        return r.status_code
12✔
262

263
    def http_get(self, payload: PayloadDict) -> int:
12✔
264
        """
265
        :param payload:  The event properties
266
        :type  payload:  dict(string:\\*)
267
        """
268
        logger.info("Sending GET request to %s..." % self.endpoint)
12✔
269
        logger.debug("Payload: %s" % payload)
12✔
270
        try:
12✔
271
            r = requests.get(
12✔
272
                self.endpoint, params=payload, timeout=self.request_timeout
273
            )
274
        except requests.RequestException as e:
12✔
275
            logger.warning(e)
12✔
276
            return -1
12✔
277

278
        return r.status_code
12✔
279

280
    def sync_flush(self) -> None:
12✔
281
        """
282
        Calls the flush method of the base Emitter class.
283
        This is guaranteed to be blocking, not asynchronous.
284
        """
285
        logger.debug("Starting synchronous flush...")
×
NEW
286
        self.flush()
×
287
        logger.info("Finished synchronous flush")
×
288

289
    @staticmethod
12✔
290
    def is_good_status_code(status_code: int) -> bool:
12✔
291
        """
292
        :param status_code:  HTTP status code
293
        :type  status_code:  int
294
        :rtype:              bool
295
        """
296
        return 200 <= status_code < 300
12✔
297

298
    def send_events(self, evts: PayloadDictList) -> None:
12✔
299
        """
300
        :param evts: Array of events to be sent
301
        :type  evts: list(dict(string:\\*))
302
        """
303
        if len(evts) > 0:
12✔
304
            logger.info("Attempting to send %s events" % len(evts))
12✔
305

306
            Emitter.attach_sent_timestamp(evts)
12✔
307
            success_events = []
12✔
308
            failure_events = []
12✔
309

310
            if self.method == "post":
12✔
311
                data = SelfDescribingJson(PAYLOAD_DATA_SCHEMA, evts).to_string()
12✔
312
                status_code = self.http_post(data)
12✔
313
                request_succeeded = Emitter.is_good_status_code(status_code)
12✔
314
                if request_succeeded:
12✔
315
                    success_events += evts
12✔
316
                else:
317
                    failure_events += evts
12✔
318

319
            elif self.method == "get":
12✔
320
                for evt in evts:
12✔
321
                    status_code = self.http_get(evt)
12✔
322
                    request_succeeded = Emitter.is_good_status_code(status_code)
12✔
323

324
                    if request_succeeded:
12✔
325
                        success_events += [evt]
12✔
326
                    else:
327
                        failure_events += [evt]
12✔
328

329
            if self.on_success is not None and len(success_events) > 0:
12✔
330
                self.on_success(success_events)
12✔
331
            if self.on_failure is not None and len(failure_events) > 0:
12✔
332
                self.on_failure(len(success_events), failure_events)
12✔
333

334
            if self._should_retry(status_code):
12✔
335
                self._set_retry_delay()
12✔
336
                self._retry_failed_events(failure_events)
12✔
337
            else:
338
                self.event_store.cleanup(success_events, False)
12✔
339
                self._reset_retry_delay()
12✔
340
        else:
341
            logger.info("Skipping flush since buffer is empty")
11✔
342

343
    def _set_retry_timer(self, timeout: float) -> None:
12✔
344
        """
345
        Set an interval at which failed events will be retried
346

347
        :param timeout:   interval in seconds
348
        :type  timeout:   int | float
349
        """
350
        self.retry_timer.start(timeout=timeout)
12✔
351

352
    def set_flush_timer(self, timeout: float) -> None:
12✔
353
        """
354
        Set an interval at which the buffer will be flushed
355
        :param timeout:   interval in seconds
356
        :type  timeout:   int | float
357
        """
358
        self.timer.start(timeout=timeout)
12✔
359

360
    def cancel_flush_timer(self) -> None:
12✔
361
        """
362
        Abort automatic async flushing
363
        """
NEW
364
        self.timer.cancel()
×
365

366
    @staticmethod
12✔
367
    def attach_sent_timestamp(events: PayloadDictList) -> None:
12✔
368
        """
369
        Attach (by mutating in-place) current timestamp in milliseconds
370
        as `stm` param
371

372
        :param events: Array of events to be sent
373
        :type  events: list(dict(string:\\*))
374
        :rtype: None
375
        """
376

377
        def update(e: PayloadDict) -> None:
12✔
378
            e.update({"stm": str(int(time.time()) * 1000)})
12✔
379

380
        for event in events:
12✔
381
            update(event)
12✔
382

383
    def _should_retry(self, status_code: int) -> bool:
12✔
384
        """
385
        Checks if a request should be retried
386

387
        :param  status_code: Response status code
388
        :type   status_code: int
389
        :rtype: bool
390
        """
391
        if Emitter.is_good_status_code(status_code):
12✔
392
            return False
12✔
393

394
        if status_code in self.custom_retry_codes.keys():
12✔
395
            return self.custom_retry_codes[status_code]
12✔
396

397
        return status_code not in [400, 401, 403, 410, 422]
12✔
398

399
    def _set_retry_delay(self) -> None:
12✔
400
        """
401
        Sets a delay to retry failed events
402
        """
403
        random_noise = random.random()
12✔
404
        self.retry_delay = min(
12✔
405
            self.retry_delay * 2 + random_noise, self.max_retry_delay_seconds
406
        )
407

408
    def _reset_retry_delay(self) -> None:
12✔
409
        """
410
        Resets retry delay to 0
411
        """
412
        self.retry_delay = 0
12✔
413

414
    def _retry_failed_events(self, failed_events) -> None:
12✔
415
        """
416
        Adds failed events back to the buffer to retry
417

418
        :param  failed_events: List of failed events
419
        :type   List
420
        """
421
        self.event_store.cleanup(failed_events, True)
12✔
422
        self._set_retry_timer(self.retry_delay)
12✔
423

424
    def _cancel_retry_timer(self) -> None:
12✔
425
        """
426
        Cancels a retry timer
427
        """
NEW
428
        self.retry_timer.cancel()
×
429

430

431
class AsyncEmitter(Emitter):
12✔
432
    """
433
    Uses threads to send HTTP requests asynchronously
434
    """
435

436
    def __init__(
12✔
437
        self,
438
        endpoint: str,
439
        protocol: HttpProtocol = "http",
440
        port: Optional[int] = None,
441
        method: Method = "post",
442
        batch_size: Optional[int] = None,
443
        on_success: Optional[SuccessCallback] = None,
444
        on_failure: Optional[FailureCallback] = None,
445
        thread_count: int = 1,
446
        byte_limit: Optional[int] = None,
447
        max_retry_delay_seconds: int = 60,
448
        buffer_capacity: int = None,
449
        event_store: Optional[EventStore] = None,
450
    ) -> None:
451
        """
452
        :param endpoint:    The collector URL. If protocol is not set in endpoint it will automatically set to "https://" - this is done automatically.
453
        :type  endpoint:    string
454
        :param protocol:    The protocol to use - http or https. Defaults to http.
455
        :type  protocol:    protocol
456
        :param port:        The collector port to connect to
457
        :type  port:        int | None
458
        :param method:      The HTTP request method
459
        :type  method:      method
460
        :param batch_size: The maximum number of queued events before the buffer is flushed. Default is 10.
461
        :type  batch_size: int | None
462
        :param on_success:  Callback executed after every HTTP request in a flush has status code 200
463
                            Gets passed the number of events flushed.
464
        :type  on_success:  function | None
465
        :param on_failure:  Callback executed if at least one HTTP request in a flush has status code other than 200
466
                            Gets passed two arguments:
467
                            1) The number of events which were successfully sent
468
                            2) If method is "post": The unsent data in string form;
469
                               If method is "get":  An array of dictionaries corresponding to the unsent events' payloads
470
        :type  on_failure:  function | None
471
        :param thread_count: Number of worker threads to use for HTTP requests
472
        :type  thread_count: int
473
        :param byte_limit:  The size event list after reaching which queued events will be flushed
474
        :type  byte_limit:  int | None
475
        :param max_retry_delay_seconds:     Set the maximum time between attempts to send failed events to the collector. Default 60 seconds
476
        :type max_retry_delay_seconds:      int
477
        :param buffer_capacity: The maximum capacity of the event buffer.
478
                                When the buffer is full new events are lost.
479
        :type buffer_capacity: int
480
        :param  event_store:    Stores the event buffer and buffer capacity. Default is an InMemoryEventStore object with buffer_capacity of 10,000 events.
481
        :type   event_store:    EventStore
482
        """
483
        super(AsyncEmitter, self).__init__(
12✔
484
            endpoint,
485
            protocol,
486
            port,
487
            method,
488
            batch_size,
489
            on_success,
490
            on_failure,
491
            byte_limit,
492
            max_retry_delay_seconds,
493
            buffer_capacity,
494
            event_store,
495
        )
496
        self.queue = Queue()
12✔
497
        for i in range(thread_count):
12✔
498
            t = threading.Thread(target=self.consume)
12✔
499
            t.daemon = True
12✔
500
            t.start()
12✔
501

502
    def sync_flush(self) -> None:
12✔
503
        while True:
8✔
504
            self.flush()
12✔
505
            self.queue.join()
12✔
506
            if self.event_store.size() < 1:
12✔
507
                break
12✔
508

509
    def flush(self) -> None:
12✔
510
        """
511
        Removes all dead threads, then creates a new thread which
512
        executes the flush method of the base Emitter class
513
        """
514
        with self.lock:
12✔
515
            self.queue.put(self.event_store.get_events_batch())
12✔
516
            if self.bytes_queued is not None:
12✔
517
                self.bytes_queued = 0
12✔
518

519
    def consume(self) -> None:
12✔
520
        while True:
8✔
521
            evts = self.queue.get()
12✔
522
            self.send_events(evts)
12✔
523
            self.queue.task_done()
12✔
524

525

526
class FlushTimer(object):
12✔
527
    """
528
    Internal class used by the Emitter to schedule flush calls for later.
529
    """
530

531
    def __init__(self, emitter: Emitter, repeating: bool):
12✔
532
        self.emitter = emitter
12✔
533
        self.repeating = repeating
12✔
534
        self.timer: Optional[threading.Timer] = None
12✔
535
        self.lock = threading.RLock()
12✔
536

537
    def start(self, timeout: float) -> bool:
12✔
538
        with self.lock:
12✔
539
            if self.timer is not None:
12✔
NEW
540
                return False
×
541
            else:
542
                self._schedule_timer(timeout=timeout)
12✔
543
                return True
12✔
544

545
    def cancel(self) -> None:
12✔
NEW
546
        with self.lock:
×
NEW
547
            if self.timer is not None:
×
NEW
548
                self.timer.cancel()
×
NEW
549
                self.timer = None
×
550

551
    def is_active(self) -> bool:
12✔
552
        with self.lock:
12✔
553
            return self.timer is not None
12✔
554

555
    def _fire(self, timeout: float) -> None:
12✔
556
        with self.lock:
12✔
557
            if self.repeating:
12✔
558
                self._schedule_timer(timeout)
12✔
559
            else:
560
                self.timer = None
12✔
561

562
        self.emitter.flush()
12✔
563

564
    def _schedule_timer(self, timeout: float) -> None:
12✔
565
        self.timer = threading.Timer(timeout, self._fire, [timeout])
12✔
566
        self.timer.daemon = True
12✔
567
        self.timer.start()
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc