• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / snowplow-python-tracker / 3956992538

pending completion
3956992538

Pull #312

github

GitHub
Merge bbb276e87 into ecca49d70
Pull Request #312: Release/0.13.0

511 of 582 new or added lines in 15 files covered. (87.8%)

2 existing lines in 1 file now uncovered.

2391 of 2522 relevant lines covered (94.81%)

11.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.45
/snowplow_tracker/test/unit/test_emitters.py
1
# """
2
#     test_emitters.py
3

4
#     Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
5

6
#     This program is licensed to you under the Apache License Version 2.0,
7
#     and you may not use this file except in compliance with the Apache License
8
#     Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
9
#     http://www.apache.org/licenses/LICENSE-2.0.
10

11
#     Unless required by applicable law or agreed to in writing,
12
#     software distributed under the Apache License Version 2.0 is distributed on
13
#     an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
#     express or implied. See the Apache License Version 2.0 for the specific
15
#     language governing permissions and limitations there under.
16

17
#     Authors: Anuj More, Alex Dean, Fred Blundun, Paul Boocock
18
#     Copyright: Copyright (c) 2013-2022 Snowplow Analytics Ltd
19
#     License: Apache License Version 2.0
20
# """
21

22

23
import time
12✔
24
import unittest
12✔
25
import unittest.mock as mock
12✔
26
from freezegun import freeze_time
12✔
27
from typing import Any
12✔
28
from requests import ConnectTimeout
12✔
29

30
from snowplow_tracker.emitters import Emitter, AsyncEmitter, DEFAULT_MAX_LENGTH
12✔
31

32

33
# helpers
34
def mocked_flush(*args: Any) -> None:
12✔
35
    pass
12✔
36

37

38
def mocked_send_events(*args: Any) -> None:
12✔
39
    pass
12✔
40

41

42
def mocked_http_success(*args: Any) -> bool:
12✔
UNCOV
43
    return True
×
44

45

46
def mocked_http_failure(*args: Any) -> bool:
12✔
UNCOV
47
    return False
×
48

49
def mocked_http_response_success(*args: Any) -> int:
12✔
50
    return 200
12✔
51

52
def mocked_http_response_failure(*args: Any) -> int:
12✔
53
    return 400
12✔
54

55
def mocked_http_response_failure_retry(*args: Any) -> int:
12✔
56
    return 500
12✔
57

58
class TestEmitters(unittest.TestCase):
12✔
59

60
    def setUp(self) -> None:
12✔
61
        pass
12✔
62

63
    def test_init(self) -> None:
12✔
64
        e = Emitter('0.0.0.0')
12✔
65
        self.assertEqual(e.endpoint, 'https://0.0.0.0/com.snowplowanalytics.snowplow/tp2')
12✔
66
        self.assertEqual(e.method, 'post')
12✔
67
        self.assertEqual(e.batch_size, 10)
12✔
68
        self.assertEqual(e.event_store.event_buffer, [])
12✔
69
        self.assertIsNone(e.byte_limit)
12✔
70
        self.assertIsNone(e.bytes_queued)
12✔
71
        self.assertIsNone(e.on_success)
12✔
72
        self.assertIsNone(e.on_failure)
12✔
73
        self.assertFalse(e.timer.is_active())
12✔
74
        self.assertIsNone(e.request_timeout)
12✔
75

76
    def test_init_batch_size(self) -> None:
12✔
77
        e = Emitter('0.0.0.0', batch_size=10)
12✔
78
        self.assertEqual(e.batch_size, 10)
12✔
79

80
    def test_init_post(self) -> None:
12✔
81
        e = Emitter('0.0.0.0')
12✔
82
        self.assertEqual(e.batch_size, DEFAULT_MAX_LENGTH)
12✔
83

84
    def test_init_byte_limit(self) -> None:
12✔
85
        e = Emitter('0.0.0.0', byte_limit=512)
12✔
86
        self.assertEqual(e.bytes_queued, 0)
12✔
87

88
    def test_init_requests_timeout(self) -> None:
12✔
89
        e = Emitter('0.0.0.0', request_timeout=(2.5, 5))
12✔
90
        self.assertEqual(e.request_timeout, (2.5, 5))
12✔
91

92
    def test_as_collector_uri(self) -> None:
12✔
93
        uri = Emitter.as_collector_uri('0.0.0.0')
12✔
94
        self.assertEqual(uri, 'https://0.0.0.0/com.snowplowanalytics.snowplow/tp2')
12✔
95

96
    def test_as_collector_uri_get(self) -> None:
12✔
97
        uri = Emitter.as_collector_uri('0.0.0.0', method='get')
12✔
98
        self.assertEqual(uri, 'https://0.0.0.0/i')
12✔
99

100
    def test_as_collector_uri_port(self) -> None:
12✔
101
        uri = Emitter.as_collector_uri('0.0.0.0', port=9090)
12✔
102
        self.assertEqual(uri, 'https://0.0.0.0:9090/com.snowplowanalytics.snowplow/tp2')
12✔
103

104
    def test_as_collector_uri_http(self) -> None:
12✔
105
        uri = Emitter.as_collector_uri('0.0.0.0', protocol="http")
12✔
106
        self.assertEqual(uri, 'http://0.0.0.0/com.snowplowanalytics.snowplow/tp2')
12✔
107

108
    def test_as_collector_uri_empty_string(self) -> None:
12✔
109
        with self.assertRaises(ValueError):
12✔
110
            Emitter.as_collector_uri('')
12✔
111

112
    def test_as_collector_uri_endpoint_protocol(self) -> None:
12✔
113
        uri = Emitter.as_collector_uri("https://0.0.0.0")
12✔
114
        self.assertEqual(uri, "https://0.0.0.0/com.snowplowanalytics.snowplow/tp2")
12✔
115

116
    def test_as_collector_uri_endpoint_protocol_http(self) -> None:
12✔
117
        uri = Emitter.as_collector_uri("http://0.0.0.0")
12✔
118
        self.assertEqual(uri, "http://0.0.0.0/com.snowplowanalytics.snowplow/tp2")
12✔
119
        
120
    @mock.patch('snowplow_tracker.Emitter.flush')
12✔
121
    def test_input_no_flush(self, mok_flush: Any) -> None:
12✔
122
        mok_flush.side_effect = mocked_flush
12✔
123

124
        e = Emitter('0.0.0.0', method="get", batch_size=2)
12✔
125
        nvPairs = {"n0": "v0", "n1": "v1"}
12✔
126
        e.input(nvPairs)
12✔
127

128
        self.assertEqual(len(e.event_store.event_buffer), 1)
12✔
129
        self.assertDictEqual(nvPairs, e.event_store.event_buffer[0])
12✔
130
        self.assertIsNone(e.byte_limit)
12✔
131
        self.assertFalse(e.reached_limit())
12✔
132
        mok_flush.assert_not_called()
12✔
133

134
    @mock.patch('snowplow_tracker.Emitter.flush')
12✔
135
    def test_input_flush_byte_limit(self, mok_flush: Any) -> None:
12✔
136
        mok_flush.side_effect = mocked_flush
12✔
137

138
        e = Emitter('0.0.0.0', method="get", batch_size=2, byte_limit=16)
12✔
139
        nvPairs = {"n0": "v0", "n1": "v1"}
12✔
140
        e.input(nvPairs)
12✔
141

142
        self.assertEqual(len(e.event_store.event_buffer), 1)
12✔
143
        self.assertDictEqual(nvPairs, e.event_store.event_buffer[0])
12✔
144
        self.assertTrue(e.reached_limit())
12✔
145
        self.assertEqual(mok_flush.call_count, 1)
12✔
146

147
    @mock.patch('snowplow_tracker.Emitter.flush')
12✔
148
    def test_input_flush_buffer(self, mok_flush: Any) -> None:
12✔
149
        mok_flush.side_effect = mocked_flush
12✔
150

151
        e = Emitter('0.0.0.0', method="get", batch_size=2, byte_limit=1024)
12✔
152
        nvPairs = {"n0": "v0", "n1": "v1"}
12✔
153
        e.input(nvPairs)
12✔
154

155
        self.assertEqual(len(e.event_store.event_buffer), 1)
12✔
156
        self.assertFalse(e.reached_limit())
12✔
157
        self.assertDictEqual(nvPairs, e.event_store.event_buffer[0])
12✔
158

159
        nextPairs = {"n0": "v0"}
12✔
160
        e.input(nextPairs)
12✔
161
        # since we mock flush, the buffer is not empty
162
        self.assertEqual(e.event_store.event_buffer, [nvPairs, nextPairs])
12✔
163
        self.assertTrue(e.reached_limit())
12✔
164
        self.assertEqual(mok_flush.call_count, 1)
12✔
165

166
    @mock.patch('snowplow_tracker.Emitter.flush')
12✔
167
    def test_input_bytes_queued(self, mok_flush: Any) -> None:
12✔
168
        mok_flush.side_effect = mocked_flush
12✔
169

170
        e = Emitter('0.0.0.0', method="get", batch_size=2, byte_limit=1024)
12✔
171
        nvPairs = {"n0": "v0", "n1": "v1"}
12✔
172
        e.input(nvPairs)
12✔
173

174
        self.assertEqual(len(e.event_store.event_buffer), 1)
12✔
175
        self.assertEqual(e.bytes_queued, 24)
12✔
176

177
        e.input(nvPairs)
12✔
178
        self.assertEqual(e.bytes_queued, 48)
12✔
179

180
    @mock.patch('snowplow_tracker.Emitter.flush')
12✔
181
    def test_input_bytes_post(self, mok_flush: Any) -> None:
12✔
182
        mok_flush.side_effect = mocked_flush
12✔
183

184
        e = Emitter('0.0.0.0')
12✔
185
        nvPairs = {"testString": "test", "testNum": 2.72}
12✔
186
        e.input(nvPairs)
12✔
187

188
        self.assertEqual(e.event_store.event_buffer, [{"testString": "test", "testNum": "2.72"}])
12✔
189

190
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
191
    def test_flush(self, mok_send_events: Any) -> None:
12✔
192
        mok_send_events.side_effect = mocked_http_response_success
12✔
193

194
        e = Emitter('0.0.0.0', batch_size=2, byte_limit=None)
12✔
195
        nvPairs = {"n": "v"}
12✔
196
        e.input(nvPairs)
12✔
197
        e.input(nvPairs)
12✔
198

199
        self.assertEqual(mok_send_events.call_count, 1)
12✔
200
        self.assertEqual(len(e.event_store.event_buffer), 0)
12✔
201

202
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
203
    def test_flush_bytes_queued(self, mok_send_events: Any) -> None:
12✔
204
        mok_send_events.side_effect = mocked_http_response_success
12✔
205

206
        e = Emitter('0.0.0.0', batch_size=2, byte_limit=256)
12✔
207
        nvPairs = {"n": "v"}
12✔
208
        e.input(nvPairs)
12✔
209
        e.input(nvPairs)
12✔
210

211
        self.assertEqual(mok_send_events.call_count, 1)
12✔
212
        self.assertEqual(len(e.event_store.event_buffer), 0)
12✔
213
        self.assertEqual(e.bytes_queued, 0)
12✔
214

215
    @freeze_time("2021-04-14 00:00:02")  # unix: 1618358402000
12✔
216
    def test_attach_sent_tstamp(self) -> None:
12✔
217
        e = Emitter('0.0.0.0')
12✔
218
        ev_list = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
219

220
        e.attach_sent_timestamp(ev_list)
12✔
221
        reduced = True
12✔
222
        for ev in ev_list:
12✔
223
            reduced = reduced and "stm" in ev.keys() and ev["stm"] == "1618358402000"
12✔
224
        self.assertTrue(reduced)
12✔
225

226
    @mock.patch('snowplow_tracker.Emitter.flush')
12✔
227
    def test_flush_timer(self, mok_flush: Any) -> None:
12✔
228
        mok_flush.side_effect = mocked_flush
12✔
229

230
        e = Emitter('0.0.0.0', batch_size=10)
12✔
231
        ev_list = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
232
        for i in ev_list:
12✔
233
            e.input(i)
12✔
234

235
        e.set_flush_timer(3)
12✔
236
        self.assertEqual(len(e.event_store.event_buffer), 3)
12✔
237
        time.sleep(5)
12✔
238
        self.assertGreaterEqual(mok_flush.call_count, 1)
12✔
239

240
    @mock.patch('snowplow_tracker.Emitter.http_get')
12✔
241
    def test_send_events_get_success(self, mok_http_get: Any) -> None:
12✔
242
        mok_http_get.side_effect = mocked_http_response_success
12✔
243
        mok_success = mock.Mock(return_value="success mocked")
12✔
244
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
245

246
        e = Emitter('0.0.0.0', method="get", batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
247

248
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
249
        e.send_events(evBuffer)
12✔
250
        mok_success.assert_called_once_with(evBuffer)
12✔
251
        mok_failure.assert_not_called()
12✔
252

253
    @mock.patch('snowplow_tracker.Emitter.http_get')
12✔
254
    def test_send_events_get_failure(self, mok_http_get: Any) -> None:
12✔
255
        mok_http_get.side_effect = mocked_http_response_failure
12✔
256
        mok_success = mock.Mock(return_value="success mocked")
12✔
257
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
258

259
        e = Emitter('0.0.0.0', method="get", batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
260

261
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
262
        e.send_events(evBuffer)
12✔
263
        mok_success.assert_not_called()
12✔
264
        mok_failure.assert_called_once_with(0, evBuffer)
12✔
265

266
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
267
    def test_send_events_post_success(self, mok_http_post: Any) -> None:
12✔
268
        mok_http_post.side_effect = mocked_http_response_success
12✔
269
        mok_success = mock.Mock(return_value="success mocked")
12✔
270
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
271

272
        e = Emitter('0.0.0.0', batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
273

274
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
275
        e.send_events(evBuffer)
12✔
276
        mok_success.assert_called_once_with(evBuffer)
12✔
277
        mok_failure.assert_not_called()
12✔
278

279
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
280
    def test_send_events_post_failure(self, mok_http_post: Any) -> None:
12✔
281
        mok_http_post.side_effect = mocked_http_response_failure
12✔
282
        mok_success = mock.Mock(return_value="success mocked")
12✔
283
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
284

285
        e = Emitter('0.0.0.0', batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
286

287
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
288
        e.send_events(evBuffer)
12✔
289
        mok_success.assert_not_called()
12✔
290
        mok_failure.assert_called_with(0, evBuffer)
12✔
291

292
    @mock.patch('snowplow_tracker.emitters.requests.post')
12✔
293
    def test_http_post_connect_timeout_error(self, mok_post_request: Any) -> None:
12✔
294
        mok_post_request.side_effect = ConnectTimeout
12✔
295
        e = Emitter('0.0.0.0')
12✔
296
        response = e.http_post("dummy_string")
12✔
297
        post_succeeded = Emitter.is_good_status_code(response)
12✔
298

299
        self.assertFalse(post_succeeded)
12✔
300

301
    @mock.patch('snowplow_tracker.emitters.requests.post')
12✔
302
    def test_http_get_connect_timeout_error(self, mok_post_request: Any) -> None:
12✔
303
        mok_post_request.side_effect = ConnectTimeout
12✔
304
        e = Emitter('0.0.0.0', method='get')
12✔
305
        response = e.http_get({"a": "b"})
12✔
306
        get_succeeded = Emitter.is_good_status_code(response)
12✔
307
        self.assertFalse(get_succeeded)
12✔
308

309
    ###
310
    # AsyncEmitter
311
    ###
312
    @mock.patch('snowplow_tracker.AsyncEmitter.flush')
12✔
313
    def test_async_emitter_input(self, mok_flush: Any) -> None:
12✔
314
        mok_flush.side_effect = mocked_flush
12✔
315

316
        ae = AsyncEmitter('0.0.0.0', port=9090, method="get", batch_size=3, thread_count=5)
12✔
317
        self.assertTrue(ae.queue.empty())
12✔
318

319
        ae.input({"a": "aa"})
12✔
320
        ae.input({"b": "bb"})
12✔
321
        self.assertEqual(len(ae.event_store.event_buffer), 2)
12✔
322
        self.assertTrue(ae.queue.empty())
12✔
323
        mok_flush.assert_not_called()
12✔
324

325
        ae.input({"c": "cc"})  # meet buffer size
12✔
326
        self.assertEqual(mok_flush.call_count, 1)
12✔
327

328
    @mock.patch('snowplow_tracker.AsyncEmitter.send_events')
12✔
329
    def test_async_emitter_sync_flash(self, mok_send_events: Any) -> None:
12✔
330
        mok_send_events.side_effect = mocked_send_events
12✔
331

332
        ae = AsyncEmitter('0.0.0.0', port=9090, method="get", batch_size=3, thread_count=5, byte_limit=1024)
12✔
333
        self.assertTrue(ae.queue.empty())
12✔
334

335
        ae.input({"a": "aa"})
12✔
336
        ae.input({"b": "bb"})
12✔
337
        self.assertEqual(len(ae.event_store.event_buffer), 2)
12✔
338
        self.assertTrue(ae.queue.empty())
12✔
339
        mok_send_events.assert_not_called()
12✔
340

341
        ae.sync_flush()
12✔
342
        self.assertEqual(len(ae.event_store.event_buffer), 0)
12✔
343
        self.assertEqual(ae.bytes_queued, 0)
12✔
344
        self.assertEqual(mok_send_events.call_count, 1)
12✔
345

346
    @mock.patch('snowplow_tracker.Emitter.http_get')
12✔
347
    def test_async_send_events_get_success(self, mok_http_get: Any) -> None:
12✔
348
        mok_http_get.side_effect = mocked_http_response_success
12✔
349
        mok_success = mock.Mock(return_value="success mocked")
12✔
350
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
351

352
        ae = AsyncEmitter('0.0.0.0', method="get", batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
353

354
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
355
        ae.send_events(evBuffer)
12✔
356
        mok_success.assert_called_once_with(evBuffer)
12✔
357
        mok_failure.assert_not_called()
12✔
358

359
    @mock.patch('snowplow_tracker.Emitter.http_get')
12✔
360
    def test_async_send_events_get_failure(self, mok_http_get: Any) -> None:
12✔
361
        mok_http_get.side_effect = mocked_http_response_failure
12✔
362
        mok_success = mock.Mock(return_value="success mocked")
12✔
363
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
364

365
        ae = AsyncEmitter('0.0.0.0', method="get", batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
366

367
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
368
        ae.send_events(evBuffer)
12✔
369
        mok_success.assert_not_called()
12✔
370
        mok_failure.assert_called_once_with(0, evBuffer)
12✔
371

372
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
373
    def test_async_send_events_post_success(self, mok_http_post: Any) -> None:
12✔
374
        mok_http_post.side_effect = mocked_http_response_success
12✔
375
        mok_success = mock.Mock(return_value="success mocked")
12✔
376
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
377

378
        ae = Emitter('0.0.0.0', batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
379

380
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
381
        ae.send_events(evBuffer)
12✔
382
        mok_success.assert_called_once_with(evBuffer)
12✔
383
        mok_failure.assert_not_called()
12✔
384

385
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
386
    def test_async_send_events_post_failure(self, mok_http_post: Any) -> None:
12✔
387
        mok_http_post.side_effect = mocked_http_response_failure
12✔
388
        mok_success = mock.Mock(return_value="success mocked")
12✔
389
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
390

391
        ae = Emitter('0.0.0.0', batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
392

393
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
394
        ae.send_events(evBuffer)
12✔
395
        mok_success.assert_not_called()
12✔
396
        mok_failure.assert_called_with(0, evBuffer)
12✔
397

398
    # Unicode
399
    @mock.patch('snowplow_tracker.AsyncEmitter.flush')
12✔
400
    def test_input_unicode_get(self, mok_flush: Any) -> None:
12✔
401
        mok_flush.side_effect = mocked_flush
12✔
402

403
        payload = {"unicode": u'\u0107', "alsoAscii": "abc"}
12✔
404
        ae = AsyncEmitter('0.0.0.0', method="get", batch_size=2)
12✔
405
        ae.input(payload)
12✔
406

407
        self.assertEqual(len(ae.event_store.event_buffer), 1)
12✔
408
        self.assertDictEqual(payload, ae.event_store.event_buffer[0])
12✔
409

410
    @mock.patch('snowplow_tracker.AsyncEmitter.flush')
12✔
411
    def test_input_unicode_post(self, mok_flush: Any) -> None:
12✔
412
        mok_flush.side_effect = mocked_flush
12✔
413

414
        payload = {"unicode": u'\u0107', "alsoAscii": "abc"}
12✔
415
        ae = AsyncEmitter('0.0.0.0', batch_size=2)
12✔
416
        ae.input(payload)
12✔
417

418
        self.assertEqual(len(ae.event_store.event_buffer), 1)
12✔
419
        self.assertDictEqual(payload, ae.event_store.event_buffer[0])
12✔
420

421
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
422
    def test_send_events_post_retry(self, mok_http_post: Any) -> None:
12✔
423
        mok_http_post.side_effect = mocked_http_response_failure_retry
12✔
424
        mok_success = mock.Mock(return_value="success mocked")
12✔
425
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
426

427
        e = Emitter('0.0.0.0', batch_size=10, on_success=mok_success, on_failure=mok_failure)
12✔
428
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
429
        e.send_events(evBuffer)
12✔
430
        
431
        mok_http_post.side_effect = mocked_http_response_success
12✔
432
        time.sleep(5)
12✔
433

434
        mok_failure.assert_called_with(0, evBuffer)
12✔
435
        mok_success.assert_called_with(evBuffer)
12✔
436

437
    @mock.patch('snowplow_tracker.Emitter.http_get')
12✔
438
    def test_send_events_get_retry(self, mok_http_get: Any) -> None:
12✔
439
        mok_http_get.side_effect = mocked_http_response_failure_retry
12✔
440
        mok_success = mock.Mock(return_value="success mocked")
12✔
441
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
442

443
        e = Emitter('0.0.0.0', method='get', batch_size=1, on_success=mok_success, on_failure=mok_failure)
12✔
444
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
445
        e.send_events(evBuffer)
12✔
446
        
447
        mok_http_get.side_effect = mocked_http_response_success
12✔
448
        time.sleep(5)
12✔
449

450
        mok_failure.assert_called_with(0, evBuffer)
12✔
451
        mok_success.assert_called_with(evBuffer)
12✔
452

453
    @mock.patch('snowplow_tracker.Emitter.http_get')
12✔
454
    def test_send_events_get_no_retry(self, mok_http_get: Any) -> None:
12✔
455
        mok_http_get.side_effect = mocked_http_response_failure
12✔
456
        mok_success = mock.Mock(return_value="success mocked")
12✔
457
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
458

459
        e = Emitter('0.0.0.0', method='get', batch_size=1, on_success=mok_success, on_failure=mok_failure)
12✔
460
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
461
        e.send_events(evBuffer)
12✔
462
        
463
        mok_failure.assert_called_once_with(0, evBuffer)
12✔
464
        mok_success.assert_not_called()
12✔
465

466
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
467
    def test_send_events_post_no_retry(self, mok_http_post: Any) -> None:
12✔
468
        mok_http_post.side_effect = mocked_http_response_failure
12✔
469
        mok_success = mock.Mock(return_value="success mocked")
12✔
470
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
471

472
        e = Emitter('0.0.0.0', method='get', batch_size=1, on_success=mok_success, on_failure=mok_failure)
12✔
473
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
474
        e.send_events(evBuffer)
12✔
475
        
476
        mok_failure.assert_called_once_with(0, evBuffer)
12✔
477
        mok_success.assert_not_called()
12✔
478

479
    @mock.patch('snowplow_tracker.Emitter.http_post')
12✔
480
    def test_send_events_post_custom_retry(self, mok_http_post: Any) -> None:
12✔
481
        mok_http_post.side_effect = mocked_http_response_failure
12✔
482
        mok_success = mock.Mock(return_value="success mocked")
12✔
483
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
484

485
        e = Emitter('0.0.0.0', batch_size=10, on_success=mok_success, on_failure=mok_failure, custom_retry_codes={400: True})
12✔
486
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
487
        e.send_events(evBuffer)
12✔
488
        
489
        mok_http_post.side_effect = mocked_http_response_success
12✔
490
        time.sleep(5)
12✔
491

492
        mok_failure.assert_called_with(0, evBuffer)
12✔
493
        mok_success.assert_called_with(evBuffer)
12✔
494

495
    @mock.patch('snowplow_tracker.Emitter.http_get')
12✔
496
    def test_send_events_get_custom_retry(self, mok_http_get: Any) -> None:
12✔
497
        mok_http_get.side_effect = mocked_http_response_failure
12✔
498
        mok_success = mock.Mock(return_value="success mocked")
12✔
499
        mok_failure = mock.Mock(return_value="failure mocked")
12✔
500

501
        e = Emitter('0.0.0.0', method='get',batch_size=10, on_success=mok_success, on_failure=mok_failure, custom_retry_codes={400: True})
12✔
502
        evBuffer = [{"a": "aa"}, {"b": "bb"}, {"c": "cc"}]
12✔
503
        e.send_events(evBuffer)
12✔
504
        
505
        mok_http_get.side_effect = mocked_http_response_success
12✔
506
        time.sleep(5)
12✔
507

508
        mok_failure.assert_called_with(0, evBuffer)
12✔
509
        mok_success.assert_called_with(evBuffer)
12✔
510

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc