• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

DataBiosphere / azul / 23174016491

17 Mar 2026 01:25AM UTC coverage: 85.09% (-0.04%) from 85.126%
23174016491

push

github

dsotirho-ucsc
[R] Expand mypy coverage (#6821, #2743, #6779, PR #7870)

20032 of 23542 relevant lines covered (85.09%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.77
src/azul/http.py
1
import logging
1✔
2
import sys
1✔
3
import time
1✔
4
from typing import (
1✔
5
    Any,
6
    ClassVar,
7
    Self,
8
)
9

10
import certifi
1✔
11
from furl import (
1✔
12
    furl,
13
)
14
import urllib3
1✔
15
import urllib3._request_methods
1✔
16
import urllib3.connection
1✔
17
import urllib3.connectionpool
1✔
18
import urllib3.exceptions
1✔
19

20
from azul import (
1✔
21
    config,
22
)
23
from azul.lib import (
1✔
24
    R,
25
    cache,
26
    cached_property,
27
)
28
from azul.logging import (
1✔
29
    http_body_log_message,
30
)
31

32
HttpClient = urllib3._request_methods.RequestMethods
1✔
33

34

35
class HttpClientDecorator(HttpClient):
1✔
36
    """
37
    A convenience base class for implementations of the RequestMethods interface
38
    that decorate some other instance of an implementation of that interface.
39
    """
40

41
    def __init__(self,
1✔
42
                 inner: HttpClient,
43
                 headers: dict | None = None):
44
        # We'd use attrs but for some unknown reason that doesn't play well
45
        # with the superclass constructor.
46
        super().__init__(headers)
1✔
47
        self._inner = inner
1✔
48

49
    def urlopen(self, *args, **kwargs) -> urllib3.BaseHTTPResponse:
1✔
50
        return self._inner.urlopen(*args, **kwargs)
1✔
51

52
    def delegate[T: HttpClient](self, cls: type[T]) -> T | None:
1✔
53
        inner = self._inner
1✔
54
        while True:
1✔
55
            if isinstance(inner, cls):
1✔
56
                return inner
1✔
57
            elif isinstance(inner, HttpClientDecorator):
1✔
58
                inner = inner._inner
1✔
59
            else:
60
                return None
×
61

62

63
class LoggingHttpClient(HttpClientDecorator):
1✔
64
    """
65
    An HTTP client that logs every request and response to the given logger.
66
    Request and response bodies will be logged at DEBUG level, and only a prefix
67
    will be logged. Request and response headers will be logged at DEBUG level.
68
    Additionally, AZUL_DEBUG must be at least 2 for request headers to be logged
69
    at all, in order to protect any credentials contained therein.
70
    """
71

72
    def __init__(self,
1✔
73
                 inner: HttpClient,
74
                 log: logging.Logger,
75
                 *,
76
                 headers: dict | None = None):
77
        super().__init__(inner, headers)
1✔
78
        self._log = log
1✔
79

80
        # As a request is being prepared by the various layers of urllib3,
81
        # requests headers may being added, in addition to the ones supplied by
82
        # the client. To ensure that all headers are logged, we'd therefore need
83
        # to log them at the innermost layer. To get at that layer we need to
84
        # dynamically subclass the connection pool class for each of the two
85
        # schemes and make the pool manager use those subclasses when creating
86
        # new pools. The dynamic subclasses inherit a static mixin that actually
87
        # logs the headers. We need to use subclassing because we don't have any
88
        # connection pool instances yet and the only place where we can stash
89
        # the logger instance is in a class attribute. There is one subclass per
90
        # scheme and logger instance, and since there is typically one logger
91
        # instance per client module, the class duplication is manageable at two
92
        # subclasses per client module. The alternative approach would have been
93
        # to monkey patch the ``_new_pool`` factory method in the pool manager
94
        # instance but I felt the subclassing approach is more transparent. The
95
        # subclass name is prefixed with the logger name.
96
        #
97
        pool_manager = self.delegate(urllib3.PoolManager)
1✔
98
        attribute_name = 'pool_classes_by_scheme'
1✔
99
        # Use setattr to appease mypy, as the stubs don't declare the attribute.
100
        attribute_value = getattr(pool_manager, attribute_name)
1✔
101
        setattr(pool_manager, attribute_name, attribute_value | {
1✔
102
            scheme: self._pool_cls(log, scheme)
103
            for scheme in ['http', 'https']
104
        })
105

106
    @classmethod
1✔
107
    @cache
1✔
108
    def _pool_cls(cls, log: logging.Logger, scheme: str) -> type:
1✔
109
        proto = scheme.upper()
1✔
110
        return type(
1✔
111
            f'{log.name}.Logging{proto}ConnectionPool',
112
            (_LoggingConnectionPool, getattr(urllib3, f'{proto}ConnectionPool')),
113
            {'_log': log}
114
        )
115

116
    def urlopen(self, method, url, *args, body=None, **kwargs) -> urllib3.HTTPResponse:
1✔
117
        log = self._log
1✔
118
        log.info('Making %s request to %r', method, url)
1✔
119
        log.info(http_body_log_message('request', body))
1✔
120
        start = time.monotonic()
1✔
121
        response = super().urlopen(method, url, *args, body=body, **kwargs)
1✔
122
        duration = time.monotonic() - start
1✔
123
        assert isinstance(response, urllib3.HTTPResponse), type(response)
1✔
124
        log.info('Got %s response after %.3fs from %s to %s',
1✔
125
                 response.status, duration, method, url)
126
        log.info('… with response headers %r', response.headers)
1✔
127
        if response.isclosed():
1✔
128
            log.info(http_body_log_message('response', response.data))
1✔
129
        else:
130
            log.info('… with a streamed response body')
1✔
131
        return response
1✔
132

133
    def log(self, message: str, *args):
1✔
134
        self._log.info(message, *args)
1✔
135

136

137
class _LoggingConnectionPool(urllib3.connectionpool.HTTPConnectionPool):
1✔
138
    _log: ClassVar[logging.Logger]
139

140
    def _make_request(self, *args, **kwargs) -> Any:
1✔
141
        log = self._log
1✔
142
        headers = kwargs.get('headers')
1✔
143
        if headers is None or len(headers) == 0:
1✔
144
            log.info('… without request headers')
1✔
145
        else:
146
            # urllib3's HTTPHeaderDict.items() can yield multiple entries for
147
            # the same key, or a key only different in case
148
            headers = [
1✔
149
                (k, 'REDACTED' if k.lower() == 'authorization' else v)
150
                for k, v in headers.items()
151
            ]
152
            log.info('… with request headers %r', headers)
1✔
153
        # The stubs for urllib3 v1.x don't declare any protected methods
154
        return super()._make_request(*args, **kwargs)  # type: ignore[misc]
1✔
155

156

157
class DisableCrossHostRedirectClient(HttpClientDecorator):
1✔
158
    """
159
    A client that disables the "custom cross-host redirect logic" (quoting the
160
    docstring here) employed by :meth:`urllib3.PoolManager.urlopen` by default.
161
    To enable the logic, simply pass ``redirect=True`` to the urlopen() method.
162
    """
163

164
    def urlopen(self, method, url, *args, **kwargs) -> urllib3.BaseHTTPResponse:
1✔
165
        kwargs.setdefault('redirect', False)
1✔
166
        return super().urlopen(method, url, *args, **kwargs)
1✔
167

168

169
def http_client(log: logging.Logger | None = None) -> HttpClient:
1✔
170
    client = urllib3.PoolManager(ca_certs=certifi.where())
1✔
171
    client: HttpClient = DisableCrossHostRedirectClient(client)
1✔
172
    if log is not None:
1✔
173
        client = LoggingHttpClient(client, log)
1✔
174
    return StatusRetryHttpClient(client)
1✔
175

176

177
class LimitedTimeoutException(Exception):
1✔
178

179
    def __init__(self, url: furl, timeout: float):
1✔
180
        super().__init__(f'No response from {url} within {timeout} seconds')
1✔
181

182

183
class TooManyRequestsException(Exception):
1✔
184

185
    def __init__(self, url: furl):
1✔
186
        super().__init__(f'Maximum request rate exceeded for {url}')
×
187

188

189
class _LimitedRetry(urllib3.Retry):
1✔
190
    """
191
    Implementation of urllib3's retry strategy for LimitedRetryHttpClient.
192

193
    First, set up the fixtures:
194

195
    >>> from urllib3.exceptions import ReadTimeoutError
196
    >>> from urllib3.connectionpool import ConnectionPool
197
    >>> from typing import cast
198
    >>> pool = cast(ConnectionPool, None)
199
    >>> error = ReadTimeoutError(pool=pool, url='', message='')
200

201
    With zero retries …
202

203
    >>> r = _LimitedRetry.create(retries=0, timeout=5)
204

205
    … there still is one tentative retry on read:
206

207
    >>> r.connect, r.read, r.redirect, r.status, r.other
208
    (0, 1, 0, 0, 0)
209

210
    A fresh instance is not exhausted:
211

212
    >>> r.is_exhausted()
213
    False
214

215
    After a read error, that tentative retry is consumed …
216

217
    >>> r = r.increment(method='GET', error=error)
218
    >>> r.connect, r.read, r.redirect, r.status, r.other
219
    (0, 0, 0, 0, 0)
220

221
    … but since less than 10 ms have passed, the instance is not yet exhausted:
222

223
    >>> r.is_exhausted()
224
    False
225

226
    Exhaustion sets in only after a longer delay:
227

228
    >>> time.sleep(.02)
229
    >>> r.is_exhausted()
230
    True
231
    """
232
    start: float
233
    retries: int
234
    timeout: float
235

236
    @classmethod
1✔
237
    def create(cls, *, retries: int, timeout: float) -> Self:
1✔
238
        # No retries on redirects, limited retries on server failures and I/O
239
        # errors such as refused or dropped connections. The latter are actually
240
        # very likely if connections from the pool are reused after a long
241
        # period of being idle. That's why we need at least one retry on read …
242
        self = cls(total=None,
1✔
243
                   connect=retries,
244
                   read=retries + 1,
245
                   redirect=0,
246
                   raise_on_redirect=True,
247
                   status=retries,
248
                   other=retries,
249
                   status_forcelist={500, 502, 503},
250
                   raise_on_status=True)
251
        self.start = time.monotonic()
1✔
252
        self.retries = retries
1✔
253
        self.timeout = timeout
1✔
254
        return self
1✔
255

256
    def is_exhausted(self):
1✔
257
        # … but only if the first read attempt failed quickly, in under 10ms.
258
        # Otherwise, read errors that don't result from a stale pool connection
259
        # could exceed the overall timeout by as much as 100%. The point of zero
260
        # retries is to guarantee that the timeout is not exceeded.
261
        if super().is_exhausted():
1✔
262
            return True
×
263
        else:
264
            elapsed = time.monotonic() - self.start
1✔
265
            return self.retries == 0 and elapsed > .01 or elapsed >= self.timeout
1✔
266

267
    def new(self, **kwargs) -> Self:
1✔
268
        # This is a copy constructor that's used to create a new instance with
269
        # decremented retry counters. The `is_exhausted` method will be called
270
        # on the copy in order to determine if another attempt should be made.
271
        other = super().new(**kwargs)
1✔
272
        other.start = self.start
1✔
273
        other.retries = self.retries
1✔
274
        other.timeout = self.timeout
1✔
275
        return other
1✔
276

277

278
class LimitedRetryHttpClient(HttpClientDecorator):
1✔
279

280
    @property
1✔
281
    def _timing_is_restricted(self) -> bool:
1✔
282
        return config.lambda_is_handling_api_gateway_request
1✔
283

284
    @property
1✔
285
    def timeout(self) -> float:
1✔
286
        return 5 if self._timing_is_restricted else 20
1✔
287

288
    @property
1✔
289
    def retries(self) -> int:
1✔
290
        return 0 if self._timing_is_restricted else 2
1✔
291

292
    def urlopen(self, method, url, *args, **kwargs) -> urllib3.BaseHTTPResponse:
1✔
293
        timeout, retries = self.timeout, self.retries
1✔
294
        assert 'retries' not in kwargs, R("Argument 'retries' is disallowed")
1✔
295
        retry = _LimitedRetry.create(retries=retries, timeout=timeout)
1✔
296
        try:
1✔
297
            response = super().urlopen(method,
1✔
298
                                       url,
299
                                       *args,
300
                                       retries=retry,
301
                                       timeout=timeout / (1 + retries),
302
                                       **kwargs)
303
        except (urllib3.exceptions.TimeoutError, urllib3.exceptions.MaxRetryError):
1✔
304
            # Any wrapped instance of LoggingHttpClient may not have had a
305
            # chance to log anything the response, so we hope that the exception
306
            # captures enough information about the cause.
307
            logging.warning('Exception during request or response', exc_info=True)
1✔
308
            raise LimitedTimeoutException(url, timeout)
1✔
309
        else:
310
            if response.status in retry.status_forcelist:
1✔
311
                raise LimitedTimeoutException(url, timeout)
×
312
            else:
313
                return response
1✔
314

315

316
class Propagate429HttpClient(HttpClientDecorator):
1✔
317

318
    def urlopen(self, method, url, *args, **kwargs) -> urllib3.BaseHTTPResponse:
1✔
319
        response = super().urlopen(method, url, *args, **kwargs)
1✔
320
        if response.status == 429:
1✔
321
            raise TooManyRequestsException(url)
×
322
        else:
323
            return response
1✔
324

325

326
class HasCachedHttpClient:
1✔
327
    """
328
    A convenience mixin that provides a cached instance property referring to an
329
    HTTP client. The client uses a connection pool and logs all requests to the
330
    logger of the module defining the concrete subclass. The module is expected
331
    to have a variable called ``log`` referencing a ``logging.Logger`` instance.
332
    """
333

334
    @cached_property
1✔
335
    def _http_client(self) -> HttpClient:
1✔
336
        return self._create_http_client()
1✔
337

338
    def _create_http_client(self) -> HttpClient:
1✔
339
        """
340
        Subclasses can override this method to replace, wrap or modify the HTTP
341
        client instance returned by this method.
342
        """
343
        log = getattr(sys.modules[type(self).__module__], 'log')
1✔
344
        assert isinstance(log, logging.Logger), type(log)
1✔
345
        return http_client(log)
1✔
346

347

348
class StatusRetryHttpClient(HttpClientDecorator):
1✔
349
    """
350
    An HTTP client that repeats the request until 1) the response status is not
351
    one of a specified set of statuses that represent an error, and 2) the
352
    number of repeat requests, aka *retries*, exceeds a specified value.
353

354
    This class attempts to emulate urllib3's built-in retry logic to the extend
355
    that the author understood it (it is rather complex).
356

357
    This class imposes additional restrictions on the arguments to the
358
    :py:meth:`urlopen` method, and the convenience methods that call it. See
359
    :py:meth:`urlopen` for details.
360
    """
361

362
    redirect_statuses = frozenset(urllib3.HTTPResponse.REDIRECT_STATUSES)
1✔
363

364
    retry_after_statuses = frozenset(urllib3.Retry.RETRY_AFTER_STATUS_CODES)
1✔
365

366
    @property
1✔
367
    def default_retries(self) -> urllib3.Retry:
1✔
368
        # Despite the class docstring claiming that Retry instances "can be
369
        # safely reused", all their attributes are mutable, so that claim
370
        # describes a convention and is not explicitly enforced. We therefore
371
        # defensively create a new instance each time one is requested.
372
        return urllib3.Retry(total=None,
1✔
373
                             connect=2,
374
                             read=2,
375
                             redirect=0,
376
                             raise_on_redirect=False,
377
                             status=5,
378
                             raise_on_status=True,
379
                             status_forcelist={429, 500, 502, 503, 504})
380

381
    def urlopen(self,
1✔
382
                method: str,
383
                url: str,
384
                *args,
385
                retries: urllib3.Retry | None = None,
386
                **kwargs
387
                ) -> urllib3.BaseHTTPResponse:
388
        """
389
        The ``retries`` argument, if specified, must be ``None`` or an instance
390
        of ``urllib3.Retry`` that has the ``status`` attribute set to an integer
391
        value. If the ``retries.status_forcelist`` attribute is not ``None``,
392
        its value must not intersect with the set of statuses that urllib3
393
        treats as redirects (``urllib3.HTTPResponse.REDIRECT_STATUSES``).
394

395
        If ``retries`` is ``None``, the return value of :meth:`default_retries`
396
        is used instead. That value statisfies the above constraints but it is
397
        notably different from the default value for the ``retries`` argument to
398
        urllib3's ``urlopen()`` method.
399
        """
400
        if retries is None:
1✔
401
            retries = self.default_retries
1✔
402

403
        assert isinstance(retries, urllib3.Retry), R(
1✔
404
            "Argument 'retries' must be an instance of urllib3.Retry",
405
            type(retries))
406

407
        assert isinstance(retries.status, int) and retries.status >= 0, R(
1✔
408
            "Argument 'retries.status' must be an non-negative integer",
409
            retries.status)
410
        num_retries = retries.status
1✔
411

412
        statuses = frozenset(retries.status_forcelist) or self.retry_after_statuses
1✔
413
        assert bool(statuses), R(
1✔
414
            "Argument 'retries.status_forcelist' must not be empty",
415
            statuses)
416
        if statuses & self.redirect_statuses:
1✔
417
            assert not bool(retries.redirect), R(
×
418
                "Redirects must be disabled if 'retries.status_forcelist' "
419
                "contains one or more redirect status codes.",
420
                statuses, self.redirect_statuses)
421

422
        logging_client = self.delegate(LoggingHttpClient)
1✔
423
        methods = retries.allowed_methods
1✔
424
        assert methods is not None
1✔
425
        retryable = methods is False or method in methods
1✔
426
        inner_retries = retries.new(status=0,
1✔
427
                                    status_forcelist=None,
428
                                    respect_retry_after_header=False)
429
        while True:
1✔
430
            response = super().urlopen(method, url, *args, retries=inner_retries, **kwargs)
1✔
431
            if retryable and response.status in statuses:
1✔
432
                if 0 < num_retries:
1✔
433
                    num_retries -= 1
1✔
434
                    if retries.respect_retry_after_header:
1✔
435
                        try:
1✔
436
                            retry_after = int(response.headers['Retry-After'])
1✔
437
                        except KeyError:
×
438
                            pass
×
439
                        else:
440
                            if logging_client is not None:
1✔
441
                                logging_client.log('Sleeping %ds to honor Retry-After header', retry_after)
1✔
442
                            time.sleep(retry_after)
1✔
443
                else:
444
                    if retries.raise_on_status:
1✔
445
                        pool = getattr(response, '_pool')
1✔
446
                        raise urllib3.exceptions.MaxRetryError(pool, url)
1✔
447
                    else:
448
                        return response
1✔
449
            else:
450
                return response
1✔
451

452

453
def parse_header(name: str, value: str) -> tuple[str, dict[str, str]]:
1✔
454
    """
455
    Parse a MIME-related HTTP header, like ``content-type`` or
456
    ``content-disposition`` into the mandatory part of the header's value and a
457
    dictionary with an entry for each optional parameter in that value.
458

459
    >>> parse_header('content-type', 'text/html; charset=utf-8')
460
    ('text/html', {'charset': 'utf-8'})
461

462
    >>> parse_header('content-type', 'application/json; charset=utf-8; foo=bar')
463
    ('application/json', {'charset': 'utf-8', 'foo': 'bar'})
464

465
    >>> parse_header('content-type', 'text/html')
466
    ('text/html', {})
467

468
    >>> parse_header('content-disposition', 'attachment; filename="document.pdf"')
469
    ('attachment', {'filename': 'document.pdf'})
470

471
    >>> parse_header('content-disposition', 'attachment; name="foo.pdf"; name="bar.pdf"')
472
    Traceback (most recent call last):
473
    ...
474
    AssertionError: R('Duplicate parameters', [('name', 'foo.pdf'), ('name', 'bar.pdf')])
475

476
    >>> parse_header('content-disposition', '')
477
    Traceback (most recent call last):
478
    ...
479
    AssertionError: R('Empty arguments are disallowed', 'content-disposition', '')
480

481
    >>> parse_header('content-type', 'text:charset=utf-8')
482
    Traceback (most recent call last):
483
    ...
484
    AssertionError: R('Unparsable header format', 'text:charset=utf-8')
485
    """
486
    assert '' not in (name, value), R(
1✔
487
        'Empty arguments are disallowed', name, value)
488
    from email.message import (
1✔
489
        Message,
490
    )
491
    m = Message()
1✔
492
    m[name] = value
1✔
493
    params = m.get_params(header=name)
1✔
494
    assert isinstance(params, list)
1✔
495
    key, delimiter = params.pop(0)
1✔
496
    assert delimiter == '', R('Unparsable header format', value)
1✔
497
    params_dict = dict(params)
1✔
498
    assert len(params_dict) == len(params), R('Duplicate parameters', params)
1✔
499
    return key, params_dict
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc