• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 15834615922

23 Jun 2025 08:38PM UTC coverage: 92.63% (-0.2%) from 92.844%
15834615922

Pull #1188

github

web-flow
Merge 179dad789 into 02ddeab02
Pull Request #1188: Refactoring of the codebase to delay the import of z3 module

60 of 77 new or added lines in 7 files covered. (77.92%)

3 existing lines in 1 file now uncovered.

3431 of 3704 relevant lines covered (92.63%)

17.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.43
/python_ta/cfg/graph.py
1
from __future__ import annotations
20✔
2

3
import logging
20✔
4
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Set
20✔
5

6
if TYPE_CHECKING:
20✔
NEW
7
    try:
×
NEW
8
        from z3 import ExprRef
×
NEW
9
    except ImportError:
×
NEW
10
        ExprRef = Any
×
11

12
from astroid import (
20✔
13
    AnnAssign,
14
    Arguments,
15
    Assign,
16
    AssignName,
17
    AugAssign,
18
    Break,
19
    Continue,
20
    NodeNG,
21
    Raise,
22
    Return,
23
)
24

25
# Global z3
26
z3 = Any
20✔
27

28

29
class ControlFlowGraph:
20✔
30
    """A graph representing the control flow of a Python program."""
31

32
    start: CFGBlock
20✔
33
    end: CFGBlock
20✔
34
    # The unique id of this cfg. Defaults to 0 if not initialized in a CFGVisitor instance.
35
    cfg_id: int
20✔
36
    # block_count is used as an "autoincrement" to ensure the block ids are unique.
37
    block_count: int
20✔
38
    # blocks (with at least one statement) that will never be executed in runtime.
39
    unreachable_blocks: Set[CFGBlock]
20✔
40
    # z3 constraints of preconditions
41
    precondition_constraints: List[ExprRef]
20✔
42
    # map from variable names to z3 variables
43
    z3_vars: Dict[str, ExprRef]
20✔
44
    # z3 enabled option
45
    z3_enabled: bool
20✔
46

47
    def __init__(self, cfg_id: int = 0, z3_enabled: bool = False) -> None:
20✔
48
        self.block_count = 0
20✔
49
        self.cfg_id = cfg_id
20✔
50
        self.unreachable_blocks = set()
20✔
51
        self.start = self.create_block()
20✔
52
        self.end = self.create_block()
20✔
53
        self.z3_vars = {}
20✔
54
        self.precondition_constraints = []
20✔
55
        self.z3_enabled = z3_enabled
20✔
56
        global z3
57
        if self.z3_enabled:
20✔
58
            try:
20✔
59
                import z3 as z3_module
20✔
60

61
                z3 = z3_module
10✔
62
            except ImportError:
20✔
63
                # Reset z3_enabled to prevent further use of z3 module
64
                logging.error("Failed to import z3 module. Disabling z3 features.")
20✔
65
                self.z3_enabled = False
20✔
66

67
    def add_arguments(self, args: Arguments) -> None:
20✔
68
        self.start.add_statement(args)
20✔
69
        args.parent.cfg = self
20✔
70
        args.parent.cfg_block = self.start
20✔
71

72
        if self.z3_enabled:
20✔
73
            try:
10✔
74
                from ..z3.z3_parser import Z3Parser
10✔
75

NEW
76
            except ImportError:
×
NEW
77
                logging.error("Failed to import Z3Parser despite Z3 being enabled.")
×
NEW
78
                return
×
79
            # Parse types
80
            parser = Z3Parser()
10✔
81
            z3_vars = parser.parse_arguments(args)
10✔
82
            self.z3_vars.update(z3_vars)
10✔
83

84
    def create_block(
20✔
85
        self,
86
        pred: Optional[CFGBlock] = None,
87
        edge_label: Optional[Any] = None,
88
        edge_condition: Optional[NodeNG] = None,
89
        edge_negate: Optional[bool] = None,
90
    ) -> CFGBlock:
91
        """Create a new CFGBlock for this graph.
92

93
        If pred is specified, set that block as a predecessor of the new block.
94

95
        If edge_label is specified, set the corresponding edge in the CFG with that label.
96

97
        If edge_condition is specified, store the condition node in the corresponding edge.
98

99
        edge_negate is not None only if edge_condition is specified
100
        """
101
        new_block = CFGBlock(self.block_count)
20✔
102
        self.unreachable_blocks.add(new_block)
20✔
103

104
        self.block_count += 1
20✔
105
        if pred:
20✔
106
            self.link_or_merge(pred, new_block, edge_label, edge_condition, edge_negate)
20✔
107
        return new_block
20✔
108

109
    def link(self, source: CFGBlock, target: CFGBlock) -> None:
20✔
110
        """Link source to target."""
111
        if not source.is_jump():
20✔
112
            CFGEdge(source, target)
20✔
113

114
    def link_or_merge(
20✔
115
        self,
116
        source: CFGBlock,
117
        target: CFGBlock,
118
        edge_label: Optional[Any] = None,
119
        edge_condition: Optional[NodeNG] = None,
120
        edge_negate: Optional[bool] = None,
121
    ) -> None:
122
        """Link source to target, or merge source into target if source is empty.
123

124
        An "empty" node for this purpose is when source has no statements.
125

126
        source with a jump statement cannot be further linked or merged to
127
        another target.
128

129
        If edge_label is specified, set the corresponding edge in the CFG with that label.
130

131
        If edge_condition is specified, store the condition node in the corresponding edge.
132

133
        edge_negate is not None only if edge_condition is specified
134
        """
135
        if source.is_jump():
20✔
136
            return
20✔
137
        if source.statements == []:
20✔
138
            if source is self.start:
20✔
139
                self.start = target
20✔
140
            else:
141
                for edge in source.predecessors:
20✔
142
                    edge.target = target
20✔
143
                    target.predecessors.append(edge)
20✔
144
            # source is a utility block that helps build the cfg that does not
145
            # represent any part of the program so it is redundant.
146
            self.unreachable_blocks.remove(source)
20✔
147
        else:
148
            CFGEdge(source, target, edge_label, edge_condition, edge_negate)
20✔
149

150
    def multiple_link_or_merge(self, source: CFGBlock, targets: List[CFGBlock]) -> None:
20✔
151
        """Link source to multiple target, or merge source into targets if source is empty.
152

153
        An "empty" node for this purpose is when source has no statements.
154

155
        source with a jump statement cannot be further linked or merged to
156
        another target.
157

158
        Precondition:
159
            - source != cfg.start
160
        """
161
        if source.statements == []:
20✔
162
            for edge in source.predecessors:
20✔
163
                for t in targets:
20✔
164
                    CFGEdge(edge.source, t)
20✔
165
                edge.source.successors.remove(edge)
20✔
166
            source.predecessors = []
20✔
167
            self.unreachable_blocks.remove(source)
20✔
168
        else:
169
            for target in targets:
20✔
170
                self.link(source, target)
20✔
171

172
    def get_blocks(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
173
        """Generate a sequence of all blocks in this graph.
174

175
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
176
        """
177
        yield from self._get_blocks(self.start, set(), only_feasible)
20✔
178

179
    def _get_blocks(
20✔
180
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
181
    ) -> Generator[CFGBlock, None, None]:
182
        if block.id in visited:
20✔
183
            return
20✔
184

185
        yield block
20✔
186
        visited.add(block.id)
20✔
187

188
        for edge in block.successors:
20✔
189
            if not only_feasible or edge.is_feasible:
20✔
190
                yield from self._get_blocks(edge.target, visited, only_feasible)
20✔
191

192
    def get_blocks_postorder(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
20✔
193
        """Return the sequence of all blocks in this graph in the order of
194
        a post-order traversal.
195

196
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
197
        """
198
        yield from self._get_blocks_postorder(self.start, set(), only_feasible)
20✔
199

200
    def _get_blocks_postorder(
20✔
201
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
202
    ) -> Generator[CFGBlock, None, None]:
203
        if block.id in visited:
20✔
204
            return
20✔
205

206
        visited.add(block.id)
20✔
207
        for succ in block.successors:
20✔
208
            if not only_feasible or succ.is_feasible:
20✔
209
                yield from self._get_blocks_postorder(succ.target, visited, only_feasible)
20✔
210

211
        yield block
20✔
212

213
    def get_edges(self) -> Generator[CFGEdge, None, None]:
20✔
214
        """Generate a sequence of all edges in this graph."""
215
        yield from self._get_edges(self.start, set())
20✔
216

217
    def _get_edges(self, block: CFGBlock, visited: Set[int]) -> Generator[CFGEdge, None, None]:
20✔
218
        if block.id in visited:
20✔
219
            return
20✔
220

221
        visited.add(block.id)
20✔
222

223
        for edge in block.successors:
20✔
224
            yield edge
20✔
225
            yield from self._get_edges(edge.target, visited)
20✔
226

227
    def get_paths(self) -> List[List[CFGEdge]]:
20✔
228
        """Get edges that represent paths from start to end node in depth-first order."""
229
        paths = []
10✔
230

231
        def _dfs(
10✔
232
            current_edge: CFGEdge,
233
            current_path: List[CFGEdge],
234
            visited_edges: Set[CFGEdge],
235
            visited_nodes: Set[CFGBlock],
236
        ):
237
            if current_edge in visited_edges:
10✔
238
                return
×
239

240
            visited_edges.add(current_edge)
10✔
241
            current_path.append(current_edge)
10✔
242
            visited_nodes.add(current_edge.source)
10✔
243

244
            if (
10✔
245
                current_edge.target == self.end
246
                or current_edge.target in visited_nodes
247
                or set(current_edge.target.successors).issubset(visited_edges)
248
            ):
249
                paths.append(current_path.copy())
10✔
250
            else:
251
                for edge in current_edge.target.successors:
10✔
252
                    _dfs(edge, current_path, visited_edges, visited_nodes)
10✔
253

254
            current_path.pop()
10✔
255
            visited_edges.remove(current_edge)
10✔
256
            visited_nodes.remove(current_edge.source)
10✔
257

258
        _dfs(self.start.successors[0], [], set(), set())
10✔
259
        return paths
10✔
260

261
    def update_block_reachability(self) -> None:
20✔
262
        for block in self.get_blocks():
20✔
263
            block.reachable = True
20✔
264
            if block in self.unreachable_blocks:
20✔
265
                self.unreachable_blocks.remove(block)
20✔
266

267
    def update_edge_z3_constraints(self) -> None:
20✔
268
        """Traverse through edges and add Z3 constraints on each edge.
269

270
        Constraints are generated from:
271
        - Function preconditions
272
        - If conditions
273
        - While conditions
274

275
        Constraints with reassigned variables are not included in subsequent edges.
276
        """
277
        if not self.z3_enabled:
10✔
278
            return
10✔
279
        for path_id, path in enumerate(self.get_paths()):
10✔
280
            # starting a new path
281
            # Note: z3_enabled is False by default. It will only be set to True if self.z3_enabled was enabled earlier
282
            z3_environment = Z3Environment(self.z3_vars, self.precondition_constraints)
10✔
283
            for edge in path:
10✔
284
                # traverse through edge
285
                if edge.condition is not None:
10✔
286
                    condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
10✔
287
                    if condition_z3_constraint is not None:
10✔
288
                        if edge.negate is not None:
10✔
289
                            z3_environment.add_constraint(
10✔
290
                                z3.Not(condition_z3_constraint)
291
                                if edge.negate
292
                                else condition_z3_constraint
293
                            )
294

295
                edge.z3_constraints[path_id] = z3_environment.update_constraints()
10✔
296

297
                # traverse into target node
298
                for node in edge.target.statements:
10✔
299
                    if isinstance(node, (Assign, AugAssign, AnnAssign)):
10✔
300
                        self._handle_variable_reassignment(node, z3_environment)
10✔
301

302
    def _handle_variable_reassignment(self, node: NodeNG, env: Z3Environment) -> None:
20✔
303
        """Check for reassignment statements and invoke Z3 environment"""
304
        if isinstance(node, Assign):
10✔
305
            for target in node.targets:
10✔
306
                if isinstance(target, AssignName):
10✔
307
                    env.assign(target.name)
10✔
308
        elif isinstance(node, (AugAssign, AnnAssign)):
10✔
309
            env.assign(node.target.name)
10✔
310

311
    def update_edge_feasibility(self) -> None:
20✔
312
        """Traverse through edges in DFS order and update is_feasible
313
        attribute of each edge. Edges that are unreachable with the given
314
        set of Z3 constraints will have is_feasible set to False
315
        """
316
        if not self.z3_enabled:
10✔
317
            return
10✔
318

319
        def _check_unsat(constraints: List[ExprRef]) -> bool:
10✔
320
            solver = z3.Solver()
10✔
321
            solver.add(constraints)
10✔
322
            return solver.check() == z3.unsat
10✔
323

324
        for edge in self.get_edges():
10✔
325
            if len(edge.z3_constraints) > 0:
10✔
326
                edge.is_feasible = not all(
10✔
327
                    _check_unsat(constraints) for constraints in edge.z3_constraints.values()
328
                )
329

330

331
class CFGBlock:
20✔
332
    """A node in a control flow graph.
333

334
    Represents a maximal block of code whose statements are guaranteed to execute in sequence.
335
    """
336

337
    # A unique identifier
338
    id: int
20✔
339
    # The statements in this block.
340
    statements: List[NodeNG]
20✔
341
    # This block's in-edges (from blocks that can execute immediately before this one).
342
    predecessors: List[CFGEdge]
20✔
343
    # This block's out-edges (to blocks that can execute immediately after this one).
344
    successors: List[CFGEdge]
20✔
345
    # Whether there exists a path from the start block to this block.
346
    reachable: bool
20✔
347

348
    def __init__(self, id_: int) -> None:
20✔
349
        """Initialize a new CFGBlock."""
350
        self.id = id_
20✔
351
        self.statements = []
20✔
352
        self.predecessors = []
20✔
353
        self.successors = []
20✔
354
        self.reachable = False
20✔
355

356
    def add_statement(self, statement: NodeNG) -> None:
20✔
357
        if not self.is_jump():
20✔
358
            self.statements.append(statement)
20✔
359
            statement.cfg_block = self
20✔
360

361
    @property
20✔
362
    def jump(self) -> Optional[NodeNG]:
20✔
363
        if len(self.statements) > 0:
20✔
364
            return self.statements[-1]
20✔
365

366
    @property
20✔
367
    def is_feasible(self) -> bool:
20✔
368
        return any(edge.is_feasible for edge in self.predecessors)
20✔
369

370
    def is_jump(self) -> bool:
20✔
371
        """Returns True if the block has a statement that branches
372
        the control flow (ex: `break`)"""
373
        return isinstance(self.jump, (Break, Continue, Return, Raise))
20✔
374

375

376
class CFGEdge:
20✔
377
    """An edge in a control flow graph.
378

379
    Edges are directed, and in the future may be augmented with auxiliary metadata about the control flow.
380

381
    The condition attribute stores the AST node representing the condition tested in If and While statements.
382
    The negate attribute stores the condition should be False (when `negate` is True) or condition should be true
383
    (when `negate` is False)
384
    """
385

386
    source: CFGBlock
20✔
387
    target: CFGBlock
20✔
388
    label: Optional[str]
20✔
389
    condition: Optional[NodeNG]
20✔
390
    negate: Optional[bool]
20✔
391
    z3_constraints: Dict[int, List[ExprRef]]
20✔
392
    is_feasible: bool
20✔
393

394
    def __init__(
20✔
395
        self,
396
        source: CFGBlock,
397
        target: CFGBlock,
398
        edge_label: Optional[str] = None,
399
        condition: Optional[NodeNG] = None,
400
        negate: Optional[bool] = None,
401
    ) -> None:
402
        self.source = source
20✔
403
        self.target = target
20✔
404
        self.label = edge_label
20✔
405
        self.condition = condition
20✔
406
        self.negate = negate
20✔
407
        self.source.successors.append(self)
20✔
408
        self.target.predecessors.append(self)
20✔
409
        self.z3_constraints = {}
20✔
410
        self.is_feasible = True
20✔
411

412
    def get_label(self) -> Optional[str]:
20✔
413
        """Return the edge label if specified.
414
        If `edge.label` is None, return the edge condition determined by the negation of `edge.negate`.
415
        Return None if both `edge.label` and `edge.negate` are None.
416
        """
417
        if self.label is not None:
20✔
418
            return self.label
×
419
        elif self.negate is not None:
20✔
420
            return str(not self.negate)
20✔
421
        return None
20✔
422

423

424
class Z3Environment:
20✔
425
    """Z3 Environment stores the Z3 variables and constraints in the current CFG path
426

427
    variable_unassigned:
428
        A dictionary mapping each variable in the current environment to a boolean indicating
429
        whether it has been reassigned (False) or remains unassigned (True).
430

431
    variables:
432
        A dictionary mapping each variable in the current environment to its z3 variable.
433

434
    constraints:
435
        A list of Z3 constraints in the current environment.
436
    """
437

438
    variable_unassigned: Dict[str, bool]
20✔
439
    variables: Dict[str, ExprRef]
20✔
440
    constraints: List[ExprRef]
20✔
441

442
    def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
20✔
443
        """Initialize the environment with function parameters and preconditions"""
444
        self.variable_unassigned = {var: True for var in variables}
10✔
445
        self.variables = variables
10✔
446
        self.constraints = constraints.copy()
10✔
447

448
    def assign(self, name: str) -> None:
20✔
449
        """Handle a variable assignment statement"""
450
        if name in self.variable_unassigned:
10✔
451
            self.variable_unassigned[name] = False
10✔
452

453
    def update_constraints(self) -> List[ExprRef]:
20✔
454
        """Returns all z3 constraints in the environments
455
        Removes constraints with reassigned variables
456
        """
457
        updated_constraints = []
10✔
458
        for constraint in self.constraints:
10✔
459
            # discard expressions with reassigned variables
460
            variables = self._get_vars(constraint)
10✔
461
            reassigned = any(
10✔
462
                not self.variable_unassigned.get(variable, False) for variable in variables
463
            )
464
            if not reassigned:
10✔
465
                updated_constraints.append(constraint)
10✔
466

467
        self.constraints = updated_constraints
10✔
468
        return updated_constraints.copy()
10✔
469

470
    def add_constraint(self, constraint: ExprRef) -> None:
20✔
471
        """Add a new z3 constraint to environment"""
472
        self.constraints.append(constraint)
10✔
473

474
    def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
20✔
475
        """Parse an Astroid node to a Z3 constraint
476
        Return the resulting expression
477
        """
478
        from z3 import Z3Exception
10✔
479

480
        from ..z3.z3_parser import Z3ParseException, Z3Parser
10✔
481

482
        parser = Z3Parser(self.variables)
10✔
483
        try:
10✔
484
            return parser.parse(node)
10✔
485
        except (Z3Exception, Z3ParseException):
10✔
486
            return None
10✔
487

488
    def _get_vars(self, expr: ExprRef) -> Set[str]:
20✔
489
        """Retrieve all z3 variables from a z3 expression"""
490
        from z3 import Z3_OP_UNINTERPRETED, is_const
10✔
491

492
        variables = set()
10✔
493

494
        def traverse(e: ExprRef) -> None:
10✔
495
            if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
10✔
496
                variables.add(e.decl().name())
10✔
497
            elif hasattr(e, "children"):
10✔
498
                for child in e.children():
10✔
499
                    traverse(child)
10✔
500

501
        traverse(expr)
10✔
502
        return variables
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc