• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 15657230024

14 Jun 2025 11:42PM UTC coverage: 92.697% (-0.3%) from 92.959%
15657230024

Pull #1188

github

web-flow
Merge 62a6d3bd7 into 02ddeab02
Pull Request #1188: Refactoring of the codebase to delay the import of z3 module

58 of 68 new or added lines in 3 files covered. (85.29%)

4 existing lines in 4 files now uncovered.

3427 of 3697 relevant lines covered (92.7%)

17.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.59
/python_ta/cfg/graph.py
1
from __future__ import annotations
20✔
2

3
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Set
20✔
4

5
if TYPE_CHECKING:
20✔
NEW
6
    try:
×
NEW
7
        from z3 import ExprRef
×
NEW
8
    except ImportError:
×
NEW
9
        ExprRef = Any
×
10

11
from astroid import (
20✔
12
    AnnAssign,
13
    Arguments,
14
    Assign,
15
    AssignName,
16
    AugAssign,
17
    Break,
18
    Continue,
19
    NodeNG,
20
    Raise,
21
    Return,
22
)
23

24

25
class ControlFlowGraph:
20✔
26
    """A graph representing the control flow of a Python program."""
27

28
    start: CFGBlock
20✔
29
    end: CFGBlock
20✔
30
    # The unique id of this cfg. Defaults to 0 if not initialized in a CFGVisitor instance.
31
    cfg_id: int
20✔
32
    # block_count is used as an "autoincrement" to ensure the block ids are unique.
33
    block_count: int
20✔
34
    # blocks (with at least one statement) that will never be executed in runtime.
35
    unreachable_blocks: Set[CFGBlock]
20✔
36
    # z3 constraints of preconditions
37
    precondition_constraints: List[ExprRef]
20✔
38
    # map from variable names to z3 variables
39
    z3_vars: Dict[str, ExprRef]
20✔
40

41
    def __init__(self, cfg_id: int = 0) -> None:
20✔
42
        self.block_count = 0
20✔
43
        self.cfg_id = cfg_id
20✔
44
        self.unreachable_blocks = set()
20✔
45
        self.start = self.create_block()
20✔
46
        self.end = self.create_block()
20✔
47
        self.z3_vars = {}
20✔
48
        self.precondition_constraints = []
20✔
49

50
    def add_arguments(self, args: Arguments) -> None:
20✔
51
        self.start.add_statement(args)
20✔
52
        args.parent.cfg = self
20✔
53
        args.parent.cfg_block = self.start
20✔
54

55
        try:
20✔
56
            from ..z3.z3_parser import Z3Parser
20✔
57

58
            # Parse types
59
            parser = Z3Parser()
10✔
60
            z3_vars = parser.parse_arguments(args)
10✔
61
            self.z3_vars.update(z3_vars)
10✔
62
        except ImportError:
20✔
63
            return
20✔
64

65
    def create_block(
20✔
66
        self,
67
        pred: Optional[CFGBlock] = None,
68
        edge_label: Optional[Any] = None,
69
        edge_condition: Optional[NodeNG] = None,
70
        edge_negate: Optional[bool] = None,
71
    ) -> CFGBlock:
72
        """Create a new CFGBlock for this graph.
73

74
        If pred is specified, set that block as a predecessor of the new block.
75

76
        If edge_label is specified, set the corresponding edge in the CFG with that label.
77

78
        If edge_condition is specified, store the condition node in the corresponding edge.
79

80
        edge_negate is not None only if edge_condition is specified
81
        """
82
        new_block = CFGBlock(self.block_count)
20✔
83
        self.unreachable_blocks.add(new_block)
20✔
84

85
        self.block_count += 1
20✔
86
        if pred:
20✔
87
            self.link_or_merge(pred, new_block, edge_label, edge_condition, edge_negate)
20✔
88
        return new_block
20✔
89

90
    def link(self, source: CFGBlock, target: CFGBlock) -> None:
20✔
91
        """Link source to target."""
92
        if not source.is_jump():
20✔
93
            CFGEdge(source, target)
20✔
94

95
    def link_or_merge(
20✔
96
        self,
97
        source: CFGBlock,
98
        target: CFGBlock,
99
        edge_label: Optional[Any] = None,
100
        edge_condition: Optional[NodeNG] = None,
101
        edge_negate: Optional[bool] = None,
102
    ) -> None:
103
        """Link source to target, or merge source into target if source is empty.
104

105
        An "empty" node for this purpose is when source has no statements.
106

107
        source with a jump statement cannot be further linked or merged to
108
        another target.
109

110
        If edge_label is specified, set the corresponding edge in the CFG with that label.
111

112
        If edge_condition is specified, store the condition node in the corresponding edge.
113

114
        edge_negate is not None only if edge_condition is specified
115
        """
116
        if source.is_jump():
20✔
117
            return
20✔
118
        if source.statements == []:
20✔
119
            if source is self.start:
20✔
120
                self.start = target
20✔
121
            else:
122
                for edge in source.predecessors:
20✔
123
                    edge.target = target
20✔
124
                    target.predecessors.append(edge)
20✔
125
            # source is a utility block that helps build the cfg that does not
126
            # represent any part of the program so it is redundant.
127
            self.unreachable_blocks.remove(source)
20✔
128
        else:
129
            CFGEdge(source, target, edge_label, edge_condition, edge_negate)
20✔
130

131
    def multiple_link_or_merge(self, source: CFGBlock, targets: List[CFGBlock]) -> None:
20✔
132
        """Link source to multiple target, or merge source into targets if source is empty.
133

134
        An "empty" node for this purpose is when source has no statements.
135

136
        source with a jump statement cannot be further linked or merged to
137
        another target.
138

139
        Precondition:
140
            - source != cfg.start
141
        """
142
        if source.statements == []:
20✔
143
            for edge in source.predecessors:
20✔
144
                for t in targets:
20✔
145
                    CFGEdge(edge.source, t)
20✔
146
                edge.source.successors.remove(edge)
20✔
147
            source.predecessors = []
20✔
148
            self.unreachable_blocks.remove(source)
20✔
149
        else:
150
            for target in targets:
20✔
151
                self.link(source, target)
20✔
152

153
    def get_blocks(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
154
        """Generate a sequence of all blocks in this graph.
155

156
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
157
        """
158
        yield from self._get_blocks(self.start, set(), only_feasible)
20✔
159

160
    def _get_blocks(
20✔
161
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
162
    ) -> Generator[CFGBlock, None, None]:
163
        if block.id in visited:
20✔
164
            return
20✔
165

166
        yield block
20✔
167
        visited.add(block.id)
20✔
168

169
        for edge in block.successors:
20✔
170
            if not only_feasible or edge.is_feasible:
20✔
171
                yield from self._get_blocks(edge.target, visited, only_feasible)
20✔
172

173
    def get_blocks_postorder(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
174
        """Return the sequence of all blocks in this graph in the order of
175
        a post-order traversal.
176

177
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
178
        """
179
        yield from self._get_blocks_postorder(self.start, set(), only_feasible)
20✔
180

181
    def _get_blocks_postorder(
20✔
182
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
183
    ) -> Generator[CFGBlock, None, None]:
184
        if block.id in visited:
20✔
185
            return
20✔
186

187
        visited.add(block.id)
20✔
188
        for succ in block.successors:
20✔
189
            if not only_feasible or succ.is_feasible:
20✔
190
                yield from self._get_blocks_postorder(succ.target, visited, only_feasible)
20✔
191

192
        yield block
20✔
193

194
    def get_edges(self) -> Generator[CFGEdge, None, None]:
20✔
195
        """Generate a sequence of all edges in this graph."""
196
        yield from self._get_edges(self.start, set())
20✔
197

198
    def _get_edges(self, block: CFGBlock, visited: Set[int]) -> Generator[CFGEdge, None, None]:
20✔
199
        if block.id in visited:
20✔
200
            return
20✔
201

202
        visited.add(block.id)
20✔
203

204
        for edge in block.successors:
20✔
205
            yield edge
20✔
206
            yield from self._get_edges(edge.target, visited)
20✔
207

208
    def get_paths(self) -> List[List[CFGEdge]]:
20✔
209
        """Get edges that represent paths from start to end node in depth-first order."""
210
        paths = []
10✔
211

212
        def _dfs(
10✔
213
            current_edge: CFGEdge,
214
            current_path: List[CFGEdge],
215
            visited_edges: Set[CFGEdge],
216
            visited_nodes: Set[CFGBlock],
217
        ):
218
            if current_edge in visited_edges:
10✔
219
                return
×
220

221
            visited_edges.add(current_edge)
10✔
222
            current_path.append(current_edge)
10✔
223
            visited_nodes.add(current_edge.source)
10✔
224

225
            if (
10✔
226
                current_edge.target == self.end
227
                or current_edge.target in visited_nodes
228
                or set(current_edge.target.successors).issubset(visited_edges)
229
            ):
230
                paths.append(current_path.copy())
10✔
231
            else:
232
                for edge in current_edge.target.successors:
10✔
233
                    _dfs(edge, current_path, visited_edges, visited_nodes)
10✔
234

235
            current_path.pop()
10✔
236
            visited_edges.remove(current_edge)
10✔
237
            visited_nodes.remove(current_edge.source)
10✔
238

239
        _dfs(self.start.successors[0], [], set(), set())
10✔
240
        return paths
10✔
241

242
    def update_block_reachability(self) -> None:
20✔
243
        for block in self.get_blocks():
20✔
244
            block.reachable = True
20✔
245
            if block in self.unreachable_blocks:
20✔
246
                self.unreachable_blocks.remove(block)
20✔
247

248
    def update_edge_z3_constraints(self) -> None:
20✔
249
        """Traverse through edges and add Z3 constraints on each edge.
250

251
        Constraints are generated from:
252
        - Function preconditions
253
        - If conditions
254
        - While conditions
255

256
        Constraints with reassigned variables are not included in subsequent edges.
257
        """
258
        try:
10✔
259
            from z3 import Not
10✔
260

261
        except ImportError:
10✔
262
            return
10✔
263

264
        for path_id, path in enumerate(self.get_paths()):
10✔
265
            # starting a new path
266
            z3_environment = Z3Environment(self.z3_vars, self.precondition_constraints)
10✔
267
            for edge in path:
10✔
268
                # traverse through edge
269
                if edge.condition is not None:
10✔
270
                    condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
10✔
271
                    if condition_z3_constraint is not None:
10✔
272
                        if edge.negate is not None:
10✔
273
                            z3_environment.add_constraint(
10✔
274
                                Not(condition_z3_constraint)
275
                                if edge.negate
276
                                else condition_z3_constraint
277
                            )
278

279
                edge.z3_constraints[path_id] = z3_environment.update_constraints()
10✔
280

281
                # traverse into target node
282
                for node in edge.target.statements:
10✔
283
                    if isinstance(node, (Assign, AugAssign, AnnAssign)):
10✔
284
                        self._handle_variable_reassignment(node, z3_environment)
10✔
285

286
    def _handle_variable_reassignment(self, node: NodeNG, env: Z3Environment) -> None:
20✔
287
        """Check for reassignment statements and invoke Z3 environment"""
288
        if isinstance(node, Assign):
10✔
289
            for target in node.targets:
10✔
290
                if isinstance(target, AssignName):
10✔
291
                    env.assign(target.name)
10✔
292
        elif isinstance(node, (AugAssign, AnnAssign)):
10✔
293
            env.assign(node.target.name)
10✔
294

295
    def update_edge_feasibility(self) -> None:
20✔
296
        """Traverse through edges in DFS order and update is_feasible
297
        attribute of each edge. Edges that are unreachable with the given
298
        set of Z3 constraints will have is_feasible set to False
299
        """
300
        try:
10✔
301
            from z3 import Solver, unsat
10✔
302

303
        except ImportError:
10✔
304
            return
10✔
305

306
        def _check_unsat(constraints: List[ExprRef]) -> bool:
10✔
307
            solver = Solver()
10✔
308
            solver.add(constraints)
10✔
309
            return solver.check() == unsat
10✔
310

311
        for edge in self.get_edges():
10✔
312
            if len(edge.z3_constraints) > 0:
10✔
313
                edge.is_feasible = not all(
10✔
314
                    _check_unsat(constraints) for constraints in edge.z3_constraints.values()
315
                )
316

317

318
class CFGBlock:
20✔
319
    """A node in a control flow graph.
320

321
    Represents a maximal block of code whose statements are guaranteed to execute in sequence.
322
    """
323

324
    # A unique identifier
325
    id: int
20✔
326
    # The statements in this block.
327
    statements: List[NodeNG]
20✔
328
    # This block's in-edges (from blocks that can execute immediately before this one).
329
    predecessors: List[CFGEdge]
20✔
330
    # This block's out-edges (to blocks that can execute immediately after this one).
331
    successors: List[CFGEdge]
20✔
332
    # Whether there exists a path from the start block to this block.
333
    reachable: bool
20✔
334

335
    def __init__(self, id_: int) -> None:
20✔
336
        """Initialize a new CFGBlock."""
337
        self.id = id_
20✔
338
        self.statements = []
20✔
339
        self.predecessors = []
20✔
340
        self.successors = []
20✔
341
        self.reachable = False
20✔
342

343
    def add_statement(self, statement: NodeNG) -> None:
20✔
344
        if not self.is_jump():
20✔
345
            self.statements.append(statement)
20✔
346
            statement.cfg_block = self
20✔
347

348
    @property
20✔
349
    def jump(self) -> Optional[NodeNG]:
20✔
350
        if len(self.statements) > 0:
20✔
351
            return self.statements[-1]
20✔
352

353
    @property
20✔
354
    def is_feasible(self) -> bool:
20✔
355
        return any(edge.is_feasible for edge in self.predecessors)
20✔
356

357
    def is_jump(self) -> bool:
20✔
358
        """Returns True if the block has a statement that branches
359
        the control flow (ex: `break`)"""
360
        return isinstance(self.jump, (Break, Continue, Return, Raise))
20✔
361

362

363
class CFGEdge:
20✔
364
    """An edge in a control flow graph.
365

366
    Edges are directed, and in the future may be augmented with auxiliary metadata about the control flow.
367

368
    The condition attribute stores the AST node representing the condition tested in If and While statements.
369
    The negate attribute stores the condition should be False (when `negate` is True) or condition should be true
370
    (when `negate` is False)
371
    """
372

373
    source: CFGBlock
20✔
374
    target: CFGBlock
20✔
375
    label: Optional[str]
20✔
376
    condition: Optional[NodeNG]
20✔
377
    negate: Optional[bool]
20✔
378
    z3_constraints: Dict[int, List[ExprRef]]
20✔
379
    is_feasible: bool
20✔
380

381
    def __init__(
20✔
382
        self,
383
        source: CFGBlock,
384
        target: CFGBlock,
385
        edge_label: Optional[str] = None,
386
        condition: Optional[NodeNG] = None,
387
        negate: Optional[bool] = None,
388
    ) -> None:
389
        self.source = source
20✔
390
        self.target = target
20✔
391
        self.label = edge_label
20✔
392
        self.condition = condition
20✔
393
        self.negate = negate
20✔
394
        self.source.successors.append(self)
20✔
395
        self.target.predecessors.append(self)
20✔
396
        self.z3_constraints = {}
20✔
397
        self.is_feasible = True
20✔
398

399
    def get_label(self) -> Optional[str]:
20✔
400
        """Return the edge label if specified.
401
        If `edge.label` is None, return the edge condition determined by the negation of `edge.negate`.
402
        Return None if both `edge.label` and `edge.negate` are None.
403
        """
404
        if self.label is not None:
20✔
405
            return self.label
×
406
        elif self.negate is not None:
20✔
407
            return str(not self.negate)
20✔
408
        return None
20✔
409

410

411
class Z3Environment:
20✔
412
    """Z3 Environment stores the Z3 variables and constraints in the current CFG path
413

414
    variable_unassigned:
415
        A dictionary mapping each variable in the current environment to a boolean indicating
416
        whether it has been reassigned (False) or remains unassigned (True).
417

418
    variables:
419
        A dictionary mapping each variable in the current environment to its z3 variable.
420

421
    constraints:
422
        A list of Z3 constraints in the current environment.
423
    """
424

425
    variable_unassigned: Dict[str, bool]
20✔
426
    variables: Dict[str, ExprRef]
20✔
427
    constraints: List[ExprRef]
20✔
428

429
    def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
20✔
430
        """Initialize the environment with function parameters and preconditions"""
431
        self.variable_unassigned = {var: True for var in variables}
10✔
432
        self.variables = variables
10✔
433
        self.constraints = constraints.copy()
10✔
434

435
    def assign(self, name: str) -> None:
20✔
436
        """Handle a variable assignment statement"""
437
        if name in self.variable_unassigned:
10✔
438
            self.variable_unassigned[name] = False
10✔
439

440
    def update_constraints(self) -> List[ExprRef]:
20✔
441
        """Returns all z3 constraints in the environments
442
        Removes constraints with reassigned variables
443
        """
444
        updated_constraints = []
10✔
445
        for constraint in self.constraints:
10✔
446
            # discard expressions with reassigned variables
447
            variables = _get_vars(constraint)
10✔
448
            reassigned = any(
10✔
449
                not self.variable_unassigned.get(variable, False) for variable in variables
450
            )
451
            if not reassigned:
10✔
452
                updated_constraints.append(constraint)
10✔
453

454
        self.constraints = updated_constraints
10✔
455
        return updated_constraints.copy()
10✔
456

457
    def add_constraint(self, constraint: ExprRef) -> None:
20✔
458
        """Add a new z3 constraint to environment"""
459
        self.constraints.append(constraint)
10✔
460

461
    def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
20✔
462
        """Parse an Astroid node to a Z3 constraint
463
        Return the resulting expression
464
        """
465
        try:
10✔
466
            from z3 import Z3Exception
10✔
467

468
            from ..z3.z3_parser import Z3ParseException, Z3Parser
10✔
469

470
        except ImportError:
10✔
471
            return None
10✔
472

473
        parser = Z3Parser(self.variables)
10✔
474
        try:
10✔
475
            return parser.parse(node)
10✔
476
        except (Z3Exception, Z3ParseException):
10✔
477
            return None
10✔
478

479

480
def _get_vars(expr: ExprRef) -> Set[str]:
20✔
481
    """Retrieve all z3 variables from a z3 expression"""
482
    variables = set()
10✔
483

484
    def traverse(e: ExprRef) -> None:
10✔
485
        try:
10✔
486
            from z3 import Z3_OP_UNINTERPRETED, is_const
10✔
487

488
        except ImportError:
10✔
489
            return
10✔
490

491
        if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
10✔
492
            variables.add(e.decl().name())
10✔
493
        elif hasattr(e, "children"):
10✔
494
            for child in e.children():
10✔
495
                traverse(child)
10✔
496

497
    traverse(expr)
10✔
498
    return variables
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc