• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alexmojaki / executing / 27476538669

13 Jun 2026 07:21PM UTC coverage: 90.701%. First build
27476538669

Pull #105

github

Pull Request #105: refactor: convert type comments to type annotations

244 of 274 branches covered (89.05%)

Branch coverage included in aggregate %.

73 of 75 new or added lines in 1 file covered. (97.33%)

468 of 511 relevant lines covered (91.59%)

4.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.7
/executing/executing.py
1
"""
2
MIT License
3

4
Copyright (c) 2021 Alex Hall
5

6
Permission is hereby granted, free of charge, to any person obtaining a copy
7
of this software and associated documentation files (the "Software"), to deal
8
in the Software without restriction, including without limitation the rights
9
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
copies of the Software, and to permit persons to whom the Software is
11
furnished to do so, subject to the following conditions:
12

13
The above copyright notice and this permission notice shall be included in all
14
copies or substantial portions of the Software.
15

16
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
SOFTWARE.
23
"""
24
from __future__ import annotations
7✔
25
import __future__
7✔
26
import ast
7✔
27
import dis
7✔
28
import inspect
7✔
29
import io
7✔
30
import linecache
7✔
31
import re
7✔
32
import sys
7✔
33
import types
7✔
34
from collections import defaultdict
7✔
35
from copy import deepcopy
7✔
36
from functools import lru_cache
7✔
37
from itertools import islice
7✔
38
from itertools import zip_longest
7✔
39
from operator import attrgetter
7✔
40
from pathlib import Path
7✔
41
from threading import RLock
7✔
42
from tokenize import detect_encoding
7✔
43
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast
7✔
44
from ._utils import mangled_name,assert_, EnhancedAST,EnhancedInstruction,Instruction,get_instructions
7✔
45

46

47
if TYPE_CHECKING:
7✔
48
    from asttokens import ASTTokens, ASTText
7✔
49
    from asttokens.asttokens import ASTTextBase
7✔
NEW
50

×
NEW
51

×
52
function_node_types: Tuple[Type, ...] = (ast.FunctionDef, ast.AsyncFunctionDef)
53

54
cache = lru_cache(maxsize=None)
7✔
55

56
TESTING = 0
7✔
57

58
class NotOneValueFound(Exception):
7✔
59
    def __init__(self,msg: str,values: Sequence = ()) -> None:
60
        self.values=values
7✔
61
        super(NotOneValueFound,self).__init__(msg)
7✔
62

7✔
63
T = TypeVar('T')
7✔
64

65

7✔
66
def only(it: Iterable[T]) -> T:
67
    if isinstance(it, Sized):
68
        if len(it) != 1:
7✔
69
            raise NotOneValueFound('Expected one value, found %s' % len(it))
7✔
70
        # noinspection PyTypeChecker
3✔
71
        return list(it)[0]
3✔
72

73
    lst = tuple(islice(it, 2))
3✔
74
    if len(lst) == 0:
75
        raise NotOneValueFound('Expected one value, found 0')
7✔
76
    if len(lst) > 1:
7✔
77
        raise NotOneValueFound('Expected one value, found several',lst)
7✔
78
    return lst[0]
7✔
79

7✔
80

7✔
81
class Source(object):
82
    """
83
    The source code of a single file and associated metadata.
7✔
84

85
    The main method of interest is the classmethod `executing(frame)`.
86

87
    If you want an instance of this class, don't construct it.
88
    Ideally use the classmethod `for_frame(frame)`.
89
    If you don't have a frame, use `for_filename(filename [, module_globals])`.
90
    These methods cache instances by filename, so at most one instance exists per filename.
91

92
    Attributes:
93
        - filename
94
        - text
95
        - lines
96
        - tree: AST parsed from text, or None if text is not valid Python
97
            All nodes in the tree have an extra `parent` attribute
98

99
    Other methods of interest:
100
        - statements_at_line
101
        - asttokens
102
        - code_qualname
103
    """
104

105
    def __init__(self, filename: str, lines: Sequence[str]) -> None:
106
        """
107
        Don't call this constructor, see the class docstring.
7✔
108
        """
109

110
        self.filename = filename
111
        self.text = ''.join(lines)
112
        self.lines = [line.rstrip('\r\n') for line in lines]
7✔
113

7✔
114
        self._nodes_by_line = defaultdict(list)
7✔
115
        self.tree = None
116
        self._qualnames = {}
7✔
117
        self._asttokens: Optional[ASTTokens] = None
7✔
118
        self._asttext: Optional[ASTText] = None
7✔
119

7✔
120
        try:
7✔
121
            self.tree = ast.parse(self.text, filename=filename)
122
        except (SyntaxError, ValueError):
7✔
123
            pass
7✔
124
        else:
7✔
125
            for node in ast.walk(self.tree):
7✔
126
                for child in ast.iter_child_nodes(node):
127
                    cast(EnhancedAST, child).parent = cast(EnhancedAST, node)
7✔
128
                for lineno in node_linenos(node):
7✔
129
                    self._nodes_by_line[lineno].append(node)
7✔
130

7✔
131
            visitor = QualnameVisitor()
7✔
132
            visitor.visit(self.tree)
133
            self._qualnames = visitor.qualnames
7✔
134

7✔
135
    @classmethod
7✔
136
    def for_frame(cls, frame: types.FrameType, use_cache: bool = True) -> Source:
137
        """
7✔
138
        Returns the `Source` object corresponding to the file the frame is executing in.
7✔
139
        """
140
        return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache)
141

142
    @classmethod
7✔
143
    def for_filename(
144
        cls,
7✔
145
        filename: Union[str, Path],
7✔
146
        module_globals: Optional[Dict[str, Any]] = None,
147
        use_cache: bool = True,  # noqa no longer used
148
    ) -> Source:
149
        if isinstance(filename, Path):
150
            filename = str(filename)
151

7!
152
        def get_lines() -> List[str]:
×
153
            return linecache.getlines(filename, module_globals)
154

7✔
155
        # Save the current linecache entry, then ensure the cache is up to date.
7✔
156
        entry = linecache.cache.get(filename) # type: ignore[attr-defined]
157
        linecache.checkcache(filename)
158
        lines = get_lines()
7✔
159
        if entry is not None and not lines:
7✔
160
            # There was an entry, checkcache removed it, and nothing replaced it.
7✔
161
            # This means the file wasn't simply changed (because the `lines` wouldn't be empty)
7✔
162
            # but rather the file was found not to exist, probably because `filename` was fake.
163
            # Restore the original entry so that we still have something.
164
            linecache.cache[filename] = entry # type: ignore[attr-defined]
165
            lines = get_lines()
166

7✔
167
        return cls._for_filename_and_lines(filename, tuple(lines))
7✔
168

169
    @classmethod
7✔
170
    def _for_filename_and_lines(cls, filename: str, lines: Sequence[str]) -> Source:
171
        source_cache: Dict[Tuple[str, Sequence[str]], Source] = cls._class_local('__source_cache_with_lines', {})
7✔
172
        try:
7✔
173
            return source_cache[(filename, lines)]
7✔
174
        except KeyError:
7✔
175
            pass
7✔
176

7✔
177
        result = source_cache[(filename, lines)] = cls(filename, lines)
7✔
178
        return result
179

7✔
180
    @classmethod
7✔
181
    def lazycache(cls, frame: types.FrameType) -> None:
182
        linecache.lazycache(frame.f_code.co_filename, frame.f_globals)
7✔
183

7✔
184
    @classmethod
7✔
185
    def executing(cls, frame_or_tb: Union[types.TracebackType, types.FrameType]) -> Executing:
186
        """
7✔
187
        Returns an `Executing` object representing the operation
7✔
188
        currently executing in the given frame or traceback object.
189
        """
190
        if isinstance(frame_or_tb, types.TracebackType):
191
            # https://docs.python.org/3/reference/datamodel.html#traceback-objects
192
            # "tb_lineno gives the line number where the exception occurred;
7✔
193
            #  tb_lasti indicates the precise instruction.
194
            #  The line number and last instruction in the traceback may differ
195
            #  from the line number of its frame object
196
            #  if the exception occurred in a try statement with no matching except clause
197
            #  or with a finally clause."
198
            tb = frame_or_tb
199
            frame = tb.tb_frame
200
            lineno = tb.tb_lineno
7✔
201
            lasti = tb.tb_lasti
7✔
202
        else:
7✔
203
            frame = frame_or_tb
7✔
204
            lineno = frame.f_lineno
205
            lasti = frame.f_lasti
7✔
206

7✔
207

7✔
208

209
        code = frame.f_code
210
        key = (code, id(code), lasti)
211
        executing_cache: Dict[Tuple[types.CodeType, int, int], Any] = cls._class_local('__executing_cache', {})
7✔
212

7✔
213
        args = executing_cache.get(key)
7✔
214
        if not args:
215
            node = stmts = decorator = None
7✔
216
            source = cls.for_frame(frame)
7✔
217
            tree = source.tree
7✔
218
            if tree:
7✔
219
                try:
7✔
220
                    stmts = source.statements_at_line(lineno)
7✔
221
                    if stmts:
7✔
222
                        if is_ipython_cell_code(code):
7✔
223
                            decorator, node = find_node_ipython(frame, lasti, stmts, source)
7!
224
                        else:
7!
225
                            node_finder = NodeFinder(frame, stmts, tree, lasti, source)
×
226
                            node = node_finder.result
227
                            decorator = node_finder.decorator
7✔
228

7✔
229
                    if node:
7✔
230
                        new_stmts = {statement_containing_node(node)}
231
                        assert_(new_stmts <= stmts)
7!
232
                        stmts = new_stmts
7✔
233
                except Exception:
7✔
234
                    if TESTING:
7✔
235
                        raise
7✔
236

7✔
237
            executing_cache[key] = args = source, node, stmts, decorator
7✔
238

239
        return Executing(frame, *args)
7✔
240

241
    @classmethod
7✔
242
    def _class_local(cls, name: str, default: T) -> T:
243
        """
7✔
244
        Returns an attribute directly associated with this class
7✔
245
        (as opposed to subclasses), setting default if necessary
246
        """
247
        # classes have a mappingproxy preventing us from using setdefault
248
        result = cls.__dict__.get(name, default)
249
        setattr(cls, name, result)
250
        return result
7✔
251

7✔
252
    @cache
7✔
253
    def statements_at_line(self, lineno: int) -> Set[EnhancedAST]:
254
        """
7✔
255
        Returns the statement nodes overlapping the given line.
7✔
256

257
        Returns at most one statement unless semicolons are present.
258

259
        If the `text` attribute is not valid python, meaning
260
        `tree` is None, returns an empty set.
261

262
        Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)`
263
        should return at least one statement.
264
        """
265

266
        return {
267
            statement_containing_node(node)
268
            for node in
7✔
269
            self._nodes_by_line[lineno]
270
        }
271

272
    def asttext(self) -> ASTText:
273
        """
274
        Returns an ASTText object for getting the source of specific AST nodes.
7✔
275

276
        See http://asttokens.readthedocs.io/en/latest/api-index.html
277
        """
278
        from asttokens import ASTText  # must be installed separately
279

280
        if self._asttext is None:
7✔
281
            self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename)
282

7✔
283
        return self._asttext
7✔
284

285
    def asttokens(self) -> ASTTokens:
7✔
286
        """
287
        Returns an ASTTokens object for getting the source of specific AST nodes.
7✔
288

289
        See http://asttokens.readthedocs.io/en/latest/api-index.html
290
        """
291
        import asttokens  # must be installed separately
292

293
        if self._asttokens is None:
7✔
294
            if hasattr(asttokens, 'ASTText'):
295
                self._asttokens = self.asttext().asttokens
7✔
296
            else:  # pragma: no cover
7✔
297
                self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename)
7✔
298
        return self._asttokens
299

300
    def _asttext_base(self) -> ASTTextBase:
7✔
301
        import asttokens  # must be installed separately
302

7✔
303
        if hasattr(asttokens, 'ASTText'):
7✔
304
            return self.asttext()
305
        else:  # pragma: no cover
7✔
306
            return self.asttokens()
7✔
307

308
    @staticmethod
309
    def decode_source(source: Union[str, bytes]) -> str:
310
        if isinstance(source, bytes):
7✔
311
            encoding = Source.detect_encoding(source)
7✔
312
            return source.decode(encoding)
7!
313
        else:
7✔
314
            return source
7✔
315

316
    @staticmethod
×
317
    def detect_encoding(source: bytes) -> str:
318
        return detect_encoding(io.BytesIO(source).readline)[0]
7✔
319

7✔
320
    def code_qualname(self, code: types.CodeType) -> str:
7✔
321
        """
322
        Imitates the __qualname__ attribute of functions for code objects.
7✔
323
        Given:
324

325
            - A function `func`
326
            - A frame `frame` for an execution of `func`, meaning:
327
                `frame.f_code is func.__code__`
328

329
        `Source.for_frame(frame).code_qualname(frame.f_code)`
330
        will be equal to `func.__qualname__`*. Works for Python 2 as well,
331
        where of course no `__qualname__` attribute exists.
332

333
        Falls back to `code.co_name` if there is no appropriate qualname.
334

335
        Based on https://github.com/wbolster/qualname
336

337
        (* unless `func` is a lambda
338
        nested inside another lambda on the same line, in which case
339
        the outer lambda's qualname will be returned for the codes
340
        of both lambdas)
341
        """
342
        assert_(code.co_filename == self.filename)
343
        return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name)
344

7✔
345

7✔
346
class Executing(object):
347
    """
348
    Information about the operation a frame is currently executing.
7✔
349

350
    Generally you will just want `node`, which is the AST node being executed,
351
    or None if it's unknown.
352

353
    If a decorator is currently being called, then:
354
        - `node` is a function or class definition
355
        - `decorator` is the expression in `node.decorator_list` being called
356
        - `statements == {node}`
357
    """
358

359
    def __init__(self, frame: types.FrameType, source: Source, node: EnhancedAST, stmts: Set[ast.stmt], decorator: Optional[EnhancedAST]) -> None:
360
        self.frame = frame
361
        self.source = source
7✔
362
        self.node = node
7✔
363
        self.statements = stmts
7✔
364
        self.decorator = decorator
7✔
365

7✔
366
    def code_qualname(self) -> str:
7✔
367
        return self.source.code_qualname(self.frame.f_code)
368

7✔
369
    def text(self) -> str:
7✔
370
        return self.source._asttext_base().get_text(self.node)
371

7✔
372
    def text_range(self) -> Tuple[int, int]:
7✔
373
        return self.source._asttext_base().get_text_range(self.node)
374

7✔
375

7✔
376
class QualnameVisitor(ast.NodeVisitor):
377
    def __init__(self) -> None:
378
        super(QualnameVisitor, self).__init__()
7✔
379
        self.stack: List[str] = []
7✔
380
        self.qualnames: Dict[Tuple[str, int], str] = {}
7✔
381

7✔
382
    def add_qualname(self, node: ast.AST, name: Optional[str] = None) -> None:
7✔
383
        name = name or node.name # type: ignore[attr-defined]
384
        self.stack.append(name)
7✔
385
        if getattr(node, 'decorator_list', ()):
7✔
386
            lineno = node.decorator_list[0].lineno # type: ignore[attr-defined]
7✔
387
        else:
7✔
388
            lineno = node.lineno # type: ignore[attr-defined]
7✔
389
        self.qualnames.setdefault((name, lineno), ".".join(self.stack))
390

7✔
391
    def visit_FunctionDef(self, node: ast.AST, name: Optional[str] = None) -> None:
7✔
392
        assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node
393
        self.add_qualname(node, name)
7✔
394
        self.stack.append('<locals>')
7✔
395
        children: Sequence[ast.AST] = []
7✔
396
        if isinstance(node, ast.Lambda):
7✔
397
            children = [node.body]
7✔
398
        else:
7✔
399
            children = node.body
7✔
400
        for child in children:
401
            self.visit(child)
7✔
402
        self.stack.pop()
7✔
403
        self.stack.pop()
7✔
404

7✔
405
        # Find lambdas in the function definition outside the body,
7✔
406
        # e.g. decorators or default arguments
407
        # Based on iter_child_nodes
408
        for field, child in ast.iter_fields(node):
409
            if field == 'body':
410
                continue
7✔
411
            if isinstance(child, ast.AST):
7✔
412
                self.visit(child)
7✔
413
            elif isinstance(child, list):
7✔
414
                for grandchild in child:
7✔
415
                    if isinstance(grandchild, ast.AST):
7✔
416
                        self.visit(grandchild)
7✔
417

7!
418
    visit_AsyncFunctionDef = visit_FunctionDef
7✔
419

420
    def visit_Lambda(self, node: ast.AST) -> None:
7✔
421
        assert isinstance(node, ast.Lambda)
422
        self.visit_FunctionDef(node, '<lambda>')
7✔
423

7✔
424
    def visit_ClassDef(self, node: ast.AST) -> None:
7✔
425
        assert isinstance(node, ast.ClassDef)
426
        self.add_qualname(node)
7✔
427
        self.generic_visit(node)
7✔
428
        self.stack.pop()
7✔
429

7✔
430

7✔
431

432

433

434
future_flags = sum(
435
    getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names
436
)
7✔
437

438

439
def compile_similar_to(source: ast.Module, matching_code: types.CodeType) -> Any:
440
    return compile(
441
        source,
7✔
442
        matching_code.co_filename,
3✔
443
        'exec',
444
        flags=future_flags & matching_code.co_flags,
445
        dont_inherit=True,
446
    )
447

448

449
sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698'
450

451
def is_rewritten_by_pytest(code: types.CodeType) -> bool:
7✔
452
    return any(
453
        bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py")
7✔
454
        for bc in get_instructions(code)
7✔
455
    )
456

457

458
class SentinelNodeFinder(object):
459
    result: Optional[EnhancedAST] = None
460

7✔
461
    def __init__(self, frame: types.FrameType, stmts: Set[EnhancedAST], tree: ast.Module, lasti: int, source: Source) -> None:
7✔
462
        assert_(stmts)
463
        self.frame = frame
7✔
464
        self.tree = tree
3✔
465
        self.code = code = frame.f_code
3✔
466
        self.is_pytest = is_rewritten_by_pytest(code)
3✔
467

3✔
468
        if self.is_pytest:
3✔
469
            self.ignore_linenos = frozenset(assert_linenos(tree))
470
        else:
3✔
471
            self.ignore_linenos = frozenset()
3✔
472

473
        self.decorator = None
3✔
474

475
        self.instruction = instruction = self.get_actual_current_instruction(lasti)
3✔
476
        op_name = instruction.opname
477
        extra_filter: Callable[[Any], bool] = lambda e: True
3✔
478
        ctx: Type = type(None)
3✔
479

3✔
480
        typ: Type = type(None)
3✔
481
        if op_name.startswith('CALL_'):
482
            typ = ast.Call
3✔
483
        elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')):
3✔
484
            typ = ast.Subscript
3✔
485
            ctx = ast.Load
3✔
486
        elif op_name.startswith('BINARY_'):
3✔
487
            typ = ast.BinOp
3✔
488
            op_type = dict(
3✔
489
                BINARY_POWER=ast.Pow,
3✔
490
                BINARY_MULTIPLY=ast.Mult,
3✔
491
                BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()),
492
                BINARY_FLOOR_DIVIDE=ast.FloorDiv,
493
                BINARY_TRUE_DIVIDE=ast.Div,
494
                BINARY_MODULO=ast.Mod,
495
                BINARY_ADD=ast.Add,
496
                BINARY_SUBTRACT=ast.Sub,
497
                BINARY_LSHIFT=ast.LShift,
498
                BINARY_RSHIFT=ast.RShift,
499
                BINARY_AND=ast.BitAnd,
500
                BINARY_XOR=ast.BitXor,
501
                BINARY_OR=ast.BitOr,
502
            )[op_name]
503
            extra_filter = lambda e: isinstance(e.op, op_type)
504
        elif op_name.startswith('UNARY_'):
505
            typ = ast.UnaryOp
3✔
506
            op_type = dict(
3✔
507
                UNARY_POSITIVE=ast.UAdd,
3✔
508
                UNARY_NEGATIVE=ast.USub,
3✔
509
                UNARY_NOT=ast.Not,
510
                UNARY_INVERT=ast.Invert,
511
            )[op_name]
512
            extra_filter = lambda e: isinstance(e.op, op_type)
513
        elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'):
514
            typ = ast.Attribute
3✔
515
            ctx = ast.Load
3✔
516
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
517
        elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'):
3✔
518
            typ = ast.Name
3✔
519
            ctx = ast.Load
3✔
520
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
521
        elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'):
3✔
522
            typ = ast.Compare
3✔
523
            extra_filter = lambda e: len(e.ops) == 1
3✔
524
        elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')):
3✔
525
            ctx = ast.Store
3✔
526
            typ = ast.Subscript
3✔
527
        elif op_name.startswith('STORE_ATTR'):
3✔
528
            ctx = ast.Store
3✔
529
            typ = ast.Attribute
3✔
530
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
531
        else:
3✔
532
            raise RuntimeError(op_name)
3✔
533

534

3✔
535
        with lock:
536
            exprs = {
537
                cast(EnhancedAST, node)
3✔
538
                for stmt in stmts
3✔
539
                for node in ast.walk(stmt)
540
                if isinstance(node, typ)
541
                if isinstance(getattr(node, "ctx", None), ctx)
542
                if extra_filter(node)
543
                if statement_containing_node(node) == stmt
544
            }
545

546
            if ctx == ast.Store:
547
                # No special bytecode tricks here.
548
                # We can handle multiple assigned attributes with different names,
3✔
549
                # but only one assigned subscript.
550
                self.result = only(exprs)
551
                return
552

3✔
553
            matching = list(self.matching_nodes(exprs))
3✔
554
            if not matching and typ == ast.Call:
555
                self.find_decorator(stmts)
3✔
556
            else:
3✔
557
                self.result = only(matching)
3✔
558

559
    def find_decorator(self, stmts: Union[List[EnhancedAST], Set[EnhancedAST]]) -> None:
3✔
560
        stmt = only(stmts)
561
        assert_(isinstance(stmt, (ast.ClassDef, function_node_types)))
7✔
562
        decorators = stmt.decorator_list # type: ignore[attr-defined]
3✔
563
        assert_(decorators)
3✔
564
        line_instructions = [
3✔
565
            inst
3✔
566
            for inst in self.clean_instructions(self.code)
3✔
567
            if inst.lineno == self.frame.f_lineno
568
        ]
569
        last_decorator_instruction_index = [
570
            i
571
            for i, inst in enumerate(line_instructions)
3✔
572
            if inst.opname == "CALL_FUNCTION"
573
        ][-1]
574
        assert_(
575
            line_instructions[last_decorator_instruction_index + 1].opname.startswith(
576
                "STORE_"
3✔
577
            )
578
        )
579
        decorator_instructions = line_instructions[
580
            last_decorator_instruction_index
581
            - len(decorators)
3✔
582
            + 1 : last_decorator_instruction_index
583
            + 1
584
        ]
585
        assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"})
586
        decorator_index = decorator_instructions.index(self.instruction)
587
        decorator = decorators[::-1][decorator_index]
3✔
588
        self.decorator = decorator
3✔
589
        self.result = stmt
3✔
590

3✔
591
    def clean_instructions(self, code: types.CodeType) -> List[EnhancedInstruction]:
3✔
592
        return [
593
            inst
7✔
594
            for inst in get_instructions(code)
3✔
595
            if inst.opname not in ("EXTENDED_ARG", "NOP")
596
            if inst.lineno not in self.ignore_linenos
597
        ]
598

599
    def get_original_clean_instructions(self) -> List[EnhancedInstruction]:
600
        result = self.clean_instructions(self.code)
601

7✔
602
        # pypy sometimes (when is not clear)
3✔
603
        # inserts JUMP_IF_NOT_DEBUG instructions in bytecode
604
        # If they're not present in our compiled instructions,
605
        # ignore them in the original bytecode
606
        if not any(
607
                inst.opname == "JUMP_IF_NOT_DEBUG"
608
                for inst in self.compile_instructions()
3!
609
        ):
610
            result = [
611
                inst for inst in result
612
                if inst.opname != "JUMP_IF_NOT_DEBUG"
3✔
613
            ]
614

615
        return result
616

617
    def matching_nodes(self, exprs: Set[EnhancedAST]) -> Iterator[EnhancedAST]:
3✔
618
        original_instructions = self.get_original_clean_instructions()
619
        original_index = only(
7✔
620
            i
3✔
621
            for i, inst in enumerate(original_instructions)
3✔
622
            if inst == self.instruction
623
        )
624
        for expr_index, expr in enumerate(exprs):
625
            setter = get_setter(expr)
626
            assert setter is not None
3✔
627
            # noinspection PyArgumentList
3✔
628
            replacement = ast.BinOp(
3✔
629
                left=cast(ast.expr,expr),
630
                op=ast.Pow(),
3✔
631
                right=ast.Constant(sentinel),
632
            )
633
            ast.fix_missing_locations(replacement)
634
            setter(replacement)
635
            try:
3✔
636
                instructions = self.compile_instructions()
3✔
637
            finally:
3✔
638
                setter(expr)
3✔
639

640
            if sys.version_info >= (3, 10):
3✔
641
                try:
642
                    handle_jumps(instructions, original_instructions)
3✔
643
                except Exception:
1✔
644
                    # Give other candidates a chance
1✔
645
                    if TESTING or expr_index < len(exprs) - 1:
×
646
                        continue
647
                    raise
×
648

×
649
            indices = [
×
650
                i
651
                for i, instruction in enumerate(instructions)
3✔
652
                if instruction.argval == sentinel
653
            ]
654

655
            # There can be several indices when the bytecode is duplicated,
656
            # as happens in a finally block in 3.9+
657
            # First we remove the opcodes caused by our modifications
658
            for index_num, sentinel_index in enumerate(indices):
659
                # Adjustment for removing sentinel instructions below
660
                # in past iterations
3✔
661
                sentinel_index -= index_num * 2
662

663
                assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST')
3✔
664
                assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER')
665

3✔
666
            # Then we see if any of the instruction indices match
3✔
667
            for index_num, sentinel_index in enumerate(indices):
668
                sentinel_index -= index_num * 2
669
                new_index = sentinel_index - 1
3✔
670

3✔
671
                if new_index != original_index:
3✔
672
                    continue
673

3✔
674
                original_inst = original_instructions[original_index]
3✔
675
                new_inst = instructions[new_index]
676

3✔
677
                # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)'
3✔
678
                # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT
679
                if (
680
                        original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP')
681
                        and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
3✔
682
                        and (
683
                        original_instructions[original_index + 1].opname
684
                        != instructions[new_index + 1].opname == 'UNARY_NOT'
685
                )):
686
                    # Remove the difference for the upcoming assert
687
                    instructions.pop(new_index + 1)
688

689
                # Check that the modified instructions don't have anything unexpected
2✔
690
                # 3.10 is a bit too weird to assert this in all cases but things still work
691
                if sys.version_info < (3, 10):
692
                    for inst1, inst2 in zip_longest(
693
                        original_instructions, instructions
3✔
694
                    ):
2✔
695
                        assert_(inst1 and inst2 and opnames_match(inst1, inst2))
696

697
                yield expr
2✔
698

699
    def compile_instructions(self) -> List[EnhancedInstruction]:
3✔
700
        module_code = compile_similar_to(self.tree, self.code)
701
        code = only(self.find_codes(module_code))
7✔
702
        return self.clean_instructions(code)
3✔
703

3✔
704
    def find_codes(self, root_code: types.CodeType) -> list:
3✔
705
        checks: List[Callable] = [
706
            attrgetter('co_firstlineno'),
7✔
707
            attrgetter('co_freevars'),
3✔
708
            attrgetter('co_cellvars'),
709
            lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name,
710
        ]
711
        if not self.is_pytest:
712
            checks += [
713
                attrgetter('co_names'),
3✔
714
                attrgetter('co_varnames'),
3✔
715
            ]
716

717
        def matches(c: types.CodeType) -> bool:
718
            return all(
719
                f(c) == f(self.code)
3✔
720
                for f in checks
3✔
721
            )
722

723
        code_options = []
724
        if matches(root_code):
725
            code_options.append(root_code)
3✔
726

3✔
727
        def finder(code: types.CodeType) -> None:
3✔
728
            for const in code.co_consts:
729
                if not inspect.iscode(const):
3✔
730
                    continue
3✔
731

3✔
732
                if matches(const):
3✔
733
                    code_options.append(const)
734
                finder(const)
3✔
735

3✔
736
        finder(root_code)
3✔
737
        return code_options
738

3✔
739
    def get_actual_current_instruction(self, lasti: int) -> EnhancedInstruction:
3✔
740
        """
741
        Get the instruction corresponding to the current
7✔
742
        frame offset, skipping EXTENDED_ARG instructions
743
        """
744
        # Don't use get_original_clean_instructions
745
        # because we need the actual instructions including
746
        # EXTENDED_ARG
747
        instructions = list(get_instructions(self.code))
748
        index = only(
749
            i
3✔
750
            for i, inst in enumerate(instructions)
3✔
751
            if inst.offset == lasti
752
        )
753

754
        while True:
755
            instruction = instructions[index]
756
            if instruction.opname != "EXTENDED_ARG":
757
                return instruction
3✔
758
            index += 1
3✔
759

3✔
760

2✔
761

762
def non_sentinel_instructions(instructions: List[EnhancedInstruction], start: int) -> Iterator[Tuple[int, EnhancedInstruction]]:
763
    """
764
    Yields (index, instruction) pairs excluding the basic
7✔
765
    instructions introduced by the sentinel transformation
766
    """
767
    skip_power = False
768
    for i, inst in islice(enumerate(instructions), start, None):
769
        if inst.argval == sentinel:
1✔
770
            assert_(inst.opname == "LOAD_CONST")
1!
771
            skip_power = True
1✔
772
            continue
1✔
773
        elif skip_power:
1✔
774
            assert_(inst.opname == "BINARY_POWER")
1✔
775
            skip_power = False
1✔
776
            continue
1✔
777
        yield i, inst
1✔
778

1✔
779

1✔
780
def walk_both_instructions(original_instructions: List[EnhancedInstruction], original_start: int, instructions: List[EnhancedInstruction], start: int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]]:
781
    """
782
    Yields matching indices and instructions from the new and original instructions,
7✔
783
    leaving out changes made by the sentinel transformation.
784
    """
785
    original_iter = islice(enumerate(original_instructions), original_start, None)
786
    new_iter = non_sentinel_instructions(instructions, start)
787
    inverted_comparison = False
1✔
788
    while True:
1✔
789
        try:
1✔
790
            original_i, original_inst = next(original_iter)
791
            new_i, new_inst = next(new_iter)
1✔
792
        except StopIteration:
1✔
793
            return
1✔
794
        if (
1✔
795
            inverted_comparison
1✔
796
            and original_inst.opname != new_inst.opname == "UNARY_NOT"
1✔
797
        ):
798
            new_i, new_inst = next(new_iter)
799
        inverted_comparison = (
800
            original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP")
1✔
801
            and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
1✔
802
        )
803
        yield original_i, original_inst, new_i, new_inst
804

805

1✔
806
def handle_jumps(instructions: List[EnhancedInstruction], original_instructions: List[EnhancedInstruction]) -> None:
807
    """
808
    Transforms instructions in place until it looks more like original_instructions.
7✔
809
    This is only needed in 3.10+ where optimisations lead to more drastic changes
810
    after the sentinel transformation.
811
    Replaces JUMP instructions that aren't also present in original_instructions
812
    with the sections that they jump to until a raise or return.
813
    In some other cases duplication found in `original_instructions`
814
    is replicated in `instructions`.
815
    """
816
    while True:
817
        for original_i, original_inst, new_i, new_inst in walk_both_instructions(
818
            original_instructions, 0, instructions, 0
819
        ):
1✔
820
            if opnames_match(original_inst, new_inst):
821
                continue
822

1✔
823
            if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname:
1✔
824
                # Find where the new instruction is jumping to, ignoring
825
                # instructions which have been copied in previous iterations
1!
826
                start = only(
827
                    i
828
                    for i, inst in enumerate(instructions)
1✔
829
                    if inst.offset == new_inst.argval
830
                    and not getattr(inst, "_copied", False)
831
                )
832
                # Replace the jump instruction with the jumped to section of instructions
833
                # That section may also be deleted if it's not similarly duplicated
834
                # in original_instructions
835
                new_instructions = handle_jump(
836
                    original_instructions, original_i, instructions, start
837
                )
1✔
838
                assert new_instructions is not None
839
                instructions[new_i : new_i + 1] = new_instructions            
840
            else:
1✔
841
                # Extract a section of original_instructions from original_i to return/raise
1✔
842
                orig_section = []
843
                for section_inst in original_instructions[original_i:]:
844
                    orig_section.append(section_inst)
×
845
                    if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
×
846
                        break
×
847
                else:
×
848
                    # No return/raise - this is just a mismatch we can't handle
×
849
                    raise AssertionError
850

851
                instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions))
×
852

853
            # instructions has been modified, the for loop can't sensibly continue
×
854
            # Restart it from the beginning, checking for other issues
855
            break
856

857
        else:  # No mismatched jumps found, we're done
1✔
858
            return
859

860

1✔
861
def find_new_matching(orig_section: List[EnhancedInstruction], instructions: List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]]:
862
    """
863
    Yields sections of `instructions` which match `orig_section`.
7✔
864
    The yielded sections include sentinel instructions, but these
865
    are ignored when checking for matches.
866
    """
867
    for start in range(len(instructions) - len(orig_section)):
868
        indices, dup_section = zip(
869
            *islice(
×
870
                non_sentinel_instructions(instructions, start),
×
871
                len(orig_section),
872
            )
873
        )
874
        if len(dup_section) < len(orig_section):
875
            return
876
        if sections_match(orig_section, dup_section):
×
877
            yield instructions[start:indices[-1] + 1]
×
878

×
879

×
880
def handle_jump(original_instructions: List[EnhancedInstruction], original_start: int, instructions: List[EnhancedInstruction], start: int) -> Optional[List[EnhancedInstruction]]:
881
    """
882
    Returns the section of instructions starting at `start` and ending
7✔
883
    with a RETURN_VALUE or RAISE_VARARGS instruction.
884
    There should be a matching section in original_instructions starting at original_start.
885
    If that section doesn't appear elsewhere in original_instructions,
886
    then also delete the returned section of instructions.
887
    """
888
    for original_j, original_inst, new_j, new_inst in walk_both_instructions(
889
        original_instructions, original_start, instructions, start
890
    ):
1!
891
        assert_(opnames_match(original_inst, new_inst))
892
        if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
893
            inlined = deepcopy(instructions[start : new_j + 1])
1✔
894
            for inl in inlined:
1✔
895
                inl._copied = True
1✔
896
            orig_section = original_instructions[original_start : original_j + 1]
1✔
897
            if not check_duplicates(
1✔
898
                original_start, orig_section, original_instructions
1✔
899
            ):
1✔
900
                instructions[start : new_j + 1] = []
901
            return inlined
902
    
1✔
903
    return None
1✔
904

905

×
906
def check_duplicates(original_i: int, orig_section: List[EnhancedInstruction], original_instructions: List[EnhancedInstruction]) -> bool:
907
    """
908
    Returns True if a section of original_instructions starting somewhere other
7✔
909
    than original_i and matching orig_section is found, i.e. orig_section is duplicated.
910
    """
911
    for dup_start in range(len(original_instructions)):
912
        if dup_start == original_i:
913
            continue
1!
914
        dup_section = original_instructions[dup_start : dup_start + len(orig_section)]
1✔
915
        if len(dup_section) < len(orig_section):
1✔
916
            return False
1✔
917
        if sections_match(orig_section, dup_section):
1✔
918
            return True
1✔
919
    
1✔
920
    return False
1✔
921

922
def sections_match(orig_section: List[EnhancedInstruction], dup_section: List[EnhancedInstruction]) -> bool:
×
923
    """
924
    Returns True if the given lists of instructions have matching linenos and opnames.
7✔
925
    """
926
    return all(
927
        (
928
            orig_inst.lineno == dup_inst.lineno
1✔
929
            # POP_BLOCKs have been found to have differing linenos in innocent cases
930
            or "POP_BLOCK" == orig_inst.opname == dup_inst.opname
931
        )
932
        and opnames_match(orig_inst, dup_inst)
933
        for orig_inst, dup_inst in zip(orig_section, dup_section)
934
    )
935

936

937
def opnames_match(inst1: Instruction, inst2: Instruction) -> bool:
938
    return (
939
        inst1.opname == inst2.opname
7✔
940
        or "JUMP" in inst1.opname
3✔
941
        and "JUMP" in inst2.opname
942
        or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP")
943
        or (
944
            inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD")
945
            and inst2.opname == "LOAD_ATTR"
946
        )
947
        or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION")
948
    )
949

950

951
def get_setter(node: EnhancedAST) -> Optional[Callable[[ast.AST], None]]:
952
    parent = node.parent
953
    for name, field in ast.iter_fields(parent):
7✔
954
        if field is node:
3✔
955
            def setter(new_node: ast.AST) -> None:
3!
956
                return setattr(parent, name, new_node)
3✔
957
            return setter
3✔
958
        elif isinstance(field, list):
3✔
959
            for i, item in enumerate(field):
3✔
960
                if item is node:
3✔
961
                    def setter(new_node: ast.AST) -> None:
3✔
962
                        field[i] = new_node
3✔
963

3✔
964
                    return setter
3✔
965
    return None
966

3✔
967
lock = RLock()
×
968

969

7✔
970
@cache
971
def statement_containing_node(node: ast.AST) -> EnhancedAST:
972
    while not isinstance(node, ast.stmt):
7✔
973
        node = cast(EnhancedAST, node).parent
7✔
974
    return cast(EnhancedAST, node)
7✔
975

7✔
976

7✔
977
def assert_linenos(tree: ast.AST) -> Iterator[int]:
978
    for node in ast.walk(tree):
979
        if (
7✔
980
                hasattr(node, 'parent') and
3✔
981
                isinstance(statement_containing_node(node), ast.Assert)
3✔
982
        ):
983
            for lineno in node_linenos(node):
984
                yield lineno
985

3✔
986

3✔
987
def _extract_ipython_statement(stmt: EnhancedAST) -> ast.Module:
988
    # IPython separates each statement in a cell to be executed separately
989
    # So NodeFinder should only compile one statement at a time or it
7✔
990
    # will find a code mismatch.
991
    while not isinstance(stmt.parent, ast.Module):
992
        stmt = stmt.parent
993
    # use `ast.parse` instead of `ast.Module` for better portability
×
994
    # python3.8 changes the signature of `ast.Module`
×
995
    # Inspired by https://github.com/pallets/werkzeug/pull/1552/files
996
    tree = ast.parse("")
997
    tree.body = [cast(ast.stmt, stmt)]
998
    ast.copy_location(tree, stmt)
×
999
    return tree
×
1000

×
1001

×
1002
def is_ipython_cell_code_name(code_name: str) -> bool:
1003
    return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name))
1004

7✔
1005

7✔
1006
def is_ipython_cell_filename(filename: str) -> bool:
1007
    return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename))
1008

7✔
1009

7✔
1010
def is_ipython_cell_code(code_obj: types.CodeType) -> bool:
1011
    return (
1012
        is_ipython_cell_filename(code_obj.co_filename) and
7✔
1013
        is_ipython_cell_code_name(code_obj.co_name)
7✔
1014
    )
1015

1016

1017
def find_node_ipython(frame: types.FrameType, lasti: int, stmts: Set[EnhancedAST], source: Source) -> Tuple[Optional[Any], Optional[Any]]:
1018
    node = decorator = None
1019
    for stmt in stmts:
7✔
1020
        tree = _extract_ipython_statement(stmt)
×
1021
        try:
×
1022
            node_finder = NodeFinder(frame, stmts, tree, lasti, source)
×
1023
            if (node or decorator) and (node_finder.result or node_finder.decorator):
×
1024
                # Found potential nodes in separate statements,
×
1025
                # cannot resolve ambiguity, give up here
×
1026
                return None, None
1027

1028
            node = node_finder.result
×
1029
            decorator = node_finder.decorator
1030
        except Exception:
×
1031
            pass
×
1032
    return decorator, node
×
1033

×
1034

×
1035

1036
def node_linenos(node: ast.AST) -> Iterator[int]:
1037
    if hasattr(node, "lineno"):
1038
        linenos: Sequence[int] = []
7✔
1039
        if hasattr(node, "end_lineno") and isinstance(node, ast.expr):
7✔
1040
            assert node.end_lineno is not None # type: ignore[attr-defined]
7✔
1041
            linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined]
7✔
1042
        else:
7✔
1043
            linenos = [node.lineno] # type: ignore[attr-defined]
7✔
1044
        for lineno in linenos:
1045
            yield lineno
7✔
1046

7✔
1047

7✔
1048
if sys.version_info >= (3, 11):
1049
    from ._position_node_finder import PositionNodeFinder as NodeFinder
1050
else:
7✔
1051
    NodeFinder = SentinelNodeFinder
4✔
1052

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc