• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alexmojaki / executing / 27478550855

13 Jun 2026 08:44PM UTC coverage: 90.585% (-0.3%) from 90.886%
27478550855

push

github

246 of 277 branches covered (88.81%)

Branch coverage included in aggregate %.

466 of 509 relevant lines covered (91.55%)

4.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.59
/executing/executing.py
1
"""
2
MIT License
3

4
Copyright (c) 2021 Alex Hall
5

6
Permission is hereby granted, free of charge, to any person obtaining a copy
7
of this software and associated documentation files (the "Software"), to deal
8
in the Software without restriction, including without limitation the rights
9
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
copies of the Software, and to permit persons to whom the Software is
11
furnished to do so, subject to the following conditions:
12

13
The above copyright notice and this permission notice shall be included in all
14
copies or substantial portions of the Software.
15

16
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
SOFTWARE.
23
"""
24
from __future__ import annotations
7✔
25
import __future__
7✔
26
import ast
7✔
27
import dis
7✔
28
import inspect
7✔
29
import io
7✔
30
import linecache
7✔
31
import re
7✔
32
import sys
7✔
33
import types
7✔
34
from collections import defaultdict
7✔
35
from copy import deepcopy
7✔
36
from functools import lru_cache
7✔
37
from itertools import islice
7✔
38
from itertools import zip_longest
7✔
39
from operator import attrgetter
7✔
40
from pathlib import Path
7✔
41
from threading import RLock
7✔
42
from tokenize import detect_encoding
7✔
43
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast
7✔
44
from ._utils import mangled_name,assert_, EnhancedAST,EnhancedInstruction,Instruction,get_instructions
7✔
45

46

47
if TYPE_CHECKING:
7!
48
    from asttokens import ASTTokens, ASTText
×
49
    from asttokens.asttokens import ASTTextBase
×
50

51

52
function_node_types: Tuple[Type, ...] = (ast.FunctionDef, ast.AsyncFunctionDef)
7✔
53

54
cache = lru_cache(maxsize=None)
7✔
55

56
TESTING = 0
7✔
57

58
class NotOneValueFound(Exception):
7✔
59
    def __init__(self,msg: str,values: Sequence = ()) -> None:
7✔
60
        self.values=values
7✔
61
        super(NotOneValueFound,self).__init__(msg)
7✔
62

63
T = TypeVar('T')
7✔
64

65

66
def only(it: Iterable[T]) -> T:
7✔
67
    if isinstance(it, Sized):
7✔
68
        if len(it) != 1:
3✔
69
            raise NotOneValueFound('Expected one value, found %s' % len(it))
3✔
70
        # noinspection PyTypeChecker
71
        return list(it)[0]
3✔
72

73
    lst = tuple(islice(it, 2))
7✔
74
    if len(lst) == 0:
7✔
75
        raise NotOneValueFound('Expected one value, found 0')
7✔
76
    if len(lst) > 1:
7✔
77
        raise NotOneValueFound('Expected one value, found several',lst)
7✔
78
    return lst[0]
7✔
79

80

81
class Source(object):
7✔
82
    """
83
    The source code of a single file and associated metadata.
84

85
    The main method of interest is the classmethod `executing(frame)`.
86

87
    If you want an instance of this class, don't construct it.
88
    Ideally use the classmethod `for_frame(frame)`.
89
    If you don't have a frame, use `for_filename(filename [, module_globals])`.
90
    These methods cache instances by filename, so at most one instance exists per filename.
91

92
    Attributes:
93
        - filename
94
        - text
95
        - lines
96
        - tree: AST parsed from text, or None if text is not valid Python
97
            All nodes in the tree have an extra `parent` attribute
98

99
    Other methods of interest:
100
        - statements_at_line
101
        - asttokens
102
        - code_qualname
103
    """
104

105
    def __init__(self, filename: str, lines: Sequence[str]) -> None:
7✔
106
        """
107
        Don't call this constructor, see the class docstring.
108
        """
109

110
        self.filename = filename
7✔
111
        self.text = ''.join(lines)
7✔
112
        self.lines = [line.rstrip('\r\n') for line in lines]
7✔
113

114
        self._nodes_by_line = defaultdict(list)
7✔
115
        self.tree = None
7✔
116
        self._qualnames = {}
7✔
117
        self._asttokens: Optional[ASTTokens] = None
7✔
118
        self._asttext: Optional[ASTText] = None
7✔
119

120
        try:
7✔
121
            self.tree = ast.parse(self.text, filename=filename)
7✔
122
        except (SyntaxError, ValueError):
7✔
123
            pass
7✔
124
        else:
125
            for node in ast.walk(self.tree):
7✔
126
                for child in ast.iter_child_nodes(node):
7✔
127
                    cast(EnhancedAST, child).parent = cast(EnhancedAST, node)
7✔
128
                for lineno in node_linenos(node):
7✔
129
                    self._nodes_by_line[lineno].append(node)
7✔
130

131
            visitor = QualnameVisitor()
7✔
132
            visitor.visit(self.tree)
7✔
133
            self._qualnames = visitor.qualnames
7✔
134

135
    @classmethod
7✔
136
    def for_frame(cls, frame: types.FrameType, use_cache: bool = True) -> Source:
7✔
137
        """
138
        Returns the `Source` object corresponding to the file the frame is executing in.
139
        """
140
        return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache)
7✔
141

142
    @classmethod
7✔
143
    def for_filename(
7✔
144
        cls,
145
        filename: Union[str, Path],
146
        module_globals: Optional[Dict[str, Any]] = None,
147
        use_cache: bool = True,  # noqa no longer used
148
    ) -> Source:
149
        if isinstance(filename, Path):
7!
150
            filename = str(filename)
×
151

152
        def get_lines() -> List[str]:
7✔
153
            return linecache.getlines(filename, module_globals)
7✔
154

155
        # Save the current linecache entry, then ensure the cache is up to date.
156
        entry = linecache.cache.get(filename) # type: ignore[attr-defined]
7✔
157
        linecache.checkcache(filename)
7✔
158
        lines = get_lines()
7✔
159
        if entry is not None and not lines:
7✔
160
            # There was an entry, checkcache removed it, and nothing replaced it.
161
            # This means the file wasn't simply changed (because the `lines` wouldn't be empty)
162
            # but rather the file was found not to exist, probably because `filename` was fake.
163
            # Restore the original entry so that we still have something.
164
            linecache.cache[filename] = entry # type: ignore[attr-defined]
7✔
165
            lines = get_lines()
7✔
166

167
        return cls._for_filename_and_lines(filename, tuple(lines))
7✔
168

169
    @classmethod
7✔
170
    def _for_filename_and_lines(cls, filename: str, lines: Sequence[str]) -> Source:
7✔
171
        source_cache: Dict[Tuple[str, Sequence[str]], Source] = cls._class_local('__source_cache_with_lines', {})
7✔
172
        try:
7✔
173
            return source_cache[(filename, lines)]
7✔
174
        except KeyError:
7✔
175
            pass
7✔
176

177
        result = source_cache[(filename, lines)] = cls(filename, lines)
7✔
178
        return result
7✔
179

180
    @classmethod
7✔
181
    def lazycache(cls, frame: types.FrameType) -> None:
7✔
182
        linecache.lazycache(frame.f_code.co_filename, frame.f_globals)
7✔
183

184
    @classmethod
7✔
185
    def executing(cls, frame_or_tb: Union[types.TracebackType, types.FrameType]) -> Executing:
7✔
186
        """
187
        Returns an `Executing` object representing the operation
188
        currently executing in the given frame or traceback object.
189
        """
190
        if isinstance(frame_or_tb, types.TracebackType):
7✔
191
            # https://docs.python.org/3/reference/datamodel.html#traceback-objects
192
            # "tb_lineno gives the line number where the exception occurred;
193
            #  tb_lasti indicates the precise instruction.
194
            #  The line number and last instruction in the traceback may differ
195
            #  from the line number of its frame object
196
            #  if the exception occurred in a try statement with no matching except clause
197
            #  or with a finally clause."
198
            tb = frame_or_tb
7✔
199
            frame = tb.tb_frame
7✔
200
            lineno = tb.tb_lineno
7✔
201
            lasti = tb.tb_lasti
7✔
202
        else:
203
            frame = frame_or_tb
7✔
204
            lineno = frame.f_lineno
7✔
205
            lasti = frame.f_lasti
7✔
206

207

208

209
        code = frame.f_code
7✔
210
        key = (code, id(code), lasti)
7✔
211
        executing_cache: Dict[Tuple[types.CodeType, int, int], Any] = cls._class_local('__executing_cache', {})
7✔
212

213
        args = executing_cache.get(key)
7✔
214
        if not args:
7✔
215
            node = stmts = decorator = None
7✔
216
            source = cls.for_frame(frame)
7✔
217
            tree = source.tree
7✔
218
            if tree:
7✔
219
                try:
7✔
220
                    stmts = source.statements_at_line(lineno)
7✔
221
                    if stmts:
7!
222
                        if is_ipython_cell_code(code):
7!
223
                            decorator, node = find_node_ipython(frame, lasti, stmts, source)
×
224
                        else:
225
                            node_finder = NodeFinder(frame, stmts, tree, lasti, source)
7✔
226
                            node = node_finder.result
7✔
227
                            decorator = node_finder.decorator
7✔
228

229
                    if node:
7!
230
                        new_stmts = {statement_containing_node(node)}
7✔
231
                        assert_(new_stmts <= stmts)
7✔
232
                        stmts = new_stmts
7✔
233
                except Exception:
7✔
234
                    if TESTING:
7✔
235
                        raise
7✔
236

237
            executing_cache[key] = args = source, node, stmts, decorator
7✔
238

239
        return Executing(frame, *args)
7✔
240

241
    @classmethod
7✔
242
    def _class_local(cls, name: str, default: T) -> T:
7✔
243
        """
244
        Returns an attribute directly associated with this class
245
        (as opposed to subclasses), setting default if necessary
246
        """
247
        # classes have a mappingproxy preventing us from using setdefault
248
        result = cls.__dict__.get(name, default)
7✔
249
        setattr(cls, name, result)
7✔
250
        return result
7✔
251

252
    @cache
7✔
253
    def statements_at_line(self, lineno: int) -> Set[EnhancedAST]:
7✔
254
        """
255
        Returns the statement nodes overlapping the given line.
256

257
        Returns at most one statement unless semicolons are present.
258

259
        If the `text` attribute is not valid python, meaning
260
        `tree` is None, returns an empty set.
261

262
        Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)`
263
        should return at least one statement.
264
        """
265

266
        return {
7✔
267
            statement_containing_node(node)
268
            for node in
269
            self._nodes_by_line[lineno]
270
        }
271

272
    def asttext(self) -> ASTText:
7✔
273
        """
274
        Returns an ASTText object for getting the source of specific AST nodes.
275

276
        See http://asttokens.readthedocs.io/en/latest/api-index.html
277
        """
278
        from asttokens import ASTText  # must be installed separately
7✔
279

280
        if self._asttext is None:
7✔
281
            self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename)
7✔
282

283
        return self._asttext
7✔
284

285
    def asttokens(self) -> ASTTokens:
7✔
286
        """
287
        Returns an ASTTokens object for getting the source of specific AST nodes.
288

289
        See http://asttokens.readthedocs.io/en/latest/api-index.html
290
        """
291
        import asttokens  # must be installed separately
7✔
292

293
        if self._asttokens is None:
7✔
294
            if hasattr(asttokens, 'ASTText'):
7✔
295
                self._asttokens = self.asttext().asttokens
7✔
296
            else:  # pragma: no cover
297
                self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename)
298
        return self._asttokens
7✔
299

300
    def _asttext_base(self) -> ASTTextBase:
7✔
301
        import asttokens  # must be installed separately
7✔
302

303
        if hasattr(asttokens, 'ASTText'):
7✔
304
            return self.asttext()
7✔
305
        else:  # pragma: no cover
306
            return self.asttokens()
307

308
    @staticmethod
7✔
309
    def decode_source(source: Union[str, bytes]) -> str:
7✔
310
        if isinstance(source, bytes):
7!
311
            encoding = Source.detect_encoding(source)
7✔
312
            return source.decode(encoding)
7✔
313
        else:
314
            return source
×
315

316
    @staticmethod
7✔
317
    def detect_encoding(source: bytes) -> str:
7✔
318
        return detect_encoding(io.BytesIO(source).readline)[0]
7✔
319

320
    def code_qualname(self, code: types.CodeType) -> str:
7✔
321
        """
322
        Imitates the __qualname__ attribute of functions for code objects.
323
        Given:
324

325
            - A function `func`
326
            - A frame `frame` for an execution of `func`, meaning:
327
                `frame.f_code is func.__code__`
328

329
        `Source.for_frame(frame).code_qualname(frame.f_code)`
330
        will be equal to `func.__qualname__`*. Works for Python 2 as well,
331
        where of course no `__qualname__` attribute exists.
332

333
        Falls back to `code.co_name` if there is no appropriate qualname.
334

335
        Based on https://github.com/wbolster/qualname
336

337
        (* unless `func` is a lambda
338
        nested inside another lambda on the same line, in which case
339
        the outer lambda's qualname will be returned for the codes
340
        of both lambdas)
341
        """
342
        assert_(code.co_filename == self.filename)
7✔
343
        return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name)
7✔
344

345

346
class Executing(object):
7✔
347
    """
348
    Information about the operation a frame is currently executing.
349

350
    Generally you will just want `node`, which is the AST node being executed,
351
    or None if it's unknown.
352

353
    If a decorator is currently being called, then:
354
        - `node` is a function or class definition
355
        - `decorator` is the expression in `node.decorator_list` being called
356
        - `statements == {node}`
357
    """
358

359
    def __init__(self, frame: types.FrameType, source: Source, node: EnhancedAST, stmts: Set[ast.stmt], decorator: Optional[EnhancedAST]) -> None:
7✔
360
        self.frame = frame
7✔
361
        self.source = source
7✔
362
        self.node = node
7✔
363
        self.statements = stmts
7✔
364
        self.decorator = decorator
7✔
365

366
    def code_qualname(self) -> str:
7✔
367
        return self.source.code_qualname(self.frame.f_code)
7✔
368

369
    def text(self) -> str:
7✔
370
        return self.source._asttext_base().get_text(self.node)
7✔
371

372
    def text_range(self) -> Tuple[int, int]:
7✔
373
        return self.source._asttext_base().get_text_range(self.node)
7✔
374

375

376
class QualnameVisitor(ast.NodeVisitor):
7✔
377
    def __init__(self) -> None:
7✔
378
        super(QualnameVisitor, self).__init__()
7✔
379
        self.stack: List[str] = []
7✔
380
        self.qualnames: Dict[Tuple[str, int], str] = {}
7✔
381

382
    def add_qualname(self, node: ast.AST, name: Optional[str] = None) -> None:
7✔
383
        name = name or node.name # type: ignore[attr-defined]
7✔
384
        self.stack.append(name)
7✔
385
        if getattr(node, 'decorator_list', ()):
7✔
386
            lineno = node.decorator_list[0].lineno # type: ignore[attr-defined]
7✔
387
        else:
388
            lineno = node.lineno # type: ignore[attr-defined]
7✔
389
        self.qualnames.setdefault((name, lineno), ".".join(self.stack))
7✔
390

391
    def visit_FunctionDef(self, node: ast.AST, name: Optional[str] = None) -> None:
7✔
392
        assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node
7✔
393
        self.add_qualname(node, name)
7✔
394
        self.stack.append('<locals>')
7✔
395
        children: Sequence[ast.AST] = []
7✔
396
        if isinstance(node, ast.Lambda):
7✔
397
            children = [node.body]
7✔
398
        else:
399
            children = node.body
7✔
400
        for child in children:
7✔
401
            self.visit(child)
7✔
402
        self.stack.pop()
7✔
403
        self.stack.pop()
7✔
404

405
        # Find lambdas in the function definition outside the body,
406
        # e.g. decorators or default arguments
407
        # Based on iter_child_nodes
408
        for field, child in ast.iter_fields(node):
7✔
409
            if field == 'body':
7✔
410
                continue
7✔
411
            if isinstance(child, ast.AST):
7✔
412
                self.visit(child)
7✔
413
            elif isinstance(child, list):
7✔
414
                for grandchild in child:
7✔
415
                    if isinstance(grandchild, ast.AST):
7!
416
                        self.visit(grandchild)
7✔
417

418
    visit_AsyncFunctionDef = visit_FunctionDef
7✔
419

420
    def visit_Lambda(self, node: ast.AST) -> None:
7✔
421
        assert isinstance(node, ast.Lambda)
7✔
422
        self.visit_FunctionDef(node, '<lambda>')
7✔
423

424
    def visit_ClassDef(self, node: ast.AST) -> None:
7✔
425
        assert isinstance(node, ast.ClassDef)
7✔
426
        self.add_qualname(node)
7✔
427
        self.generic_visit(node)
7✔
428
        self.stack.pop()
7✔
429

430

431

432

433

434
future_flags = sum(
7✔
435
    getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names
436
)
437

438

439
def compile_similar_to(source: ast.Module, matching_code: types.CodeType) -> Any:
7✔
440
    return compile(
3✔
441
        source,
442
        matching_code.co_filename,
443
        'exec',
444
        flags=future_flags & matching_code.co_flags,
445
        dont_inherit=True,
446
    )
447

448

449
sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698'
7✔
450

451
def is_rewritten_by_pytest(code: types.CodeType) -> bool:
7✔
452
    return any(
7✔
453
        bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py")
454
        for bc in get_instructions(code)
455
    )
456

457

458
class SentinelNodeFinder(object):
7✔
459
    result: Optional[EnhancedAST] = None
7✔
460

461
    def __init__(self, frame: types.FrameType, stmts: Set[EnhancedAST], tree: ast.Module, lasti: int, source: Source) -> None:
7✔
462
        assert_(stmts)
3✔
463
        self.frame = frame
3✔
464
        self.tree = tree
3✔
465
        self.code = code = frame.f_code
3✔
466
        self.is_pytest = is_rewritten_by_pytest(code)
3✔
467

468
        if self.is_pytest:
3✔
469
            self.ignore_linenos = frozenset(assert_linenos(tree))
3✔
470
        else:
471
            self.ignore_linenos = frozenset()
3✔
472

473
        self.decorator = None
3✔
474

475
        self.instruction = instruction = self.get_actual_current_instruction(lasti)
3✔
476
        op_name = instruction.opname
3✔
477
        extra_filter: Callable[[Any], bool] = lambda e: True
3✔
478
        ctx: Type = type(None)
3✔
479

480
        typ: Type = type(None)
3✔
481
        if op_name.startswith('CALL_'):
3✔
482
            typ = ast.Call
3✔
483
        elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')):
3✔
484
            typ = ast.Subscript
3✔
485
            ctx = ast.Load
3✔
486
        elif op_name.startswith('BINARY_'):
3✔
487
            typ = ast.BinOp
3✔
488
            op_type = dict(
3✔
489
                BINARY_POWER=ast.Pow,
490
                BINARY_MULTIPLY=ast.Mult,
491
                BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()),
492
                BINARY_FLOOR_DIVIDE=ast.FloorDiv,
493
                BINARY_TRUE_DIVIDE=ast.Div,
494
                BINARY_MODULO=ast.Mod,
495
                BINARY_ADD=ast.Add,
496
                BINARY_SUBTRACT=ast.Sub,
497
                BINARY_LSHIFT=ast.LShift,
498
                BINARY_RSHIFT=ast.RShift,
499
                BINARY_AND=ast.BitAnd,
500
                BINARY_XOR=ast.BitXor,
501
                BINARY_OR=ast.BitOr,
502
            )[op_name]
503
            extra_filter = lambda e: isinstance(e.op, op_type)
3✔
504
        elif op_name.startswith('UNARY_'):
3✔
505
            typ = ast.UnaryOp
3✔
506
            op_type = dict(
3✔
507
                UNARY_POSITIVE=ast.UAdd,
508
                UNARY_NEGATIVE=ast.USub,
509
                UNARY_NOT=ast.Not,
510
                UNARY_INVERT=ast.Invert,
511
            )[op_name]
512
            extra_filter = lambda e: isinstance(e.op, op_type)
3✔
513
        elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'):
3✔
514
            typ = ast.Attribute
3✔
515
            ctx = ast.Load
3✔
516
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
517
        elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'):
3✔
518
            typ = ast.Name
3✔
519
            ctx = ast.Load
3✔
520
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
521
        elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'):
3✔
522
            typ = ast.Compare
3✔
523
            extra_filter = lambda e: len(e.ops) == 1
3✔
524
        elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')):
3✔
525
            ctx = ast.Store
3✔
526
            typ = ast.Subscript
3✔
527
        elif op_name.startswith('STORE_ATTR'):
3✔
528
            ctx = ast.Store
3✔
529
            typ = ast.Attribute
3✔
530
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
531
        else:
532
            raise RuntimeError(op_name)
3✔
533

534

535
        with lock:
3✔
536
            exprs = {
3✔
537
                cast(EnhancedAST, node)
538
                for stmt in stmts
539
                for node in ast.walk(stmt)
540
                if isinstance(node, typ)
541
                if isinstance(getattr(node, "ctx", None), ctx)
542
                if extra_filter(node)
543
                if statement_containing_node(node) == stmt
544
            }
545

546
            if ctx == ast.Store:
3✔
547
                # No special bytecode tricks here.
548
                # We can handle multiple assigned attributes with different names,
549
                # but only one assigned subscript.
550
                self.result = only(exprs)
3✔
551
                return
3✔
552

553
            matching = list(self.matching_nodes(exprs))
3✔
554
            if not matching and typ == ast.Call:
3✔
555
                self.find_decorator(stmts)
3✔
556
            else:
557
                self.result = only(matching)
3✔
558

559
    def find_decorator(self, stmts: Union[List[EnhancedAST], Set[EnhancedAST]]) -> None:
7✔
560
        stmt = only(stmts)
3✔
561
        assert_(isinstance(stmt, (ast.ClassDef, function_node_types)))
3✔
562
        decorators = stmt.decorator_list # type: ignore[attr-defined]
3✔
563
        assert_(decorators)
3✔
564
        line_instructions = [
3✔
565
            inst
566
            for inst in self.clean_instructions(self.code)
567
            if inst.lineno == self.frame.f_lineno
568
        ]
569
        last_decorator_instruction_index = [
3✔
570
            i
571
            for i, inst in enumerate(line_instructions)
572
            if inst.opname == "CALL_FUNCTION"
573
        ][-1]
574
        assert_(
3✔
575
            line_instructions[last_decorator_instruction_index + 1].opname.startswith(
576
                "STORE_"
577
            )
578
        )
579
        decorator_instructions = line_instructions[
3✔
580
            last_decorator_instruction_index
581
            - len(decorators)
582
            + 1 : last_decorator_instruction_index
583
            + 1
584
        ]
585
        assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"})
3✔
586
        decorator_index = decorator_instructions.index(self.instruction)
3✔
587
        decorator = decorators[::-1][decorator_index]
3✔
588
        self.decorator = decorator
3✔
589
        self.result = stmt
3✔
590

591
    def clean_instructions(self, code: types.CodeType) -> List[EnhancedInstruction]:
7✔
592
        return [
3✔
593
            inst
594
            for inst in get_instructions(code)
595
            if inst.opname not in ("EXTENDED_ARG", "NOP")
596
            if inst.lineno not in self.ignore_linenos
597
        ]
598

599
    def get_original_clean_instructions(self) -> List[EnhancedInstruction]:
7✔
600
        result = self.clean_instructions(self.code)
3✔
601

602
        # pypy sometimes (when is not clear)
603
        # inserts JUMP_IF_NOT_DEBUG instructions in bytecode
604
        # If they're not present in our compiled instructions,
605
        # ignore them in the original bytecode
606
        if not any(
3!
607
                inst.opname == "JUMP_IF_NOT_DEBUG"
608
                for inst in self.compile_instructions()
609
        ):
610
            result = [
3✔
611
                inst for inst in result
612
                if inst.opname != "JUMP_IF_NOT_DEBUG"
613
            ]
614

615
        return result
3✔
616

617
    def matching_nodes(self, exprs: Set[EnhancedAST]) -> Iterator[EnhancedAST]:
7✔
618
        original_instructions = self.get_original_clean_instructions()
3✔
619
        original_index = only(
3✔
620
            i
621
            for i, inst in enumerate(original_instructions)
622
            if inst == self.instruction
623
        )
624
        for expr_index, expr in enumerate(exprs):
3✔
625
            setter = get_setter(expr)
3✔
626
            assert setter is not None
3✔
627
            # noinspection PyArgumentList
628
            replacement = ast.BinOp(
3✔
629
                left=cast(ast.expr,expr),
630
                op=ast.Pow(),
631
                right=ast.Constant(sentinel),
632
            )
633
            ast.fix_missing_locations(replacement)
3✔
634
            setter(replacement)
3✔
635
            try:
3✔
636
                instructions = self.compile_instructions()
3✔
637
            finally:
638
                setter(expr)
3✔
639

640
            if sys.version_info >= (3, 10):
3✔
641
                try:
1✔
642
                    handle_jumps(instructions, original_instructions)
1✔
643
                except Exception:
×
644
                    # Give other candidates a chance
645
                    if TESTING or expr_index < len(exprs) - 1:
×
646
                        continue
×
647
                    raise
×
648

649
            indices = [
3✔
650
                i
651
                for i, instruction in enumerate(instructions)
652
                if instruction.argval == sentinel
653
            ]
654

655
            # There can be several indices when the bytecode is duplicated,
656
            # as happens in a finally block in 3.9+
657
            # First we remove the opcodes caused by our modifications
658
            for index_num, sentinel_index in enumerate(indices):
3✔
659
                # Adjustment for removing sentinel instructions below
660
                # in past iterations
661
                sentinel_index -= index_num * 2
3✔
662

663
                assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST')
3✔
664
                assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER')
3✔
665

666
            # Then we see if any of the instruction indices match
667
            for index_num, sentinel_index in enumerate(indices):
3✔
668
                sentinel_index -= index_num * 2
3✔
669
                new_index = sentinel_index - 1
3✔
670

671
                if new_index != original_index:
3✔
672
                    continue
3✔
673

674
                original_inst = original_instructions[original_index]
3✔
675
                new_inst = instructions[new_index]
3✔
676

677
                # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)'
678
                # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT
679
                if (
3✔
680
                        original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP')
681
                        and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
682
                        and (
683
                        original_instructions[original_index + 1].opname
684
                        != instructions[new_index + 1].opname == 'UNARY_NOT'
685
                )):
686
                    # Remove the difference for the upcoming assert
687
                    instructions.pop(new_index + 1)
2✔
688

689
                # Check that the modified instructions don't have anything unexpected
690
                # 3.10 is a bit too weird to assert this in all cases but things still work
691
                if sys.version_info < (3, 10):
3✔
692
                    for inst1, inst2 in zip_longest(
2✔
693
                        original_instructions, instructions
694
                    ):
695
                        assert_(inst1 and inst2 and opnames_match(inst1, inst2))
2✔
696

697
                yield expr
3✔
698

699
    def compile_instructions(self) -> List[EnhancedInstruction]:
7✔
700
        module_code = compile_similar_to(self.tree, self.code)
3✔
701
        code = only(self.find_codes(module_code))
3✔
702
        return self.clean_instructions(code)
3✔
703

704
    def find_codes(self, root_code: types.CodeType) -> list:
7✔
705
        checks: List[Callable] = [
3✔
706
            attrgetter('co_firstlineno'),
707
            attrgetter('co_freevars'),
708
            attrgetter('co_cellvars'),
709
            lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name,
710
        ]
711
        if not self.is_pytest:
3✔
712
            checks += [
3✔
713
                attrgetter('co_names'),
714
                attrgetter('co_varnames'),
715
            ]
716

717
        def matches(c: types.CodeType) -> bool:
3✔
718
            return all(
3✔
719
                f(c) == f(self.code)
720
                for f in checks
721
            )
722

723
        code_options = []
3✔
724
        if matches(root_code):
3✔
725
            code_options.append(root_code)
3✔
726

727
        def finder(code: types.CodeType) -> None:
3✔
728
            for const in code.co_consts:
3✔
729
                if not inspect.iscode(const):
3✔
730
                    continue
3✔
731

732
                if matches(const):
3✔
733
                    code_options.append(const)
3✔
734
                finder(const)
3✔
735

736
        finder(root_code)
3✔
737
        return code_options
3✔
738

739
    def get_actual_current_instruction(self, lasti: int) -> EnhancedInstruction:
7✔
740
        """
741
        Get the instruction corresponding to the current
742
        frame offset, skipping EXTENDED_ARG instructions
743
        """
744
        # Don't use get_original_clean_instructions
745
        # because we need the actual instructions including
746
        # EXTENDED_ARG
747
        instructions = list(get_instructions(self.code))
3✔
748
        index = only(
3✔
749
            i
750
            for i, inst in enumerate(instructions)
751
            if inst.offset == lasti
752
        )
753

754
        while True:
755
            instruction = instructions[index]
3✔
756
            if instruction.opname != "EXTENDED_ARG":
3✔
757
                return instruction
3✔
758
            index += 1
2✔
759

760

761

762
def non_sentinel_instructions(instructions: List[EnhancedInstruction], start: int) -> Iterator[Tuple[int, EnhancedInstruction]]:
7✔
763
    """
764
    Yields (index, instruction) pairs excluding the basic
765
    instructions introduced by the sentinel transformation
766
    """
767
    skip_power = False
1✔
768
    for i, inst in islice(enumerate(instructions), start, None):
1!
769
        if inst.argval == sentinel:
1✔
770
            assert_(inst.opname == "LOAD_CONST")
1✔
771
            skip_power = True
1✔
772
            continue
1✔
773
        elif skip_power:
1✔
774
            assert_(inst.opname == "BINARY_POWER")
1✔
775
            skip_power = False
1✔
776
            continue
1✔
777
        yield i, inst
1✔
778

779

780
def walk_both_instructions(original_instructions: List[EnhancedInstruction], original_start: int, instructions: List[EnhancedInstruction], start: int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]]:
7✔
781
    """
782
    Yields matching indices and instructions from the new and original instructions,
783
    leaving out changes made by the sentinel transformation.
784
    """
785
    original_iter = islice(enumerate(original_instructions), original_start, None)
1✔
786
    new_iter = non_sentinel_instructions(instructions, start)
1✔
787
    inverted_comparison = False
1✔
788
    while True:
789
        try:
1✔
790
            original_i, original_inst = next(original_iter)
1✔
791
            new_i, new_inst = next(new_iter)
1✔
792
        except StopIteration:
1✔
793
            return
1✔
794
        if (
1✔
795
            inverted_comparison
796
            and original_inst.opname != new_inst.opname == "UNARY_NOT"
797
        ):
798
            new_i, new_inst = next(new_iter)
1✔
799
        inverted_comparison = (
1✔
800
            original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP")
801
            and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
802
        )
803
        yield original_i, original_inst, new_i, new_inst
1✔
804

805

806
def handle_jumps(instructions: List[EnhancedInstruction], original_instructions: List[EnhancedInstruction]) -> None:
7✔
807
    """
808
    Transforms instructions in place until it looks more like original_instructions.
809
    This is only needed in 3.10+ where optimisations lead to more drastic changes
810
    after the sentinel transformation.
811
    Replaces JUMP instructions that aren't also present in original_instructions
812
    with the sections that they jump to until a raise or return.
813
    In some other cases duplication found in `original_instructions`
814
    is replicated in `instructions`.
815
    """
816
    while True:
817
        for original_i, original_inst, new_i, new_inst in walk_both_instructions(
1✔
818
            original_instructions, 0, instructions, 0
819
        ):
820
            if opnames_match(original_inst, new_inst):
1✔
821
                continue
1✔
822

823
            if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname:
1!
824
                # Find where the new instruction is jumping to, ignoring
825
                # instructions which have been copied in previous iterations
826
                start = only(
1✔
827
                    i
828
                    for i, inst in enumerate(instructions)
829
                    if inst.offset == new_inst.argval
830
                    and not getattr(inst, "_copied", False)
831
                )
832
                # Replace the jump instruction with the jumped to section of instructions
833
                # That section may also be deleted if it's not similarly duplicated
834
                # in original_instructions
835
                new_instructions = handle_jump(
1✔
836
                    original_instructions, original_i, instructions, start
837
                )
838
                assert new_instructions is not None
1✔
839
                instructions[new_i : new_i + 1] = new_instructions            
1✔
840
            else:
841
                # Extract a section of original_instructions from original_i to return/raise
842
                orig_section = []
×
843
                for section_inst in original_instructions[original_i:]:
×
844
                    orig_section.append(section_inst)
×
845
                    if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
×
846
                        break
×
847
                else:
848
                    # No return/raise - this is just a mismatch we can't handle
849
                    raise AssertionError
×
850

851
                instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions))
×
852

853
            # instructions has been modified, the for loop can't sensibly continue
854
            # Restart it from the beginning, checking for other issues
855
            break
1✔
856

857
        else:  # No mismatched jumps found, we're done
858
            return
1✔
859

860

861
def find_new_matching(orig_section: List[EnhancedInstruction], instructions: List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]]:
7✔
862
    """
863
    Yields sections of `instructions` which match `orig_section`.
864
    The yielded sections include sentinel instructions, but these
865
    are ignored when checking for matches.
866
    """
867
    for start in range(len(instructions) - len(orig_section)):
×
868
        indices, dup_section = zip(
×
869
            *islice(
870
                non_sentinel_instructions(instructions, start),
871
                len(orig_section),
872
            )
873
        )
874
        if len(dup_section) < len(orig_section):
×
875
            return
×
876
        if sections_match(orig_section, dup_section):
×
877
            yield instructions[start:indices[-1] + 1]
×
878

879

880
def handle_jump(original_instructions: List[EnhancedInstruction], original_start: int, instructions: List[EnhancedInstruction], start: int) -> Optional[List[EnhancedInstruction]]:
7✔
881
    """
882
    Returns the section of instructions starting at `start` and ending
883
    with a RETURN_VALUE or RAISE_VARARGS instruction.
884
    There should be a matching section in original_instructions starting at original_start.
885
    If that section doesn't appear elsewhere in original_instructions,
886
    then also delete the returned section of instructions.
887
    """
888
    for original_j, original_inst, new_j, new_inst in walk_both_instructions(
1!
889
        original_instructions, original_start, instructions, start
890
    ):
891
        assert_(opnames_match(original_inst, new_inst))
1✔
892
        if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
1✔
893
            inlined = deepcopy(instructions[start : new_j + 1])
1✔
894
            for inl in inlined:
1✔
895
                inl._copied = True
1✔
896
            orig_section = original_instructions[original_start : original_j + 1]
1✔
897
            if not check_duplicates(
1✔
898
                original_start, orig_section, original_instructions
899
            ):
900
                instructions[start : new_j + 1] = []
1✔
901
            return inlined
1✔
902
    
903
    return None
×
904

905

906
def check_duplicates(original_i: int, orig_section: List[EnhancedInstruction], original_instructions: List[EnhancedInstruction]) -> bool:
7✔
907
    """
908
    Returns True if a section of original_instructions starting somewhere other
909
    than original_i and matching orig_section is found, i.e. orig_section is duplicated.
910
    """
911
    for dup_start in range(len(original_instructions)):
1!
912
        if dup_start == original_i:
1✔
913
            continue
1✔
914
        dup_section = original_instructions[dup_start : dup_start + len(orig_section)]
1✔
915
        if len(dup_section) < len(orig_section):
1✔
916
            return False
1✔
917
        if sections_match(orig_section, dup_section):
1✔
918
            return True
1✔
919
    
920
    return False
×
921

922
def sections_match(orig_section: List[EnhancedInstruction], dup_section: List[EnhancedInstruction]) -> bool:
7✔
923
    """
924
    Returns True if the given lists of instructions have matching linenos and opnames.
925
    """
926
    return all(
1✔
927
        (
928
            orig_inst.lineno == dup_inst.lineno
929
            # POP_BLOCKs have been found to have differing linenos in innocent cases
930
            or "POP_BLOCK" == orig_inst.opname == dup_inst.opname
931
        )
932
        and opnames_match(orig_inst, dup_inst)
933
        for orig_inst, dup_inst in zip(orig_section, dup_section)
934
    )
935

936

937
def opnames_match(inst1: Instruction, inst2: Instruction) -> bool:
7✔
938
    return (
3✔
939
        inst1.opname == inst2.opname
940
        or "JUMP" in inst1.opname
941
        and "JUMP" in inst2.opname
942
        or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP")
943
        or (
944
            inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD")
945
            and inst2.opname == "LOAD_ATTR"
946
        )
947
        or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION")
948
    )
949

950

951
def get_setter(node: EnhancedAST) -> Optional[Callable[[ast.AST], None]]:
7✔
952
    parent = node.parent
3✔
953
    for name, field in ast.iter_fields(parent):
3!
954
        if field is node:
3✔
955
            def setter(new_node: ast.AST) -> None:
3✔
956
                return setattr(parent, name, new_node)
3✔
957
            return setter
3✔
958
        elif isinstance(field, list):
3✔
959
            for i, item in enumerate(field):
3✔
960
                if item is node:
3✔
961
                    def setter(new_node: ast.AST) -> None:
3✔
962
                        field[i] = new_node
3✔
963

964
                    return setter
3✔
965
    return None
×
966

967
lock = RLock()
7✔
968

969

970
@cache
7✔
971
def statement_containing_node(node: ast.AST) -> EnhancedAST:
7✔
972
    while not isinstance(node, ast.stmt):
7✔
973
        node = cast(EnhancedAST, node).parent
7✔
974
    return cast(EnhancedAST, node)
7✔
975

976

977
def assert_linenos(tree: ast.AST) -> Iterator[int]:
7✔
978
    for node in ast.walk(tree):
3✔
979
        if (
3✔
980
                hasattr(node, 'parent') and
981
                isinstance(statement_containing_node(node), ast.Assert)
982
        ):
983
            for lineno in node_linenos(node):
3✔
984
                yield lineno
3✔
985

986

987
def _extract_ipython_statement(stmt: EnhancedAST) -> ast.Module:
7✔
988
    # IPython separates each statement in a cell to be executed separately
989
    # So NodeFinder should only compile one statement at a time or it
990
    # will find a code mismatch.
991
    while not isinstance(stmt.parent, ast.Module):
×
992
        stmt = stmt.parent
×
993
    # use `ast.parse` instead of `ast.Module` for better portability
994
    # python3.8 changes the signature of `ast.Module`
995
    # Inspired by https://github.com/pallets/werkzeug/pull/1552/files
996
    tree = ast.parse("")
×
997
    tree.body = [cast(ast.stmt, stmt)]
×
998
    ast.copy_location(tree, stmt)
×
999
    return tree
×
1000

1001

1002
def is_ipython_cell_code_name(code_name: str) -> bool:
7✔
1003
    return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name))
7✔
1004

1005

1006
def is_ipython_cell_filename(filename: str) -> bool:
7✔
1007
    return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename))
7✔
1008

1009

1010
def is_ipython_cell_code(code_obj: types.CodeType) -> bool:
7✔
1011
    return (
7✔
1012
        is_ipython_cell_filename(code_obj.co_filename) and
1013
        is_ipython_cell_code_name(code_obj.co_name)
1014
    )
1015

1016

1017
def find_node_ipython(frame: types.FrameType, lasti: int, stmts: Set[EnhancedAST], source: Source) -> Tuple[Optional[Any], Optional[Any]]:
7✔
1018
    node = decorator = None
×
1019
    for stmt in stmts:
×
1020
        tree = _extract_ipython_statement(stmt)
×
1021
        try:
×
1022
            node_finder = NodeFinder(frame, stmts, tree, lasti, source)
×
1023
            if (node or decorator) and (node_finder.result or node_finder.decorator):
×
1024
                # Found potential nodes in separate statements,
1025
                # cannot resolve ambiguity, give up here
1026
                return None, None
×
1027

1028
            node = node_finder.result
×
1029
            decorator = node_finder.decorator
×
1030
        except Exception:
×
1031
            pass
×
1032
    return decorator, node
×
1033

1034

1035

1036
def node_linenos(node: ast.AST) -> Iterator[int]:
7✔
1037
    if hasattr(node, "lineno"):
7✔
1038
        linenos: Sequence[int] = []
7✔
1039
        if hasattr(node, "end_lineno") and isinstance(node, ast.expr):
7✔
1040
            assert node.end_lineno is not None # type: ignore[attr-defined]
7✔
1041
            linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined]
7✔
1042
        else:
1043
            linenos = [node.lineno] # type: ignore[attr-defined]
7✔
1044
        for lineno in linenos:
7✔
1045
            yield lineno
7✔
1046

1047

1048
if sys.version_info >= (3, 11):
7✔
1049
    from ._position_node_finder import PositionNodeFinder as NodeFinder
4✔
1050
else:
1051
    NodeFinder = SentinelNodeFinder
3✔
1052

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc