• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 12039300517

26 Nov 2024 09:26PM UTC coverage: 91.968% (-0.03%) from 92.002%
12039300517

Pull #1116

github

web-flow
Merge bcdcfb049 into e28ef0165
Pull Request #1116: Fix Loop Handling and Reporting Issues in Z3 Constraints and `one-iteration-checker`

4 of 4 new or added lines in 2 files covered. (100.0%)

1 existing line in 1 file now uncovered.

3103 of 3374 relevant lines covered (91.97%)

9.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.83
/python_ta/cfg/graph.py
1
from __future__ import annotations
10✔
2

3
from typing import Any, Dict, Generator, List, Optional, Set
10✔
4

5
try:
10✔
6
    from z3 import (
10✔
7
        Z3_OP_UNINTERPRETED,
8
        ExprRef,
9
        Not,
10
        Solver,
11
        Z3Exception,
12
        is_const,
13
        unsat,
14
    )
15

16
    from ..z3.z3_parser import Z3ParseException, Z3Parser
10✔
17

18
    z3_dependency_available = True
10✔
19
except ImportError:
×
20
    ExprRef = Any
×
21
    Z3Parser = Any
×
22
    Not = Any
×
23
    Z3Exception = Any
×
24
    is_const = Any
×
25
    Z3_OP_UNINTERPRETED = Any
×
26
    Z3ParseException = Any
×
27
    Solver = Any
×
28
    unsat = Any
×
29
    z3_dependency_available = False
×
30

31
from astroid import (
10✔
32
    AnnAssign,
33
    Arguments,
34
    Assign,
35
    AssignName,
36
    AugAssign,
37
    Break,
38
    Continue,
39
    NodeNG,
40
    Raise,
41
    Return,
42
)
43

44

45
class ControlFlowGraph:
10✔
46
    """A graph representing the control flow of a Python program."""
47

48
    start: CFGBlock
10✔
49
    end: CFGBlock
10✔
50
    # The unique id of this cfg. Defaults to 0 if not initialized in a CFGVisitor instance.
51
    cfg_id: int
10✔
52
    # block_count is used as an "autoincrement" to ensure the block ids are unique.
53
    block_count: int
10✔
54
    # blocks (with at least one statement) that will never be executed in runtime.
55
    unreachable_blocks: Set[CFGBlock]
10✔
56
    # z3 constraints of preconditions
57
    precondition_constraints: List[ExprRef]
10✔
58
    # map from variable names to z3 variables
59
    _z3_vars: Dict[str, ExprRef]
10✔
60

61
    def __init__(self, cfg_id: int = 0) -> None:
10✔
62
        self.block_count = 0
10✔
63
        self.cfg_id = cfg_id
10✔
64
        self.unreachable_blocks = set()
10✔
65
        self.start = self.create_block()
10✔
66
        self.end = self.create_block()
10✔
67
        self._z3_vars = {}
10✔
68
        self.precondition_constraints = []
10✔
69

70
    def add_arguments(self, args: Arguments) -> None:
10✔
71
        self.start.add_statement(args)
10✔
72
        args.parent.cfg = self
10✔
73
        args.parent.cfg_block = self.start
10✔
74

75
        if ExprRef is not Any:
10✔
76
            # Parse types
77
            parser = Z3Parser()
10✔
78
            z3_vars = parser.parse_arguments(args)
10✔
79
            self._z3_vars.update(z3_vars)
10✔
80

81
    def create_block(
10✔
82
        self,
83
        pred: Optional[CFGBlock] = None,
84
        edge_label: Optional[Any] = None,
85
        edge_condition: Optional[NodeNG] = None,
86
        edge_negate: Optional[bool] = None,
87
    ) -> CFGBlock:
88
        """Create a new CFGBlock for this graph.
89

90
        If pred is specified, set that block as a predecessor of the new block.
91

92
        If edge_label is specified, set the corresponding edge in the CFG with that label.
93

94
        If edge_condition is specified, store the condition node in the corresponding edge.
95

96
        edge_negate is not None only if edge_condition is specified
97
        """
98
        new_block = CFGBlock(self.block_count)
10✔
99
        self.unreachable_blocks.add(new_block)
10✔
100

101
        self.block_count += 1
10✔
102
        if pred:
10✔
103
            self.link_or_merge(pred, new_block, edge_label, edge_condition, edge_negate)
10✔
104
        return new_block
10✔
105

106
    def link(self, source: CFGBlock, target: CFGBlock) -> None:
10✔
107
        """Link source to target."""
108
        if not source.is_jump():
10✔
109
            CFGEdge(source, target)
10✔
110

111
    def link_or_merge(
10✔
112
        self,
113
        source: CFGBlock,
114
        target: CFGBlock,
115
        edge_label: Optional[Any] = None,
116
        edge_condition: Optional[NodeNG] = None,
117
        edge_negate: Optional[bool] = None,
118
    ) -> None:
119
        """Link source to target, or merge source into target if source is empty.
120

121
        An "empty" node for this purpose is when source has no statements.
122

123
        source with a jump statement cannot be further linked or merged to
124
        another target.
125

126
        If edge_label is specified, set the corresponding edge in the CFG with that label.
127

128
        If edge_condition is specified, store the condition node in the corresponding edge.
129

130
        edge_negate is not None only if edge_condition is specified
131
        """
132
        if source.is_jump():
10✔
133
            return
10✔
134
        if source.statements == []:
10✔
135
            if source is self.start:
10✔
136
                self.start = target
10✔
137
            else:
138
                for edge in source.predecessors:
10✔
139
                    edge.target = target
10✔
140
                    target.predecessors.append(edge)
10✔
141
            # source is a utility block that helps build the cfg that does not
142
            # represent any part of the program so it is redundant.
143
            self.unreachable_blocks.remove(source)
10✔
144
        else:
145
            CFGEdge(source, target, edge_label, edge_condition, edge_negate)
10✔
146

147
    def multiple_link_or_merge(self, source: CFGBlock, targets: List[CFGBlock]) -> None:
10✔
148
        """Link source to multiple target, or merge source into targets if source is empty.
149

150
        An "empty" node for this purpose is when source has no statements.
151

152
        source with a jump statement cannot be further linked or merged to
153
        another target.
154

155
        Precondition:
156
            - source != cfg.start
157
        """
158
        if source.statements == []:
10✔
159
            for edge in source.predecessors:
10✔
160
                for t in targets:
10✔
161
                    CFGEdge(edge.source, t)
10✔
162
                edge.source.successors.remove(edge)
10✔
163
            source.predecessors = []
10✔
164
            self.unreachable_blocks.remove(source)
10✔
165
        else:
166
            for target in targets:
10✔
167
                self.link(source, target)
10✔
168

169
    def get_blocks(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
10✔
170
        """Generate a sequence of all blocks in this graph.
171

172
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
173
        """
174
        yield from self._get_blocks(self.start, set(), only_feasible)
10✔
175

176
    def _get_blocks(
10✔
177
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
178
    ) -> Generator[CFGBlock, None, None]:
179
        if block.id in visited:
10✔
180
            return
10✔
181

182
        yield block
10✔
183
        visited.add(block.id)
10✔
184

185
        for edge in block.successors:
10✔
186
            if not only_feasible or edge.is_feasible:
10✔
187
                yield from self._get_blocks(edge.target, visited, only_feasible)
10✔
188

189
    def get_blocks_postorder(self, only_feasible: bool = False) -> Generator[CFGBlock, None, None]:
10✔
190
        """Return the sequence of all blocks in this graph in the order of
191
        a post-order traversal.
192

193
        When only_feasible is True, only generate blocks feasible from start based on edge z3 constraints.
194
        """
195
        yield from self._get_blocks_postorder(self.start, set(), only_feasible)
10✔
196

197
    def _get_blocks_postorder(
10✔
198
        self, block: CFGBlock, visited: Set[int], only_feasible: bool
199
    ) -> Generator[CFGBlock, None, None]:
200
        if block.id in visited:
10✔
201
            return
10✔
202

203
        visited.add(block.id)
10✔
204
        for succ in block.successors:
10✔
205
            if not only_feasible or succ.is_feasible:
10✔
206
                yield from self._get_blocks_postorder(succ.target, visited, only_feasible)
10✔
207

208
        yield block
10✔
209

210
    def get_edges(self) -> Generator[CFGEdge, None, None]:
10✔
211
        """Generate a sequence of all edges in this graph."""
212
        yield from self._get_edges(self.start, set())
10✔
213

214
    def _get_edges(self, block: CFGBlock, visited: Set[int]) -> Generator[CFGEdge, None, None]:
10✔
215
        if block.id in visited:
10✔
216
            return
10✔
217

218
        visited.add(block.id)
10✔
219

220
        for edge in block.successors:
10✔
221
            yield edge
10✔
222
            yield from self._get_edges(edge.target, visited)
10✔
223

224
    def get_paths(self) -> List[List[CFGEdge]]:
10✔
225
        """Get edges that represent paths from start to end node in depth-first order."""
226
        paths = []
10✔
227

228
        def _dfs(
10✔
229
            current_edge: CFGEdge,
230
            current_path: List[CFGEdge],
231
            visited_edges: Set[CFGEdge],
232
            visited_nodes: Set[CFGBlock],
233
        ):
234
            if current_edge in visited_edges:
10✔
UNCOV
235
                return
×
236

237
            visited_edges.add(current_edge)
10✔
238
            current_path.append(current_edge)
10✔
239
            visited_nodes.add(current_edge.source)
10✔
240

241
            if (
10✔
242
                current_edge.target == self.end
243
                or current_edge.target in visited_nodes
244
                or set(current_edge.target.successors).issubset(visited_edges)
245
            ):
246
                paths.append(current_path.copy())
10✔
247
            else:
248
                for edge in current_edge.target.successors:
10✔
249
                    _dfs(edge, current_path, visited_edges, visited_nodes)
10✔
250

251
            current_path.pop()
10✔
252
            visited_edges.remove(current_edge)
10✔
253
            visited_nodes.remove(current_edge.source)
10✔
254

255
        _dfs(self.start.successors[0], [], set(), set())
10✔
256
        return paths
10✔
257

258
    def update_block_reachability(self) -> None:
10✔
259
        for block in self.get_blocks():
10✔
260
            block.reachable = True
10✔
261
            if block in self.unreachable_blocks:
10✔
262
                self.unreachable_blocks.remove(block)
10✔
263

264
    def update_edge_z3_constraints(self) -> None:
10✔
265
        """Traverse through edges and add Z3 constraints on each edge.
266

267
        Constraints are generated from:
268
        - Function preconditions
269
        - If conditions
270
        - While conditions
271

272
        Constraints with reassigned variables are not included in subsequent edges.
273
        """
274
        if not z3_dependency_available:
10✔
275
            return
×
276

277
        for path_id, path in enumerate(self.get_paths()):
10✔
278
            # starting a new path
279
            z3_environment = Z3Environment(self._z3_vars, self.precondition_constraints)
10✔
280
            for edge in path:
10✔
281
                # traverse through edge
282
                if edge.condition is not None:
10✔
283
                    condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
10✔
284
                    if condition_z3_constraint is not None:
10✔
285
                        if edge.negate is not None:
10✔
286
                            z3_environment.add_constraint(
10✔
287
                                Not(condition_z3_constraint)
288
                                if edge.negate
289
                                else condition_z3_constraint
290
                            )
291

292
                edge.z3_constraints[path_id] = z3_environment.update_constraints()
10✔
293

294
                # traverse into target node
295
                for node in edge.target.statements:
10✔
296
                    if isinstance(node, (Assign, AugAssign, AnnAssign)):
10✔
297
                        self._handle_variable_reassignment(node, z3_environment)
10✔
298

299
    def _handle_variable_reassignment(self, node: NodeNG, env: Z3Environment) -> None:
10✔
300
        """Check for reassignment statements and invoke Z3 environment"""
301
        if isinstance(node, Assign):
10✔
302
            for target in node.targets:
10✔
303
                if isinstance(target, AssignName):
10✔
304
                    env.assign(target.name)
10✔
305
        elif isinstance(node, (AugAssign, AnnAssign)):
10✔
306
            env.assign(node.target.name)
10✔
307

308
    def update_edge_feasibility(self) -> None:
10✔
309
        """Traverse through edges in DFS order and update is_feasible
310
        attribute of each edge. Edges that are unreachable with the given
311
        set of Z3 constraints will have is_feasible set to False
312
        """
313
        if not z3_dependency_available:
10✔
314
            return
×
315

316
        def _check_unsat(constraints: List[ExprRef]) -> bool:
10✔
317
            solver = Solver()
10✔
318
            solver.add(constraints)
10✔
319
            return solver.check() == unsat
10✔
320

321
        for edge in self.get_edges():
10✔
322
            if len(edge.z3_constraints) > 0:
10✔
323
                edge.is_feasible = not all(
10✔
324
                    _check_unsat(constraints) for constraints in edge.z3_constraints.values()
325
                )
326

327

328
class CFGBlock:
10✔
329
    """A node in a control flow graph.
330

331
    Represents a maximal block of code whose statements are guaranteed to execute in sequence.
332
    """
333

334
    # A unique identifier
335
    id: int
10✔
336
    # The statements in this block.
337
    statements: List[NodeNG]
10✔
338
    # This block's in-edges (from blocks that can execute immediately before this one).
339
    predecessors: List[CFGEdge]
10✔
340
    # This block's out-edges (to blocks that can execute immediately after this one).
341
    successors: List[CFGEdge]
10✔
342
    # Whether there exists a path from the start block to this block.
343
    reachable: bool
10✔
344

345
    def __init__(self, id_: int) -> None:
10✔
346
        """Initialize a new CFGBlock."""
347
        self.id = id_
10✔
348
        self.statements = []
10✔
349
        self.predecessors = []
10✔
350
        self.successors = []
10✔
351
        self.reachable = False
10✔
352

353
    def add_statement(self, statement: NodeNG) -> None:
10✔
354
        if not self.is_jump():
10✔
355
            self.statements.append(statement)
10✔
356
            statement.cfg_block = self
10✔
357

358
    @property
10✔
359
    def jump(self) -> Optional[NodeNG]:
10✔
360
        if len(self.statements) > 0:
10✔
361
            return self.statements[-1]
10✔
362

363
    @property
10✔
364
    def is_feasible(self) -> bool:
10✔
365
        return any(edge.is_feasible for edge in self.predecessors)
10✔
366

367
    def is_jump(self) -> bool:
10✔
368
        """Returns True if the block has a statement that branches
369
        the control flow (ex: `break`)"""
370
        return isinstance(self.jump, (Break, Continue, Return, Raise))
10✔
371

372

373
class CFGEdge:
10✔
374
    """An edge in a control flow graph.
375

376
    Edges are directed, and in the future may be augmented with auxiliary metadata about the control flow.
377

378
    The condition attribute stores the AST node representing the condition tested in If and While statements.
379
    The negate attribute stores the condition should be False (when `negate` is True) or condition should be true
380
    (when `negate` is False)
381
    """
382

383
    source: CFGBlock
10✔
384
    target: CFGBlock
10✔
385
    label: Optional[str]
10✔
386
    condition: Optional[NodeNG]
10✔
387
    negate: Optional[bool]
10✔
388
    z3_constraints: Dict[int, List[ExprRef]]
10✔
389
    is_feasible: bool
10✔
390

391
    def __init__(
10✔
392
        self,
393
        source: CFGBlock,
394
        target: CFGBlock,
395
        edge_label: Optional[str] = None,
396
        condition: Optional[NodeNG] = None,
397
        negate: Optional[bool] = None,
398
    ) -> None:
399
        self.source = source
10✔
400
        self.target = target
10✔
401
        self.label = edge_label
10✔
402
        self.condition = condition
10✔
403
        self.negate = negate
10✔
404
        self.source.successors.append(self)
10✔
405
        self.target.predecessors.append(self)
10✔
406
        self.z3_constraints = {}
10✔
407
        self.is_feasible = True
10✔
408

409
    def get_label(self) -> Optional[str]:
10✔
410
        """Return the edge label if specified.
411
        If `edge.label` is None, return the edge condition determined by the negation of `edge.negate`.
412
        Return None if both `edge.label` and `edge.negate` are None.
413
        """
414
        if self.label is not None:
10✔
415
            return self.label
×
416
        elif self.negate is not None:
10✔
417
            return str(not self.negate)
10✔
418
        return None
10✔
419

420

421
class Z3Environment:
10✔
422
    """Z3 Environment stores the Z3 variables and constraints in the current CFG path
423

424
    variable_unassigned:
425
        A dictionary mapping each variable in the current environment to a boolean indicating
426
        whether it has been reassigned (False) or remains unassigned (True).
427

428
    variables:
429
        A dictionary mapping each variable in the current environment to its z3 variable.
430

431
    constraints:
432
        A list of Z3 constraints in the current environment.
433
    """
434

435
    variable_unassigned: Dict[str, bool]
10✔
436
    variables: Dict[str, ExprRef]
10✔
437
    constraints: List[ExprRef]
10✔
438

439
    def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
10✔
440
        """Initialize the environment with function parameters and preconditions"""
441
        self.variable_unassigned = {var: True for var in variables}
10✔
442
        self.variables = variables
10✔
443
        self.constraints = constraints.copy()
10✔
444

445
    def assign(self, name: str) -> None:
10✔
446
        """Handle a variable assignment statement"""
447
        if name in self.variable_unassigned:
10✔
448
            self.variable_unassigned[name] = False
10✔
449

450
    def update_constraints(self) -> List[ExprRef]:
10✔
451
        """Returns all z3 constraints in the environments
452
        Removes constraints with reassigned variables
453
        """
454
        updated_constraints = []
10✔
455
        for constraint in self.constraints:
10✔
456
            # discard expressions with reassigned variables
457
            variables = _get_vars(constraint)
10✔
458
            reassigned = any(
10✔
459
                not self.variable_unassigned.get(variable, False) for variable in variables
460
            )
461
            if not reassigned:
10✔
462
                updated_constraints.append(constraint)
10✔
463

464
        self.constraints = updated_constraints
10✔
465
        return updated_constraints.copy()
10✔
466

467
    def add_constraint(self, constraint: ExprRef) -> None:
10✔
468
        """Add a new z3 constraint to environment"""
469
        self.constraints.append(constraint)
10✔
470

471
    def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
10✔
472
        """Parse an Astroid node to a Z3 constraint
473
        Return the resulting expression
474
        """
475
        parser = Z3Parser(self.variables)
10✔
476
        try:
10✔
477
            return parser.parse(node)
10✔
478
        except (Z3Exception, Z3ParseException):
10✔
479
            return None
10✔
480

481

482
def _get_vars(expr: ExprRef) -> Set[str]:
10✔
483
    """Retrieve all z3 variables from a z3 expression"""
484
    variables = set()
10✔
485

486
    def traverse(e: ExprRef) -> None:
10✔
487
        if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
10✔
488
            variables.add(e.decl().name())
10✔
489
        elif hasattr(e, "children"):
10✔
490
            for child in e.children():
10✔
491
                traverse(child)
10✔
492

493
    traverse(expr)
10✔
494
    return variables
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc