• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 15791974148

21 Jun 2025 04:11AM UTC coverage: 92.849% (+0.03%) from 92.821%
15791974148

Pull #1190

github

web-flow
Merge b652ed720 into 02ddeab02
Pull Request #1190: Simplify Combined Z3 Preconditions in z3_visitor.py Using z3.simplify

4 of 4 new or added lines in 1 file covered. (100.0%)

26 existing lines in 5 files now uncovered.

3428 of 3692 relevant lines covered (92.85%)

17.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.45
/python_ta/cfg/visitor.py
1
import logging
20✔
2
from typing import Any, Dict, List, Optional, Tuple, Union
20✔
3

4
from astroid import extract_node, nodes
20✔
5
from astroid.exceptions import AstroidSyntaxError
20✔
6

7
from python_ta.contracts import parse_assertions
20✔
8

9
from .graph import CFGBlock, ControlFlowGraph
20✔
10

11

12
class CFGVisitor:
20✔
13
    """An astroid visitor that creates a control flow graph for a given Python module.
14

15
    Initialize with an options dict to configure the visitor.
16

17
    Supported Options:
18
      - "separate-condition-blocks": bool
19
            This option specifies whether the test condition of an if statement gets merged with any
20
            preceding statements or placed in a new block. By default, it will merge them.
21
      - "functions": list
22
            This option specifies whether to restrict the creation of cfgs to just top-level
23
            function definitions or methods provided in this list. By default, it will create the
24
            cfg for the file as it normally would.
25

26
    Private Attributes:
27
    _control_boundaries: A stack of the boundaries the visitor is currently in.
28
        The top of the stack corresponds to the end of the list.
29
        (compound statement [while], {'Break'/'Continue': CFGBlock to link to})
30
    """
31

32
    cfgs: Dict[Union[nodes.FunctionDef, nodes.Module], ControlFlowGraph]
20✔
33
    options: Dict[str, Any]
20✔
34
    # cfg_count is used as an "auto-increment" to ensure cfg ids are unique.
35
    cfg_count: int
20✔
36
    _current_cfg: Optional[ControlFlowGraph]
20✔
37
    _current_block: Optional[CFGBlock]
20✔
38
    _control_boundaries: List[Tuple[nodes.NodeNG, Dict[str, CFGBlock]]]
20✔
39

40
    def __init__(self, options: Optional[Dict[str, Any]] = None) -> None:
20✔
41
        super().__init__()
20✔
42
        self.cfgs = {}
20✔
43
        self.options = {
20✔
44
            "separate-condition-blocks": False,
45
            "functions": [],
46
        }
47
        if options is not None:
20✔
48
            self.options.update(options)
20✔
49
        self.cfg_count = 0
20✔
50
        self._current_cfg = None
20✔
51
        self._current_block = None
20✔
52
        self._control_boundaries = []
20✔
53

54
    def __getattr__(self, attr: str):
20✔
55
        if attr.startswith("visit_"):
20✔
56
            return self.visit_generic
20✔
57
        else:
UNCOV
58
            raise AttributeError(f"'CFGVisitor' object has no attribute '{attr}'")
×
59

60
    def visit_generic(self, node: nodes.NodeNG) -> None:
20✔
61
        """By default, add the expression to the end of the current block."""
62
        if self._current_block is not None:
20✔
63
            self._current_block.add_statement(node)
20✔
64

65
    def visit_module(self, module: nodes.Module) -> None:
20✔
66
        # Modules shouldn't be considered if user specifies functions to render
67
        functions_to_render = self.options.get("functions", [])
20✔
68
        if functions_to_render:
20✔
69
            for child in module.body:
20✔
70
                # Check any classes or function definitions for the target function/method
71
                if isinstance(child, nodes.FunctionDef) or isinstance(child, nodes.ClassDef):
20✔
72
                    child.accept(self)
20✔
73
            return
20✔
74

75
        self.cfgs[module] = ControlFlowGraph(self.cfg_count)
20✔
76
        self.cfg_count += 1
20✔
77
        self._current_cfg = self.cfgs[module]
20✔
78
        self._current_block = self._current_cfg.start
20✔
79
        module.cfg_block = self._current_cfg.start
20✔
80
        module.cfg = self._current_cfg
20✔
81

82
        for child in module.body:
20✔
83
            child.accept(self)
20✔
84

85
        self._current_cfg.link_or_merge(self._current_block, self._current_cfg.end)
20✔
86
        self._current_cfg.update_block_reachability()
20✔
87

88
    def visit_classdef(self, node: nodes.ClassDef) -> None:
20✔
89
        functions_to_render = self.options.get("functions", [])
20✔
90
        for child in node.body:
20✔
91
            if functions_to_render:
20✔
92
                if isinstance(child, nodes.FunctionDef):
20✔
93
                    child.accept(self)
20✔
94
            else:
95
                child.accept(self)
20✔
96

97
    def visit_functiondef(self, func: nodes.FunctionDef) -> None:
20✔
98
        # If user specifies to only render functions, check if the function/method name is listed
99
        functions_to_render = self.options.get("functions", [])
20✔
100
        scope_parent = func.scope().parent
20✔
101

102
        if functions_to_render:
20✔
103
            if (
20✔
104
                isinstance(scope_parent, nodes.ClassDef)
105
                and (scope_parent.name + "." + func.name) not in functions_to_render
106
            ) or (isinstance(scope_parent, nodes.Module) and func.name not in functions_to_render):
107
                return
20✔
108

109
        if self._current_block is not None:
20✔
110
            self._current_block.add_statement(func)
20✔
111

112
        previous_cfg = self._current_cfg
20✔
113
        previous_block = self._current_block
20✔
114

115
        self.cfgs[func] = ControlFlowGraph(self.cfg_count)
20✔
116
        self.cfg_count += 1
20✔
117
        self._current_cfg = self.cfgs[func]
20✔
118

119
        self._control_boundaries.append(
20✔
120
            (
121
                func,
122
                {
123
                    nodes.Return.__name__: self._current_cfg.end,
124
                    nodes.Raise.__name__: self._current_cfg.end,
125
                },
126
            )
127
        )
128

129
        # Current CFG block is self._current_cfg.start while initially creating the function cfg
130
        self._current_cfg.add_arguments(func.args)
20✔
131

132
        preconditions_node = _get_preconditions_node(func)
20✔
133

134
        self._current_block = self._current_cfg.create_block(
20✔
135
            self._current_cfg.start, edge_condition=preconditions_node
136
        )
137

138
        for child in func.body:
20✔
139
            child.accept(self)
20✔
140

141
        self._control_boundaries.pop()
20✔
142

143
        self._current_cfg.link_or_merge(self._current_block, self._current_cfg.end)
20✔
144
        self._current_cfg.update_block_reachability()
20✔
145

146
        if hasattr(func, "z3_constraints"):
20✔
147
            self._current_cfg.precondition_constraints = func.z3_constraints
10✔
148
            self._current_cfg.update_edge_z3_constraints()
10✔
149
            self._current_cfg.update_edge_feasibility()
10✔
150

151
        self._current_block = previous_block
20✔
152
        self._current_cfg = previous_cfg
20✔
153

154
    def visit_if(self, node: nodes.If) -> None:
20✔
155
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
156
        if self._current_cfg is None:
20✔
UNCOV
157
            return
×
158

159
        separate_conditions = self.options.get("separate-condition-blocks", False)
20✔
160
        if separate_conditions:
20✔
161
            # Create a block for the test condition
162
            test_block = self._current_cfg.create_block(self._current_block)
20✔
163
            test_block.add_statement(node.test)
20✔
164
            self._current_block = test_block
20✔
165
        else:
166
            # If the options doesn't specify to separate the test condition blocks, just add it to
167
            # the current block.
168
            self._current_block.add_statement(node.test)
20✔
169
        node.cfg_block = self._current_block
20✔
170
        old_curr = self._current_block
20✔
171

172
        # Handle "then" branch and label it.
173
        then_block = self._current_cfg.create_block(
20✔
174
            old_curr, edge_condition=node.test, edge_negate=False
175
        )
176
        self._current_block = then_block
20✔
177
        for child in node.body:
20✔
178
            child.accept(self)
20✔
179
        end_if = self._current_block
20✔
180

181
        # Handle "else" branch.
182
        if node.orelse == []:
20✔
183
            end_else = old_curr
20✔
184
        else:
185
            # Label the edge to the else block.
186
            else_block = self._current_cfg.create_block(
20✔
187
                old_curr, edge_condition=node.test, edge_negate=True
188
            )
189
            self._current_block = else_block
20✔
190
            for child in node.orelse:
20✔
191
                child.accept(self)
20✔
192
            end_else = self._current_block
20✔
193

194
        after_if_block = self._current_cfg.create_block()
20✔
195
        self._current_cfg.link_or_merge(end_if, after_if_block)
20✔
196
        # Label the edge if there was no "else" branch
197
        if node.orelse == []:
20✔
198
            self._current_cfg.link_or_merge(
20✔
199
                end_else,
200
                after_if_block,
201
                edge_condition=node.test,
202
                edge_negate=True,
203
            )
204
        else:
205
            self._current_cfg.link_or_merge(end_else, after_if_block)
20✔
206

207
        self._current_block = after_if_block
20✔
208

209
    def visit_while(self, node: nodes.While) -> None:
20✔
210
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
211
        if self._current_cfg is None:
20✔
UNCOV
212
            return
×
213

214
        old_curr = self._current_block
20✔
215

216
        # Handle "test" block
217
        test_block = self._current_cfg.create_block()
20✔
218
        test_block.add_statement(node.test)
20✔
219
        node.cfg_block = test_block
20✔
220
        self._current_cfg.link_or_merge(old_curr, test_block)
20✔
221

222
        after_while_block = self._current_cfg.create_block()
20✔
223

224
        # step into while
225
        self._control_boundaries.append(
20✔
226
            (
227
                node,
228
                {nodes.Break.__name__: after_while_block, nodes.Continue.__name__: test_block},
229
            )
230
        )
231

232
        # Handle "body" branch
233
        body_block = self._current_cfg.create_block(
20✔
234
            test_block, edge_condition=node.test, edge_negate=False
235
        )
236
        self._current_block = body_block
20✔
237
        for child in node.body:
20✔
238
            child.accept(self)
20✔
239
        end_body = self._current_block
20✔
240
        self._current_cfg.link_or_merge(end_body, test_block)
20✔
241

242
        # step out of while
243
        self._control_boundaries.pop()
20✔
244

245
        # Handle "else" branch
246
        else_block = self._current_cfg.create_block(
20✔
247
            test_block, edge_condition=node.test, edge_negate=True
248
        )
249
        self._current_block = else_block
20✔
250
        for child in node.orelse:
20✔
251
            child.accept(self)
20✔
252
        end_else = self._current_block
20✔
253

254
        self._current_cfg.link_or_merge(end_else, after_while_block)
20✔
255
        self._current_block = after_while_block
20✔
256

257
    def visit_for(self, node: nodes.For) -> None:
20✔
258
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
259
        if self._current_cfg is None:
20✔
UNCOV
260
            return
×
261

262
        old_curr = self._current_block
20✔
263
        old_curr.add_statement(node.iter)
20✔
264
        node.cfg_block = old_curr
20✔
265

266
        # Handle "test" block
267
        test_block = self._current_cfg.create_block()
20✔
268
        test_block.add_statement(node.target)
20✔
269
        self._current_cfg.link_or_merge(old_curr, test_block)
20✔
270

271
        after_for_block = self._current_cfg.create_block()
20✔
272

273
        # step into for
274
        self._control_boundaries.append(
20✔
275
            (node, {nodes.Break.__name__: after_for_block, nodes.Continue.__name__: test_block})
276
        )
277

278
        # Handle "body" branch
279
        body_block = self._current_cfg.create_block(test_block)
20✔
280
        self._current_block = body_block
20✔
281
        for child in node.body:
20✔
282
            child.accept(self)
20✔
283
        end_body = self._current_block
20✔
284
        self._current_cfg.link_or_merge(end_body, test_block)
20✔
285

286
        # step out of for
287
        self._control_boundaries.pop()
20✔
288

289
        # Handle "else" branch
290
        else_block = self._current_cfg.create_block(test_block)
20✔
291
        self._current_block = else_block
20✔
292
        for child in node.orelse:
20✔
293
            child.accept(self)
20✔
294
        end_else = self._current_block
20✔
295

296
        self._current_cfg.link_or_merge(end_else, after_for_block)
20✔
297
        self._current_block = after_for_block
20✔
298

299
    def visit_break(self, node: nodes.Break) -> None:
20✔
300
        self._visit_jump(node)
20✔
301

302
    def visit_continue(self, node: nodes.Continue) -> None:
20✔
303
        self._visit_jump(node)
20✔
304

305
    def visit_return(self, node: nodes.Return) -> None:
20✔
306
        self._visit_jump(node)
20✔
307

308
    def visit_raise(self, node: nodes.Raise) -> None:
20✔
309
        self._visit_jump(node)
20✔
310

311
    def _visit_jump(
20✔
312
        self, node: Union[nodes.Break, nodes.Continue, nodes.Return, nodes.Raise]
313
    ) -> None:
314
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
315
        if self._current_cfg is None:
20✔
UNCOV
316
            return
×
317

318
        old_curr = self._current_block
20✔
319
        unreachable_block = self._current_cfg.create_block()
20✔
320
        for boundary, exits in reversed(self._control_boundaries):
20✔
321
            if (
20✔
322
                isinstance(boundary, nodes.FunctionDef) or isinstance(boundary, nodes.ClassDef)
323
            ) and (isinstance(node, nodes.Break) or isinstance(node, nodes.Continue)):
324
                logging.warning(
20✔
325
                    f"'{type(node).__name__}' outside"
326
                    f' {"function" if isinstance(node, nodes.Return) else "loop"}'
327
                )
328
                self._current_cfg.link(old_curr, unreachable_block)
20✔
329
                old_curr.add_statement(node)
20✔
330
                break
20✔
331

332
            if isinstance(node, nodes.Raise):
20✔
333
                exc_name = _get_raise_exc(node)
20✔
334

335
                if exc_name in exits:
20✔
336
                    self._current_cfg.link(old_curr, exits[exc_name])
20✔
337
                    old_curr.add_statement(node)
20✔
338
                    break
20✔
339

340
            if type(node).__name__ in exits:
20✔
341
                self._current_cfg.link(old_curr, exits[type(node).__name__])
20✔
342
                old_curr.add_statement(node)
20✔
343
                break
20✔
344

345
        else:
346
            logging.warning(
20✔
347
                f"'{type(node).__name__}' outside"
348
                f' {"function" if isinstance(node, nodes.Return) else "loop"}'
349
            )
350
            self._current_cfg.link(old_curr, unreachable_block)
20✔
351
            old_curr.add_statement(node)
20✔
352

353
        self._current_block = unreachable_block
20✔
354

355
    def visit_try(self, node: nodes.Try) -> None:
20✔
356
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
357
        if self._current_cfg is None:
20✔
UNCOV
358
            return
×
359

360
        if self._current_block.statements != []:
20✔
361
            self._current_block = self._current_cfg.create_block(self._current_block)
20✔
362

363
        node.cfg_block = self._current_block
20✔
364

365
        # Construct the exception handlers first
366
        # Initialize a temporary block to later merge with end_body
367
        self._current_block = self._current_cfg.create_block()
20✔
368
        temp = self._current_block
20✔
369
        end_block = self._current_cfg.create_block()
20✔
370
        # Case where Raise is not handled in try
371
        self._control_boundaries.append((node, {nodes.Raise.__name__: end_block}))
20✔
372
        cbs_added = 1
20✔
373

374
        after_body = []
20✔
375
        # Construct blocks in reverse to give precedence to the first block in overlapping except
376
        # branches
377
        for handler in reversed(node.handlers):
20✔
378
            h = self._current_cfg.create_block()
20✔
379
            self._current_block = h
20✔
380
            handler.cfg_block = h
20✔
381

382
            exceptions = _extract_exceptions(handler)
20✔
383
            # Edge case: catch-all except clause (i.e. except: ...)
384
            if exceptions == []:
20✔
385
                self._control_boundaries.append((node, {nodes.Raise.__name__: h}))
20✔
386
                cbs_added += 1
20✔
387

388
            # General case: specific except clause
389
            for exception in exceptions:
20✔
390
                self._control_boundaries.append((node, {f"{nodes.Raise.__name__} {exception}": h}))
20✔
391
                cbs_added += 1
20✔
392

393
            if handler.name is not None:  # The name assigned to the caught exception.
20✔
394
                handler.name.accept(self)
20✔
395
            for child in handler.body:
20✔
396
                child.accept(self)
20✔
397
            end_handler = self._current_block
20✔
398
            self._current_cfg.link_or_merge(end_handler, end_block)
20✔
399
            after_body.append(h)
20✔
400

401
        if node.orelse == []:
20✔
402
            after_body.append(end_block)
20✔
403
        else:
404
            self._current_block = self._current_cfg.create_block()
20✔
405
            after_body.append(self._current_block)
20✔
406
            for child in node.orelse:
20✔
407
                child.accept(self)
20✔
408
            self._current_cfg.link_or_merge(self._current_block, end_block)
20✔
409

410
        # Construct the try body so reset current block to this node's block
411
        self._current_block = node.cfg_block
20✔
412

413
        for child in node.body:
20✔
414
            child.accept(self)
20✔
415
        end_body = self._current_block
20✔
416

417
        # Remove each control boundary that we added in this method
418
        for _ in range(cbs_added):
20✔
419
            self._control_boundaries.pop()
20✔
420

421
        self._current_cfg.link_or_merge(temp, end_body)
20✔
422
        self._current_cfg.multiple_link_or_merge(end_body, after_body)
20✔
423
        self._current_block = end_block
20✔
424

425
    def visit_with(self, node: nodes.With) -> None:
20✔
426
        # When only creating cfgs for functions, _current_cfg will only be None outside of functions
427
        if self._current_cfg is None:
20✔
UNCOV
428
            return
×
429

430
        for context_node, name in node.items:
20✔
431
            self._current_block.add_statement(context_node)
20✔
432
            if name is not None:
20✔
433
                self._current_block.add_statement(name)
20✔
434

435
        for child in node.body:
20✔
436
            child.accept(self)
20✔
437

438

439
def _extract_exceptions(node: nodes.ExceptHandler) -> List[str]:
20✔
440
    """A helper method that returns a list of all the exceptions handled by this except block as a
441
    list of strings.
442
    """
443
    exceptions = node.type
20✔
444
    exceptions_so_far = []
20✔
445
    # ExceptHandler.type will either be Tuple, NodeNG, or None.
446
    if exceptions is None:
20✔
447
        return exceptions_so_far
20✔
448

449
    # Get all the Name nodes for all exceptions this except block is handling
450
    for exception in exceptions.nodes_of_class(nodes.Name):
20✔
451
        exceptions_so_far.append(exception.name)
20✔
452

453
    return exceptions_so_far
20✔
454

455

456
def _get_raise_exc(node: nodes.Raise) -> str:
20✔
457
    """A helper method that returns a string formatted for the control boundary representing the
458
    exception that this Raise node throws.
459

460
    Preconditions:
461
        - the raise statement is of the form 'raise' or 'raise <exception_class>'
462
    """
463
    exceptions = node.nodes_of_class(nodes.Name)
20✔
464

465
    # Return the formatted name of the exception or the just 'Raise' otherwise
466
    try:
20✔
467
        return f"{nodes.Raise.__name__} {next(exceptions).name}"
20✔
468
    except StopIteration:
20✔
469
        return nodes.Raise.__name__
20✔
470

471

472
def _get_preconditions_node(func: nodes.FunctionDef) -> Optional[nodes.NodeNG]:
20✔
473
    """A helper method that takes in a function definition node, retrieves its preconditions, and then parses them
474
    into a AST node representing all the valid Python preconditions combined in an and statement. Returns None if
475
    there are no valid Python preconditions."""
476
    valid_assertions = [
20✔
477
        assertion for assertion in parse_assertions(func) if _is_python_precondition(assertion)
478
    ]
479
    if not valid_assertions:
20✔
480
        return None
20✔
481
    precondition_string = " and ".join(valid_assertions)
20✔
482
    condition = extract_node(precondition_string)
20✔
483
    return condition
20✔
484

485

486
def _is_python_precondition(precondition: str) -> bool:
20✔
487
    """Given a precondition string, determine if it is a valid Python precondition that can be parsed and return
488
    a boolean result."""
489
    try:
20✔
490
        _ = extract_node(precondition)
20✔
491
        return True
20✔
492
    except (AstroidSyntaxError, ValueError):  # ValueError raised when precondition is blank
20✔
493
        return False
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc