• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cle-b / httpdbg / 12528626572

28 Dec 2024 05:55PM UTC coverage: 89.433% (+0.1%) from 89.308%
12528626572

Pull #164

github

cle-b
format
Pull Request #164: Initiator / Group by - refactoring

80 of 83 new or added lines in 7 files covered. (96.39%)

1 existing line in 1 file now uncovered.

1498 of 1675 relevant lines covered (89.43%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.29
/httpdbg/records.py
1
# -*- coding: utf-8 -*-
2
import asyncio
1✔
3
import asyncio.proactor_events
1✔
4
import datetime
1✔
5
import os
1✔
6
import socket
1✔
7
import ssl
1✔
8
import traceback
1✔
9
from urllib.parse import urlparse
1✔
10
from typing import Dict, List, Tuple, Union
1✔
11

12
from httpdbg.env import HTTPDBG_CURRENT_GROUP
1✔
13
from httpdbg.env import HTTPDBG_CURRENT_INITIATOR
1✔
14
from httpdbg.env import HTTPDBG_CURRENT_TAG
1✔
15
from httpdbg.utils import HTTPDBGCookie
1✔
16
from httpdbg.utils import HTTPDBGHeader
1✔
17
from httpdbg.initiator import in_lib
1✔
18
from httpdbg.initiator import Group
1✔
19
from httpdbg.initiator import Initiator
1✔
20
from httpdbg.preview import generate_preview
1✔
21
from httpdbg.utils import get_new_uuid
1✔
22
from httpdbg.utils import chunked_to_bytes
1✔
23
from httpdbg.utils import list_cookies_headers_request_simple_cookies
1✔
24
from httpdbg.utils import list_cookies_headers_response_simple_cookies
1✔
25
from httpdbg.log import logger
1✔
26

27

28
class SocketRawData(object):
1✔
29
    """Store the request data without encryption, even when using an SSLSocket."""
30

31
    def __init__(self, id: int, address: Tuple[str, int], ssl: bool) -> None:
1✔
32
        self.id: int = id
1✔
33
        self.address: Tuple[str, int] = address
1✔
34
        self.ssl: bool = ssl
1✔
35
        self._rawdata: bytes = bytes()
1✔
36
        self.record: Union[HTTPRecord, None] = None
1✔
37
        self.tbegin: datetime.datetime = datetime.datetime.now(datetime.timezone.utc)
1✔
38

39
    @property
1✔
40
    def rawdata(self) -> bytes:
1✔
41
        return self._rawdata
1✔
42

43
    @rawdata.setter
1✔
44
    def rawdata(self, value: bytes) -> None:
1✔
45
        logger().info(
1✔
46
            f"SocketRawData id={self.id} newdata={value[:20]!r} len={len(value)}"
47
        )
48
        self._rawdata = value
1✔
49

50
    def http_detected(self) -> Union[bool, None]:
1✔
51
        end_of_first_line = self.rawdata[:2048].find(b"\r\n")
1✔
52
        if end_of_first_line == -1:
1✔
53
            if len(self.rawdata) > 2048:
1✔
54
                return False
×
55
            else:
56
                return None
1✔
57
        firstline = self.rawdata[:end_of_first_line]
1✔
58
        if firstline.upper().endswith(b"HTTP/1.1"):
1✔
59
            return True
1✔
60
        if firstline.upper().endswith(b"HTTP/1.0"):
1✔
61
            return True
×
62
        return False
1✔
63

64
    def __repr__(self) -> str:
1✔
65
        return f"SocketRawData id={self.id} {self.address}"
1✔
66

67

68
class HTTPRecordReqResp(object):
1✔
69
    def __init__(self) -> None:
1✔
70
        self._rawdata: bytes = bytes()
1✔
71
        self._rawheaders: bytes = bytes()
1✔
72
        self._headers: List[HTTPDBGHeader] = []
1✔
73
        self.last_update: datetime.datetime = datetime.datetime.now(
1✔
74
            datetime.timezone.utc
75
        )
76

77
    def get_header(self, name: str, default: str = "") -> str:
1✔
78
        for header in self.headers:
1✔
79
            if header.name.lower() == name.lower():
1✔
80
                return header.value
1✔
81
        return default
1✔
82

83
    @property
1✔
84
    def rawheaders(self) -> bytes:
1✔
85
        if not self._rawheaders:
1✔
86
            sep = self.rawdata.find(b"\r\n\r\n")
1✔
87
            if sep > -1:
1✔
88
                self._rawheaders = self.rawdata[:sep]
1✔
89
        return self._rawheaders
1✔
90

91
    @property
1✔
92
    def headers(self) -> List[HTTPDBGHeader]:
1✔
93
        if not self._headers:
1✔
94
            if self.rawheaders:
1✔
95
                for header in self.rawheaders[self.rawheaders.find(b"\r\n") :].split(
1✔
96
                    b"\r\n"
97
                ):
98
                    sh = header.split(b":")
1✔
99
                    name = sh[0]
1✔
100
                    value = b":".join(sh[1:])
1✔
101
                    if name:
1✔
102
                        self._headers.append(
1✔
103
                            HTTPDBGHeader(name.decode().strip(), value.decode().strip())
104
                        )
105
        return self._headers
1✔
106

107
    @property
1✔
108
    def content(self) -> bytes:
1✔
109
        rawdata = bytes()
1✔
110

111
        sep = self.rawdata.find(b"\r\n\r\n")
1✔
112
        if sep > -1:
1✔
113
            rawcontent = self.rawdata[sep + 4 :]
1✔
114

115
            content_length = self.get_header("Content-Length")
1✔
116

117
            if content_length:
1✔
118
                rawdata = rawcontent[: min(len(rawcontent), int(content_length))]
1✔
119
            elif "chunked" in self.get_header("Transfer-Encoding", "").lower():
1✔
120
                rawdata = chunked_to_bytes(rawcontent)
×
121
            else:
122
                rawdata = rawcontent
1✔
123

124
        return rawdata
1✔
125

126
    @property
1✔
127
    def preview(self):
1✔
128
        return generate_preview(
1✔
129
            self.content,
130
            self.get_header("Content-Type"),
131
            self.get_header("Content-Encoding"),
132
        )
133

134
    @property
1✔
135
    def rawdata(self) -> bytes:
1✔
136
        return self._rawdata
1✔
137

138
    @rawdata.setter
1✔
139
    def rawdata(self, value: bytes):
1✔
140
        self.last_update = datetime.datetime.now(datetime.timezone.utc)
1✔
141
        self._rawdata = value
1✔
142

143

144
class HTTPRecordRequest(HTTPRecordReqResp):
1✔
145
    def __init__(self) -> None:
1✔
146
        super().__init__()
1✔
147
        self._method = bytes()
1✔
148
        self._uri = bytes()
1✔
149
        self._protocol = bytes()
1✔
150

151
    @property
1✔
152
    def cookies(self) -> List[HTTPDBGCookie]:
1✔
153
        return list_cookies_headers_request_simple_cookies(self.headers)
1✔
154

155
    def _parse_first_line(self) -> None:
1✔
156
        if self.rawheaders:
1✔
157
            firstline = self.rawheaders[: self.rawheaders.find(b"\r\n")]
1✔
158
            self._method, self._uri, self._protocol = firstline.split(b" ")
1✔
159

160
    @property
1✔
161
    def method(self) -> str:
1✔
162
        if not self._method:
1✔
163
            self._parse_first_line()
1✔
164
        return self._method.decode()
1✔
165

166
    @property
1✔
167
    def uri(self) -> str:
1✔
168
        if not self._uri:
1✔
169
            self._parse_first_line()
1✔
170
        return self._uri.decode()
1✔
171

172
    @property
1✔
173
    def protocol(self) -> str:
1✔
174
        if not self._protocol:
×
175
            self._parse_first_line()
×
176
        return self._protocol.decode()
×
177

178

179
class HTTPRecordResponse(HTTPRecordReqResp):
1✔
180
    def __init__(self):
1✔
181
        super().__init__()
1✔
182
        self._protocol = bytes()
1✔
183
        self._status_code = bytes()
1✔
184
        self._message = bytes()
1✔
185

186
    @property
1✔
187
    def cookies(self) -> List[HTTPDBGCookie]:
1✔
188
        return list_cookies_headers_response_simple_cookies(self.headers)
1✔
189

190
    def _parse_first_line(self) -> None:
1✔
191
        if self.rawheaders:
1✔
192
            firstline = self.rawheaders[: self.rawheaders.find(b"\r\n")]
1✔
193
            self._protocol, self._status_code, self._message = firstline.split(b" ", 2)
1✔
194

195
    @property
1✔
196
    def protocol(self) -> str:
1✔
197
        if not self._protocol:
×
198
            self._parse_first_line()
×
199
        return self._protocol.decode()
×
200

201
    @property
1✔
202
    def status_code(self) -> int:
1✔
203
        if not self._status_code:
1✔
204
            self._parse_first_line()
1✔
205
        return int(self._status_code.decode()) if self._status_code else 0
1✔
206

207
    @property
1✔
208
    def message(self) -> str:
1✔
209
        if not self._message:
1✔
210
            self._parse_first_line()
1✔
211
        return self._message.decode()
1✔
212

213

214
class HTTPRecord:
1✔
215
    def __init__(self, tbegin: datetime.datetime = None) -> None:
1✔
216
        self.id = get_new_uuid()
1✔
217
        self.address: Tuple[str, int] = ("", 0)
1✔
218
        self._url: Union[str, None] = None
1✔
219
        self.initiator_id: Union[str, None] = None
1✔
220
        self.exception: Union[Exception, None] = None
1✔
221
        self.request: HTTPRecordRequest = HTTPRecordRequest()
1✔
222
        self.response: HTTPRecordResponse = HTTPRecordResponse()
1✔
223
        self.ssl: Union[bool, None] = None
1✔
224
        self.tbegin: datetime.datetime = datetime.datetime.now(datetime.timezone.utc)
1✔
225
        self.tag = os.environ.get(HTTPDBG_CURRENT_TAG)
1✔
226
        self.group_id = os.environ.get(HTTPDBG_CURRENT_GROUP)
1✔
227
        if tbegin:
1✔
228
            self.tbegin = tbegin
1✔
229

230
    @property
1✔
231
    def url(self) -> str:
1✔
232
        if not self._url:
1✔
233
            address = self.request.get_header("host").split(":") or self.address
1✔
234
            host = address[0]
1✔
235
            port = address[1] if len(address) == 2 else None
1✔
236
            sport = ""
1✔
237
            if self.ssl:
1✔
238
                scheme = "https"
1✔
239
                if port:
1✔
240
                    sport = f":{port}" if port != "443" else ""
1✔
241
            else:
242
                scheme = "http"
1✔
243
                if port:
1✔
244
                    sport = f":{port}" if port != "80" else ""
1✔
245
            self._url = f"{scheme}://{host}{sport}{self.request.uri}"
1✔
246
        return self._url
1✔
247

248
    @url.setter
1✔
249
    def url(self, value: str) -> None:
1✔
250
        self._url = value
1✔
251

252
    @property
1✔
253
    def method(self) -> str:
1✔
254
        return self.request.method
1✔
255

256
    @property
1✔
257
    def status_code(self) -> int:
1✔
258
        if self.exception:
1✔
259
            return -1
1✔
260
        else:
261
            return self.response.status_code if self.response.status_code else 0
1✔
262

263
    @property
1✔
264
    def reason(self) -> str:
1✔
265
        desc = "in progress"
1✔
266
        if self.response.message:
1✔
267
            desc = self.response.message
1✔
268
        elif self.exception is not None:
1✔
269
            desc = getattr(type(self.exception), "__name__", str(type(self.exception)))
1✔
270
        return desc
1✔
271

272
    @property
1✔
273
    def netloc(self) -> str:
1✔
274
        url = urlparse(self.url)
1✔
275
        return f"{url.scheme}://{url.netloc}"
1✔
276

277
    @property
1✔
278
    def urlext(self) -> str:
1✔
279
        return self.url[len(self.netloc) :]
1✔
280

281
    @property
1✔
282
    def in_progress(self) -> bool:
1✔
283
        try:
1✔
284
            length = int(self.response.get_header("Content-Length", "0"))
1✔
285
            if length:
1✔
286
                return len(self.response.content) < length
1✔
287
        except Exception:
×
288
            pass
×
289
        return False
1✔
290

291
    @property
1✔
292
    def last_update(self) -> datetime.datetime:
1✔
293
        return max(self.request.last_update, self.response.last_update)
1✔
294

295

296
class HTTPRecords:
1✔
297
    def __init__(self) -> None:
1✔
298
        self.reset()
1✔
299

300
    def reset(self) -> None:
1✔
301
        logger().info("HTTPRecords.reset")
1✔
302
        self.id = get_new_uuid()
1✔
303
        self.requests: Dict[str, HTTPRecord] = {}
1✔
304
        self.requests_already_loaded = 0
1✔
305
        self.initiators: Dict[str, Initiator] = {}
1✔
306
        self.groups: Dict[str, Group] = {}
1✔
307
        self._sockets: Dict[int, SocketRawData] = {}
1✔
308

309
    @property
1✔
310
    def unread(self) -> int:
1✔
311
        return self.requests_already_loaded < len(self.requests)
×
312

313
    def __getitem__(self, item: int) -> HTTPRecord:
1✔
314
        return list(self.requests.values())[item]
1✔
315

316
    def __len__(self) -> int:
1✔
317
        return len(self.requests)
1✔
318

319
    def get_initiator(self) -> str:
1✔
320
        envname = f"{HTTPDBG_CURRENT_INITIATOR}_{self.id}"
1✔
321

322
        if envname in os.environ:
1✔
323
            initiator = self.initiators[os.environ[envname]]
1✔
324
        else:
325
            fullstack = traceback.format_stack()
1✔
326
            stack: List[str] = []
1✔
327
            for line in fullstack[6:]:
1✔
328
                if in_lib(line):
1✔
329
                    break
1✔
330
                stack.append(line)
1✔
331
            long_label = stack[-1]
1✔
332
            label = long_label.split("\n")[1]
1✔
333
            initiator = Initiator(get_new_uuid(), label, long_label, stack)
1✔
334

335
        if initiator.id not in self.initiators:
1✔
336
            self.initiators[initiator.id] = initiator
1✔
337

338
        return initiator.id
1✔
339

340
    def get_socket_data(
1✔
341
        self, obj, extra_sock=None, force_new=False, request=None
342
    ) -> Union[SocketRawData, None]:
343
        """Record a new SocketRawData (or get an existing one) and return it."""
344
        socketdata = None
1✔
345

346
        if force_new:
1✔
347
            self.del_socket_data(obj)
1✔
348

349
        if id(obj) in self._sockets:
1✔
350
            socketdata = self._sockets[id(obj)]
1✔
351
            if request:
1✔
352
                if (
1✔
353
                    socketdata
354
                    and socketdata.record
355
                    and socketdata.record.response.rawdata
356
                ):
357
                    # the socket is reused for a new request
358
                    self._sockets[id(obj)] = SocketRawData(
×
359
                        id(obj), socketdata.address, socketdata.ssl
360
                    )
361
                    socketdata = self._sockets[id(obj)]
×
362
        else:
363
            if isinstance(obj, socket.socket):
1✔
364
                try:
1✔
365
                    address = obj.getsockname()
1✔
366
                    self._sockets[id(obj)] = SocketRawData(
1✔
367
                        id(obj), address, isinstance(obj, ssl.SSLSocket)
368
                    )
369
                    socketdata = self._sockets[id(obj)]
1✔
370
                except OSError:
×
371
                    # OSError: [WinError 10022] An invalid argument was supplied
372
                    pass
×
373
            elif isinstance(obj, asyncio.proactor_events._ProactorSocketTransport):
1✔
374
                # only for async HTTP requests (not HTTPS) on Windows
375
                self._sockets[id(obj)] = SocketRawData(id(obj), ("", 0), False)
×
376
                socketdata = self._sockets[id(obj)]
×
377
            else:
378
                if extra_sock:
1✔
379
                    try:
1✔
380
                        address = (
1✔
381
                            extra_sock.getsockname()
382
                            if hasattr(extra_sock, "getsockname")
383
                            else ("", 0)  # wrap_bio
384
                        )
385
                        self._sockets[id(obj)] = SocketRawData(
1✔
386
                            id(obj),
387
                            address,
388
                            isinstance(obj, (ssl.SSLObject, ssl.SSLSocket)),
389
                        )
390
                        socketdata = self._sockets[id(obj)]
1✔
391
                    except OSError:
×
392
                        # OSError: [WinError 10022] An invalid argument was supplied
393
                        pass
×
394

395
        return socketdata
1✔
396

397
    def move_socket_data(self, dest, ori):
1✔
398
        if id(ori) in self._sockets:
1✔
399
            socketdata = self.get_socket_data(ori)
1✔
400
            if socketdata:
1✔
401
                self._sockets[id(dest)] = socketdata
1✔
402
                if isinstance(dest, (ssl.SSLSocket, ssl.SSLObject)):
1✔
403
                    socketdata.ssl = True
1✔
404
                self.del_socket_data(ori)
1✔
405

406
    def del_socket_data(self, obj):
1✔
407
        if id(obj) in self._sockets:
1✔
408
            logger().info(f"SocketRawData del id={id(obj)}")
1✔
409
            self._sockets[id(obj)] = None
1✔
410
            del self._sockets[id(obj)]
1✔
411

412
    def add_new_record_exception(
1✔
413
        self, initiator: Initiator, url: str, exception: Exception
414
    ) -> HTTPRecord:
415
        if initiator.id not in self.initiators:
1✔
NEW
416
            self.initiators[initiator.id] = initiator
×
417
        new_record = HTTPRecord()
1✔
418
        new_record.url = url
1✔
419
        new_record.initiator_id = initiator.id
1✔
420
        new_record.exception = exception
1✔
421
        self.requests[new_record.id] = new_record
1✔
422
        return new_record
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc