• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mindflayer / python-mocket / 5083519070

pending completion
5083519070

push

github-actions

GitHub
Update README.rst

757 of 771 relevant lines covered (98.18%)

6.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.3
/mocket/mockhttp.py
1
import re
7✔
2
import time
7✔
3
from http.server import BaseHTTPRequestHandler
7✔
4
from urllib.parse import parse_qs, unquote, urlsplit
7✔
5

6
from httptools.parser import HttpRequestParser
7✔
7

8
from .compat import ENCODING, decode_from_bytes, do_the_magic, encode_to_bytes
7✔
9
from .mocket import Mocket, MocketEntry
7✔
10

11
try:
7✔
12
    import magic
7✔
13
except ImportError:
14
    magic = None
15

16

17
STATUS = {k: v[0] for k, v in BaseHTTPRequestHandler.responses.items()}
7✔
18
CRLF = "\r\n"
7✔
19

20

21
class Protocol:
7✔
22
    def __init__(self):
7✔
23
        self.url = None
7✔
24
        self.body = None
7✔
25
        self.headers = {}
7✔
26

27
    def on_header(self, name: bytes, value: bytes):
7✔
28
        self.headers[name.decode("ascii")] = value.decode("ascii")
7✔
29

30
    def on_body(self, body: bytes):
7✔
31
        try:
7✔
32
            self.body = body.decode(ENCODING)
7✔
33
        except UnicodeDecodeError:
7✔
34
            self.body = body
7✔
35

36
    def on_url(self, url: bytes):
7✔
37
        self.url = url.decode("ascii")
7✔
38

39

40
class Request:
7✔
41
    _protocol = None
7✔
42
    _parser = None
7✔
43

44
    def __init__(self, data):
7✔
45
        self._protocol = Protocol()
7✔
46
        self._parser = HttpRequestParser(self._protocol)
7✔
47
        self.add_data(data)
7✔
48

49
    def add_data(self, data):
7✔
50
        self._parser.feed_data(data)
7✔
51

52
    @property
7✔
53
    def method(self):
4✔
54
        return self._parser.get_method().decode("ascii")
7✔
55

56
    @property
7✔
57
    def path(self):
4✔
58
        return self._protocol.url
7✔
59

60
    @property
7✔
61
    def headers(self):
4✔
62
        return self._protocol.headers
7✔
63

64
    @property
7✔
65
    def querystring(self):
4✔
66
        parts = self._protocol.url.split("?", 1)
7✔
67
        if len(parts) == 2:
7✔
68
            return parse_qs(unquote(parts[1]), keep_blank_values=True)
7✔
69
        return {}
×
70

71
    @property
7✔
72
    def body(self):
4✔
73
        return self._protocol.body
7✔
74

75
    def __str__(self):
76
        return "{} - {} - {}".format(self.method, self.path, self.headers)
77

78

79
class Response:
7✔
80
    headers = None
7✔
81
    is_file_object = False
7✔
82

83
    def __init__(self, body="", status=200, headers=None, lib_magic=magic):
7✔
84
        # needed for testing libmagic import failure
85
        self.magic = lib_magic
7✔
86

87
        headers = headers or {}
7✔
88
        try:
7✔
89
            #  File Objects
90
            self.body = body.read()
7✔
91
            self.is_file_object = True
7✔
92
        except AttributeError:
93
            self.body = encode_to_bytes(body)
94
        self.status = status
7✔
95

96
        self.set_base_headers()
7✔
97

98
        if headers is not None:
7✔
99
            self.set_extra_headers(headers)
7✔
100

101
        self.data = self.get_protocol_data() + self.body
7✔
102

103
    def get_protocol_data(self, str_format_fun_name="capitalize"):
7✔
104
        status_line = "HTTP/1.1 {status_code} {status}".format(
7✔
105
            status_code=self.status, status=STATUS[self.status]
106
        )
107
        header_lines = CRLF.join(
7✔
108
            (
109
                "{0}: {1}".format(getattr(k, str_format_fun_name)(), v)
110
                for k, v in self.headers.items()
111
            )
112
        )
113
        return "{0}\r\n{1}\r\n\r\n".format(status_line, header_lines).encode("utf-8")
7✔
114

115
    def set_base_headers(self):
7✔
116
        self.headers = {
7✔
117
            "Status": str(self.status),
118
            "Date": time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()),
119
            "Server": "Python/Mocket",
120
            "Connection": "close",
121
            "Content-Length": str(len(self.body)),
122
        }
123
        if not self.is_file_object:
7✔
124
            self.headers["Content-Type"] = "text/plain; charset=utf-8"
7✔
125
        elif self.magic:
7✔
126
            self.headers["Content-Type"] = do_the_magic(self.magic, self.body)
7✔
127

128
    def set_extra_headers(self, headers):
7✔
129
        r"""
130
        >>> r = Response(body="<html />")
131
        >>> len(r.headers.keys())
132
        6
133
        >>> r.set_extra_headers({"foo-bar": "Foobar"})
134
        >>> len(r.headers.keys())
135
        7
136
        >>> encode_to_bytes(r.headers.get("Foo-Bar")) == encode_to_bytes("Foobar")
137
        True
138
        """
139
        for k, v in headers.items():
7✔
140
            self.headers["-".join((token.capitalize() for token in k.split("-")))] = v
7✔
141

142

143
class Entry(MocketEntry):
7✔
144
    CONNECT = "CONNECT"
7✔
145
    DELETE = "DELETE"
7✔
146
    GET = "GET"
7✔
147
    HEAD = "HEAD"
7✔
148
    OPTIONS = "OPTIONS"
7✔
149
    PATCH = "PATCH"
7✔
150
    POST = "POST"
7✔
151
    PUT = "PUT"
7✔
152
    TRACE = "TRACE"
7✔
153

154
    METHODS = (CONNECT, DELETE, GET, HEAD, OPTIONS, PATCH, POST, PUT, TRACE)
7✔
155

156
    request_cls = Request
7✔
157
    response_cls = Response
7✔
158

159
    def __init__(self, uri, method, responses, match_querystring=True):
7✔
160
        uri = urlsplit(uri)
7✔
161

162
        if not uri.port:
7✔
163
            if uri.scheme == "https":
7✔
164
                port = 443
7✔
165
            else:
166
                port = 80
7✔
167

168
        super(Entry, self).__init__((uri.hostname, uri.port or port), responses)
7✔
169
        self.schema = uri.scheme
7✔
170
        self.path = uri.path
7✔
171
        self.query = uri.query
7✔
172
        self.method = method.upper()
7✔
173
        self._sent_data = b""
7✔
174
        self._match_querystring = match_querystring
7✔
175

176
    def collect(self, data):
7✔
177
        consume_response = True
7✔
178

179
        decoded_data = decode_from_bytes(data)
7✔
180
        if not decoded_data.startswith(Entry.METHODS):
7✔
181
            Mocket.remove_last_request()
7✔
182
            self._sent_data += data
7✔
183
            consume_response = False
7✔
184
        else:
185
            self._sent_data = data
7✔
186

187
        super(Entry, self).collect(self._sent_data)
7✔
188

189
        return consume_response
7✔
190

191
    def can_handle(self, data):
7✔
192
        r"""
193
        >>> e = Entry('http://www.github.com/?bar=foo&foobar', Entry.GET, (Response(b'<html/>'),))
194
        >>> e.can_handle(b'GET /?bar=foo HTTP/1.1\r\nHost: github.com\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\nUser-Agent: python-requests/2.7.0 CPython/3.4.3 Linux/3.19.0-16-generic\r\nAccept: */*\r\n\r\n')
195
        False
196
        >>> e = Entry('http://www.github.com/?bar=foo&foobar', Entry.GET, (Response(b'<html/>'),))
197
        >>> e.can_handle(b'GET /?bar=foo&foobar HTTP/1.1\r\nHost: github.com\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\nUser-Agent: python-requests/2.7.0 CPython/3.4.3 Linux/3.19.0-16-generic\r\nAccept: */*\r\n\r\n')
198
        True
199
        """
200
        try:
7✔
201
            requestline, _ = decode_from_bytes(data).split(CRLF, 1)
7✔
202
            method, path, version = self._parse_requestline(requestline)
7✔
203
        except ValueError:
7✔
204
            return self is getattr(Mocket, "_last_entry", None)
7✔
205

206
        uri = urlsplit(path)
7✔
207
        can_handle = uri.path == self.path and method == self.method
7✔
208
        if self._match_querystring:
7✔
209
            kw = dict(keep_blank_values=True)
7✔
210
            can_handle = can_handle and parse_qs(uri.query, **kw) == parse_qs(
7✔
211
                self.query, **kw
212
            )
213
        if can_handle:
7✔
214
            Mocket._last_entry = self
7✔
215
        return can_handle
7✔
216

217
    @staticmethod
7✔
218
    def _parse_requestline(line):
4✔
219
        """
220
        http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5
221

222
        >>> Entry._parse_requestline('GET / HTTP/1.0') == ('GET', '/', '1.0')
223
        True
224
        >>> Entry._parse_requestline('post /testurl htTP/1.1') == ('POST', '/testurl', '1.1')
225
        True
226
        >>> Entry._parse_requestline('Im not a RequestLine')
227
        Traceback (most recent call last):
228
            ...
229
        ValueError: Not a Request-Line
230
        """
231
        m = re.match(
7✔
232
            r"({})\s+(.*)\s+HTTP/(1.[0|1])".format("|".join(Entry.METHODS)), line, re.I
233
        )
234
        if m:
7✔
235
            return m.group(1).upper(), m.group(2), m.group(3)
7✔
236
        raise ValueError("Not a Request-Line")
7✔
237

238
    @classmethod
7✔
239
    def register(cls, method, uri, *responses, **config):
4✔
240

241
        if "body" in config or "status" in config:
7✔
242
            raise AttributeError("Did you mean `Entry.single_register(...)`?")
7✔
243

244
        default_config = dict(match_querystring=True, add_trailing_slash=True)
7✔
245
        default_config.update(config)
7✔
246
        config = default_config
7✔
247

248
        if config["add_trailing_slash"] and not urlsplit(uri).path:
7✔
249
            uri += "/"
7✔
250

251
        Mocket.register(
7✔
252
            cls(uri, method, responses, match_querystring=config["match_querystring"])
253
        )
254

255
    @classmethod
7✔
256
    def single_register(
7✔
257
        cls,
258
        method,
259
        uri,
260
        body="",
261
        status=200,
262
        headers=None,
263
        match_querystring=True,
264
        exception=None,
265
    ):
266

267
        response = (
7✔
268
            exception
269
            if exception
270
            else cls.response_cls(body=body, status=status, headers=headers)
271
        )
272

273
        cls.register(
7✔
274
            method,
275
            uri,
276
            response,
277
            match_querystring=match_querystring,
278
        )
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc