• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 15840622935

24 Jun 2025 03:39AM UTC coverage: 92.819% (-0.03%) from 92.844%
15840622935

Pull #1188

github

web-flow
Merge 92c70f202 into 02ddeab02
Pull Request #1188: Refactoring of the codebase to delay the import of z3 module

65 of 77 new or added lines in 7 files covered. (84.42%)

1 existing line in 1 file now uncovered.

3438 of 3704 relevant lines covered (92.82%)

17.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.62
/python_ta/cfg/graph.py
1
from __future__ import annotations
20✔
2

3
import logging
20✔
4
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Set
20✔
5

6
if TYPE_CHECKING:
20✔
NEW
7
    try:
×
NEW
8
        from z3 import ExprRef
×
NEW
9
    except ImportError:
×
NEW
10
        ExprRef = Any
×
11

12
from astroid import (
20✔
13
    AnnAssign,
14
    Arguments,
15
    Assign,
16
    AssignName,
17
    AugAssign,
18
    Break,
19
    Continue,
20
    NodeNG,
21
    Raise,
22
    Return,
23
)
24

25
# Global z3
26
z3 = Any
20✔
27

28

29
class ControlFlowGraph:
20✔
30
    """A graph representing the control flow of a Python program."""
31

32
    start: CFGBlock
20✔
33
    end: CFGBlock
20✔
34
    # The unique id of this cfg. Defaults to 0 if not initialized in a CFGVisitor instance.
35
    cfg_id: int
20✔
36
    # block_count is used as an "autoincrement" to ensure the block ids are unique.
37
    block_count: int
20✔
38
    # blocks (with at least one statement) that will never be executed in runtime.
39
    unreachable_blocks: Set[CFGBlock]
20✔
40
    # z3 constraints of preconditions
41
    precondition_constraints: List[ExprRef]
20✔
42
    # map from variable names to z3 variables
43
    z3_vars: Dict[str, ExprRef]
20✔
44
    # z3 enabled option
45
    z3_enabled: bool
20✔
46

47
    def __init__(self, cfg_id: int = 0, z3_enabled: bool = False) -> None:
20✔
48
        self.block_count = 0
20✔
49
        self.cfg_id = cfg_id
20✔
50
        self.unreachable_blocks = set()
20✔
51
        self.start = self.create_block()
20✔
52
        self.end = self.create_block()
20✔
53
        self.z3_vars = {}
20✔
54
        self.precondition_constraints = []
20✔
55
        self.z3_enabled = z3_enabled
20✔
56
        global z3
57
        if self.z3_enabled:
20✔
58
            try:
20✔
59
                import z3 as z3_module
20✔
60

61
                z3 = z3_module
10✔
62
            except ImportError:
20✔
63
                # Reset z3_enabled to prevent further use of z3 module
64
                logging.error("Failed to import z3 module. Disabling z3 features.")
20✔
65
                self.z3_enabled = False
20✔
66

67
    def add_arguments(self, args: Arguments) -> None:
20✔
68
        self.start.add_statement(args)
20✔
69
        args.parent.cfg = self
20✔
70
        args.parent.cfg_block = self.start
20✔
71

72
        if self.z3_enabled:
20✔
73
            try:
10✔
74
                from ..z3.z3_parser import Z3Parser
10✔
75

76
            except ImportError:
10✔
77
                logging.error("Failed to import Z3Parser despite Z3 being enabled.")
10✔
78
                return
10✔
79
            # Parse types
80
            parser = Z3Parser()
10✔
81
            z3_vars = parser.parse_arguments(args)
10✔
82
            self.z3_vars.update(z3_vars)
10✔
83

84
    def create_block(
20✔
85
        self,
86
        pred: Optional[CFGBlock] = None,
87
        edge_label: Optional[Any] = None,
88
        edge_condition: Optional[NodeNG] = None,
89
        edge_negate: Optional[bool] = None,
90
    ) -> CFGBlock:
91
        """Create a new CFGBlock for this graph.
92

93
        If pred is specified, set that block as a predecessor of the new block.
94

95
        If edge_label is specified, set the corresponding edge in the CFG with that label.
96

97
        If edge_condition is specified, store the condition node in the corresponding edge.
98

99
        edge_negate is not None only if edge_condition is specified
100
        """
101
        new_block = CFGBlock(self.block_count)
20✔
102
        self.unreachable_blocks.add(new_block)
20✔
103

104
        self.block_count += 1
20✔
105
        if pred:
20✔
106
            self.link_or_merge(pred, new_block, edge_label, edge_condition, edge_negate)
20✔
107
        return new_block
20✔
108

109
    def link(self, source: CFGBlock, target: CFGBlock) -> None:
20✔
110
        """Link source to target."""
111
        if not source.is_jump():
20✔
112
            CFGEdge(source, target)
20✔
113

114
    def link_or_merge(
20✔
115
        self,
116
        source: CFGBlock,
117
        target: CFGBlock,
118
        edge_label: Optional[Any] = None,
119
        edge_condition: Optional[NodeNG] = None,
120
        edge_negate: Optional[bool] = None,
121
    ) -> None:
122
        """Link source to target, or merge source into target if source is empty.
123

124
        An "empty" node for this purpose is when source has no statements.
125

126
        source with a jump statement cannot be further linked or merged to
127
        another target.
128

129
        If edge_label is specified, set the corresponding edge in the CFG with that label.
130

131
        If edge_condition is specified, store the condition node in the corresponding edge.
132

133
        edge_negate is not None only if edge_condition is specified
134
        """
135
        if source.is_jump():
20✔
136
            return
20✔
137
        if source.statements == []:
20✔
138
            if source is self.start:
20✔
139
                self.start = target
20✔
140
            else:
141
                for edge in source.predecessors:
20✔
142
                    edge.target = target
20✔
143
                    target.predecessors.append(edge)
20✔
144
            # source is a utility block that helps build the cfg that does not
145
            # represent any part of the program so it is redundant.
146
            self.unreachable_blocks.remove(source)
20✔
147
        else:
148
            CFGEdge(source, target, edge_label, edge_condition, edge_negate)
20✔
149

150
    def multiple_link_or_merge(self, source: CFGBlock, targets: List[CFGBlock]) -> None:
20✔
151
        """Link source to multiple target, or merge source into targets if source is empty.
152

153
        An "empty" node for this purpose is when source has no statements.
154

155
        source with a jump statement cannot be further linked or merged to
156
        another target.
157

158
        Precondition:
159
            - source != cfg.start
160
        """
161
        if source.statements == []:
20✔
162
            for edge in source.predecessors:
20✔
163
                for t in targets:
20✔
164
                    CFGEdge(edge.source, t)
20✔
165
                edge.source.successors.remove(edge)
20✔
166
            source.predecessors = []
20✔
167
            self.unreachable_blocks.remove(source)
20✔
168
        else:
169
            for target in targets:
20✔
170
                self.link(source, target)
20✔
171

172
    def get_blocks(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
173
        """Generate a sequence of all blocks in this graph.
174

175
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
176
        """
177
        yield from self._get_blocks(self.start, set(), only_feasible)
20✔
178

179
    def _get_blocks(
20✔
180
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
181
    ) -> Generator[CFGBlock, None, None]:
182
        if block.id in visited:
20✔
183
            return
20✔
184

185
        yield block
20✔
186
        visited.add(block.id)
20✔
187

188
        for edge in block.successors:
20✔
189
            if not only_feasible or edge.is_feasible:
20✔
190
                yield from self._get_blocks(edge.target, visited, only_feasible)
20✔
191

192
    def get_blocks_postorder(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
193
        """Return the sequence of all blocks in this graph in the order of
194
        a post-order traversal.
195

196
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
197
        """
198
        yield from self._get_blocks_postorder(self.start, set(), only_feasible)
20✔
199

200
    def _get_blocks_postorder(
20✔
201
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
202
    ) -> Generator[CFGBlock, None, None]:
203
        if block.id in visited:
20✔
204
            return
20✔
205

206
        visited.add(block.id)
20✔
207
        for succ in block.successors:
20✔
208
            if not only_feasible or succ.is_feasible:
20✔
209
                yield from self._get_blocks_postorder(succ.target, visited, only_feasible)
20✔
210

211
        yield block
20✔
212

213
    def get_edges(self) -> Generator[CFGEdge, None, None]:
20✔
214
        """Generate a sequence of all edges in this graph."""
215
        yield from self._get_edges(self.start, set())
20✔
216

217
    def _get_edges(self, block: CFGBlock, visited: Set[int]) -> Generator[CFGEdge, None, None]:
20✔
218
        if block.id in visited:
20✔
219
            return
20✔
220

221
        visited.add(block.id)
20✔
222

223
        for edge in block.successors:
20✔
224
            yield edge
20✔
225
            yield from self._get_edges(edge.target, visited)
20✔
226

227
    def get_paths(self) -> List[List[CFGEdge]]:
20✔
228
        """Get edges that represent paths from start to end node in depth-first order."""
229
        paths = []
10✔
230

231
        def _dfs(
10✔
232
            current_edge: CFGEdge,
233
            current_path: List[CFGEdge],
234
            visited_edges: Set[CFGEdge],
235
            visited_nodes: Set[CFGBlock],
236
        ):
237
            if current_edge in visited_edges:
10✔
238
                return
×
239

240
            visited_edges.add(current_edge)
10✔
241
            current_path.append(current_edge)
10✔
242
            visited_nodes.add(current_edge.source)
10✔
243

244
            if (
10✔
245
                current_edge.target == self.end
246
                or current_edge.target in visited_nodes
247
                or set(current_edge.target.successors).issubset(visited_edges)
248
            ):
249
                paths.append(current_path.copy())
10✔
250
            else:
251
                for edge in current_edge.target.successors:
10✔
252
                    _dfs(edge, current_path, visited_edges, visited_nodes)
10✔
253

254
            current_path.pop()
10✔
255
            visited_edges.remove(current_edge)
10✔
256
            visited_nodes.remove(current_edge.source)
10✔
257

258
        _dfs(self.start.successors[0], [], set(), set())
10✔
259
        return paths
10✔
260

261
    def update_block_reachability(self) -> None:
20✔
262
        for block in self.get_blocks():
20✔
263
            block.reachable = True
20✔
264
            if block in self.unreachable_blocks:
20✔
265
                self.unreachable_blocks.remove(block)
20✔
266

267
    def update_edge_z3_constraints(self) -> None:
20✔
268
        """Traverse through edges and add Z3 constraints on each edge.
269

270
        Constraints are generated from:
271
        - Function preconditions
272
        - If conditions
273
        - While conditions
274

275
        Constraints with reassigned variables are not included in subsequent edges.
276
        """
277
        if not self.z3_enabled:
10✔
278
            return
10✔
279
        for path_id, path in enumerate(self.get_paths()):
10✔
280
            # starting a new path
281
            z3_environment = Z3Environment(self.z3_vars, self.precondition_constraints)
10✔
282
            for edge in path:
10✔
283
                # traverse through edge
284
                if edge.condition is not None:
10✔
285
                    condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
10✔
286
                    if condition_z3_constraint is not None:
10✔
287
                        if edge.negate is not None:
10✔
288
                            z3_environment.add_constraint(
10✔
289
                                z3.Not(condition_z3_constraint)
290
                                if edge.negate
291
                                else condition_z3_constraint
292
                            )
293

294
                edge.z3_constraints[path_id] = z3_environment.update_constraints()
10✔
295

296
                # traverse into target node
297
                for node in edge.target.statements:
10✔
298
                    if isinstance(node, (Assign, AugAssign, AnnAssign)):
10✔
299
                        self._handle_variable_reassignment(node, z3_environment)
10✔
300

301
    def _handle_variable_reassignment(self, node: NodeNG, env: Z3Environment) -> None:
20✔
302
        """Check for reassignment statements and invoke Z3 environment"""
303
        if isinstance(node, Assign):
10✔
304
            for target in node.targets:
10✔
305
                if isinstance(target, AssignName):
10✔
306
                    env.assign(target.name)
10✔
307
        elif isinstance(node, (AugAssign, AnnAssign)):
10✔
308
            env.assign(node.target.name)
10✔
309

310
    def update_edge_feasibility(self) -> None:
20✔
311
        """Traverse through edges in DFS order and update is_feasible
312
        attribute of each edge. Edges that are unreachable with the given
313
        set of Z3 constraints will have is_feasible set to False
314
        """
315
        if not self.z3_enabled:
10✔
316
            return
10✔
317

318
        def _check_unsat(constraints: List[ExprRef]) -> bool:
10✔
319
            solver = z3.Solver()
10✔
320
            solver.add(constraints)
10✔
321
            return solver.check() == z3.unsat
10✔
322

323
        for edge in self.get_edges():
10✔
324
            if len(edge.z3_constraints) > 0:
10✔
325
                edge.is_feasible = not all(
10✔
326
                    _check_unsat(constraints) for constraints in edge.z3_constraints.values()
327
                )
328

329

330
class CFGBlock:
20✔
331
    """A node in a control flow graph.
332

333
    Represents a maximal block of code whose statements are guaranteed to execute in sequence.
334
    """
335

336
    # A unique identifier
337
    id: int
20✔
338
    # The statements in this block.
339
    statements: List[NodeNG]
20✔
340
    # This block's in-edges (from blocks that can execute immediately before this one).
341
    predecessors: List[CFGEdge]
20✔
342
    # This block's out-edges (to blocks that can execute immediately after this one).
343
    successors: List[CFGEdge]
20✔
344
    # Whether there exists a path from the start block to this block.
345
    reachable: bool
20✔
346

347
    def __init__(self, id_: int) -> None:
20✔
348
        """Initialize a new CFGBlock."""
349
        self.id = id_
20✔
350
        self.statements = []
20✔
351
        self.predecessors = []
20✔
352
        self.successors = []
20✔
353
        self.reachable = False
20✔
354

355
    def add_statement(self, statement: NodeNG) -> None:
20✔
356
        if not self.is_jump():
20✔
357
            self.statements.append(statement)
20✔
358
            statement.cfg_block = self
20✔
359

360
    @property
20✔
361
    def jump(self) -> Optional[NodeNG]:
20✔
362
        if len(self.statements) > 0:
20✔
363
            return self.statements[-1]
20✔
364

365
    @property
20✔
366
    def is_feasible(self) -> bool:
20✔
367
        return any(edge.is_feasible for edge in self.predecessors)
20✔
368

369
    def is_jump(self) -> bool:
20✔
370
        """Returns True if the block has a statement that branches
371
        the control flow (ex: `break`)"""
372
        return isinstance(self.jump, (Break, Continue, Return, Raise))
20✔
373

374

375
class CFGEdge:
20✔
376
    """An edge in a control flow graph.
377

378
    Edges are directed, and in the future may be augmented with auxiliary metadata about the control flow.
379

380
    The condition attribute stores the AST node representing the condition tested in If and While statements.
381
    The negate attribute stores the condition should be False (when `negate` is True) or condition should be true
382
    (when `negate` is False)
383
    """
384

385
    source: CFGBlock
20✔
386
    target: CFGBlock
20✔
387
    label: Optional[str]
20✔
388
    condition: Optional[NodeNG]
20✔
389
    negate: Optional[bool]
20✔
390
    z3_constraints: Dict[int, List[ExprRef]]
20✔
391
    is_feasible: bool
20✔
392

393
    def __init__(
20✔
394
        self,
395
        source: CFGBlock,
396
        target: CFGBlock,
397
        edge_label: Optional[str] = None,
398
        condition: Optional[NodeNG] = None,
399
        negate: Optional[bool] = None,
400
    ) -> None:
401
        self.source = source
20✔
402
        self.target = target
20✔
403
        self.label = edge_label
20✔
404
        self.condition = condition
20✔
405
        self.negate = negate
20✔
406
        self.source.successors.append(self)
20✔
407
        self.target.predecessors.append(self)
20✔
408
        self.z3_constraints = {}
20✔
409
        self.is_feasible = True
20✔
410

411
    def get_label(self) -> Optional[str]:
20✔
412
        """Return the edge label if specified.
413
        If `edge.label` is None, return the edge condition determined by the negation of `edge.negate`.
414
        Return None if both `edge.label` and `edge.negate` are None.
415
        """
416
        if self.label is not None:
20✔
417
            return self.label
×
418
        elif self.negate is not None:
20✔
419
            return str(not self.negate)
20✔
420
        return None
20✔
421

422

423
class Z3Environment:
20✔
424
    """Z3 Environment stores the Z3 variables and constraints in the current CFG path
425

426
    variable_unassigned:
427
        A dictionary mapping each variable in the current environment to a boolean indicating
428
        whether it has been reassigned (False) or remains unassigned (True).
429

430
    variables:
431
        A dictionary mapping each variable in the current environment to its z3 variable.
432

433
    constraints:
434
        A list of Z3 constraints in the current environment.
435
    """
436

437
    variable_unassigned: Dict[str, bool]
20✔
438
    variables: Dict[str, ExprRef]
20✔
439
    constraints: List[ExprRef]
20✔
440

441
    def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
20✔
442
        """Initialize the environment with function parameters and preconditions"""
443
        self.variable_unassigned = {var: True for var in variables}
10✔
444
        self.variables = variables
10✔
445
        self.constraints = constraints.copy()
10✔
446

447
    def assign(self, name: str) -> None:
20✔
448
        """Handle a variable assignment statement"""
449
        if name in self.variable_unassigned:
10✔
450
            self.variable_unassigned[name] = False
10✔
451

452
    def update_constraints(self) -> List[ExprRef]:
20✔
453
        """Returns all z3 constraints in the environments
454
        Removes constraints with reassigned variables
455
        """
456
        updated_constraints = []
10✔
457
        for constraint in self.constraints:
10✔
458
            # discard expressions with reassigned variables
459
            variables = self._get_vars(constraint)
10✔
460
            reassigned = any(
10✔
461
                not self.variable_unassigned.get(variable, False) for variable in variables
462
            )
463
            if not reassigned:
10✔
464
                updated_constraints.append(constraint)
10✔
465

466
        self.constraints = updated_constraints
10✔
467
        return updated_constraints.copy()
10✔
468

469
    def add_constraint(self, constraint: ExprRef) -> None:
20✔
470
        """Add a new z3 constraint to environment"""
471
        self.constraints.append(constraint)
10✔
472

473
    def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
20✔
474
        """Parse an Astroid node to a Z3 constraint
475
        Return the resulting expression
476
        """
477
        from z3 import Z3Exception
10✔
478

479
        from ..z3.z3_parser import Z3ParseException, Z3Parser
10✔
480

481
        parser = Z3Parser(self.variables)
10✔
482
        try:
10✔
483
            return parser.parse(node)
10✔
484
        except (Z3Exception, Z3ParseException):
10✔
485
            return None
10✔
486

487
    def _get_vars(self, expr: ExprRef) -> Set[str]:
20✔
488
        """Retrieve all z3 variables from a z3 expression"""
489
        from z3 import Z3_OP_UNINTERPRETED, is_const
10✔
490

491
        variables = set()
10✔
492

493
        def traverse(e: ExprRef) -> None:
10✔
494
            if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
10✔
495
                variables.add(e.decl().name())
10✔
496
            elif hasattr(e, "children"):
10✔
497
                for child in e.children():
10✔
498
                    traverse(child)
10✔
499

500
        traverse(expr)
10✔
501
        return variables
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc