• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 16159241666

09 Jul 2025 02:59AM UTC coverage: 92.899% (-0.006%) from 92.905%
16159241666

Pull #1198

github

web-flow
Merge 96b08cb69 into d4ea6b063
Pull Request #1198: Added Dark Mode to PythonTA Web Reporter

3480 of 3746 relevant lines covered (92.9%)

17.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.44
/python_ta/cfg/visitor.py
1
import logging
20✔
2
from typing import Any, Dict, List, Optional, Tuple, Union
20✔
3

4
from astroid import extract_node, nodes
20✔
5
from astroid.exceptions import AstroidSyntaxError
20✔
6

7
from python_ta.contracts import parse_assertions
20✔
8

9
from .graph import CFGBlock, ControlFlowGraph
20✔
10

11

12
class CFGVisitor:
20✔
13
    """An astroid visitor that creates a control flow graph for a given Python module.
14

15
    Initialize with an options dict to configure the visitor.
16

17
    Supported Options:
18
      - "separate-condition-blocks": bool
19
            This option specifies whether the test condition of an if statement gets merged with any
20
            preceding statements or placed in a new block. By default, it will merge them.
21
      - "functions": list
22
            This option specifies whether to restrict the creation of cfgs to just top-level
23
            function definitions or methods provided in this list. By default, it will create the
24
            cfg for the file as it normally would.
25

26
    Private Attributes:
27
    _control_boundaries: A stack of the boundaries the visitor is currently in.
28
        The top of the stack corresponds to the end of the list.
29
        (compound statement [while], {'Break'/'Continue': CFGBlock to link to})
30
    """
31

32
    cfgs: Dict[Union[nodes.FunctionDef, nodes.Module], ControlFlowGraph]
20✔
33
    options: Dict[str, Any]
20✔
34
    # cfg_count is used as an "auto-increment" to ensure cfg ids are unique.
35
    cfg_count: int
20✔
36
    _current_cfg: Optional[ControlFlowGraph]
20✔
37
    _current_block: Optional[CFGBlock]
20✔
38
    _control_boundaries: List[Tuple[nodes.NodeNG, Dict[str, CFGBlock]]]
20✔
39
    z3_enabled: bool
20✔
40

41
    def __init__(self, options: Optional[Dict[str, Any]] = None, z3_enabled: bool = False) -> None:
20✔
42
        super().__init__()
20✔
43
        self.cfgs = {}
20✔
44
        self.options = {
20✔
45
            "separate-condition-blocks": False,
46
            "functions": [],
47
        }
48
        if options is not None:
20✔
49
            self.options.update(options)
20✔
50
        self.cfg_count = 0
20✔
51
        self._current_cfg = None
20✔
52
        self._current_block = None
20✔
53
        self._control_boundaries = []
20✔
54
        self.z3_enabled = z3_enabled
20✔
55

56
    def __getattr__(self, attr: str):
20✔
57
        if attr.startswith("visit_"):
20✔
58
            return self.visit_generic
20✔
59
        else:
60
            raise AttributeError(f"'CFGVisitor' object has no attribute '{attr}'")
×
61

62
    def visit_generic(self, node: nodes.NodeNG) -> None:
20✔
63
        """By default, add the expression to the end of the current block."""
64
        if self._current_block is not None:
20✔
65
            self._current_block.add_statement(node)
20✔
66

67
    def visit_module(self, module: nodes.Module) -> None:
20✔
68
        # Modules shouldn't be considered if user specifies functions to render
69
        functions_to_render = self.options.get("functions", [])
20✔
70
        if functions_to_render:
20✔
71
            for child in module.body:
20✔
72
                # Check any classes or function definitions for the target function/method
73
                if isinstance(child, nodes.FunctionDef) or isinstance(child, nodes.ClassDef):
20✔
74
                    child.accept(self)
20✔
75
            return
20✔
76

77
        self.cfgs[module] = ControlFlowGraph(self.cfg_count, z3_enabled=self.z3_enabled)
20✔
78
        self.cfg_count += 1
20✔
79
        self._current_cfg = self.cfgs[module]
20✔
80
        self._current_block = self._current_cfg.start
20✔
81
        module.cfg_block = self._current_cfg.start
20✔
82
        module.cfg = self._current_cfg
20✔
83

84
        for child in module.body:
20✔
85
            child.accept(self)
20✔
86

87
        self._current_cfg.link_or_merge(self._current_block, self._current_cfg.end)
20✔
88
        self._current_cfg.update_block_reachability()
20✔
89

90
    def visit_classdef(self, node: nodes.ClassDef) -> None:
20✔
91
        functions_to_render = self.options.get("functions", [])
20✔
92
        for child in node.body:
20✔
93
            if functions_to_render:
20✔
94
                if isinstance(child, nodes.FunctionDef):
20✔
95
                    child.accept(self)
20✔
96
            else:
97
                child.accept(self)
20✔
98

99
    def visit_functiondef(self, func: nodes.FunctionDef) -> None:
20✔
100
        # If user specifies to only render functions, check if the function/method name is listed
101
        functions_to_render = self.options.get("functions", [])
20✔
102
        scope_parent = func.scope().parent
20✔
103

104
        if functions_to_render:
20✔
105
            if (
20✔
106
                isinstance(scope_parent, nodes.ClassDef)
107
                and (scope_parent.name + "." + func.name) not in functions_to_render
108
            ) or (isinstance(scope_parent, nodes.Module) and func.name not in functions_to_render):
109
                return
20✔
110

111
        if self._current_block is not None:
20✔
112
            self._current_block.add_statement(func)
20✔
113

114
        previous_cfg = self._current_cfg
20✔
115
        previous_block = self._current_block
20✔
116

117
        self.cfgs[func] = ControlFlowGraph(self.cfg_count, z3_enabled=self.z3_enabled)
20✔
118
        self.cfg_count += 1
20✔
119
        self._current_cfg = self.cfgs[func]
20✔
120

121
        self._control_boundaries.append(
20✔
122
            (
123
                func,
124
                {
125
                    nodes.Return.__name__: self._current_cfg.end,
126
                    nodes.Raise.__name__: self._current_cfg.end,
127
                },
128
            )
129
        )
130

131
        # Current CFG block is self._current_cfg.start while initially creating the function cfg
132
        self._current_cfg.add_arguments(func.args)
20✔
133

134
        preconditions_node = _get_preconditions_node(func)
20✔
135

136
        self._current_block = self._current_cfg.create_block(
20✔
137
            self._current_cfg.start, edge_condition=preconditions_node
138
        )
139

140
        for child in func.body:
20✔
141
            child.accept(self)
20✔
142

143
        self._control_boundaries.pop()
20✔
144

145
        self._current_cfg.link_or_merge(self._current_block, self._current_cfg.end)
20✔
146
        self._current_cfg.update_block_reachability()
20✔
147

148
        if hasattr(func, "z3_constraints"):
20✔
149
            self._current_cfg.precondition_constraints = func.z3_constraints
10✔
150
            self._current_cfg.update_edge_z3_constraints()
10✔
151
            self._current_cfg.update_edge_feasibility()
10✔
152

153
        self._current_block = previous_block
20✔
154
        self._current_cfg = previous_cfg
20✔
155

156
    def visit_if(self, node: nodes.If) -> None:
20✔
157
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
158
        if self._current_cfg is None:
20✔
159
            return
×
160

161
        separate_conditions = self.options.get("separate-condition-blocks", False)
20✔
162
        if separate_conditions:
20✔
163
            # Create a block for the test condition
164
            test_block = self._current_cfg.create_block(self._current_block)
20✔
165
            test_block.add_statement(node.test)
20✔
166
            self._current_block = test_block
20✔
167
        else:
168
            # If the options doesn't specify to separate the test condition blocks, just add it to
169
            # the current block.
170
            self._current_block.add_statement(node.test)
20✔
171
        node.cfg_block = self._current_block
20✔
172
        old_curr = self._current_block
20✔
173

174
        # Handle "then" branch and label it.
175
        then_block = self._current_cfg.create_block(
20✔
176
            old_curr, edge_condition=node.test, edge_negate=False
177
        )
178
        self._current_block = then_block
20✔
179
        for child in node.body:
20✔
180
            child.accept(self)
20✔
181
        end_if = self._current_block
20✔
182

183
        # Handle "else" branch.
184
        if node.orelse == []:
20✔
185
            end_else = old_curr
20✔
186
        else:
187
            # Label the edge to the else block.
188
            else_block = self._current_cfg.create_block(
20✔
189
                old_curr, edge_condition=node.test, edge_negate=True
190
            )
191
            self._current_block = else_block
20✔
192
            for child in node.orelse:
20✔
193
                child.accept(self)
20✔
194
            end_else = self._current_block
20✔
195

196
        after_if_block = self._current_cfg.create_block()
20✔
197
        self._current_cfg.link_or_merge(end_if, after_if_block)
20✔
198
        # Label the edge if there was no "else" branch
199
        if node.orelse == []:
20✔
200
            self._current_cfg.link_or_merge(
20✔
201
                end_else,
202
                after_if_block,
203
                edge_condition=node.test,
204
                edge_negate=True,
205
            )
206
        else:
207
            self._current_cfg.link_or_merge(end_else, after_if_block)
20✔
208

209
        self._current_block = after_if_block
20✔
210

211
    def visit_while(self, node: nodes.While) -> None:
20✔
212
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
213
        if self._current_cfg is None:
20✔
214
            return
×
215

216
        old_curr = self._current_block
20✔
217

218
        # Handle "test" block
219
        test_block = self._current_cfg.create_block()
20✔
220
        test_block.add_statement(node.test)
20✔
221
        node.cfg_block = test_block
20✔
222
        self._current_cfg.link_or_merge(old_curr, test_block)
20✔
223

224
        after_while_block = self._current_cfg.create_block()
20✔
225

226
        # step into while
227
        self._control_boundaries.append(
20✔
228
            (
229
                node,
230
                {nodes.Break.__name__: after_while_block, nodes.Continue.__name__: test_block},
231
            )
232
        )
233

234
        # Handle "body" branch
235
        body_block = self._current_cfg.create_block(
20✔
236
            test_block, edge_condition=node.test, edge_negate=False
237
        )
238
        self._current_block = body_block
20✔
239
        for child in node.body:
20✔
240
            child.accept(self)
20✔
241
        end_body = self._current_block
20✔
242
        self._current_cfg.link_or_merge(end_body, test_block)
20✔
243

244
        # step out of while
245
        self._control_boundaries.pop()
20✔
246

247
        # Handle "else" branch
248
        else_block = self._current_cfg.create_block(
20✔
249
            test_block, edge_condition=node.test, edge_negate=True
250
        )
251
        self._current_block = else_block
20✔
252
        for child in node.orelse:
20✔
253
            child.accept(self)
20✔
254
        end_else = self._current_block
20✔
255

256
        self._current_cfg.link_or_merge(end_else, after_while_block)
20✔
257
        self._current_block = after_while_block
20✔
258

259
    def visit_for(self, node: nodes.For) -> None:
20✔
260
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
261
        if self._current_cfg is None:
20✔
262
            return
×
263

264
        old_curr = self._current_block
20✔
265
        old_curr.add_statement(node.iter)
20✔
266
        node.cfg_block = old_curr
20✔
267

268
        # Handle "test" block
269
        test_block = self._current_cfg.create_block()
20✔
270
        test_block.add_statement(node.target)
20✔
271
        self._current_cfg.link_or_merge(old_curr, test_block)
20✔
272

273
        after_for_block = self._current_cfg.create_block()
20✔
274

275
        # step into for
276
        self._control_boundaries.append(
20✔
277
            (node, {nodes.Break.__name__: after_for_block, nodes.Continue.__name__: test_block})
278
        )
279

280
        # Handle "body" branch
281
        body_block = self._current_cfg.create_block(test_block)
20✔
282
        self._current_block = body_block
20✔
283
        for child in node.body:
20✔
284
            child.accept(self)
20✔
285
        end_body = self._current_block
20✔
286
        self._current_cfg.link_or_merge(end_body, test_block)
20✔
287

288
        # step out of for
289
        self._control_boundaries.pop()
20✔
290

291
        # Handle "else" branch
292
        else_block = self._current_cfg.create_block(test_block)
20✔
293
        self._current_block = else_block
20✔
294
        for child in node.orelse:
20✔
295
            child.accept(self)
20✔
296
        end_else = self._current_block
20✔
297

298
        self._current_cfg.link_or_merge(end_else, after_for_block)
20✔
299
        self._current_block = after_for_block
20✔
300

301
    def visit_break(self, node: nodes.Break) -> None:
20✔
302
        self._visit_jump(node)
20✔
303

304
    def visit_continue(self, node: nodes.Continue) -> None:
20✔
305
        self._visit_jump(node)
20✔
306

307
    def visit_return(self, node: nodes.Return) -> None:
20✔
308
        self._visit_jump(node)
20✔
309

310
    def visit_raise(self, node: nodes.Raise) -> None:
20✔
311
        self._visit_jump(node)
20✔
312

313
    def _visit_jump(
20✔
314
        self, node: Union[nodes.Break, nodes.Continue, nodes.Return, nodes.Raise]
315
    ) -> None:
316
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
317
        if self._current_cfg is None:
20✔
318
            return
×
319

320
        old_curr = self._current_block
20✔
321
        unreachable_block = self._current_cfg.create_block()
20✔
322
        for boundary, exits in reversed(self._control_boundaries):
20✔
323
            if (
20✔
324
                isinstance(boundary, nodes.FunctionDef) or isinstance(boundary, nodes.ClassDef)
325
            ) and (isinstance(node, nodes.Break) or isinstance(node, nodes.Continue)):
326
                logging.warning(
20✔
327
                    f"'{type(node).__name__}' outside"
328
                    f' {"function" if isinstance(node, nodes.Return) else "loop"}'
329
                )
330
                self._current_cfg.link(old_curr, unreachable_block)
20✔
331
                old_curr.add_statement(node)
20✔
332
                break
20✔
333

334
            if isinstance(node, nodes.Raise):
20✔
335
                exc_name = _get_raise_exc(node)
20✔
336

337
                if exc_name in exits:
20✔
338
                    self._current_cfg.link(old_curr, exits[exc_name])
20✔
339
                    old_curr.add_statement(node)
20✔
340
                    break
20✔
341

342
            if type(node).__name__ in exits:
20✔
343
                self._current_cfg.link(old_curr, exits[type(node).__name__])
20✔
344
                old_curr.add_statement(node)
20✔
345
                break
20✔
346

347
        else:
348
            logging.warning(
20✔
349
                f"'{type(node).__name__}' outside"
350
                f' {"function" if isinstance(node, nodes.Return) else "loop"}'
351
            )
352
            self._current_cfg.link(old_curr, unreachable_block)
20✔
353
            old_curr.add_statement(node)
20✔
354

355
        self._current_block = unreachable_block
20✔
356

357
    def visit_try(self, node: nodes.Try) -> None:
20✔
358
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
359
        if self._current_cfg is None:
20✔
360
            return
×
361

362
        if self._current_block.statements != []:
20✔
363
            self._current_block = self._current_cfg.create_block(self._current_block)
20✔
364

365
        node.cfg_block = self._current_block
20✔
366

367
        # Construct the exception handlers first
368
        # Initialize a temporary block to later merge with end_body
369
        self._current_block = self._current_cfg.create_block()
20✔
370
        temp = self._current_block
20✔
371
        end_block = self._current_cfg.create_block()
20✔
372
        # Case where Raise is not handled in try
373
        self._control_boundaries.append((node, {nodes.Raise.__name__: end_block}))
20✔
374
        cbs_added = 1
20✔
375

376
        after_body = []
20✔
377
        # Construct blocks in reverse to give precedence to the first block in overlapping except
378
        # branches
379
        for handler in reversed(node.handlers):
20✔
380
            h = self._current_cfg.create_block()
20✔
381
            self._current_block = h
20✔
382
            handler.cfg_block = h
20✔
383

384
            exceptions = _extract_exceptions(handler)
20✔
385
            # Edge case: catch-all except clause (i.e. except: ...)
386
            if exceptions == []:
20✔
387
                self._control_boundaries.append((node, {nodes.Raise.__name__: h}))
20✔
388
                cbs_added += 1
20✔
389

390
            # General case: specific except clause
391
            for exception in exceptions:
20✔
392
                self._control_boundaries.append((node, {f"{nodes.Raise.__name__} {exception}": h}))
20✔
393
                cbs_added += 1
20✔
394

395
            if handler.name is not None:  # The name assigned to the caught exception.
20✔
396
                handler.name.accept(self)
20✔
397
            for child in handler.body:
20✔
398
                child.accept(self)
20✔
399
            end_handler = self._current_block
20✔
400
            self._current_cfg.link_or_merge(end_handler, end_block)
20✔
401
            after_body.append(h)
20✔
402

403
        if node.orelse == []:
20✔
404
            after_body.append(end_block)
20✔
405
        else:
406
            self._current_block = self._current_cfg.create_block()
20✔
407
            after_body.append(self._current_block)
20✔
408
            for child in node.orelse:
20✔
409
                child.accept(self)
20✔
410
            self._current_cfg.link_or_merge(self._current_block, end_block)
20✔
411

412
        # Construct the try body so reset current block to this node's block
413
        self._current_block = node.cfg_block
20✔
414

415
        for child in node.body:
20✔
416
            child.accept(self)
20✔
417
        end_body = self._current_block
20✔
418

419
        # Remove each control boundary that we added in this method
420
        for _ in range(cbs_added):
20✔
421
            self._control_boundaries.pop()
20✔
422

423
        self._current_cfg.link_or_merge(temp, end_body)
20✔
424
        self._current_cfg.multiple_link_or_merge(end_body, after_body)
20✔
425
        self._current_block = end_block
20✔
426

427
    def visit_with(self, node: nodes.With) -> None:
20✔
428
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
429
        if self._current_cfg is None:
20✔
430
            return
×
431

432
        for context_node, name in node.items:
20✔
433
            self._current_block.add_statement(context_node)
20✔
434
            if name is not None:
20✔
435
                self._current_block.add_statement(name)
20✔
436

437
        for child in node.body:
20✔
438
            child.accept(self)
20✔
439

440
    def visit_match(self, node: nodes.Match) -> None:
20✔
441
        """Visit a match statement and create appropriate control flow."""
442
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
443
        if self._current_cfg is None:
16✔
444
            return
×
445

446
        self._current_block.add_statement(node.subject)
16✔
447
        node.cfg_block = self._current_block
16✔
448
        after_match_block = self._current_cfg.create_block()
16✔
449

450
        case_end_blocks = []
16✔
451

452
        prev_case = self._current_block
16✔
453
        connect_guard_block = False
16✔
454

455
        for case in node.cases:
16✔
456
            edge_label = "No Match" if case_end_blocks else ""
16✔
457

458
            separate_conditions = self.options.get("separate-condition-blocks", False)
16✔
459

460
            pattern_block = self._current_cfg.create_block(prev_case, edge_label=edge_label)
16✔
461
            pattern_block.add_statement(case.pattern)
16✔
462

463
            if connect_guard_block:
16✔
464
                self._current_cfg.link_or_merge(guard_block, pattern_block, edge_label="False")
16✔
465
                connect_guard_block = False
16✔
466

467
            if separate_conditions and case.guard is not None:
16✔
468
                guard_block = self._current_cfg.create_block(pattern_block, edge_label="Match")
16✔
469
                guard_block.add_statement(case.guard)
16✔
470
                pattern_body = self._current_cfg.create_block(guard_block, edge_label="True")
16✔
471
                connect_guard_block = True
16✔
472
            else:
473
                if not separate_conditions and case.guard is not None:
16✔
474
                    pattern_block.add_statement(case.guard)
16✔
475
                pattern_body = self._current_cfg.create_block(pattern_block, edge_label="Match")
16✔
476

477
            self._current_block = pattern_body
16✔
478

479
            for child in case.body:
16✔
480
                child.accept(self)
16✔
481

482
            case_end_blocks.append(self._current_block)
16✔
483
            prev_case = pattern_block
16✔
484

485
        # For the final block, create a new block that links to the end of the match
486
        self._current_cfg.link_or_merge(pattern_block, after_match_block, edge_label="No Match")
16✔
487
        if connect_guard_block:
16✔
488
            self._current_cfg.link_or_merge(guard_block, after_match_block, edge_label="False")
16✔
489

490
        for end_block in case_end_blocks:
16✔
491
            self._current_cfg.link_or_merge(end_block, after_match_block)
16✔
492

493
        self._current_block = after_match_block
16✔
494

495

496
def _extract_exceptions(node: nodes.ExceptHandler) -> List[str]:
20✔
497
    """A helper method that returns a list of all the exceptions handled by this except block as a
498
    list of strings.
499
    """
500
    exceptions = node.type
20✔
501
    exceptions_so_far = []
20✔
502
    # ExceptHandler.type will either be Tuple, NodeNG, or None.
503
    if exceptions is None:
20✔
504
        return exceptions_so_far
20✔
505

506
    # Get all the Name nodes for all exceptions this except block is handling
507
    for exception in exceptions.nodes_of_class(nodes.Name):
20✔
508
        exceptions_so_far.append(exception.name)
20✔
509

510
    return exceptions_so_far
20✔
511

512

513
def _get_raise_exc(node: nodes.Raise) -> str:
20✔
514
    """A helper method that returns a string formatted for the control boundary representing the
515
    exception that this Raise node throws.
516

517
    Preconditions:
518
        - the raise statement is of the form 'raise' or 'raise <exception_class>'
519
    """
520
    exceptions = node.nodes_of_class(nodes.Name)
20✔
521

522
    # Return the formatted name of the exception or the just 'Raise' otherwise
523
    try:
20✔
524
        return f"{nodes.Raise.__name__} {next(exceptions).name}"
20✔
525
    except StopIteration:
20✔
526
        return nodes.Raise.__name__
20✔
527

528

529
def _get_preconditions_node(func: nodes.FunctionDef) -> Optional[nodes.NodeNG]:
20✔
530
    """A helper method that takes in a function definition node, retrieves its preconditions, and then parses them
531
    into a AST node representing all the valid Python preconditions combined in an and statement. Returns None if
532
    there are no valid Python preconditions."""
533
    valid_assertions = [
20✔
534
        assertion for assertion in parse_assertions(func) if _is_python_precondition(assertion)
535
    ]
536
    if not valid_assertions:
20✔
537
        return None
20✔
538
    precondition_string = " and ".join(valid_assertions)
20✔
539
    condition = extract_node(precondition_string)
20✔
540
    return condition
20✔
541

542

543
def _is_python_precondition(precondition: str) -> bool:
20✔
544
    """Given a precondition string, determine if it is a valid Python precondition that can be parsed and return
545
    a boolean result."""
546
    try:
20✔
547
        _ = extract_node(precondition)
20✔
548
        return True
20✔
549
    except (AstroidSyntaxError, ValueError):  # ValueError raised when precondition is blank
20✔
550
        return False
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc