• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

HowFast / apm-python / 06b7e795-a650-4df1-9f26-225db7666945

24 Dec 2024 12:58AM UTC coverage: 92.333%. Remained the same
06b7e795-a650-4df1-9f26-225db7666945

Pull #21

circleci

web-flow
Bump jinja2 from 3.1.4 to 3.1.5

Bumps [jinja2](https://github.com/pallets/jinja) from 3.1.4 to 3.1.5.
- [Release notes](https://github.com/pallets/jinja/releases)
- [Changelog](https://github.com/pallets/jinja/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/jinja/compare/3.1.4...3.1.5)

---
updated-dependencies:
- dependency-name: jinja2
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #21: Bump jinja2 from 3.1.4 to 3.1.5

53 of 61 branches covered (86.89%)

Branch coverage included in aggregate %.

224 of 239 relevant lines covered (93.72%)

0.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.28
/howfast_apm/runner.py
1
import requests
1✔
2
from logging import getLogger
1✔
3
from threading import Thread
1✔
4
from typing import List, Dict, Any
1✔
5

6
from queue import Queue, Empty
1✔
7
from .config import HOWFAST_APM_COLLECTOR_URL
1✔
8

9
logger = getLogger("howfast_apm")
1✔
10

11

12
class Runner(Thread):
1✔
13
    """ Thread dedicated to sending performance events stored in the queue to the API """
14

15
    # The DSN of the application
16
    app_id: str
1✔
17

18
    # If the queue is empty, wait up to X seconds before checking if the thread has to stop.
19
    # Whatever this value, an incoming element in the queue will be picked up as soon as it arrives.
20
    sleep_delay = 0.5
1✔
21

22
    # Group points before sending them to the API
23
    batch_size = 100
1✔
24

25
    # Local list of the points to be sent to the API
26
    current_batch: List[Dict[str, Any]]
1✔
27

28
    def __init__(self, queue: Queue, app_id: str):
1✔
29
        self.queue = queue
1✔
30
        self.app_id = app_id
1✔
31
        self.current_batch = []
1✔
32
        # TODO: stop mechanism?
33
        self.stop = False
1✔
34
        logger.debug("APM thread starting...")
1✔
35
        super(Runner, self).__init__(
1✔
36
            name="HowFast APM",
37
            # The entire Python program exits when no alive non-daemon threads are left, and we
38
            # don't want this thread to block the program from exiting.
39
            # TODO: block until the queue is empty?
40
            # TODO: don't abruptly kill this thread when the process stops, but instead set the
41
            # `stop` property
42
            daemon=True,
43
        )
44

45
    def run(self):
1✔
46
        while self.stop is False:
×
47
            self.run_once()
×
48

49
    def run_once(self):
1✔
50
        try:
1✔
51
            for _ in range(self.batch_size):
1✔
52
                # Try to get N=100 points from the queue. As soon as the queue is empty, queue.get
53
                # will block up to self.sleep_delay (0.5s). Once the queue is empty after the sleep
54
                # delay OR 10 points have been retrieved from the queue, we proceed to sending the
55
                # batch. This strategy means that the thread will wait up to N=100 * 0.5s = 50s
56
                # before sending points to the API, if 100 points arrive with slightly less than
57
                # 0.5s between them.
58
                point = self.queue.get(timeout=self.sleep_delay)
1✔
59
                self.current_batch.append(point)
1✔
60

61
        except Empty:
1!
62
            pass
1✔
63
        except Exception:
×
64
            logger.error("Runner crashed:", exc_info=True)
×
65
            return
×
66

67
        if self.current_batch:
1✔
68
            # If the queue was empty, the current batch will be empty and we don't need to send the batch
69
            self._send_batch_robust()
1✔
70

71
        # Exit now if should stop
72
        if self.stop:
1!
73
            return
×
74

75
    @staticmethod
1✔
76
    def serialize_point(point: dict) -> dict:
1✔
77
        """ Prepare the point to be sent to the API """
78
        serialized_point = {
1✔
79
            'method': point['method'],
80
            'uri': point['uri'],
81
            'time_request_started': point['time_request_started'].isoformat(),
82
            'time_elapsed': point['time_elapsed'],
83
            'interactions': point['interactions'],
84
            'response_status': point['response_status'],
85
            'endpoint_name': point['endpoint_name'],
86
        }
87
        # Save some space in the request body if we don't have interesting information
88
        if 'url_rule' in point and point['url_rule']:
1!
89
            serialized_point['url_rule'] = point['url_rule']
×
90
        if 'is_not_found' in point and point['is_not_found'] is not None:
1!
91
            serialized_point['is_not_found'] = point['is_not_found']
×
92

93
        return serialized_point
1✔
94

95
    def _send_batch_robust(self, attempts=1, max_attempts=3) -> None:
1✔
96
        """ Retry sending the batch up to max_retry times """
97
        try:
1✔
98
            self.send_batch()
1✔
99
        except Exception:
1✔
100
            # Print an error, and don't die
101
            logger.error(
1✔
102
                "Runner was unable to send performance data (try %d/%d)",
103
                attempts,
104
                max_attempts,
105
                exc_info=True,
106
            )
107
            if attempts < max_attempts:
1✔
108
                # Retry sending the batch
109
                self._send_batch_robust(
1✔
110
                    attempts=attempts + 1,
111
                    max_attempts=max_attempts,
112
                )
113
            else:
114
                # We've retried enough times, let's just drop the batch :(
115
                logger.warning(
1✔
116
                    "Unable to send the batch after %d attemps, dropping %d points",
117
                    max_attempts,
118
                    len(self.current_batch),
119
                )
120
                # Drop the points
121
                self.current_batch = []
1✔
122

123
    def send_batch(self) -> None:
1✔
124
        """ Process one performance point """
125
        logger.debug("Posting %d point(s) to the server", len(self.current_batch))
1✔
126
        response = requests.post(
1✔
127
            HOWFAST_APM_COLLECTOR_URL,
128
            json={
129
                'dsn': self.app_id,
130
                'perf': list(map(self.serialize_point, self.current_batch)),
131
            },
132
        )
133
        # Batch is now empty
134
        self.current_batch = []
1✔
135

136
        if response.status_code != 200:
1✔
137
            logger.warning(
1✔
138
                "Unable to send a point to the server (%s), data was dropped! %s",
139
                response.status_code,
140
                response.content,
141
            )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc