• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

hivesolutions / flask-quorum / 1497

pending completion
1497

push

travis-ci-com

web-flow
Merge pull request #2 from hivesolutions/joamag/stack-tune

Fixes removal of `_app_ctx_stack`

4 of 4 new or added lines in 1 file covered. (100.0%)

5996 of 8451 relevant lines covered (70.95%)

5.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.85
/src/quorum/httpc.py
1
#!/usr/bin/python
2
# -*- coding: utf-8 -*-
3

4
# Hive Flask Quorum
5
# Copyright (c) 2008-2022 Hive Solutions Lda.
6
#
7
# This file is part of Hive Flask Quorum.
8
#
9
# Hive Flask Quorum is free software: you can redistribute it and/or modify
10
# it under the terms of the Apache License as published by the Apache
11
# Foundation, either version 2.0 of the License, or (at your option) any
12
# later version.
13
#
14
# Hive Flask Quorum is distributed in the hope that it will be useful,
15
# but WITHOUT ANY WARRANTY; without even the implied warranty of
16
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17
# Apache License for more details.
18
#
19
# You should have received a copy of the Apache License along with
20
# Hive Flask Quorum. If not, see <http://www.apache.org/licenses/>.
21

22
__author__ = "João Magalhães <joamag@hive.pt>"
8✔
23
""" The author(s) of the module """
24

25
__version__ = "1.0.0"
8✔
26
""" The version of the module """
27

28
__revision__ = "$LastChangedRevision$"
8✔
29
""" The revision number of the module """
30

31
__date__ = "$LastChangedDate$"
8✔
32
""" The last change date of the module """
33

34
__copyright__ = "Copyright (c) 2008-2022 Hive Solutions Lda."
8✔
35
""" The copyright for the module """
36

37
__license__ = "Apache License, Version 2.0"
8✔
38
""" The license for the module """
39

40
import os
8✔
41
import json
8✔
42
import base64
8✔
43
import random
8✔
44
import string
8✔
45
import logging
8✔
46

47
from . import util
8✔
48
from . import legacy
8✔
49
from . import typesf
8✔
50
from . import config
8✔
51
from . import common
8✔
52
from . import exceptions
8✔
53
from . import structures
8✔
54

55
TIMEOUT = 60
8✔
56
""" The timeout in seconds to be used for the blocking
57
operations in the HTTP connection """
58

59
RANGE = string.ascii_letters + string.digits
8✔
60
""" The range of characters that are going to be used in
61
the generation of the boundary value for the mime """
62

63
SEQUENCE_TYPES = (list, tuple)
8✔
64
""" The sequence defining the various types that are
65
considered to be sequence based for python """
66

67
AUTH_ERRORS = (401, 403, 440, 499)
8✔
68
""" The sequence that defines the various HTTP errors
69
considered to be authentication related and for which a
70
new authentication try will be performed """
71

72
def file_g(path, chunk = 40960):
8✔
73
    yield os.path.getsize(path)
×
74
    file = open(path, "rb")
×
75
    try:
×
76
        while True:
×
77
            data = file.read(chunk)
×
78
            if not data: break
×
79
            yield data
×
80
    finally:
81
        file.close()
×
82

83
def get_f(*args, **kwargs):
8✔
84
    name = kwargs.pop("name", "default")
8✔
85
    kwargs["handle"] = kwargs.get("handle", True)
8✔
86
    kwargs["redirect"] = kwargs.get("redirect", True)
8✔
87
    data, response = get_json(*args, **kwargs)
8✔
88
    info = response.info()
8✔
89
    mime = info.get("Content-Type", None)
8✔
90
    file_tuple = (name, mime, data)
8✔
91
    return typesf.File(file_tuple)
8✔
92

93
def get(url, auth_callback = None, **kwargs):
8✔
94
    # starts the variable holding the number of
95
    # retrieves to be used
96
    retries = 5
8✔
97

98
    while True:
8✔
99
        try:
8✔
100
            return _get(url, **kwargs)
8✔
101
        except legacy.HTTPError as error:
8✔
102
            if error.code in AUTH_ERRORS and auth_callback:
8✔
103
                _try_auth(auth_callback, kwargs)
×
104
            else:
105
                code = error.getcode()
8✔
106
                raise exceptions.HTTPDataError(error, code)
8✔
107

108
        # decrements the number of retries and checks if the
109
        # number of retries has reached the limit
110
        retries -= 1
×
111
        if retries == 0:
×
112
            raise exceptions.HTTPError("Data retrieval not possible")
×
113

114
def get_json(
8✔
115
    url,
116
    headers = None,
117
    handle = None,
118
    redirect = None,
119
    silent = None,
120
    timeout = None,
121
    auth_callback = None,
122
    **kwargs
123
):
124
    # starts the variable holding the number of
125
    # retrieves to be used
126
    retries = 5
8✔
127

128
    while True:
8✔
129
        try:
8✔
130
            return _get_json(
8✔
131
                url,
132
                headers = headers,
133
                handle = handle,
134
                silent = silent,
135
                redirect = redirect,
136
                timeout = timeout,
137
                **kwargs
138
            )
139
        except legacy.HTTPError as error:
8✔
140
            if error.code in AUTH_ERRORS and auth_callback:
×
141
                _try_auth(auth_callback, kwargs)
×
142
            else:
143
                data_r = error.read()
×
144
                data_r = legacy.str(data_r, encoding = "utf-8")
×
145
                data_s = json.loads(data_r)
×
146
                raise exceptions.JSONError(data_s)
×
147

148
        # decrements the number of retries and checks if the
149
        # number of retries has reached the limit
150
        retries -= 1
×
151
        if retries == 0:
×
152
            raise exceptions.HTTPError("Data retrieval not possible")
×
153

154
def post_json(
8✔
155
    url,
156
    data = None,
157
    data_j = None,
158
    data_m = None,
159
    headers = None,
160
    mime = None,
161
    handle = None,
162
    silent = None,
163
    redirect = None,
164
    timeout = None,
165
    auth_callback = None,
166
    **kwargs
167
):
168
    # starts the variable holding the number of
169
    # retrieves to be used
170
    retries = 5
8✔
171

172
    while True:
8✔
173
        try:
8✔
174
            return _post_json(
8✔
175
                url,
176
                data = data,
177
                data_j = data_j,
178
                data_m = data_m,
179
                headers = headers,
180
                mime = mime,
181
                handle = handle,
182
                silent = silent,
183
                redirect = redirect,
184
                timeout = timeout,
185
                **kwargs
186
            )
187
        except legacy.HTTPError as error:
×
188
            if error.code in AUTH_ERRORS and auth_callback:
×
189
                _try_auth(auth_callback, kwargs)
×
190
            else:
191
                data_r = error.read()
×
192
                data_r = legacy.str(data_r, encoding = "utf-8")
×
193
                data_s = json.loads(data_r)
×
194
                raise exceptions.JSONError(data_s)
×
195

196
        # decrements the number of retries and checks if the
197
        # number of retries has reached the limit
198
        retries -= 1
×
199
        if retries == 0:
×
200
            raise exceptions.HTTPError("Data retrieval not possible")
×
201

202
def put_json(
8✔
203
    url,
204
    data = None,
205
    data_j = None,
206
    data_m = None,
207
    headers = None,
208
    mime = None,
209
    handle = None,
210
    silent = None,
211
    redirect = None,
212
    timeout = None,
213
    auth_callback = None,
214
    **kwargs
215
):
216
    # starts the variable holding the number of
217
    # retrieves to be used
218
    retries = 5
×
219

220
    while True:
×
221
        try:
×
222
            return _put_json(
×
223
                url,
224
                data = data,
225
                data_j = data_j,
226
                data_m = data_m,
227
                headers = headers,
228
                mime = mime,
229
                handle = handle,
230
                silent = silent,
231
                redirect = redirect,
232
                timeout = timeout,
233
                **kwargs
234
            )
235
        except legacy.HTTPError as error:
×
236
            if error.code in AUTH_ERRORS and auth_callback:
×
237
                _try_auth(auth_callback, kwargs)
×
238
            else:
239
                data_r = error.read()
×
240
                data_r = legacy.str(data_r, encoding = "utf-8")
×
241
                data_s = json.loads(data_r)
×
242
                raise exceptions.JSONError(data_s)
×
243

244
        # decrements the number of retries and checks if the
245
        # number of retries has reached the limit
246
        retries -= 1
×
247
        if retries == 0:
×
248
            raise exceptions.HTTPError("Data retrieval not possible")
×
249

250
def delete_json(
8✔
251
    url,
252
    headers = None,
253
    handle = None,
254
    silent = None,
255
    redirect = None,
256
    timeout = None,
257
    auth_callback = None,
258
    **kwargs
259
):
260
    # starts the variable holding the number of
261
    # retrieves to be used
262
    retries = 5
×
263

264
    while True:
×
265
        try:
×
266
            return _delete_json(
×
267
                url,
268
                headers = headers,
269
                handle = handle,
270
                silent = silent,
271
                redirect = redirect,
272
                timeout = timeout,
273
                **kwargs
274
            )
275
        except legacy.HTTPError as error:
×
276
            if error.code in AUTH_ERRORS and auth_callback:
×
277
                _try_auth(auth_callback, kwargs)
×
278
            else:
279
                data_r = error.read()
×
280
                data_r = legacy.str(data_r, encoding = "utf-8")
×
281
                data_s = json.loads(data_r)
×
282
                raise exceptions.JSONError(data_s)
×
283

284
        # decrements the number of retries and checks if the
285
        # number of retries has reached the limit
286
        retries -= 1
×
287
        if retries == 0:
×
288
            raise exceptions.HTTPError("Data retrieval not possible")
×
289

290
def patch_json(
8✔
291
    url,
292
    headers = None,
293
    handle = None,
294
    silent = None,
295
    redirect = None,
296
    timeout = None,
297
    auth_callback = None,
298
    **kwargs
299
):
300
    # starts the variable holding the number of
301
    # retrieves to be used
302
    retries = 5
×
303

304
    while True:
×
305
        try:
×
306
            return _patch_json(
×
307
                url,
308
                headers = headers,
309
                handle = handle,
310
                silent = silent,
311
                redirect = redirect,
312
                timeout = timeout,
313
                **kwargs
314
            )
315
        except legacy.HTTPError as error:
×
316
            if error.code in AUTH_ERRORS and auth_callback:
×
317
                _try_auth(auth_callback, kwargs)
×
318
            else:
319
                data_r = error.read()
×
320
                data_r = legacy.str(data_r, encoding = "utf-8")
×
321
                data_s = json.loads(data_r)
×
322
                raise exceptions.JSONError(data_s)
×
323

324
        # decrements the number of retries and checks if the
325
        # number of retries has reached the limit
326
        retries -= 1
×
327
        if retries == 0:
×
328
            raise exceptions.HTTPError("Data retrieval not possible")
×
329

330
def basic_auth(username, password = None):
8✔
331
    if not password: password = username
8✔
332
    authorization = _authorization(username, password)
8✔
333
    return "Basic %s" % authorization
8✔
334

335
def _try_auth(auth_callback, params, headers = None):
8✔
336
    if not auth_callback: raise
×
337
    if headers == None: headers = dict()
×
338
    auth_callback(params, headers)
×
339

340
def _get(url, **kwargs):
8✔
341
    values = kwargs or dict()
8✔
342
    data = _urlencode(values)
8✔
343
    url = url + "?" + data if data else url
8✔
344
    file = _resolve(url, "GET", {}, None, False, TIMEOUT)
8✔
345
    contents = file.read()
×
346
    return contents
×
347

348
def _get_json(
8✔
349
    url,
350
    headers = None,
351
    handle = None,
352
    silent = None,
353
    redirect = None,
354
    timeout = None,
355
    **kwargs
356
):
357
    return _method_empty(
8✔
358
        "GET",
359
        url,
360
        headers = headers,
361
        handle = handle,
362
        silent = silent,
363
        redirect = redirect,
364
        timeout = timeout,
365
        **kwargs
366
    )
367

368
def _post_json(
8✔
369
    url,
370
    data = None,
371
    data_j = None,
372
    data_m = None,
373
    headers = None,
374
    mime = None,
375
    handle = None,
376
    silent = None,
377
    redirect = None,
378
    timeout = None,
379
    **kwargs
380
):
381
    return _method_payload(
8✔
382
        "POST",
383
        url,
384
        data = data,
385
        data_j = data_j,
386
        data_m = data_m,
387
        headers = headers,
388
        mime = mime,
389
        handle = handle,
390
        silent = silent,
391
        redirect = redirect,
392
        timeout = timeout,
393
        **kwargs
394
    )
395

396
def _put_json(
8✔
397
    url,
398
    data = None,
399
    data_j = None,
400
    data_m = None,
401
    headers = None,
402
    mime = None,
403
    handle = None,
404
    silent = None,
405
    redirect = None,
406
    timeout = None,
407
    **kwargs
408
):
409
    return _method_payload(
×
410
        "PUT",
411
        url,
412
        data = data,
413
        data_j = data_j,
414
        data_m = data_m,
415
        headers = headers,
416
        mime = mime,
417
        handle = handle,
418
        silent = silent,
419
        redirect = redirect,
420
        timeout = timeout,
421
        **kwargs
422
    )
423

424
def _delete_json(
8✔
425
    url,
426
    headers = None,
427
    handle = None,
428
    silent = None,
429
    redirect = None,
430
    timeout = None,
431
    **kwargs
432
):
433
    return _method_empty(
×
434
        "DELETE",
435
        url,
436
        headers = headers,
437
        handle = handle,
438
        silent = silent,
439
        redirect = redirect,
440
        timeout = timeout,
441
        **kwargs
442
    )
443

444
def _patch_json(
8✔
445
    url,
446
    headers = None,
447
    handle = None,
448
    silent = None,
449
    redirect = None,
450
    timeout = None,
451
    **kwargs
452
):
453
    return _method_empty(
×
454
        "PATCH",
455
        url,
456
        headers = headers,
457
        handle = handle,
458
        silent = silent,
459
        redirect = redirect,
460
        timeout = timeout,
461
        **kwargs
462
    )
463

464
def _method_empty(
8✔
465
    name,
466
    url,
467
    headers = None,
468
    handle = None,
469
    silent = None,
470
    redirect = None,
471
    timeout = None,
472
    **kwargs
473
):
474
    if handle == None: handle = False
8✔
475
    if silent == None: silent = False
8✔
476
    if redirect == None: redirect = False
8✔
477
    if timeout == None: timeout = TIMEOUT
8✔
478
    values = kwargs or dict()
8✔
479
    url, scheme, host, authorization, extra = _parse_url(url)
8✔
480
    if extra: values.update(extra)
8✔
481
    data = _urlencode(values)
8✔
482
    headers = dict(headers) if headers else dict()
8✔
483
    if host: headers["host"] = host
8✔
484
    if authorization: headers["Authorization"] = "Basic %s" % authorization
8✔
485
    url = url + "?" + data if data else url
8✔
486
    url = str(url)
8✔
487
    file = _resolve(url, name, headers, None, silent, timeout)
8✔
488
    try: result = file.read()
8✔
489
    finally: file.close()
8✔
490
    info = file.info()
8✔
491
    location = info.get("Location", None) if redirect else None
8✔
492
    if location: return _redirect(
8✔
493
        location,
494
        scheme,
495
        host,
496
        handle = handle,
497
        silent = silent,
498
        redirect = redirect,
499
        timeout = timeout
500
    )
501
    result = _result(result, info)
8✔
502
    return (result, file) if handle else result
8✔
503

504
def _method_payload(
8✔
505
    name,
506
    url,
507
    data = None,
508
    data_j = None,
509
    data_m = None,
510
    headers = None,
511
    mime = None,
512
    handle = None,
513
    silent = None,
514
    redirect = None,
515
    timeout = None,
516
    **kwargs
517
):
518
    if handle == None: handle = False
8✔
519
    if silent == None: silent = False
8✔
520
    if redirect == None: redirect = False
8✔
521
    if timeout == None: timeout = TIMEOUT
8✔
522
    values = kwargs or dict()
8✔
523

524
    url, scheme, host, authorization, extra = _parse_url(url)
8✔
525
    if extra: values.update(extra)
8✔
526
    data_e = _urlencode(values)
8✔
527

528
    if not data == None:
8✔
529
        url = url + "?" + data_e if data_e else url
8✔
530
    elif not data_j == None:
×
531
        data = json.dumps(data_j)
×
532
        url = url + "?" + data_e if data_e else url
×
533
        mime = mime or "application/json"
×
534
    elif not data_m == None:
×
535
        url = url + "?" + data_e if data_e else url
×
536
        content_type, data = _encode_multipart(
×
537
            data_m, mime = mime, doseq = True
538
        )
539
        mime = content_type
×
540
    elif data_e:
×
541
        data = data_e
×
542
        mime = mime or "application/x-www-form-urlencoded"
×
543

544
    if legacy.is_unicode(data): data = legacy.bytes(data, force = True)
8✔
545

546
    if not data: length = 0
8✔
547
    elif legacy.is_bytes(data): length = len(data)
8✔
548
    else: length = -1
8✔
549

550
    headers = dict(headers) if headers else dict()
8✔
551
    if not length == -1: headers["Content-Length"] = length
8✔
552
    if mime: headers["Content-Type"] = mime
8✔
553
    if host: headers["Host"] = host
8✔
554
    if authorization: headers["Authorization"] = "Basic %s" % authorization
8✔
555
    url = str(url)
8✔
556

557
    file = _resolve(url, name, headers, data, silent, timeout)
8✔
558
    try: result = file.read()
8✔
559
    finally: file.close()
8✔
560

561
    info = file.info()
8✔
562

563
    location = info.get("Location", None) if redirect else None
8✔
564
    if location: return _redirect(
8✔
565
        location,
566
        scheme,
567
        host,
568
        handle = handle,
569
        silent = silent,
570
        redirect = redirect,
571
        timeout = timeout
572
    )
573

574
    result = _result(result, info)
8✔
575
    return (result, file) if handle else result
8✔
576

577
def _redirect(
8✔
578
    location,
579
    scheme,
580
    host,
581
    handle = None,
582
    silent = None,
583
    redirect = None,
584
    timeout = None
585
):
586
    is_relative = location.startswith("/")
8✔
587
    if is_relative: location = scheme + "://" + host + location
8✔
588
    return get_json(
8✔
589
        location,
590
        handle = handle,
591
        silent = silent,
592
        redirect = redirect,
593
        timeout = timeout
594
    )
595

596
def _resolve(*args, **kwargs):
8✔
597
    # obtains the reference to the global set of variables, so
598
    # that it's possible to obtain the proper resolver method
599
    # according to the requested client
600
    _global = globals()
8✔
601

602
    # tries to retrieve the global configuration values that
603
    # will condition the way the request is going to be performed
604
    client = config.conf("HTTP_CLIENT", "netius")
8✔
605

606
    # tries to determine the set of configurations requested on
607
    # a request basis (not global) these have priority when
608
    # compared with the global configuration ones
609
    client = kwargs.pop("client", client)
8✔
610

611
    # tries to retrieve the reference to the resolve method for the
612
    # current client and then runs it, retrieve then the final result,
613
    # note that the result structure may be engine dependent
614
    resolver = _global.get("_resolve_" + client, _resolve_legacy)
8✔
615
    try: result = resolver(*args, **kwargs)
8✔
616
    except ImportError: result = _resolve_legacy(*args, **kwargs)
8✔
617
    return result
8✔
618

619
def _resolve_legacy(url, method, headers, data, silent, timeout, **kwargs):
8✔
620
    is_generator = not data == None and legacy.is_generator(data)
×
621
    if is_generator: next(data); data = b"".join(data)
×
622
    is_file = hasattr(data, "tell")
×
623
    if is_file: data = data.read()
×
624
    opener = legacy.build_opener(legacy.HTTPHandler)
×
625
    request = legacy.Request(url, data = data, headers = headers)
×
626
    request.get_method = lambda: method
×
627
    return opener.open(request, timeout = timeout)
×
628

629
def _resolve_requests(url, method, headers, data, silent, timeout, **kwargs):
8✔
630
    import requests
×
631

632
    # verifies if the provided data is a generator, assumes that if the
633
    # data is not invalid and is of type generator then it's a generator
634
    # and then if that's the case encapsulates this size based generator
635
    # into a generator based file-like object so that it can be used inside
636
    # the request infra-structure (as it accepts only file objects)
637
    is_generator = not data == None and legacy.is_generator(data)
×
638
    if is_generator: data = structures.GeneratorFile(data)
×
639

640
    method = method.lower()
×
641
    caller = getattr(requests, method)
×
642
    result = caller(url, headers = headers, data = data, timeout = timeout)
×
643
    response = HTTPResponse(
×
644
        data = result.content,
645
        code = result.status_code,
646
        headers = result.headers
647
    )
648
    code = response.getcode()
×
649
    is_error = _is_error(code)
×
650
    if is_error: raise legacy.HTTPError(
×
651
        url, code, "HTTP retrieval problem", None, response
652
    )
653
    return response
×
654

655
def _resolve_netius(url, method, headers, data, silent, timeout, **kwargs):
8✔
656
    import netius.clients
8✔
657
    silent = silent or False
8✔
658
    silent |= not common.is_devel()
8✔
659
    level = logging.CRITICAL if silent else logging.DEBUG
8✔
660
    level = kwargs.get("level", level)
8✔
661
    result = netius.clients.HTTPClient.method_s(
8✔
662
        method,
663
        url,
664
        headers = headers,
665
        data = data,
666
        asynchronous = False,
667
        timeout = timeout,
668
        level = level
669
    )
670
    response = netius.clients.HTTPClient.to_response(result)
8✔
671
    code = response.getcode()
8✔
672
    is_error = _is_error(code)
8✔
673
    if is_error: raise legacy.HTTPError(
8✔
674
        url, code, "HTTP retrieval problem", None, response
675
    )
676
    return response
8✔
677

678
def _parse_url(url):
8✔
679
    parse = legacy.urlparse(url)
8✔
680
    scheme = parse.scheme
8✔
681
    secure = scheme == "https"
8✔
682
    default = 443 if secure else 80
8✔
683
    port = parse.port or default
8✔
684
    url = parse.scheme + "://" + parse.hostname + ":" + str(port) + parse.path
8✔
685
    if port in (80, 443): host = parse.hostname
8✔
686
    else: host = parse.hostname + ":" + str(port)
×
687
    authorization = _authorization(parse.username, parse.password)
8✔
688
    params = _params(parse.query)
8✔
689
    return (url, scheme, host, authorization, params)
8✔
690

691
def _result(data, info = {}, force = False, strict = False):
8✔
692
    # tries to retrieve the content type value from the headers
693
    # info and verifies if the current data is JSON encoded, so
694
    # that it gets automatically decoded for such cases
695
    content_type = info.get("Content-Type", None) or ""
8✔
696
    is_json = util.is_content_type(
8✔
697
        content_type,
698
        (
699
            "application/json",
700
            "text/json",
701
            "text/javascript"
702
        )
703
    ) or force
704

705
    # verifies if the current result set is JSON encoded and in
706
    # case it's decodes it and loads it as JSON otherwise returns
707
    # the "raw" data to the caller method as expected, note that
708
    # the strict flag is used to determine if the exception should
709
    # be re-raised to the upper level in case of value error
710
    if is_json and legacy.is_bytes(data): data = data.decode("utf-8")
8✔
711
    try: data = json.loads(data) if is_json else data
8✔
712
    except ValueError:
×
713
        if strict: raise
×
714
    return data
8✔
715

716
def _params(query):
8✔
717
    # creates the dictionary that is going to be used to store the
718
    # complete information regarding the parameters in query
719
    params = dict()
8✔
720

721
    # validates that the provided query value is valid and if
722
    # that's not the case returns the created parameters immediately
723
    # (empty parameters are returned)
724
    if not query: return params
8✔
725

726
    # splits the query value around the initial parameter separator
727
    # symbol and iterates over each of them to parse them and create
728
    # the proper parameters dictionary (of lists)
729
    query_s = query.split("&")
8✔
730
    for part in query_s:
8✔
731
        parts = part.split("=", 1)
8✔
732
        if len(parts) == 1: value = ""
8✔
733
        else: value = parts[1]
8✔
734
        key = parts[0]
8✔
735
        key = legacy.unquote_plus(key)
8✔
736
        value = legacy.unquote_plus(value)
8✔
737
        param = params.get(key, [])
8✔
738
        param.append(value)
8✔
739
        params[key] = param
8✔
740

741
    # returns the final parameters dictionary to the caller method
742
    # so that it may be used as a proper structure representation
743
    return params
8✔
744

745
def _urlencode(values, as_string = True):
8✔
746
    # creates the list that will hold the final tuple of values
747
    # (without the unset and invalid values)
748
    final = []
8✔
749

750
    # verifies if the provided value is a sequence and in case it's
751
    # not converts it into a sequence (assuming a map)
752
    is_sequence = isinstance(values, (list, tuple))
8✔
753
    if not is_sequence: values = values.items()
8✔
754

755
    # iterates over all the items in the values sequence to
756
    # try to filter the values that are not valid
757
    for key, value in values:
8✔
758
        # creates the list that will hold the valid values
759
        # of the current key in iteration (sanitized values)
760
        _values = []
8✔
761

762
        # in case the current data type of the key is unicode
763
        # the value must be converted into a string using the
764
        # default utf encoding strategy (as defined)
765
        if type(key) == legacy.UNICODE: key = key.encode("utf-8")
8✔
766

767
        # verifies the type of the current value and in case
768
        # it's sequence based converts it into a list using
769
        # the conversion method otherwise creates a new list
770
        # and includes the value in it
771
        value_t = type(value)
8✔
772
        if value_t in SEQUENCE_TYPES: value = list(value)
8✔
773
        else: value = [value]
8✔
774

775
        # iterates over all the values in the current sequence
776
        # and adds the valid values to the sanitized sequence,
777
        # this includes the conversion from unicode string into
778
        # a simple string using the default utf encoder
779
        for _value in value:
8✔
780
            if _value == None: continue
8✔
781
            is_string = type(_value) in legacy.STRINGS
8✔
782
            if not is_string: _value = str(_value)
8✔
783
            is_unicode = type(_value) == legacy.UNICODE
8✔
784
            if is_unicode: _value = _value.encode("utf-8")
8✔
785
            _values.append(_value)
8✔
786

787
        # sets the sanitized list of values as the new value for
788
        # the key in the final dictionary of values
789
        final.append((key, _values))
8✔
790

791
    # in case the "as string" flag is not set the ended key to value
792
    # dictionary should be returned to the called method and not the
793
    # "default" linear and string based value
794
    if not as_string: return final
8✔
795

796
    # runs the encoding with sequence support on the final map
797
    # of sanitized values and returns the encoded result to the
798
    # caller method as the encoded value
799
    return legacy.urlencode(final, doseq = True)
8✔
800

801
def _quote(values, plus = False, safe = "/"):
8✔
802
    method = legacy.quote_plus if plus else legacy.quote
×
803
    values = _urlencode(values, as_string = False)
×
804

805
    final = dict()
×
806

807
    for key, value in values.items():
×
808
        key = method(key, safe = safe)
×
809
        value = method(value[0], safe = safe)
×
810
        final[key] = value
×
811

812
    return final
×
813

814
def _authorization(username, password):
8✔
815
    if not username: return None
8✔
816
    if not password: return None
8✔
817
    payload = "%s:%s" % (username, password)
8✔
818
    payload = legacy.bytes(payload)
8✔
819
    authorization = base64.b64encode(payload)
8✔
820
    authorization = legacy.str(authorization)
8✔
821
    return authorization
8✔
822

823
def _encode_multipart(fields, mime = None, doseq = False):
8✔
824
    mime = mime or "multipart/form-data"
×
825
    boundary = _create_boundary(fields, doseq = doseq)
×
826
    boundary_b = legacy.bytes(boundary)
×
827

828
    buffer = []
×
829

830
    for key, values in fields.items():
×
831
        is_list = doseq and type(values) == list
×
832
        values = values if is_list else [values]
×
833

834
        for value in values:
×
835
            if value == None: continue
×
836

837
            if isinstance(value, dict):
×
838
                header_l = []
×
839
                data = None
×
840
                for key, item in value.items():
×
841
                    if key == "data": data = item
×
842
                    else: header_l.append("%s: %s" % (key, item))
×
843
                value = data
×
844
                header = "\r\n".join(header_l)
×
845
            elif isinstance(value, tuple):
×
846
                content_type = None
×
847
                if len(value) == 2: name, contents = value
×
848
                else: name, content_type, contents = value
×
849
                header = "Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"" %\
×
850
                    (key, name)
851
                if content_type: header += "\r\nContent-Type: %s" % content_type
×
852
                value = contents
×
853
            else:
854
                header = "Content-Disposition: form-data; name=\"%s\"" % key
×
855
                value = _encode(value)
×
856

857
            header = _encode(header)
×
858
            value = _encode(value)
×
859

860
            buffer.append(b"--" + boundary_b)
×
861
            buffer.append(header)
×
862
            buffer.append(b"")
×
863
            buffer.append(value)
×
864

865
    buffer.append(b"--" + boundary_b + b"--")
×
866
    buffer.append(b"")
×
867
    body = b"\r\n".join(buffer)
×
868
    content_type = "%s; boundary=%s" % (mime, boundary)
×
869

870
    return content_type, body
×
871

872
def _create_boundary(fields, size = 32, doseq = False):
8✔
873
    while True:
×
874
        base = "".join(random.choice(RANGE) for _value in range(size))
×
875
        boundary = "----------" + base
×
876
        result = _try_boundary(fields, boundary, doseq = doseq)
×
877
        if result: break
×
878

879
    return boundary
×
880

881
def _try_boundary(fields, boundary, doseq = False):
8✔
882
    boundary_b = legacy.bytes(boundary)
×
883

884
    for key, values in fields.items():
×
885
        is_list = doseq and type(values) == list
×
886
        values = values if is_list else [values]
×
887

888
        for value in values:
×
889
            if isinstance(value, dict): name = ""; value = value.get("data", b"")
×
890
            elif isinstance(value, tuple):
×
891
                if len(value) == 2: name = value[0] or ""; value = value[1] or b""
×
892
                else: name = value[0] or ""; value = value[2] or b""
×
893
            else: name = ""; value = _encode(value)
×
894

895
            if not key.find(boundary) == -1: return False
×
896
            if not name.find(boundary) == -1: return False
×
897
            if not value.find(boundary_b) == -1: return False
×
898

899
    return True
×
900

901
def _is_error(code):
8✔
902
    return code // 100 in (4, 5) if code else True
8✔
903

904
def _encode(value, encoding = "utf-8"):
8✔
905
    value_t = type(value)
×
906
    if value_t == legacy.BYTES: return value
×
907
    elif value_t == legacy.UNICODE: return value.encode(encoding)
×
908
    return legacy.bytes(str(value))
×
909

910
class HTTPResponse(object):
8✔
911
    """
912
    Compatibility object to be used by HTTP libraries that do
913
    not support the legacy HTTP response object as a return
914
    for any of their structures.
915
    """
916

917
    def __init__(self, data = None, code = 200, status = None, headers = None):
8✔
918
        self.data = data
×
919
        self.code = code
×
920
        self.status = status
×
921
        self.headers = headers
×
922

923
    def read(self):
8✔
924
        return self.data
×
925

926
    def readline(self):
8✔
927
        return self.read()
×
928

929
    def close(self):
8✔
930
        pass
×
931

932
    def getcode(self):
8✔
933
        return self.code
×
934

935
    def info(self):
8✔
936
        return self.headers
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc