• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

cle-b / httpdbg / 12528626572

28 Dec 2024 05:55PM UTC coverage: 89.433% (+0.1%) from 89.308%
12528626572

Pull #164

github

cle-b
format
Pull Request #164: Initiator / Group by - refactoring

80 of 83 new or added lines in 7 files covered. (96.39%)

1 existing line in 1 file now uncovered.

1498 of 1675 relevant lines covered (89.43%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.59
/httpdbg/initiator.py
1
# -*- coding: utf-8 -*-
2
from contextlib import contextmanager
1✔
3
import os
1✔
4
import platform
1✔
5
import traceback
1✔
6
from typing import Generator
1✔
7
from typing import List
1✔
8
from typing import Tuple
1✔
9
from typing import Union
1✔
10
from typing import TYPE_CHECKING
1✔
11

12
if TYPE_CHECKING:
1✔
NEW
13
    from httpdbg.records import HTTPRecords
×
14

15
from httpdbg.env import HTTPDBG_CURRENT_GROUP
1✔
16
from httpdbg.env import HTTPDBG_CURRENT_INITIATOR
1✔
17
from httpdbg.env import HTTPDBG_CURRENT_TAG
1✔
18
from httpdbg.hooks.utils import getcallargs
1✔
19
from httpdbg.utils import get_new_uuid
1✔
20
from httpdbg.log import logger
1✔
21

22

23
class Initiator:
1✔
24
    def __init__(
1✔
25
        self,
26
        id: str,
27
        label: str,
28
        short_stack: str,
29
        stack: List[str],
30
    ):
31
        self.id = id
1✔
32
        self.label = label
1✔
33
        self.short_stack = short_stack
1✔
34
        self.stack = stack
1✔
35

36
    def __eq__(self, other) -> bool:
1✔
37
        if type(other) is Initiator:
1✔
38
            return (
1✔
39
                self.label == other.label
40
                and self.short_stack == other.short_stack
41
                and self.stack == other.stack
42
            )
43
        else:
44
            return False
×
45

46
    def to_json(self, full: bool = True) -> dict:
1✔
47
        if full:
1✔
48
            json = {
1✔
49
                "id": self.id,
50
                "label": self.label,
51
                "short_stack": self.short_stack,
52
                "stack": "\n".join(self.stack),
53
            }
54
        else:
UNCOV
55
            json = {
×
56
                "id": self.id,
57
                "label": self.label,
58
                "short_stack": self.short_stack,
59
            }
60
        return json
1✔
61

62

63
class Group:
1✔
64
    def __init__(self, id: str, label: str, full_label: str):
1✔
65
        self.id = id
1✔
66
        self.label = label
1✔
67
        self.full_label = full_label
1✔
68

69
    def to_json(self) -> dict:
1✔
70
        return {"id": self.id, "label": self.label, "full_label": self.full_label}
1✔
71

72

73
def compatible_path(path: str) -> str:
1✔
74
    p = path
1✔
75
    if platform.system().lower() == "windows":
1✔
76
        p = path.replace("/", "\\")
×
77
    return p
1✔
78

79

80
def in_lib(line: str, packages: List[str] = None):
1✔
81
    if not packages:
1✔
82
        packages = ["requests", "httpx", "aiohttp", "urllib3"]
1✔
83
    return any(
1✔
84
        [
85
            (compatible_path(f"/site-packages/{package}/") in line)
86
            for package in packages
87
        ]
88
    )
89

90

91
def get_current_instruction(
1✔
92
    extracted_stack: traceback.StackSummary,
93
) -> Tuple[str, str, List[str]]:
94
    instruction = ""
1✔
95
    short_stack = ""
1✔
96
    stack = []
1✔
97

98
    try:
1✔
99
        n_stack = -1
1✔
100
        framesummary = extracted_stack[-2]
1✔
101
        while "asyncio" in framesummary.filename:
1✔
102
            n_stack -= 1
1✔
103
            framesummary = extracted_stack[n_stack - 1]
1✔
104

105
        while "httpdbg/hooks" in framesummary.filename:
1✔
106
            n_stack -= 1
1✔
107
            framesummary = extracted_stack[n_stack - 1]
1✔
108

109
        short_stack = f'File "{framesummary.filename}", line {framesummary.lineno}, in {framesummary.name}\n'
1✔
110
        if os.path.exists(framesummary.filename) and framesummary.lineno is not None:
1✔
111
            instruction, s_stack = extract_short_stack_from_file(
1✔
112
                framesummary.filename, framesummary.lineno, 0, 8
113
            )
114
            short_stack += s_stack
1✔
115

116
            # stack
117
            to_include = False
1✔
118
            for i_stack in range(6, len(extracted_stack) + n_stack):
1✔
119
                last_stack = i_stack == len(extracted_stack) + n_stack - 1
1✔
120
                fs = extracted_stack[i_stack]
1✔
121
                to_include = to_include or (
1✔
122
                    ("/site-packages/" not in fs.filename)
123
                    and ("importlib" not in fs.filename)
124
                )  # remove the stack before to start the user part
125
                if to_include:
1✔
126
                    _, s_stack = extract_short_stack_from_file(
1✔
127
                        fs.filename,
128
                        fs.lineno if fs.lineno else 0,
129
                        4 if last_stack else 0,
130
                        8,
131
                        not last_stack,
132
                    )
133
                    stack.append(f'File "{fs.filename}", line {fs.lineno}, \n{s_stack}')
1✔
134

135
        else:
136
            instruction = framesummary.line if framesummary.line else "console"
1✔
137
            short_stack += f"{instruction}\n"
1✔
138
            stack = []
1✔
139
    except Exception as ex:
×
140
        logger().info(
×
141
            f"GET_CURRENT_INSTRUCTION [{str(extracted_stack)}] - error - {str(ex)}"
142
        )
143

144
    return instruction.replace("\n", " "), short_stack, stack
1✔
145

146

147
def extract_short_stack_from_file(
1✔
148
    filename: str,
149
    lineno: int,
150
    before: int,
151
    after: int,
152
    stop_if_instruction_ends: bool = True,
153
) -> Tuple[str, str]:
154
    instruction = ""
1✔
155
    short_stack = ""
1✔
156

157
    try:
1✔
158
        if os.path.exists(filename):
1✔
159
            with open(filename, "r") as src:
1✔
160
                lines = src.read().splitlines()
1✔
161

162
                # copy the lines before
163
                for i in range(
1✔
164
                    max(0, lineno - 1 - before), min(lineno - 1, len(lines))
165
                ):
166
                    line = lines[i]
1✔
167
                    short_stack += f" {i+1}. {line}\n"
1✔
168

169
                # try to recompose the instruction if on multi-lines
170
                end_of_instruction_found = False
1✔
171
                for i in range(max(0, lineno - 1), min(lineno - 1 + after, len(lines))):
1✔
172
                    line = lines[i]
1✔
173
                    if not end_of_instruction_found:
1✔
174
                        instruction += line.strip()
1✔
175
                    short_stack += f" {i+1}. {line}{' <====' if (before > 0 and i + 1 == lineno) else ''}\n"
1✔
176
                    nb_parenthesis = 0
1✔
177
                    for c in instruction[instruction.find("(") :]:
1✔
178
                        if c == "(":
1✔
179
                            nb_parenthesis += 1
1✔
180
                        if c == ")":
1✔
181
                            nb_parenthesis -= 1
1✔
182
                        if nb_parenthesis == 0:
1✔
183
                            end_of_instruction_found = True
1✔
184
                            break
1✔
185
                    if end_of_instruction_found and stop_if_instruction_ends:
1✔
186
                        break
1✔
187
    except Exception as ex:
×
188
        logger().info(
×
189
            f"EXTRACT_SHORT_STACK_FROM_FILE {filename} lineno={lineno} before={before} after={after}- error - {str(ex)}"
190
        )
191

192
    return instruction, short_stack
1✔
193

194

195
@contextmanager
1✔
196
def httpdbg_initiator(
1✔
197
    records, extracted_stack: traceback.StackSummary, original_method, *args, **kwargs
198
) -> Generator[Union[Initiator, None], None, None]:
199
    envname = f"{HTTPDBG_CURRENT_INITIATOR}_{records.id}"
1✔
200

201
    if not os.environ.get(envname):
1✔
202
        # temporary set a fake initiator env variable to avoid a recursion error
203
        #  RecursionError: maximum recursion depth exceeded while calling a Python object
204
        # TL;DR When we construct the short_stack string, a recursion error occurs if there
205
        # is an object from a class where a hooked method is called in __repr__ or __str__.
206
        os.environ[envname] = "blabla"
1✔
207

208
        callargs = getcallargs(original_method, *args, **kwargs)
1✔
209
        instruction, short_stack, stack = get_current_instruction(extracted_stack)
1✔
210

211
        short_stack += (
1✔
212
            f"----------\n{original_method.__module__}.{original_method.__name__}(\n"
213
        )
214
        for k, v in callargs.items():
1✔
215
            short_stack += f"    {k}={v}\n"
1✔
216
        short_stack += ")"
1✔
217

218
        current_initiator = Initiator(get_new_uuid(), instruction, short_stack, stack)
1✔
219
        records.initiators[current_initiator.id] = current_initiator
1✔
220

221
        os.environ[envname] = current_initiator.id
1✔
222

223
        try:
1✔
224
            with httpdbg_group(
1✔
225
                records, instruction, short_stack
226
            ):  # by default we group the requests by initiator
227
                yield current_initiator
1✔
228
        except Exception:
1✔
229
            del os.environ[envname]
1✔
230
            raise
1✔
231

232
        del os.environ[envname]
1✔
233

234
    else:
235
        yield None
1✔
236

237

238
@contextmanager
1✔
239
def httpdbg_tag(tag: str) -> Generator[None, None, None]:
1✔
240

241
    tag_already_set = HTTPDBG_CURRENT_TAG in os.environ
1✔
242

243
    if not tag_already_set:
1✔
244
        os.environ[HTTPDBG_CURRENT_TAG] = tag
1✔
245

246
    try:
1✔
247
        logger().info(f"httpdbg_tag {tag}")
1✔
248
        yield
1✔
249
    except Exception:
×
250
        if (not tag_already_set) and (HTTPDBG_CURRENT_TAG in os.environ):
×
251
            del os.environ[HTTPDBG_CURRENT_TAG]
×
252
        raise
×
253

254
    if (not tag_already_set) and (HTTPDBG_CURRENT_TAG in os.environ):
1✔
255
        del os.environ[HTTPDBG_CURRENT_TAG]
1✔
256

257

258
@contextmanager
1✔
259
def httpdbg_group(
1✔
260
    records: "HTTPRecords", label: str, full_label: str
261
) -> Generator[None, None, None]:
262

263
    group_already_set = HTTPDBG_CURRENT_GROUP in os.environ
1✔
264

265
    if not group_already_set:
1✔
266
        logger().info("httpdbg_group (new)")
1✔
267
        group_id = get_new_uuid()
1✔
268
        records.groups[group_id] = Group(group_id, label, full_label)
1✔
269
        os.environ[HTTPDBG_CURRENT_GROUP] = group_id
1✔
270

271
    try:
1✔
272
        logger().info(
1✔
273
            f"httpdbg_group {records.groups} group_id={os.environ[HTTPDBG_CURRENT_GROUP]} label={label} full_label={full_label}"
274
        )
275
        yield
1✔
276
    except Exception:
1✔
277
        if (not group_already_set) and (HTTPDBG_CURRENT_GROUP in os.environ):
1✔
278
            del os.environ[HTTPDBG_CURRENT_GROUP]
1✔
279
        raise
1✔
280

281
    if (not group_already_set) and (HTTPDBG_CURRENT_GROUP in os.environ):
1✔
282
        del os.environ[HTTPDBG_CURRENT_GROUP]
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc