• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pyta-uoft / pyta / 11109330666

30 Sep 2024 03:30PM UTC coverage: 92.034% (-0.02%) from 92.051%
11109330666

Pull #1084

github

web-flow
Merge a090257c5 into 667ea92e2
Pull Request #1084: Check cfg edge constraints

19 of 22 new or added lines in 3 files covered. (86.36%)

1 existing line in 1 file now uncovered.

3073 of 3339 relevant lines covered (92.03%)

9.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.26
/python_ta/cfg/graph.py
1
from __future__ import annotations
10✔
2

3
from typing import Any, Dict, Generator, List, Optional, Set
10✔
4

5
try:
10✔
6
    from z3 import (
10✔
7
        Z3_OP_UNINTERPRETED,
8
        ExprRef,
9
        Not,
10
        Solver,
11
        Z3Exception,
12
        is_const,
13
        unsat,
14
    )
15

16
    from ..z3.z3_parser import Z3ParseException, Z3Parser
10✔
17

18
    z3_dependency_available = True
10✔
19
except ImportError:
×
20
    ExprRef = Any
×
21
    Z3Parser = Any
×
22
    Not = Any
×
23
    Z3Exception = Any
×
24
    is_const = Any
×
25
    Z3_OP_UNINTERPRETED = Any
×
26
    Z3ParseException = Any
×
NEW
27
    Solver = Any
×
NEW
28
    unsat = Any
×
UNCOV
29
    z3_dependency_available = False
×
30

31
from astroid import (
10✔
32
    AnnAssign,
33
    Arguments,
34
    Assign,
35
    AssignName,
36
    AugAssign,
37
    Break,
38
    Continue,
39
    NodeNG,
40
    Raise,
41
    Return,
42
)
43

44

45
class ControlFlowGraph:
10✔
46
    """A graph representing the control flow of a Python program."""
5✔
47

48
    start: CFGBlock
10✔
49
    end: CFGBlock
10✔
50
    # The unique id of this cfg. Defaults to 0 if not initialized in a CFGVisitor instance.
51
    cfg_id: int
10✔
52
    # block_count is used as an "autoincrement" to ensure the block ids are unique.
53
    block_count: int
10✔
54
    # blocks (with at least one statement) that will never be executed in runtime.
55
    unreachable_blocks: Set[CFGBlock]
10✔
56
    # z3 constraints of preconditions
57
    precondition_constraints: List[ExprRef]
10✔
58
    # map from variable names to z3 variables
59
    _z3_vars: Dict[str, ExprRef]
10✔
60

61
    def __init__(self, cfg_id: int = 0) -> None:
10✔
62
        self.block_count = 0
10✔
63
        self.cfg_id = cfg_id
10✔
64
        self.unreachable_blocks = set()
10✔
65
        self.start = self.create_block()
10✔
66
        self.end = self.create_block()
10✔
67
        self._z3_vars = {}
10✔
68
        self.precondition_constraints = []
10✔
69

70
    def add_arguments(self, args: Arguments) -> None:
10✔
71
        self.start.add_statement(args)
10✔
72
        args.parent.cfg = self
10✔
73
        args.parent.cfg_block = self.start
10✔
74

75
        if ExprRef is not Any:
10✔
76
            # Parse types
77
            parser = Z3Parser()
10✔
78
            z3_vars = parser.parse_arguments(args)
10✔
79
            self._z3_vars.update(z3_vars)
10✔
80

81
    def create_block(
10✔
82
        self,
83
        pred: Optional[CFGBlock] = None,
84
        edge_label: Optional[Any] = None,
85
        edge_condition: Optional[NodeNG] = None,
86
        edge_negate: Optional[bool] = None,
87
    ) -> CFGBlock:
88
        """Create a new CFGBlock for this graph.
89

90
        If pred is specified, set that block as a predecessor of the new block.
91

92
        If edge_label is specified, set the corresponding edge in the CFG with that label.
93

94
        If edge_condition is specified, store the condition node in the corresponding edge.
95

96
        edge_negate is not None only if edge_condition is specified
97
        """
98
        new_block = CFGBlock(self.block_count)
10✔
99
        self.unreachable_blocks.add(new_block)
10✔
100

101
        self.block_count += 1
10✔
102
        if pred:
10✔
103
            self.link_or_merge(pred, new_block, edge_label, edge_condition, edge_negate)
10✔
104
        return new_block
10✔
105

106
    def link(self, source: CFGBlock, target: CFGBlock) -> None:
10✔
107
        """Link source to target."""
108
        if not source.is_jump():
10✔
109
            CFGEdge(source, target)
10✔
110

111
    def link_or_merge(
10✔
112
        self,
113
        source: CFGBlock,
114
        target: CFGBlock,
115
        edge_label: Optional[Any] = None,
116
        edge_condition: Optional[NodeNG] = None,
117
        edge_negate: Optional[bool] = None,
118
    ) -> None:
119
        """Link source to target, or merge source into target if source is empty.
120

121
        An "empty" node for this purpose is when source has no statements.
122

123
        source with a jump statement cannot be further linked or merged to
124
        another target.
125

126
        If edge_label is specified, set the corresponding edge in the CFG with that label.
127

128
        If edge_condition is specified, store the condition node in the corresponding edge.
129

130
        edge_negate is not None only if edge_condition is specified
131
        """
132
        if source.is_jump():
10✔
133
            return
10✔
134
        if source.statements == []:
10✔
135
            if source is self.start:
10✔
136
                self.start = target
10✔
137
            else:
138
                for edge in source.predecessors:
10✔
139
                    edge.target = target
10✔
140
                    target.predecessors.append(edge)
10✔
141
            # source is a utility block that helps build the cfg that does not
142
            # represent any part of the program so it is redundant.
143
            self.unreachable_blocks.remove(source)
10✔
144
        else:
145
            CFGEdge(source, target, edge_label, edge_condition, edge_negate)
10✔
146

147
    def multiple_link_or_merge(self, source: CFGBlock, targets: List[CFGBlock]) -> None:
10✔
148
        """Link source to multiple target, or merge source into targets if source is empty.
149

150
        An "empty" node for this purpose is when source has no statements.
151

152
        source with a jump statement cannot be further linked or merged to
153
        another target.
154

155
        Precondition:
156
            - source != cfg.start
157
        """
158
        if source.statements == []:
10✔
159
            for edge in source.predecessors:
10✔
160
                for t in targets:
10✔
161
                    CFGEdge(edge.source, t)
10✔
162
                edge.source.successors.remove(edge)
10✔
163
            source.predecessors = []
10✔
164
            self.unreachable_blocks.remove(source)
10✔
165
        else:
166
            for target in targets:
10✔
167
                self.link(source, target)
10✔
168

169
    def get_blocks(self) -> Generator[CFGBlock, None, None]:
10✔
170
        """Generate a sequence of all blocks in this graph."""
171
        yield from self._get_blocks(self.start, set())
10✔
172

173
    def _get_blocks(self, block: CFGBlock, visited: Set[int]) -> Generator[CFGBlock, None, None]:
10✔
174
        if block.id in visited:
10✔
175
            return
10✔
176

177
        yield block
10✔
178
        visited.add(block.id)
10✔
179

180
        for edge in block.successors:
10✔
181
            yield from self._get_blocks(edge.target, visited)
10✔
182

183
    def get_blocks_postorder(self) -> Generator[CFGBlock, None, None]:
10✔
184
        """Return the sequence of all blocks in this graph in the order of
185
        a post-order traversal."""
186
        yield from self._get_blocks_postorder(self.start, set())
10✔
187

188
    def _get_blocks_postorder(self, block: CFGBlock, visited) -> Generator[CFGBlock, None, None]:
10✔
189
        if block.id in visited:
10✔
190
            return
10✔
191

192
        visited.add(block.id)
10✔
193
        for succ in block.successors:
10✔
194
            yield from self._get_blocks_postorder(succ.target, visited)
10✔
195

196
        yield block
10✔
197

198
    def get_edges(self) -> Generator[CFGEdge, None, None]:
10✔
199
        """Generate a sequence of all edges in this graph."""
200
        yield from self._get_edges(self.start, set())
10✔
201

202
    def _get_edges(self, block: CFGBlock, visited: Set[int]) -> Generator[CFGEdge, None, None]:
10✔
203
        if block.id in visited:
10✔
204
            return
10✔
205

206
        visited.add(block.id)
10✔
207

208
        for edge in block.successors:
10✔
209
            yield edge
10✔
210
            yield from self._get_edges(edge.target, visited)
10✔
211

212
    def get_paths(self) -> List[List[CFGEdge]]:
10✔
213
        """Get edges that represent paths from start to end node in depth-first order."""
214
        paths = []
10✔
215

216
        def _visited(
10✔
217
            edge: CFGEdge, visited_edges: Set[CFGEdge], visited_nodes: Set[CFGBlock]
218
        ) -> bool:
219
            return edge in visited_edges or edge.target in visited_nodes
10✔
220

221
        def _dfs(
10✔
222
            current_edge: CFGEdge,
223
            current_path: List[CFGEdge],
224
            visited_edges: Set[CFGEdge],
225
            visited_nodes: Set[CFGBlock],
226
        ):
227
            # note: both visited edges and visited nodes need to be tracked to correctly handle cycles
228
            if _visited(current_edge, visited_edges, visited_nodes):
10✔
229
                return
10✔
230

231
            visited_edges.add(current_edge)
10✔
232
            visited_nodes.add(current_edge.source)
10✔
233
            current_path.append(current_edge)
10✔
234

235
            if current_edge.target == self.end or all(
10✔
236
                _visited(edge, visited_edges, visited_nodes)
237
                for edge in current_edge.target.successors
238
            ):
239
                paths.append(current_path.copy())
10✔
240
            else:
241
                for edge in current_edge.target.successors:
10✔
242
                    _dfs(edge, current_path, visited_edges, visited_nodes)
10✔
243

244
            current_path.pop()
10✔
245
            visited_edges.remove(current_edge)
10✔
246
            visited_nodes.remove(current_edge.source)
10✔
247

248
        _dfs(self.start.successors[0], [], set(), set())
10✔
249
        return paths
10✔
250

251
    def update_block_reachability(self) -> None:
10✔
252
        for block in self.get_blocks():
10✔
253
            block.reachable = True
10✔
254
            if block in self.unreachable_blocks:
10✔
255
                self.unreachable_blocks.remove(block)
10✔
256

257
    def update_edge_z3_constraints(self) -> None:
10✔
258
        """Traverse through edges and add Z3 constraints on each edge.
259

260
        Constraints are generated from:
261
        - Function preconditions
262
        - If conditions
263
        - While conditions
264

265
        Constraints with reassigned variables are not included in subsequent edges.
266
        """
267
        if not z3_dependency_available:
10✔
268
            return
×
269

270
        for path_id, path in enumerate(self.get_paths()):
10✔
271
            # starting a new path
272
            z3_environment = Z3Environment(self._z3_vars, self.precondition_constraints)
10✔
273
            for edge in path:
10✔
274
                # traverse through edge
275
                if edge.condition is not None:
10✔
276
                    condition_z3_constraint = z3_environment.parse_constraint(edge.condition)
10✔
277
                    if condition_z3_constraint is not None:
10✔
278
                        if edge.negate is not None:
10✔
279
                            z3_environment.add_constraint(
10✔
280
                                Not(condition_z3_constraint)
281
                                if edge.negate
282
                                else condition_z3_constraint
283
                            )
284

285
                edge.z3_constraints[path_id] = z3_environment.update_constraints()
10✔
286

287
                # traverse into target node
288
                for node in edge.target.statements:
10✔
289
                    if isinstance(node, (Assign, AugAssign, AnnAssign)):
10✔
290
                        self._handle_variable_reassignment(node, z3_environment)
10✔
291

292
    def _handle_variable_reassignment(self, node: NodeNG, env: Z3Environment) -> None:
10✔
293
        """Check for reassignment statements and invoke Z3 environment"""
294
        if isinstance(node, Assign):
10✔
295
            for target in node.targets:
10✔
296
                if isinstance(target, AssignName):
10✔
297
                    env.assign(target.name)
10✔
298
        elif isinstance(node, (AugAssign, AnnAssign)):
10✔
299
            env.assign(node.target.name)
10✔
300

301
    def update_edge_feasibility(self) -> None:
10✔
302
        """Traverse through edges in DFS order and update is_feasible
303
        attribute of each edge. Edges that are unreachable with the given
304
        set of Z3 constraints will have is_feasible set to False
305
        """
306
        if not z3_dependency_available:
10✔
NEW
307
            return
×
308

309
        def _check_unsat(constraints: List[ExprRef]) -> bool:
10✔
310
            solver = Solver()
10✔
311
            solver.add(constraints)
10✔
312
            return solver.check() == unsat
10✔
313

314
        for edge in self.get_edges():
10✔
315
            if len(edge.z3_constraints) > 0:
10✔
316
                edge.is_feasible = not all(
10✔
317
                    _check_unsat(constraints) for constraints in edge.z3_constraints.values()
318
                )
319

320

321
class CFGBlock:
10✔
322
    """A node in a control flow graph.
5✔
323

324
    Represents a maximal block of code whose statements are guaranteed to execute in sequence.
325
    """
326

327
    # A unique identifier
328
    id: int
10✔
329
    # The statements in this block.
330
    statements: List[NodeNG]
10✔
331
    # This block's in-edges (from blocks that can execute immediately before this one).
332
    predecessors: List[CFGEdge]
10✔
333
    # This block's out-edges (to blocks that can execute immediately after this one).
334
    successors: List[CFGEdge]
10✔
335
    # Whether there exists a path from the start block to this block.
336
    reachable: bool
10✔
337

338
    def __init__(self, id_: int) -> None:
10✔
339
        """Initialize a new CFGBlock."""
340
        self.id = id_
10✔
341
        self.statements = []
10✔
342
        self.predecessors = []
10✔
343
        self.successors = []
10✔
344
        self.reachable = False
10✔
345

346
    def add_statement(self, statement: NodeNG) -> None:
10✔
347
        if not self.is_jump():
10✔
348
            self.statements.append(statement)
10✔
349
            statement.cfg_block = self
10✔
350

351
    @property
10✔
352
    def jump(self) -> Optional[NodeNG]:
10✔
353
        if len(self.statements) > 0:
10✔
354
            return self.statements[-1]
10✔
355

356
    def is_jump(self) -> bool:
10✔
357
        """Returns True if the block has a statement that branches
358
        the control flow (ex: `break`)"""
359
        return isinstance(self.jump, (Break, Continue, Return, Raise))
10✔
360

361

362
class CFGEdge:
10✔
363
    """An edge in a control flow graph.
5✔
364

365
    Edges are directed, and in the future may be augmented with auxiliary metadata about the control flow.
366

367
    The condition attribute stores the AST node representing the condition tested in If and While statements.
368
    The negate attribute stores the condition should be False (when `negate` is True) or condition should be true
369
    (when `negate` is False)
370
    """
371

372
    source: CFGBlock
10✔
373
    target: CFGBlock
10✔
374
    label: Optional[str]
10✔
375
    condition: Optional[NodeNG]
10✔
376
    negate: Optional[bool]
10✔
377
    z3_constraints: Dict[int, List[ExprRef]]
10✔
378
    is_feasible: bool
10✔
379

380
    def __init__(
10✔
381
        self,
382
        source: CFGBlock,
383
        target: CFGBlock,
384
        edge_label: Optional[str] = None,
385
        condition: Optional[NodeNG] = None,
386
        negate: Optional[bool] = None,
387
    ) -> None:
388
        self.source = source
10✔
389
        self.target = target
10✔
390
        self.label = edge_label
10✔
391
        self.condition = condition
10✔
392
        self.negate = negate
10✔
393
        self.source.successors.append(self)
10✔
394
        self.target.predecessors.append(self)
10✔
395
        self.z3_constraints = {}
10✔
396
        self.is_feasible = True
10✔
397

398
    def get_label(self) -> Optional[str]:
10✔
399
        """Return the edge label if specified.
400
        If `edge.label` is None, return the edge condition determined by the negation of `edge.negate`.
401
        Return None if both `edge.label` and `edge.negate` are None.
402
        """
403
        if self.label is not None:
10✔
404
            return self.label
×
405
        elif self.negate is not None:
10✔
406
            return str(not self.negate)
10✔
407
        return None
10✔
408

409

410
class Z3Environment:
10✔
411
    """Z3 Environment stores the Z3 variables and constraints in the current CFG path
5✔
412

413
    variable_unassigned:
414
        A dictionary mapping each variable in the current environment to a boolean indicating
415
        whether it has been reassigned (False) or remains unassigned (True).
416

417
    variables:
418
        A dictionary mapping each variable in the current environment to its z3 variable.
419

420
    constraints:
421
        A list of Z3 constraints in the current environment.
422
    """
423

424
    variable_unassigned: Dict[str, bool]
10✔
425
    variables: Dict[str, ExprRef]
10✔
426
    constraints: List[ExprRef]
10✔
427

428
    def __init__(self, variables: Dict[str, ExprRef], constraints: List[ExprRef]) -> None:
10✔
429
        """Initialize the environment with function parameters and preconditions"""
430
        self.variable_unassigned = {var: True for var in variables}
10✔
431
        self.variables = variables
10✔
432
        self.constraints = constraints.copy()
10✔
433

434
    def assign(self, name: str) -> None:
10✔
435
        """Handle a variable assignment statement"""
436
        if name in self.variable_unassigned:
10✔
437
            self.variable_unassigned[name] = False
10✔
438

439
    def update_constraints(self) -> List[ExprRef]:
10✔
440
        """Returns all z3 constraints in the environments
441
        Removes constraints with reassigned variables
442
        """
443
        updated_constraints = []
10✔
444
        for constraint in self.constraints:
10✔
445
            # discard expressions with reassigned variables
446
            variables = _get_vars(constraint)
10✔
447
            reassigned = any(
10✔
448
                not self.variable_unassigned.get(variable, False) for variable in variables
449
            )
450
            if not reassigned:
10✔
451
                updated_constraints.append(constraint)
10✔
452

453
        self.constraints = updated_constraints
10✔
454
        return updated_constraints.copy()
10✔
455

456
    def add_constraint(self, constraint: ExprRef) -> None:
10✔
457
        """Add a new z3 constraint to environment"""
458
        self.constraints.append(constraint)
10✔
459

460
    def parse_constraint(self, node: NodeNG) -> Optional[ExprRef]:
10✔
461
        """Parse an Astroid node to a Z3 constraint
462
        Return the resulting expression
463
        """
464
        parser = Z3Parser(self.variables)
10✔
465
        try:
10✔
466
            return parser.parse(node)
10✔
467
        except (Z3Exception, Z3ParseException):
10✔
468
            return None
10✔
469

470

471
def _get_vars(expr: ExprRef) -> Set[str]:
10✔
472
    """Retrieve all z3 variables from a z3 expression"""
473
    variables = set()
10✔
474

475
    def traverse(e: ExprRef) -> None:
10✔
476
        if is_const(e) and e.decl().kind() == Z3_OP_UNINTERPRETED:
10✔
477
            variables.add(e.decl().name())
10✔
478
        elif hasattr(e, "children"):
10✔
479
            for child in e.children():
10✔
480
                traverse(child)
10✔
481

482
    traverse(expr)
10✔
483
    return variables
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc