• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindflayer / python-mocket / 4217033595

pending completion
4217033595

push

github-actions

GitHub
Support for Python 3.11 (#181)

85 of 85 new or added lines in 4 files covered. (100.0%)

759 of 767 relevant lines covered (98.96%)

7.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.3
/mocket/mockhttp.py
1
import re
8✔
2
import time
8✔
3
from http.server import BaseHTTPRequestHandler
8✔
4
from urllib.parse import parse_qs, unquote, urlsplit
8✔
5

6
from httptools.parser import HttpRequestParser
8✔
7

8
from .compat import ENCODING, decode_from_bytes, do_the_magic, encode_to_bytes
8✔
9
from .mocket import Mocket, MocketEntry
8✔
10

11
try:
8✔
12
    import magic
8✔
13
except ImportError:
14
    magic = None
15

16

17
STATUS = {k: v[0] for k, v in BaseHTTPRequestHandler.responses.items()}
8✔
18
CRLF = "\r\n"
8✔
19

20

21
class Protocol:
8✔
22
    def __init__(self):
8✔
23
        self.url = None
8✔
24
        self.body = None
8✔
25
        self.headers = {}
8✔
26

27
    def on_header(self, name: bytes, value: bytes):
8✔
28
        self.headers[name.decode("ascii")] = value.decode("ascii")
8✔
29

30
    def on_body(self, body: bytes):
8✔
31
        try:
8✔
32
            self.body = body.decode(ENCODING)
8✔
33
        except UnicodeDecodeError:
8✔
34
            self.body = body
8✔
35

36
    def on_url(self, url: bytes):
8✔
37
        self.url = url.decode("ascii")
8✔
38

39

40
class Request:
8✔
41
    _protocol = None
8✔
42
    _parser = None
8✔
43

44
    def __init__(self, data):
8✔
45
        self._protocol = Protocol()
8✔
46
        self._parser = HttpRequestParser(self._protocol)
8✔
47
        self.add_data(data)
8✔
48

49
    def add_data(self, data):
8✔
50
        self._parser.feed_data(data)
8✔
51

52
    @property
8✔
53
    def method(self):
5✔
54
        return self._parser.get_method().decode("ascii")
8✔
55

56
    @property
8✔
57
    def path(self):
5✔
58
        return self._protocol.url
8✔
59

60
    @property
8✔
61
    def headers(self):
5✔
62
        return self._protocol.headers
8✔
63

64
    @property
8✔
65
    def querystring(self):
5✔
66
        parts = self._protocol.url.split("?", 1)
8✔
67
        if len(parts) == 2:
8✔
68
            return parse_qs(unquote(parts[1]), keep_blank_values=True)
8✔
69
        return {}
×
70

71
    @property
8✔
72
    def body(self):
5✔
73
        return self._protocol.body
8✔
74

75
    def __str__(self):
76
        return "{} - {} - {}".format(self.method, self.path, self.headers)
77

78

79
class Response:
8✔
80
    headers = None
8✔
81
    is_file_object = False
8✔
82

83
    def __init__(self, body="", status=200, headers=None, lib_magic=magic):
8✔
84
        # needed for testing libmagic import failure
85
        self.magic = lib_magic
8✔
86

87
        headers = headers or {}
8✔
88
        try:
8✔
89
            #  File Objects
90
            self.body = body.read()
8✔
91
            self.is_file_object = True
8✔
92
        except AttributeError:
93
            self.body = encode_to_bytes(body)
94
        self.status = status
8✔
95

96
        self.set_base_headers()
8✔
97

98
        if headers is not None:
8✔
99
            self.set_extra_headers(headers)
8✔
100

101
        self.data = self.get_protocol_data() + self.body
8✔
102

103
    def get_protocol_data(self, str_format_fun_name="capitalize"):
8✔
104
        status_line = "HTTP/1.1 {status_code} {status}".format(
8✔
105
            status_code=self.status, status=STATUS[self.status]
106
        )
107
        header_lines = CRLF.join(
8✔
108
            (
109
                "{0}: {1}".format(getattr(k, str_format_fun_name)(), v)
110
                for k, v in self.headers.items()
111
            )
112
        )
113
        return "{0}\r\n{1}\r\n\r\n".format(status_line, header_lines).encode("utf-8")
8✔
114

115
    def set_base_headers(self):
8✔
116
        self.headers = {
8✔
117
            "Status": str(self.status),
118
            "Date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
119
            "Server": "Python/Mocket",
120
            "Connection": "close",
121
            "Content-Length": str(len(self.body)),
122
        }
123
        if not self.is_file_object:
8✔
124
            self.headers["Content-Type"] = "text/plain; charset=utf-8"
8✔
125
        elif self.magic:
8✔
126
            self.headers["Content-Type"] = do_the_magic(self.magic, self.body)
8✔
127

128
    def set_extra_headers(self, headers):
8✔
129
        r"""
130
        >>> r = Response(body="<html />")
131
        >>> len(r.headers.keys())
132
        6
133
        >>> r.set_extra_headers({"foo-bar": "Foobar"})
134
        >>> len(r.headers.keys())
135
        7
136
        >>> encode_to_bytes(r.headers.get("Foo-Bar")) == encode_to_bytes("Foobar")
137
        True
138
        """
139
        for k, v in headers.items():
8✔
140
            self.headers["-".join((token.capitalize() for token in k.split("-")))] = v
8✔
141

142

143
class Entry(MocketEntry):
8✔
144
    CONNECT = "CONNECT"
8✔
145
    DELETE = "DELETE"
8✔
146
    GET = "GET"
8✔
147
    HEAD = "HEAD"
8✔
148
    OPTIONS = "OPTIONS"
8✔
149
    PATCH = "PATCH"
8✔
150
    POST = "POST"
8✔
151
    PUT = "PUT"
8✔
152
    TRACE = "TRACE"
8✔
153

154
    METHODS = (CONNECT, DELETE, GET, HEAD, OPTIONS, PATCH, POST, PUT, TRACE)
8✔
155

156
    request_cls = Request
8✔
157
    response_cls = Response
8✔
158

159
    def __init__(self, uri, method, responses, match_querystring=True):
8✔
160
        uri = urlsplit(uri)
8✔
161

162
        if not uri.port:
8✔
163
            if uri.scheme == "https":
8✔
164
                port = 443
8✔
165
            else:
166
                port = 80
8✔
167

168
        super(Entry, self).__init__((uri.hostname, uri.port or port), responses)
8✔
169
        self.schema = uri.scheme
8✔
170
        self.path = uri.path
8✔
171
        self.query = uri.query
8✔
172
        self.method = method.upper()
8✔
173
        self._sent_data = b""
8✔
174
        self._match_querystring = match_querystring
8✔
175

176
    def collect(self, data):
8✔
177
        consume_response = True
8✔
178

179
        decoded_data = decode_from_bytes(data)
8✔
180
        if not decoded_data.startswith(Entry.METHODS):
8✔
181
            Mocket.remove_last_request()
8✔
182
            self._sent_data += data
8✔
183
            consume_response = False
8✔
184
        else:
185
            self._sent_data = data
8✔
186

187
        super(Entry, self).collect(self._sent_data)
8✔
188

189
        return consume_response
8✔
190

191
    def can_handle(self, data):
8✔
192
        r"""
193
        >>> e = Entry('http://www.github.com/?bar=foo&foobar', Entry.GET, (Response(b'<html/>'),))
194
        >>> e.can_handle(b'GET /?bar=foo HTTP/1.1\r\nHost: github.com\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\nUser-Agent: python-requests/2.7.0 CPython/3.4.3 Linux/3.19.0-16-generic\r\nAccept: */*\r\n\r\n')
195
        False
196
        >>> e = Entry('http://www.github.com/?bar=foo&foobar', Entry.GET, (Response(b'<html/>'),))
197
        >>> e.can_handle(b'GET /?bar=foo&foobar HTTP/1.1\r\nHost: github.com\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\nUser-Agent: python-requests/2.7.0 CPython/3.4.3 Linux/3.19.0-16-generic\r\nAccept: */*\r\n\r\n')
198
        True
199
        """
200
        try:
8✔
201
            requestline, _ = decode_from_bytes(data).split(CRLF, 1)
8✔
202
            method, path, version = self._parse_requestline(requestline)
8✔
203
        except ValueError:
8✔
204
            return self is getattr(Mocket, "_last_entry", None)
8✔
205

206
        uri = urlsplit(path)
8✔
207
        can_handle = uri.path == self.path and method == self.method
8✔
208
        if self._match_querystring:
8✔
209
            kw = dict(keep_blank_values=True)
8✔
210
            can_handle = can_handle and parse_qs(uri.query, **kw) == parse_qs(
8✔
211
                self.query, **kw
212
            )
213
        if can_handle:
8✔
214
            Mocket._last_entry = self
8✔
215
        return can_handle
8✔
216

217
    @staticmethod
8✔
218
    def _parse_requestline(line):
5✔
219
        """
220
        http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5
221

222
        >>> Entry._parse_requestline('GET / HTTP/1.0') == ('GET', '/', '1.0')
223
        True
224
        >>> Entry._parse_requestline('post /testurl htTP/1.1') == ('POST', '/testurl', '1.1')
225
        True
226
        >>> Entry._parse_requestline('Im not a RequestLine')
227
        Traceback (most recent call last):
228
            ...
229
        ValueError: Not a Request-Line
230
        """
231
        m = re.match(
8✔
232
            r"({})\s+(.*)\s+HTTP/(1.[0|1])".format("|".join(Entry.METHODS)), line, re.I
233
        )
234
        if m:
8✔
235
            return m.group(1).upper(), m.group(2), m.group(3)
8✔
236
        raise ValueError("Not a Request-Line")
8✔
237

238
    @classmethod
8✔
239
    def register(cls, method, uri, *responses, **config):
5✔
240

241
        if "body" in config or "status" in config:
8✔
242
            raise AttributeError("Did you mean `Entry.single_register(...)`?")
8✔
243

244
        default_config = dict(match_querystring=True, add_trailing_slash=True)
8✔
245
        default_config.update(config)
8✔
246
        config = default_config
8✔
247

248
        if config["add_trailing_slash"] and not urlsplit(uri).path:
8✔
249
            uri += "/"
8✔
250

251
        Mocket.register(
8✔
252
            cls(uri, method, responses, match_querystring=config["match_querystring"])
253
        )
254

255
    @classmethod
8✔
256
    def single_register(
8✔
257
        cls,
258
        method,
259
        uri,
260
        body="",
261
        status=200,
262
        headers=None,
263
        match_querystring=True,
264
        exception=None,
265
    ):
266

267
        response = (
8✔
268
            exception
269
            if exception
270
            else cls.response_cls(body=body, status=status, headers=headers)
271
        )
272

273
        cls.register(
8✔
274
            method,
275
            uri,
276
            response,
277
            match_querystring=match_querystring,
278
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc