• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

collective / sphinxcontrib-httpexample / 10442703330

18 Aug 2024 06:06PM UTC coverage: 94.03% (-1.9%) from 95.94%
10442703330

push

github

datakurre
Back to development: 1.4

441 of 469 relevant lines covered (94.03%)

14.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.56
/src/sphinxcontrib/httpexample/parsers.py
1
# -*- coding: utf-8 -*-
2
from io import BytesIO
15✔
3
from sphinxcontrib.httpexample.utils import add_url_params
15✔
4
from sphinxcontrib.httpexample.utils import capitalize_keys
15✔
5
from sphinxcontrib.httpexample.utils import is_json
15✔
6
from sphinxcontrib.httpexample.utils import ordered
15✔
7

8
import base64
15✔
9
import json
15✔
10
import re
15✔
11

12

13
try:
15✔
14
    from http.server import BaseHTTPRequestHandler
15✔
15
except ImportError:
×
16
    from BaseHTTPServer import BaseHTTPRequestHandler
×
17

18

19
AVAILABLE_FIELDS = ['query']
15✔
20

21

22
class HTTPRequest(BaseHTTPRequestHandler):
15✔
23
    # http://stackoverflow.com/a/5955949
24

25
    scheme = 'http'
15✔
26

27
    # noinspection PyMissingConstructor
28
    def __init__(self, request_bytes, scheme):
15✔
29
        assert isinstance(request_bytes, bytes)
15✔
30

31
        self.scheme = scheme
15✔
32
        self.rfile = BytesIO(request_bytes)
15✔
33
        self.raw_requestline = self.rfile.readline()
15✔
34
        self.error_code = self.error_message = None
15✔
35
        self.parse_request()
15✔
36

37
        if self.error_message:
15✔
38
            raise Exception(self.error_message)
15✔
39

40
        # Replace headers with simple dict to coup differences in Py2 and Py3
41
        self.headers = capitalize_keys(dict(getattr(self, 'headers', {})))
15✔
42

43
    def send_error(self, code, message=None, explain=None):
15✔
44
        self.error_code = code
15✔
45
        self.error_message = message
15✔
46

47
    def extract_fields(self, field=None, available_fields=None):
15✔
48
        if available_fields is None:
15✔
49
            available_fields = AVAILABLE_FIELDS
15✔
50

51
        if (field is not None) and field not in available_fields:
15✔
52
            msg = "Unexpected field '{}'. Expected one of {}."
15✔
53
            msg = msg.format(field, ', '.join(available_fields))
15✔
54
            raise ValueError(msg)
15✔
55

56
        if field is None:
15✔
57
            field = '|'.join(available_fields)
15✔
58
        is_field = r':({}) (.+): (.+)'.format(field)
15✔
59

60
        fields = []
15✔
61
        remaining_request = []
15✔
62
        cursor = self.rfile.tell()
15✔
63
        for i, line in enumerate(self.rfile.readlines()):
15✔
64
            line = line.decode('utf-8')
15✔
65
            try:
15✔
66
                field, key, val = re.match(is_field, line).groups()
15✔
67
            except AttributeError:
15✔
68
                remaining_request.append(line)
15✔
69
                continue
15✔
70
            fields.append((field.strip(), key.strip(), val.strip()))
15✔
71

72
        remaining_request = BytesIO(
15✔
73
            '\n'.join(remaining_request).encode('utf-8').strip()
74
        )
75
        remaining_request.seek(0)
15✔
76
        self.rfile.seek(cursor)
15✔
77

78
        return (fields, remaining_request)
15✔
79

80
    def auth(self):
15✔
81
        try:
15✔
82
            method, token = self.headers.get('Authorization').split()
15✔
83
        except (AttributeError, KeyError, ValueError):
15✔
84
            return None, None
15✔
85
        if not isinstance(token, bytes):
15✔
86
            token = token.encode('utf-8')
15✔
87
        if method == 'Basic':
15✔
88
            return method, base64.b64decode(token).decode('utf-8')
15✔
89
        else:
90
            return method, token
15✔
91

92
    def url(self):
15✔
93
        base_url = '{}://{}{}'.format(
15✔
94
            self.scheme, self.headers.get('Host', 'nohost'), self.path
95
        )
96

97
        params, _ = self.extract_fields('query')
15✔
98
        params = [(p[1], p[2]) for p in params]
15✔
99

100
        if params:
15✔
101
            new_url = add_url_params(base_url, params)
15✔
102
        else:
103
            new_url = base_url
15✔
104

105
        return new_url
15✔
106

107
    def data(self):
15✔
108
        _, payload_bytes = self.extract_fields(None)
15✔
109
        payload_bytes = payload_bytes.read()
15✔
110
        if payload_bytes:
15✔
111
            if is_json(self.headers.get('Content-Type', '')):
15✔
112
                assert isinstance(payload_bytes, bytes)
15✔
113
                payload_str = payload_bytes.decode('utf-8')
15✔
114
                return ordered(json.loads(payload_str))
15✔
115
            else:
116
                return payload_bytes
15✔
117

118

119
def parse_request(request_bytes, scheme='http'):
15✔
120
    return HTTPRequest(request_bytes, scheme)
15✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc