• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

snowplow / snowplow-python-tracker / 3956992538

pending completion
3956992538

Pull #312

github

GitHub
Merge bbb276e87 into ecca49d70
Pull Request #312: Release/0.13.0

511 of 582 new or added lines in 15 files covered. (87.8%)

2 existing lines in 1 file now uncovered.

2391 of 2522 relevant lines covered (94.81%)

11.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

46.88
/snowplow_tracker/celery/celery_emitter.py
1
# """
2
#     celery_emitter.py
3

4
#     Copyright (c) 2013-2022 Snowplow Analytics Ltd. All rights reserved.
5

6
#     This program is licensed to you under the Apache License Version 2.0,
7
#     and you may not use this file except in compliance with the Apache License
8
#     Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
9
#     http://www.apache.org/licenses/LICENSE-2.0.
10

11
#     Unless required by applicable law or agreed to in writing,
12
#     software distributed under the Apache License Version 2.0 is distributed on
13
#     an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14
#     express or implied. See the Apache License Version 2.0 for the specific
15
#     language governing permissions and limitations there under.
16

17
#     Authors: Anuj More, Alex Dean, Fred Blundun, Paul Boocock
18
#     Copyright: Copyright (c) 2013-2022 Snowplow Analytics Ltd
19
#     License: Apache License Version 2.0
20
# """
21

22
import logging
12✔
23
from typing import Any, Optional
12✔
24

25
from snowplow_tracker.emitters import Emitter
12✔
26
from snowplow_tracker.typing import HttpProtocol, Method
12✔
27

28
_CELERY_OPT = True
12✔
29
try:
12✔
30
    from celery import Celery
12✔
31
except ImportError:
12✔
32
    _CELERY_OPT = False
12✔
33

34
# logging
35
logging.basicConfig()
12✔
36
logger = logging.getLogger(__name__)
12✔
37
logger.setLevel(logging.INFO)
12✔
38

39

40
class CeleryEmitter(Emitter):
12✔
41
    """
42
        Uses a Celery worker to send HTTP requests asynchronously.
43
        Works like the base Emitter class,
44
        but on_success and on_failure callbacks cannot be set.
45
    """
46
    if _CELERY_OPT:
12✔
47

48
        celery_app = None
×
49

50
        def __init__(
×
51
                self,
52
                endpoint: str,
53
                protocol: HttpProtocol = "http",
54
                port: Optional[int] = None,
55
                method: Method = "post",
56
                batch_size: Optional[int] = None,
57
                byte_limit: Optional[int] = None) -> None:
NEW
58
            super(CeleryEmitter, self).__init__(endpoint, protocol, port, method, batch_size, None, None, byte_limit)
×
59

60
            try:
×
61
                # Check whether a custom Celery configuration module named "snowplow_celery_config" exists
62
                import snowplow_celery_config
×
63
                self.celery_app = Celery()
×
64
                self.celery_app.config_from_object(snowplow_celery_config)
×
65
            except ImportError:
×
66
                # Otherwise configure Celery with default settings
67
                self.celery_app = Celery("Snowplow", broker="redis://guest@localhost//")
×
68

69
            self.async_flush = self.celery_app.task(self.async_flush)
×
70

71
        def flush(self) -> None:
×
72
            """
73
            Schedules a flush task
74
            """
75
            self.async_flush.delay()
×
76
            logger.info("Scheduled a Celery task to flush the event queue")
×
77

78
        def async_flush(self) -> None:
×
79
            super(CeleryEmitter, self).flush()
×
80

81
    else:
82

83
        def __new__(cls, *args: Any, **kwargs: Any) -> 'CeleryEmitter':
12✔
84
            logger.error("CeleryEmitter is not available. Please install snowplow-tracker with celery extra dependency.")
×
85
            raise RuntimeError('CeleryEmitter is not available. To use: `pip install snowplow-tracker[celery]`')
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc