• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

synchronizing / mitm / 23581258074

26 Mar 2026 06:45AM UTC coverage: 86.264% (-7.7%) from 93.96%
23581258074

Pull #41

github

web-flow
Merge 78cf0ad5c into a189234b3
Pull Request #41: Modernize project: CLI, dependency removal, restructure

401 of 457 new or added lines in 12 files covered. (87.75%)

22 existing lines in 1 file now uncovered.

628 of 728 relevant lines covered (86.26%)

4.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.95
/mitm/utils/http/parser.py
1
"""HTTP request and response parsing."""
2

3
from __future__ import annotations
5✔
4

5
import enum
5✔
6
from abc import ABC, abstractmethod
5✔
7
from typing import Any, Optional, Union
5✔
8

9
from mitm.utils.http.item import (
5✔
10
    Item,
11
    ItemDict,
12
    ItemType,
13
    MultiEntryDict,
14
    ObjectDict,
15
    OverloadedDict,
16
    UnderscoreAccessDict,
17
)
18

19

20
class state(enum.Enum):
5✔
21
    """
22
    States of the HTTP request.
23
    """
24

25
    TOP = 0
5✔
26
    HEADER = 1
5✔
27
    BODY = 2
5✔
28

29

30
class Headers(ItemDict, ObjectDict, OverloadedDict, UnderscoreAccessDict, MultiEntryDict):
5✔
31
    """
32
    Container for HTTP headers.
33
    """
34

35
    def _compile(self) -> bytes:
5✔
36
        """
37
        Compile the headers.
38
        """
39
        lines = []
5✔
40
        for k, v in self.items():
5✔
41
            if isinstance(v, list):
5✔
42
                string = b"%s: " % k.raw + b", ".join([i.raw for i in self[k]]) + b"\r\n"
5✔
43
            else:
44
                string = b"%s: %s\r\n" % (k.raw, v.raw)
5✔
45

46
            lines.append(string)
5✔
47

48
        return b"%s\r\n" % b"".join(lines)
5✔
49

50
    def __setitem__(self, key: Any, value: Any):
5✔
51
        """
52
        Deletes the previous value of the item and sets the new value.
53

54
        Args:
55
            key: The key of the item.
56
            value: The value of the item.
57
        """
58
        if key in self:
5✔
59
            del self[key]
5✔
60

61
        super().__setitem__(key, value)
5✔
62

63
    def __defaultsetitem__(self, key: Any, value: Any):
5✔
64
        """
65
        Sets the value of the item without deleting the previous value.
66

67
        Args:
68
            key: The key of the item.
69
            value: The value of the item.
70
        """
71
        super().__setitem__(key, value)
5✔
72

73
    @property
5✔
74
    def raw(self) -> bytes:
5✔
75
        """
76
        The raw headers.
77
        """
78
        return self._compile()
5✔
79

80

81
InpType = Optional[ItemType]
5✔
82
HeadersType = Union[Headers, dict]
5✔
83

84

85
class Message(ABC):
5✔
86
    __slots__ = ("body", "buffer", "headers", "protocol")
5✔
87

88
    def __init__(
5✔
89
        self,
90
        protocol: InpType = None,
91
        headers: HeadersType = {},
92
        body: InpType = None,
93
    ):
94
        """
95
        Initializes an HTTP message.
96

97
        Args:
98
            protocol: The protocol of the HTTP message.
99
            headers: The headers of the HTTP message.
100
            body: The body of the HTTP message.
101

102
        Note:
103
            :py:class:`Message` is the base class for :py:class:`Request` and
104
            :py:class:`Response`, and is not intended to be used directly.
105
        """
106
        self.protocol = protocol
5✔
107
        self.headers = headers
5✔
108
        self.body = body
5✔
109
        self.buffer = b""
5✔
110

111
    def __setattr__(self, name: str, value: Any):
5✔
112
        """
113
        Sets the value of the attribute. Defaults to ``toolbox.collections.Item``.
114

115
        Args:
116
            name: The name of the attribute.
117
            value: The value of the attribute.
118
        """
119
        if name == "headers":
5✔
120
            super().__setattr__(name, Headers(value))
5✔
121
        elif name == "buffer":
5✔
122
            super().__setattr__(name, value)
5✔
123
        else:
124
            super().__setattr__(name, Item(value))
5✔
125

126
    def feed(self, msg: bytes) -> state:
5✔
127
        """
128
        Adds chuncks of the message to the internal buffer.
129

130
        Args:
131
            msg: The message to add to the internal buffer.
132
        """
133

134
        # Checks the msg type:
135
        if not isinstance(msg, bytes):
5✔
136
            raise TypeError("Message must be bytes.")
5✔
137

138
        self.buffer += msg
5✔
139
        return self.state
5✔
140

141
    @property
5✔
142
    def state(self) -> state:
5✔
143
        if self.buffer.count(b"\r\n") > 0 and b"\r\n\r\n" not in self.buffer:
5✔
144
            return state.HEADER
5✔
145
        elif self.buffer.count(b"\r\n") == 0:
5✔
146
            return state.TOP
5✔
147

148
        current = state.TOP
5✔
149
        _, body = self.buffer.split(b"\r\n\r\n", 1)
5✔
150

151
        # Split the message into lines.
152
        for line in self.buffer.split(b"\r\n"):
5✔
153
            # Parses the first line of the HTTP/1.1 msg.
154
            if current == state.TOP:
5✔
155
                self._parse_top(line)
5✔
156
                current = state.HEADER
5✔
157

158
            # Parse the headers of the HTTP/1.1 msg.
159
            elif current == state.HEADER:
5✔
160
                if b":" in line:
5✔
161
                    key, value = line.split(b":", 1)
5✔
162

163
                    if b"," in value:
5✔
164
                        value = value.split(b",")
5✔
165
                    else:
166
                        value = [value]
5✔
167

168
                    for v in value:
5✔
169
                        if key not in self.headers or v.strip() not in self.headers[key]:
5✔
170
                            self.headers.__defaultsetitem__(key, v.strip())
5✔
171
                else:
172
                    current = state.BODY
5✔
173

174
        if current == state.BODY:
5✔
175
            self.body = body
5✔
176

177
        return current
5✔
178

179
    @abstractmethod
180
    def _parse_top(self, line: bytes):  # pragma: no cover
181
        """
182
        Parses the first line of the HTTP message.
183
        """
184
        raise NotImplementedError
185

186
    @classmethod
5✔
187
    def parse(cls, msg: bytes) -> "Message":
5✔
188
        """
189
        Parses a complete HTTP message.
190

191
        Args:
192
            msg: The message to parse.
193
        """
194
        obj = cls()
5✔
195
        obj.feed(msg)
5✔
196
        return obj
5✔
197

198
    @abstractmethod
199
    def _compile_top(self) -> bytes:  # pragma: no cover
200
        """
201
        Compiles the first line of the HTTP message.
202
        """
203
        raise NotImplementedError
204

205
    def _compile(self) -> bytes:
5✔
206
        """
207
        Compiles a complete HTTP message.
208
        """
209
        return b"%s%s%s" % (self._compile_top(), self.headers.raw, self.body.raw)
5✔
210

211
    @property
5✔
212
    def raw(self) -> bytes:
5✔
213
        """
214
        Returns the raw (bytes) HTTP message.
215
        """
216
        return self._compile()
5✔
217

218
    def __eq__(self, other: Message) -> bool:
5✔
219
        """
220
        Compares two HTTP messages.
221
        """
222
        return self.raw == other.raw
5✔
223

224
    def __str__(self) -> str:
5✔
225
        """
226
        Pretty-print of the HTTP message.
227
        """
228

229
        if self.__class__ == Request:
5✔
230
            arrow = "→ "
5✔
231
        elif self.__class__ == Response:
5✔
232
            arrow = "← "
5✔
233
        else:  # pragma: no cover
234
            arrow = "? "
235

236
        return arrow + arrow.join(self._compile().decode("utf-8").rstrip("\r\n").splitlines(True))
5✔
237

238

239
class Request(Message):
5✔
240
    __slots__ = Message.__slots__ + ("method", "target")
5✔
241

242
    def __init__(
5✔
243
        self,
244
        method: InpType = None,
245
        target: InpType = None,
246
        protocol: InpType = None,
247
        headers: HeadersType = {},
248
        body: InpType = None,
249
    ):
250
        """
251
        Initializes an HTTP request.
252

253
        Args:
254
            method: The method of the HTTP request.
255
            target: The target of the HTTP request.
256
            protocol: The protocol of the HTTP request.
257
            headers: The headers of the HTTP request.
258
            body: The body of the HTTP request.
259
        """
260
        super().__init__(protocol, headers, body)
5✔
261
        self.method = method
5✔
262
        self.target = target
5✔
263

264
        objs = [self.method, self.target, self.protocol]
5✔
265
        if all(obj == None for obj in objs):
5✔
266
            self.buffer = b""
5✔
267
        elif all(obj for obj in objs):
5✔
268
            self.buffer = b"%s %s %s\r\n" % (
5✔
269
                self.method.raw,
270
                self.target.raw,
271
                self.protocol.raw,
272
            )
273
        else:
274
            raise ValueError("Request must have method, target, and protocol.")
5✔
275

276
        if self.headers:
5✔
NEW
277
            self.buffer += self.headers.raw + b"\r\n\r\n"
×
278

279
        if self.body:
5✔
NEW
280
            self.buffer += self.body.raw
×
281

282
    def _parse_top(self, line: bytes):
5✔
283
        """
284
        Parses the first line of the HTTP request.
285
        """
286
        self.method, self.target, self.protocol = line.split(b" ")
5✔
287

288
    def _compile_top(self):
5✔
289
        """
290
        Compiles the first line of the HTTP request.
291
        """
292
        return b"%s %s %s\r\n" % (self.method.raw, self.target.raw, self.protocol.raw)
5✔
293

294

295
class Response(Message):
5✔
296
    __slots__ = Message.__slots__ + ("status", "reason")
5✔
297

298
    def __init__(
5✔
299
        self,
300
        protocol: InpType = None,
301
        status: InpType = None,
302
        reason: InpType = None,
303
        headers: HeadersType = {},
304
        body: InpType = None,
305
    ):
306
        """
307
        Initializes an HTTP response.
308

309
        Args:
310
            protocol: The protocol of the HTTP response.
311
            status: The status of the HTTP response.
312
            reason: The reason of the HTTP response.
313
            headers: The headers of the HTTP response.
314
            body: The body of the HTTP response.
315
        """
316
        super().__init__(protocol, headers, body)
5✔
317
        self.status = status
5✔
318
        self.reason = reason
5✔
319

320
        objs = [self.protocol, self.status, self.reason]
5✔
321
        if all(obj == None for obj in objs):
5✔
322
            self.buffer = b""
5✔
323
        elif all(obj for obj in objs):
5✔
324
            self.buffer = b"%s %s %s\r\n" % (
5✔
325
                self.protocol.raw,
326
                self.status.raw,
327
                self.reason.raw,
328
            )
329
        else:
330
            raise ValueError("Response must have protocol, status, and reason.")
5✔
331

332
        if self.headers:
5✔
NEW
333
            self.buffer += self.headers.raw + b"\r\n\r\n"
×
334

335
        if self.body:
5✔
NEW
336
            self.buffer += self.body.raw
×
337

338
    def _parse_top(self, line: bytes):
5✔
339
        """
340
        Parses the first line of the HTTP response.
341
        """
342
        self.protocol, self.status, self.reason = line.split(b" ")
5✔
343

344
    def _compile_top(self) -> bytes:
5✔
345
        """
346
        Parses the first line of the HTTP response.
347
        """
348
        return b"%s %s %s\r\n" % (self.protocol.raw, self.status.raw, self.reason.raw)
5✔
349

350

351
__all__ = ["Request", "Response", "Headers", "state"]
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc