• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 15866914814

25 Jun 2025 03:51AM UTC coverage: 92.821% (-0.02%) from 92.844%
15866914814

Pull #1188

github

web-flow
Merge a06536429 into 428cbb1f3
Pull Request #1188: Refactoring of the codebase to delay the import of z3 module

55 of 67 new or added lines in 7 files covered. (82.09%)

1 existing line in 1 file now uncovered.

3439 of 3705 relevant lines covered (92.82%)

17.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.63
/python_ta/cfg/graph.py
1
from __future__ import annotations
20✔
2

3
import logging
20✔
4
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Set
20✔
5

6
if TYPE_CHECKING:
20✔
NEW
7
    try:
×
NEW
8
        from z3 import ExprRef
×
NEW
9
    except ImportError:
×
NEW
10
        ExprRef = Any
×
11

12
from astroid import (
20✔
13
    AnnAssign,
14
    Arguments,
15
    Assign,
16
    AssignName,
17
    AugAssign,
18
    Break,
19
    Continue,
20
    NodeNG,
21
    Raise,
22
    Return,
23
)
24

25
# Global z3
26
z3 = Any
20✔
27

28

29
class ControlFlowGraph:
20✔
30
    """A graph representing the control flow of a Python program."""
31

32
    start: CFGBlock
20✔
33
    end: CFGBlock
20✔
34
    # The unique id of this cfg. Defaults to 0 if not initialized in a CFGVisitor instance.
35
    cfg_id: int
20✔
36
    # block_count is used as an "autoincrement" to ensure the block ids are unique.
37
    block_count: int
20✔
38
    # blocks (with at least one statement) that will never be executed in runtime.
39
    unreachable_blocks: Set[CFGBlock]
20✔
40
    # z3 constraints of preconditions
41
    precondition_constraints: List[ExprRef]
20✔
42
    # map from variable names to z3 variables
43
    z3_vars: Dict[str, ExprRef]
20✔
44
    # z3 enabled option
45
    z3_enabled: bool
20✔
46

47
    def __init__(self, cfg_id: int = 0, z3_enabled: bool = False) -> None:
20✔
48
        self.block_count = 0
20✔
49
        self.cfg_id = cfg_id
20✔
50
        self.unreachable_blocks = set()
20✔
51
        self.start = self.create_block()
20✔
52
        self.end = self.create_block()
20✔
53
        self.z3_vars = {}
20✔
54
        self.precondition_constraints = []
20✔
55
        global z3
56
        if z3_enabled:
20✔
57
            try:
20✔
58
                import z3 as z3_module
20✔
59

60
                z3 = z3_module
10✔
61
            except ImportError:
20✔
62
                # Reset z3_enabled to prevent further use of z3 module
63
                logging.error("Failed to import z3 module. Disabling z3 features.")
20✔
64
                self.z3_enabled = False
20✔
65
            else:
66
                self.z3_enabled = True
10✔
67
        else:
68
            self.z3_enabled = False
20✔
69

70
    def add_arguments(self, args: Arguments) -> None:
20✔
71
        self.start.add_statement(args)
20✔
72
        args.parent.cfg = self
20✔
73
        args.parent.cfg_block = self.start
20✔
74

75
        if self.z3_enabled:
20✔
76
            try:
10✔
77
                from ..z3.z3_parser import Z3Parser
10✔
78

79
            except ImportError:
10✔
80
                logging.error("Failed to import Z3Parser despite Z3 being enabled.")
10✔
81
                return
10✔
82
            # Parse types
83
            parser = Z3Parser()
10✔
84
            z3_vars = parser.parse_arguments(args)
10✔
85
            self.z3_vars.update(z3_vars)
10✔
86

87
    def create_block(
20✔
88
        self,
89
        pred: Optional[CFGBlock] = None,
90
        edge_label: Optional[Any] = None,
91
        edge_condition: Optional[NodeNG] = None,
92
        edge_negate: Optional[bool] = None,
93
    ) -> CFGBlock:
94
        """Create a new CFGBlock for this graph.
95

96
        If pred is specified, set that block as a predecessor of the new block.
97

98
        If edge_label is specified, set the corresponding edge in the CFG with that label.
99

100
        If edge_condition is specified, store the condition node in the corresponding edge.
101

102
        edge_negate is not None only if edge_condition is specified
103
        """
104
        new_block = CFGBlock(self.block_count)
20✔
105
        self.unreachable_blocks.add(new_block)
20✔
106

107
        self.block_count += 1
20✔
108
        if pred:
20✔
109
            self.link_or_merge(pred, new_block, edge_label, edge_condition, edge_negate)
20✔
110
        return new_block
20✔
111

112
    def link(self, source: CFGBlock, target: CFGBlock) -> None:
20✔
113
        """Link source to target."""
114
        if not source.is_jump():
20✔
115
            CFGEdge(source, target)
20✔
116

117
    def link_or_merge(
20✔
118
        self,
119
        source: CFGBlock,
120
        target: CFGBlock,
121
        edge_label: Optional[Any] = None,
122
        edge_condition: Optional[NodeNG] = None,
123
        edge_negate: Optional[bool] = None,
124
    ) -> None:
125
        """Link source to target, or merge source into target if source is empty.
126

127
        An "empty" node for this purpose is when source has no statements.
128

129
        source with a jump statement cannot be further linked or merged to
130
        another target.
131

132
        If edge_label is specified, set the corresponding edge in the CFG with that label.
133

134
        If edge_condition is specified, store the condition node in the corresponding edge.
135

136
        edge_negate is not None only if edge_condition is specified
137
        """
138
        if source.is_jump():
20✔
139
            return
20✔
140
        if source.statements == []:
20✔
141
            if source is self.start:
20✔
142
                self.start = target
20✔
143
            else:
144
                for edge in source.predecessors:
20✔
145
                    edge.target = target
20✔
146
                    target.predecessors.append(edge)
20✔
147
            # source is a utility block that helps build the cfg that does not
148
            # represent any part of the program so it is redundant.
149
            self.unreachable_blocks.remove(source)
20✔
150
        else:
151
            CFGEdge(source, target, edge_label, edge_condition, edge_negate)
20✔
152

153
    def multiple_link_or_merge(self, source: CFGBlock, targets: List[CFGBlock]) -> None:
20✔
154
        """Link source to multiple target, or merge source into targets if source is empty.
155

156
        An "empty" node for this purpose is when source has no statements.
157

158
        source with a jump statement cannot be further linked or merged to
159
        another target.
160

161
        Precondition:
162
            - source != cfg.start
163
        """
164
        if source.statements == []:
20✔
165
            for edge in source.predecessors:
20✔
166
                for t in targets:
20✔
167
                    CFGEdge(edge.source, t)
20✔
168
                edge.source.successors.remove(edge)
20✔
169
            source.predecessors = []
20✔
170
            self.unreachable_blocks.remove(source)
20✔
171
        else:
172
            for target in targets:
20✔
173
                self.link(source, target)
20✔
174

175
    def get_blocks(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
176
        """Generate a sequence of all blocks in this graph.
177

178
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
179
        """
180
        yield from self._get_blocks(self.start, set(), only_feasible)
20✔
181

182
    def _get_blocks(
20✔
183
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
184
    ) -> Generator[CFGBlock, None, None]:
185
        if block.id in visited:
20✔
186
            return
20✔
187

188
        yield block
20✔
189
        visited.add(block.id)
20✔
190

191
        for edge in block.successors:
20✔
192
            if not only_feasible or edge.is_feasible:
20✔
193
                yield from self._get_blocks(edge.target, visited, only_feasible)
20✔
194

195
    def get_blocks_postorder(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
196
        """Return the sequence of all blocks in this graph in the order of
197
        a post-order traversal.
198

199
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
200
        """
201
        yield from self._get_blocks_postorder(self.start, set(), only_feasible)
20✔
202

203
    def _get_blocks_postorder(
20✔
204
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
205
    ) -> Generator[CFGBlock, None, None]:
206
        if block.id in visited:
20✔
207
            return
20✔
208

209
        visited.add(block.id)
20✔
210
        for succ in block.successors:
20✔
211
            if not only_feasible or succ.is_feasible:
20✔
212
                yield from self._get_blocks_postorder(succ.target, visited, only_feasible)
20✔
213

214
        yield block
20✔
215

216
    def get_edges(self) -> Generator[CFGEdge, None, None]:
20✔
217
        """Generate a sequence of all edges in this graph."""
218
        yield from self._get_edges(self.start, set())
20✔
219

220
    def _get_edges(self, block: CFGBlock, visited: Set[int]) -> Generator[CFGEdge, None, None]:
20✔
221
        if block.id in visited:
20✔
222
            return
20✔
223

224
        visited.add(block.id)
20✔
225

226
        for edge in block.successors:
20✔
227
            yield edge
20✔
228
            yield from self._get_edges(edge.target, visited)
20✔
229

230
    def get_paths(self) -> List[List[CFGEdge]]:
20✔
231
        """Get edges that represent paths from start to end node in depth-first order."""
232
        paths = []
10✔
233

234
        def _dfs(
10✔
235
            current_edge: CFGEdge,
236
            current_path: List[CFGEdge],
237
            visited_edges: Set[CFGEdge],
238
            visited_nodes: Set[CFGBlock],
239
        ):
240
            if current_edge in visited_edges:
10✔
241
                return
×
242

243
            visited_edges.add(current_edge)
10✔
244
            current_path.append(current_edge)
10✔
245
            visited_nodes.add(current_edge.source)
10✔
246

247
            if (
10✔
248
                current_edge.target == self.end
249
                or current_edge.target in visited_nodes
250
                or set(current_edge.target.successors).issubset(visited_edges)
251
            ):
252
                paths.append(current_path.copy())
10✔
253
            else:
254
                for edge in current_edge.target.successors:
10✔
255
                    _dfs(edge, current_path, visited_edges, visited_nodes)
10✔
256

257
            current_path.pop()
10✔
258
            visited_edges.remove(current_edge)
10✔
259
            visited_nodes.remove(current_edge.source)
10✔
260

261
        _dfs(self.start.successors[0], [], set(), set())
10✔
262
        return paths
10✔
263

264
    def update_block_reachability(self) -> None:
20✔
265
        for block in self.get_blocks():
20✔
266
            block.reachable = True
20✔
267
            if block in self.unreachable_blocks:
20✔
268
                self.unreachable_blocks.remove(block)
20✔
269

270
    def update_edge_z3_constraints(self) -> None:
20✔
271
        """Traverse through edges and add Z3 constraints on each edge.
272

273
        Constraints are generated from:
274
        - Function preconditions
275
        - If conditions
276
        - While conditions
277

278
        Constraints with reassigned variables are not included in subsequent edges.
279
        """
280
        if not self.z3_enabled:
10✔
281
            return
10✔
282

283
        for path_id, path in enumerate(self.get_paths()):
10✔
284
            # starting a new path
285
            z3_environment = Z3Environment(self.z3_vars, self.precondition_constraints)
10✔
286
            for edge in path:
10✔
287
                # traverse through edge
288
                if edge.condition is not None:
10✔
289
                    condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
10✔
290
                    if condition_z3_constraint is not None:
10✔
291
                        if edge.negate is not None:
10✔
292
                            z3_environment.add_constraint(
10✔
293
                                z3.Not(condition_z3_constraint)
294
                                if edge.negate
295
                                else condition_z3_constraint
296
                            )
297

298
                edge.z3_constraints[path_id] = z3_environment.update_constraints()
10✔
299

300
                # traverse into target node
301
                for node in edge.target.statements:
10✔
302
                    if isinstance(node, (Assign, AugAssign, AnnAssign)):
10✔
303
                        self._handle_variable_reassignment(node, z3_environment)
10✔
304

305
    def _handle_variable_reassignment(self, node: NodeNG, env: Z3Environment) -> None:
20✔
306
        """Check for reassignment statements and invoke Z3 environment"""
307
        if isinstance(node, Assign):
10✔
308
            for target in node.targets:
10✔
309
                if isinstance(target, AssignName):
10✔
310
                    env.assign(target.name)
10✔
311
        elif isinstance(node, (AugAssign, AnnAssign)):
10✔
312
            env.assign(node.target.name)
10✔
313

314
    def update_edge_feasibility(self) -> None:
20✔
315
        """Traverse through edges in DFS order and update is_feasible
316
        attribute of each edge. Edges that are unreachable with the given
317
        set of Z3 constraints will have is_feasible set to False
318
        """
319
        if not self.z3_enabled:
10✔
320
            return
10✔
321

322
        def _check_unsat(constraints: List[ExprRef]) -> bool:
10✔
323
            solver = z3.Solver()
10✔
324
            solver.add(constraints)
10✔
325
            return solver.check() == z3.unsat
10✔
326

327
        for edge in self.get_edges():
10✔
328
            if len(edge.z3_constraints) > 0:
10✔
329
                edge.is_feasible = not all(
10✔
330
                    _check_unsat(constraints) for constraints in edge.z3_constraints.values()
331
                )
332

333

334
class CFGBlock:
20✔
335
    """A node in a control flow graph.
336

337
    Represents a maximal block of code whose statements are guaranteed to execute in sequence.
338
    """
339

340
    # A unique identifier
341
    id: int
20✔
342
    # The statements in this block.
343
    statements: List[NodeNG]
20✔
344
    # This block's in-edges (from blocks that can execute immediately before this one).
345
    predecessors: List[CFGEdge]
20✔
346
    # This block's out-edges (to blocks that can execute immediately after this one).
347
    successors: List[CFGEdge]
20✔
348
    # Whether there exists a path from the start block to this block.
349
    reachable: bool
20✔
350

351
    def __init__(self, id_: int) -> None:
20✔
352
        """Initialize a new CFGBlock."""
353
        self.id = id_
20✔
354
        self.statements = []
20✔
355
        self.predecessors = []
20✔
356
        self.successors = []
20✔
357
        self.reachable = False
20✔
358

359
    def add_statement(self, statement: NodeNG) -> None:
20✔
360
        if not self.is_jump():
20✔
361
            self.statements.append(statement)
20✔
362
            statement.cfg_block = self
20✔
363

364
    @property
20✔
365
    def jump(self) -> Optional[NodeNG]:
20✔
366
        if len(self.statements) > 0:
20✔
367
            return self.statements[-1]
20✔
368

369
    @property
20✔
370
    def is_feasible(self) -> bool:
20✔
371
        return any(edge.is_feasible for edge in self.predecessors)
20✔
372

373
    def is_jump(self) -> bool:
20✔
374
        """Returns True if the block has a statement that branches
375
        the control flow (ex: `break`)"""
376
        return isinstance(self.jump, (Break, Continue, Return, Raise))
20✔
377

378

379
class CFGEdge:
20✔
380
    """An edge in a control flow graph.
381

382
    Edges are directed, and in the future may be augmented with auxiliary metadata about the control flow.
383

384
    The condition attribute stores the AST node representing the condition tested in If and While statements.
385
    The negate attribute stores the condition should be False (when `negate` is True) or condition should be true
386
    (when `negate` is False)
387
    """
388

389
    source: CFGBlock
20✔
390
    target: CFGBlock
20✔
391
    label: Optional[str]
20✔
392
    condition: Optional[NodeNG]
20✔
393
    negate: Optional[bool]
20✔
394
    z3_constraints: Dict[int, List[ExprRef]]
20✔
395
    is_feasible: bool
20✔
396

397
    def __init__(
20✔
398
        self,
399
        source: CFGBlock,
400
        target: CFGBlock,
401
        edge_label: Optional[str] = None,
402
        condition: Optional[NodeNG] = None,
403
        negate: Optional[bool] = None,
404
    ) -> None:
405
        self.source = source
20✔
406
        self.target = target
20✔
407
        self.label = edge_label
20✔
408
        self.condition = condition
20✔
409
        self.negate = negate
20✔
410
        self.source.successors.append(self)
20✔
411
        self.target.predecessors.append(self)
20✔
412
        self.z3_constraints = {}
20✔
413
        self.is_feasible = True
20✔
414

415
    def get_label(self) -> Optional[str]:
20✔
416
        """Return the edge label if specified.
417
        If `edge.label` is None, return the edge condition determined by the negation of `edge.negate`.
418
        Return None if both `edge.label` and `edge.negate` are None.
419
        """
420
        if self.label is not None:
20✔
421
            return self.label
×
422
        elif self.negate is not None:
20✔
423
            return str(not self.negate)
20✔
424
        return None
20✔
425

426

427
class Z3Environment:
20✔
428
    """Z3 Environment stores the Z3 variables and constraints in the current CFG path
429

430
    variable_unassigned:
431
        A dictionary mapping each variable in the current environment to a boolean indicating
432
        whether it has been reassigned (False) or remains unassigned (True).
433

434
    variables:
435
        A dictionary mapping each variable in the current environment to its z3 variable.
436

437
    constraints:
438
        A list of Z3 constraints in the current environment.
439
    """
440

441
    variable_unassigned: Dict[str, bool]
20✔
442
    variables: Dict[str, ExprRef]
20✔
443
    constraints: List[ExprRef]
20✔
444

445
    def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
20✔
446
        """Initialize the environment with function parameters and preconditions"""
447
        self.variable_unassigned = {var: True for var in variables}
10✔
448
        self.variables = variables
10✔
449
        self.constraints = constraints.copy()
10✔
450

451
    def assign(self, name: str) -> None:
20✔
452
        """Handle a variable assignment statement"""
453
        if name in self.variable_unassigned:
10✔
454
            self.variable_unassigned[name] = False
10✔
455

456
    def update_constraints(self) -> List[ExprRef]:
20✔
457
        """Returns all z3 constraints in the environments
458
        Removes constraints with reassigned variables
459
        """
460
        updated_constraints = []
10✔
461
        for constraint in self.constraints:
10✔
462
            # discard expressions with reassigned variables
463
            variables = _get_vars(constraint)
10✔
464
            reassigned = any(
10✔
465
                not self.variable_unassigned.get(variable, False) for variable in variables
466
            )
467
            if not reassigned:
10✔
468
                updated_constraints.append(constraint)
10✔
469

470
        self.constraints = updated_constraints
10✔
471
        return updated_constraints.copy()
10✔
472

473
    def add_constraint(self, constraint: ExprRef) -> None:
20✔
474
        """Add a new z3 constraint to environment"""
475
        self.constraints.append(constraint)
10✔
476

477
    def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
20✔
478
        """Parse an Astroid node to a Z3 constraint
479
        Return the resulting expression
480
        """
481
        from z3 import Z3Exception
10✔
482

483
        from ..z3.z3_parser import Z3ParseException, Z3Parser
10✔
484

485
        parser = Z3Parser(self.variables)
10✔
486
        try:
10✔
487
            return parser.parse(node)
10✔
488
        except (Z3Exception, Z3ParseException):
10✔
489
            return None
10✔
490

491

492
def _get_vars(expr: ExprRef) -> Set[str]:
20✔
493
    """Retrieve all z3 variables from a z3 expression"""
494
    from z3 import Z3_OP_UNINTERPRETED, is_const
10✔
495

496
    variables = set()
10✔
497

498
    def traverse(e: ExprRef) -> None:
10✔
499
        if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
10✔
500
            variables.add(e.decl().name())
10✔
501
        elif hasattr(e, "children"):
10✔
502
            for child in e.children():
10✔
503
                traverse(child)
10✔
504

505
    traverse(expr)
10✔
506
    return variables
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc