• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

alexmojaki / executing / 17110460267

20 Aug 2025 09:16PM UTC coverage: 97.189% (-0.1%) from 97.297%
17110460267

push

github

231 of 242 branches covered (95.45%)

Branch coverage included in aggregate %.

495 of 505 relevant lines covered (98.02%)

4.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.19
/executing/executing.py
1
"""
2
MIT License
3

4
Copyright (c) 2021 Alex Hall
5

6
Permission is hereby granted, free of charge, to any person obtaining a copy
7
of this software and associated documentation files (the "Software"), to deal
8
in the Software without restriction, including without limitation the rights
9
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
copies of the Software, and to permit persons to whom the Software is
11
furnished to do so, subject to the following conditions:
12

13
The above copyright notice and this permission notice shall be included in all
14
copies or substantial portions of the Software.
15

16
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22
SOFTWARE.
23
"""
24

25
import __future__
6✔
26
import ast
6✔
27
import dis
6✔
28
import inspect
6✔
29
import io
6✔
30
import linecache
6✔
31
import re
6✔
32
import sys
6✔
33
import types
6✔
34
from collections import defaultdict
6✔
35
from copy import deepcopy
6✔
36
from functools import lru_cache
6✔
37
from itertools import islice
6✔
38
from itertools import zip_longest
6✔
39
from operator import attrgetter
6✔
40
from pathlib import Path
6✔
41
from threading import RLock
6✔
42
from tokenize import detect_encoding
6✔
43
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, Iterator, List, Optional, Sequence, Set, Sized, Tuple, Type, TypeVar, Union, cast
6✔
44
from ._utils import mangled_name,assert_, EnhancedAST,EnhancedInstruction,Instruction,get_instructions
6✔
45

46
if TYPE_CHECKING:  # pragma: no cover
47
    from asttokens import ASTTokens, ASTText
48
    from asttokens.asttokens import ASTTextBase
49

50

51
function_node_types = (ast.FunctionDef, ast.AsyncFunctionDef) # type: Tuple[Type, ...]
6✔
52

53
cache = lru_cache(maxsize=None)
6✔
54

55
TESTING = 0
6✔
56

57
class NotOneValueFound(Exception):
6✔
58
    def __init__(self,msg,values=[]):
6✔
59
        # type: (str, Sequence) -> None
60
        self.values=values
6✔
61
        super(NotOneValueFound,self).__init__(msg)
6✔
62

63
T = TypeVar('T')
6✔
64

65

66
def only(it):
6✔
67
    # type: (Iterable[T]) -> T
68
    if isinstance(it, Sized):
6✔
69
        if len(it) != 1:
3✔
70
            raise NotOneValueFound('Expected one value, found %s' % len(it))
3✔
71
        # noinspection PyTypeChecker
72
        return list(it)[0]
3✔
73

74
    lst = tuple(islice(it, 2))
6✔
75
    if len(lst) == 0:
6✔
76
        raise NotOneValueFound('Expected one value, found 0')
6✔
77
    if len(lst) > 1:
6✔
78
        raise NotOneValueFound('Expected one value, found several',lst)
6✔
79
    return lst[0]
6✔
80

81

82
class Source(object):
6✔
83
    """
84
    The source code of a single file and associated metadata.
85

86
    The main method of interest is the classmethod `executing(frame)`.
87

88
    If you want an instance of this class, don't construct it.
89
    Ideally use the classmethod `for_frame(frame)`.
90
    If you don't have a frame, use `for_filename(filename [, module_globals])`.
91
    These methods cache instances by filename, so at most one instance exists per filename.
92

93
    Attributes:
94
        - filename
95
        - text
96
        - lines
97
        - tree: AST parsed from text, or None if text is not valid Python
98
            All nodes in the tree have an extra `parent` attribute
99

100
    Other methods of interest:
101
        - statements_at_line
102
        - asttokens
103
        - code_qualname
104
    """
105

106
    def __init__(self, filename, lines):
6✔
107
        # type: (str, Sequence[str]) -> None
108
        """
109
        Don't call this constructor, see the class docstring.
110
        """
111

112
        self.filename = filename
6✔
113
        self.text = ''.join(lines)
6✔
114
        self.lines = [line.rstrip('\r\n') for line in lines]
6✔
115

116
        self._nodes_by_line = defaultdict(list)
6✔
117
        self.tree = None
6✔
118
        self._qualnames = {}
6✔
119
        self._asttokens = None  # type: Optional[ASTTokens]
6✔
120
        self._asttext = None  # type: Optional[ASTText]
6✔
121

122
        try:
6✔
123
            self.tree = ast.parse(self.text, filename=filename)
6✔
124
        except (SyntaxError, ValueError):
6✔
125
            pass
6✔
126
        else:
127
            for node in ast.walk(self.tree):
6✔
128
                for child in ast.iter_child_nodes(node):
6✔
129
                    cast(EnhancedAST, child).parent = cast(EnhancedAST, node)
6✔
130
                for lineno in node_linenos(node):
6✔
131
                    self._nodes_by_line[lineno].append(node)
6✔
132

133
            visitor = QualnameVisitor()
6✔
134
            visitor.visit(self.tree)
6✔
135
            self._qualnames = visitor.qualnames
6✔
136

137
    @classmethod
6✔
138
    def for_frame(cls, frame, use_cache=True):
6✔
139
        # type: (types.FrameType, bool) -> "Source"
140
        """
141
        Returns the `Source` object corresponding to the file the frame is executing in.
142
        """
143
        return cls.for_filename(frame.f_code.co_filename, frame.f_globals or {}, use_cache)
6✔
144

145
    @classmethod
6✔
146
    def for_filename(
6✔
147
        cls,
148
        filename,
149
        module_globals=None,
150
        use_cache=True,  # noqa no longer used
151
    ):
152
        # type: (Union[str, Path], Optional[Dict[str, Any]], bool) -> "Source"
153
        if isinstance(filename, Path):
6!
154
            filename = str(filename)
×
155

156
        def get_lines():
6✔
157
            # type: () -> List[str]
158
            return linecache.getlines(cast(str, filename), module_globals)
6✔
159

160
        # Save the current linecache entry, then ensure the cache is up to date.
161
        entry = linecache.cache.get(filename) # type: ignore[attr-defined]
6✔
162
        linecache.checkcache(filename)
6✔
163
        lines = get_lines()
6✔
164
        if entry is not None and not lines:
6✔
165
            # There was an entry, checkcache removed it, and nothing replaced it.
166
            # This means the file wasn't simply changed (because the `lines` wouldn't be empty)
167
            # but rather the file was found not to exist, probably because `filename` was fake.
168
            # Restore the original entry so that we still have something.
169
            linecache.cache[filename] = entry # type: ignore[attr-defined]
6✔
170
            lines = get_lines()
6✔
171

172
        return cls._for_filename_and_lines(filename, tuple(lines))
6✔
173

174
    @classmethod
6✔
175
    def _for_filename_and_lines(cls, filename, lines):
6✔
176
        # type: (str, Sequence[str]) -> "Source"
177
        source_cache = cls._class_local('__source_cache_with_lines', {}) # type: Dict[Tuple[str, Sequence[str]], Source]
6✔
178
        try:
6✔
179
            return source_cache[(filename, lines)]
6✔
180
        except KeyError:
6✔
181
            pass
6✔
182

183
        result = source_cache[(filename, lines)] = cls(filename, lines)
6✔
184
        return result
6✔
185

186
    @classmethod
6✔
187
    def lazycache(cls, frame):
6✔
188
        # type: (types.FrameType) -> None
189
        linecache.lazycache(frame.f_code.co_filename, frame.f_globals)
6✔
190

191
    @classmethod
6✔
192
    def executing(cls, frame_or_tb):
6✔
193
        # type: (Union[types.TracebackType, types.FrameType]) -> "Executing"
194
        """
195
        Returns an `Executing` object representing the operation
196
        currently executing in the given frame or traceback object.
197
        """
198
        if isinstance(frame_or_tb, types.TracebackType):
6✔
199
            # https://docs.python.org/3/reference/datamodel.html#traceback-objects
200
            # "tb_lineno gives the line number where the exception occurred;
201
            #  tb_lasti indicates the precise instruction.
202
            #  The line number and last instruction in the traceback may differ
203
            #  from the line number of its frame object
204
            #  if the exception occurred in a try statement with no matching except clause
205
            #  or with a finally clause."
206
            tb = frame_or_tb
6✔
207
            frame = tb.tb_frame
6✔
208
            lineno = tb.tb_lineno
6✔
209
            lasti = tb.tb_lasti
6✔
210
        else:
211
            frame = frame_or_tb
6✔
212
            lineno = frame.f_lineno
6✔
213
            lasti = frame.f_lasti
6✔
214

215

216

217
        code = frame.f_code
6✔
218
        key = (code, id(code), lasti)
6✔
219
        executing_cache = cls._class_local('__executing_cache', {}) # type: Dict[Tuple[types.CodeType, int, int], Any]
6✔
220

221
        args = executing_cache.get(key)
6✔
222
        if not args:
6✔
223
            node = stmts = decorator = None
6✔
224
            source = cls.for_frame(frame)
6✔
225
            tree = source.tree
6✔
226
            if tree:
6✔
227
                try:
6✔
228
                    stmts = source.statements_at_line(lineno)
6✔
229
                    if stmts:
6!
230
                        if is_ipython_cell_code(code):
6✔
231
                            decorator, node = find_node_ipython(frame, lasti, stmts, source)
6✔
232
                        else:
233
                            node_finder = NodeFinder(frame, stmts, tree, lasti, source)
6✔
234
                            node = node_finder.result
6✔
235
                            decorator = node_finder.decorator
6✔
236

237
                    if node:
6✔
238
                        new_stmts = {statement_containing_node(node)}
6✔
239
                        assert_(new_stmts <= stmts)
6✔
240
                        stmts = new_stmts
6✔
241
                except Exception:
6✔
242
                    if TESTING:
6✔
243
                        raise
6✔
244

245
            executing_cache[key] = args = source, node, stmts, decorator
6✔
246

247
        return Executing(frame, *args)
6✔
248

249
    @classmethod
6✔
250
    def _class_local(cls, name, default):
6✔
251
        # type: (str, T) -> T
252
        """
253
        Returns an attribute directly associated with this class
254
        (as opposed to subclasses), setting default if necessary
255
        """
256
        # classes have a mappingproxy preventing us from using setdefault
257
        result = cls.__dict__.get(name, default)
6✔
258
        setattr(cls, name, result)
6✔
259
        return result
6✔
260

261
    @cache
6✔
262
    def statements_at_line(self, lineno):
6✔
263
        # type: (int) -> Set[EnhancedAST]
264
        """
265
        Returns the statement nodes overlapping the given line.
266

267
        Returns at most one statement unless semicolons are present.
268

269
        If the `text` attribute is not valid python, meaning
270
        `tree` is None, returns an empty set.
271

272
        Otherwise, `Source.for_frame(frame).statements_at_line(frame.f_lineno)`
273
        should return at least one statement.
274
        """
275

276
        return {
6✔
277
            statement_containing_node(node)
278
            for node in
279
            self._nodes_by_line[lineno]
280
        }
281

282
    def asttext(self):
6✔
283
        # type: () -> ASTText
284
        """
285
        Returns an ASTText object for getting the source of specific AST nodes.
286

287
        See http://asttokens.readthedocs.io/en/latest/api-index.html
288
        """
289
        from asttokens import ASTText  # must be installed separately
6✔
290

291
        if self._asttext is None:
6✔
292
            self._asttext = ASTText(self.text, tree=self.tree, filename=self.filename)
6✔
293

294
        return self._asttext
6✔
295

296
    def asttokens(self):
6✔
297
        # type: () -> ASTTokens
298
        """
299
        Returns an ASTTokens object for getting the source of specific AST nodes.
300

301
        See http://asttokens.readthedocs.io/en/latest/api-index.html
302
        """
303
        import asttokens  # must be installed separately
6✔
304

305
        if self._asttokens is None:
6✔
306
            if hasattr(asttokens, 'ASTText'):
6✔
307
                self._asttokens = self.asttext().asttokens
6✔
308
            else:  # pragma: no cover
309
                self._asttokens = asttokens.ASTTokens(self.text, tree=self.tree, filename=self.filename)
310
        return self._asttokens
6✔
311

312
    def _asttext_base(self):
6✔
313
        # type: () -> ASTTextBase
314
        import asttokens  # must be installed separately
6✔
315

316
        if hasattr(asttokens, 'ASTText'):
6✔
317
            return self.asttext()
6✔
318
        else:  # pragma: no cover
319
            return self.asttokens()
320

321
    @staticmethod
6✔
322
    def decode_source(source):
6✔
323
        # type: (Union[str, bytes]) -> str
324
        if isinstance(source, bytes):
6!
325
            encoding = Source.detect_encoding(source)
6✔
326
            return source.decode(encoding)
6✔
327
        else:
328
            return source
×
329

330
    @staticmethod
6✔
331
    def detect_encoding(source):
6✔
332
        # type: (bytes) -> str
333
        return detect_encoding(io.BytesIO(source).readline)[0]
6✔
334

335
    def code_qualname(self, code):
6✔
336
        # type: (types.CodeType) -> str
337
        """
338
        Imitates the __qualname__ attribute of functions for code objects.
339
        Given:
340

341
            - A function `func`
342
            - A frame `frame` for an execution of `func`, meaning:
343
                `frame.f_code is func.__code__`
344

345
        `Source.for_frame(frame).code_qualname(frame.f_code)`
346
        will be equal to `func.__qualname__`*. Works for Python 2 as well,
347
        where of course no `__qualname__` attribute exists.
348

349
        Falls back to `code.co_name` if there is no appropriate qualname.
350

351
        Based on https://github.com/wbolster/qualname
352

353
        (* unless `func` is a lambda
354
        nested inside another lambda on the same line, in which case
355
        the outer lambda's qualname will be returned for the codes
356
        of both lambdas)
357
        """
358
        assert_(code.co_filename == self.filename)
6✔
359
        return self._qualnames.get((code.co_name, code.co_firstlineno), code.co_name)
6✔
360

361

362
class Executing(object):
6✔
363
    """
364
    Information about the operation a frame is currently executing.
365

366
    Generally you will just want `node`, which is the AST node being executed,
367
    or None if it's unknown.
368

369
    If a decorator is currently being called, then:
370
        - `node` is a function or class definition
371
        - `decorator` is the expression in `node.decorator_list` being called
372
        - `statements == {node}`
373
    """
374

375
    def __init__(self, frame, source, node, stmts, decorator):
6✔
376
        # type: (types.FrameType, Source, EnhancedAST, Set[ast.stmt], Optional[EnhancedAST]) -> None
377
        self.frame = frame
6✔
378
        self.source = source
6✔
379
        self.node = node
6✔
380
        self.statements = stmts
6✔
381
        self.decorator = decorator
6✔
382

383
    def code_qualname(self):
6✔
384
        # type: () -> str
385
        return self.source.code_qualname(self.frame.f_code)
6✔
386

387
    def text(self):
6✔
388
        # type: () -> str
389
        return self.source._asttext_base().get_text(self.node)
6✔
390

391
    def text_range(self):
6✔
392
        # type: () -> Tuple[int, int]
393
        return self.source._asttext_base().get_text_range(self.node)
6✔
394

395

396
class QualnameVisitor(ast.NodeVisitor):
6✔
397
    def __init__(self):
6✔
398
        # type: () -> None
399
        super(QualnameVisitor, self).__init__()
6✔
400
        self.stack = [] # type: List[str]
6✔
401
        self.qualnames = {} # type: Dict[Tuple[str, int], str]
6✔
402

403
    def add_qualname(self, node, name=None):
6✔
404
        # type: (ast.AST, Optional[str]) -> None
405
        name = name or node.name # type: ignore[attr-defined]
6✔
406
        self.stack.append(name)
6✔
407
        if getattr(node, 'decorator_list', ()):
6✔
408
            lineno = node.decorator_list[0].lineno # type: ignore[attr-defined]
6✔
409
        else:
410
            lineno = node.lineno # type: ignore[attr-defined]
6✔
411
        self.qualnames.setdefault((name, lineno), ".".join(self.stack))
6✔
412

413
    def visit_FunctionDef(self, node, name=None):
6✔
414
        # type: (ast.AST, Optional[str]) -> None
415
        assert isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.Lambda)), node
6✔
416
        self.add_qualname(node, name)
6✔
417
        self.stack.append('<locals>')
6✔
418
        children = [] # type: Sequence[ast.AST]
6✔
419
        if isinstance(node, ast.Lambda):
6✔
420
            children = [node.body]
6✔
421
        else:
422
            children = node.body
6✔
423
        for child in children:
6✔
424
            self.visit(child)
6✔
425
        self.stack.pop()
6✔
426
        self.stack.pop()
6✔
427

428
        # Find lambdas in the function definition outside the body,
429
        # e.g. decorators or default arguments
430
        # Based on iter_child_nodes
431
        for field, child in ast.iter_fields(node):
6✔
432
            if field == 'body':
6✔
433
                continue
6✔
434
            if isinstance(child, ast.AST):
6✔
435
                self.visit(child)
6✔
436
            elif isinstance(child, list):
6✔
437
                for grandchild in child:
6✔
438
                    if isinstance(grandchild, ast.AST):
6!
439
                        self.visit(grandchild)
6✔
440

441
    visit_AsyncFunctionDef = visit_FunctionDef
6✔
442

443
    def visit_Lambda(self, node):
6✔
444
        # type: (ast.AST) -> None
445
        assert isinstance(node, ast.Lambda)
6✔
446
        self.visit_FunctionDef(node, '<lambda>')
6✔
447

448
    def visit_ClassDef(self, node):
6✔
449
        # type: (ast.AST) -> None
450
        assert isinstance(node, ast.ClassDef)
6✔
451
        self.add_qualname(node)
6✔
452
        self.generic_visit(node)
6✔
453
        self.stack.pop()
6✔
454

455

456

457

458

459
future_flags = sum(
6✔
460
    getattr(__future__, fname).compiler_flag for fname in __future__.all_feature_names
461
)
462

463

464
def compile_similar_to(source, matching_code):
6✔
465
    # type: (ast.Module, types.CodeType) -> Any
466
    return compile(
3✔
467
        source,
468
        matching_code.co_filename,
469
        'exec',
470
        flags=future_flags & matching_code.co_flags,
471
        dont_inherit=True,
472
    )
473

474

475
sentinel = 'io8urthglkjdghvljusketgIYRFYUVGHFRTBGVHKGF78678957647698'
6✔
476

477
def is_rewritten_by_pytest(code):
6✔
478
    # type: (types.CodeType) -> bool
479
    return any(
6✔
480
        bc.opname != "LOAD_CONST" and isinstance(bc.argval,str) and bc.argval.startswith("@py")
481
        for bc in get_instructions(code)
482
    )
483

484

485
class SentinelNodeFinder(object):
6✔
486
    result = None # type: EnhancedAST
6✔
487

488
    def __init__(self, frame, stmts, tree, lasti, source):
6✔
489
        # type: (types.FrameType, Set[EnhancedAST], ast.Module, int, Source) -> None
490
        assert_(stmts)
3✔
491
        self.frame = frame
3✔
492
        self.tree = tree
3✔
493
        self.code = code = frame.f_code
3✔
494
        self.is_pytest = is_rewritten_by_pytest(code)
3✔
495

496
        if self.is_pytest:
3✔
497
            self.ignore_linenos = frozenset(assert_linenos(tree))
3✔
498
        else:
499
            self.ignore_linenos = frozenset()
3✔
500

501
        self.decorator = None
3✔
502

503
        self.instruction = instruction = self.get_actual_current_instruction(lasti)
3✔
504
        op_name = instruction.opname
3✔
505
        extra_filter = lambda e: True # type: Callable[[Any], bool]
3✔
506
        ctx = type(None) # type: Type
3✔
507

508
        typ = type(None) # type: Type
3✔
509
        if op_name.startswith('CALL_'):
3✔
510
            typ = ast.Call
3✔
511
        elif op_name.startswith(('BINARY_SUBSCR', 'SLICE+')):
3✔
512
            typ = ast.Subscript
3✔
513
            ctx = ast.Load
3✔
514
        elif op_name.startswith('BINARY_'):
3✔
515
            typ = ast.BinOp
3✔
516
            op_type = dict(
3✔
517
                BINARY_POWER=ast.Pow,
518
                BINARY_MULTIPLY=ast.Mult,
519
                BINARY_MATRIX_MULTIPLY=getattr(ast, "MatMult", ()),
520
                BINARY_FLOOR_DIVIDE=ast.FloorDiv,
521
                BINARY_TRUE_DIVIDE=ast.Div,
522
                BINARY_MODULO=ast.Mod,
523
                BINARY_ADD=ast.Add,
524
                BINARY_SUBTRACT=ast.Sub,
525
                BINARY_LSHIFT=ast.LShift,
526
                BINARY_RSHIFT=ast.RShift,
527
                BINARY_AND=ast.BitAnd,
528
                BINARY_XOR=ast.BitXor,
529
                BINARY_OR=ast.BitOr,
530
            )[op_name]
531
            extra_filter = lambda e: isinstance(e.op, op_type)
3✔
532
        elif op_name.startswith('UNARY_'):
3✔
533
            typ = ast.UnaryOp
3✔
534
            op_type = dict(
3✔
535
                UNARY_POSITIVE=ast.UAdd,
536
                UNARY_NEGATIVE=ast.USub,
537
                UNARY_NOT=ast.Not,
538
                UNARY_INVERT=ast.Invert,
539
            )[op_name]
540
            extra_filter = lambda e: isinstance(e.op, op_type)
3✔
541
        elif op_name in ('LOAD_ATTR', 'LOAD_METHOD', 'LOOKUP_METHOD'):
3✔
542
            typ = ast.Attribute
3✔
543
            ctx = ast.Load
3✔
544
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
545
        elif op_name in ('LOAD_NAME', 'LOAD_GLOBAL', 'LOAD_FAST', 'LOAD_DEREF', 'LOAD_CLASSDEREF'):
3✔
546
            typ = ast.Name
3✔
547
            ctx = ast.Load
3✔
548
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
549
        elif op_name in ('COMPARE_OP', 'IS_OP', 'CONTAINS_OP'):
3✔
550
            typ = ast.Compare
3✔
551
            extra_filter = lambda e: len(e.ops) == 1
3✔
552
        elif op_name.startswith(('STORE_SLICE', 'STORE_SUBSCR')):
3✔
553
            ctx = ast.Store
3✔
554
            typ = ast.Subscript
3✔
555
        elif op_name.startswith('STORE_ATTR'):
3✔
556
            ctx = ast.Store
3✔
557
            typ = ast.Attribute
3✔
558
            extra_filter = lambda e:mangled_name(e) == instruction.argval 
3✔
559
        else:
560
            raise RuntimeError(op_name)
3✔
561

562

563
        with lock:
3✔
564
            exprs = {
3✔
565
                cast(EnhancedAST, node)
566
                for stmt in stmts
567
                for node in ast.walk(stmt)
568
                if isinstance(node, typ)
569
                if isinstance(getattr(node, "ctx", None), ctx)
570
                if extra_filter(node)
571
                if statement_containing_node(node) == stmt
572
            }
573

574
            if ctx == ast.Store:
3✔
575
                # No special bytecode tricks here.
576
                # We can handle multiple assigned attributes with different names,
577
                # but only one assigned subscript.
578
                self.result = only(exprs)
3✔
579
                return
3✔
580

581
            matching = list(self.matching_nodes(exprs))
3✔
582
            if not matching and typ == ast.Call:
3✔
583
                self.find_decorator(stmts)
3✔
584
            else:
585
                self.result = only(matching)
3✔
586

587
    def find_decorator(self, stmts):
6✔
588
        # type: (Union[List[EnhancedAST], Set[EnhancedAST]]) -> None
589
        stmt = only(stmts)
3✔
590
        assert_(isinstance(stmt, (ast.ClassDef, function_node_types)))
3✔
591
        decorators = stmt.decorator_list # type: ignore[attr-defined]
3✔
592
        assert_(decorators)
3✔
593
        line_instructions = [
3✔
594
            inst
595
            for inst in self.clean_instructions(self.code)
596
            if inst.lineno == self.frame.f_lineno
597
        ]
598
        last_decorator_instruction_index = [
3✔
599
            i
600
            for i, inst in enumerate(line_instructions)
601
            if inst.opname == "CALL_FUNCTION"
602
        ][-1]
603
        assert_(
3✔
604
            line_instructions[last_decorator_instruction_index + 1].opname.startswith(
605
                "STORE_"
606
            )
607
        )
608
        decorator_instructions = line_instructions[
3✔
609
            last_decorator_instruction_index
610
            - len(decorators)
611
            + 1 : last_decorator_instruction_index
612
            + 1
613
        ]
614
        assert_({inst.opname for inst in decorator_instructions} == {"CALL_FUNCTION"})
3✔
615
        decorator_index = decorator_instructions.index(self.instruction)
3✔
616
        decorator = decorators[::-1][decorator_index]
3✔
617
        self.decorator = decorator
3✔
618
        self.result = stmt
3✔
619

620
    def clean_instructions(self, code):
6✔
621
        # type: (types.CodeType) -> List[EnhancedInstruction]
622
        return [
3✔
623
            inst
624
            for inst in get_instructions(code)
625
            if inst.opname not in ("EXTENDED_ARG", "NOP")
626
            if inst.lineno not in self.ignore_linenos
627
        ]
628

629
    def get_original_clean_instructions(self):
6✔
630
        # type: () -> List[EnhancedInstruction]
631
        result = self.clean_instructions(self.code)
3✔
632

633
        # pypy sometimes (when is not clear)
634
        # inserts JUMP_IF_NOT_DEBUG instructions in bytecode
635
        # If they're not present in our compiled instructions,
636
        # ignore them in the original bytecode
637
        if not any(
3!
638
                inst.opname == "JUMP_IF_NOT_DEBUG"
639
                for inst in self.compile_instructions()
640
        ):
641
            result = [
3✔
642
                inst for inst in result
643
                if inst.opname != "JUMP_IF_NOT_DEBUG"
644
            ]
645

646
        return result
3✔
647

648
    def matching_nodes(self, exprs):
6✔
649
        # type: (Set[EnhancedAST]) -> Iterator[EnhancedAST]
650
        original_instructions = self.get_original_clean_instructions()
3✔
651
        original_index = only(
3✔
652
            i
653
            for i, inst in enumerate(original_instructions)
654
            if inst == self.instruction
655
        )
656
        for expr_index, expr in enumerate(exprs):
3✔
657
            setter = get_setter(expr)
3✔
658
            assert setter is not None
3✔
659
            # noinspection PyArgumentList
660
            replacement = ast.BinOp(
3✔
661
                left=expr,
662
                op=ast.Pow(),
663
                right=ast.Str(s=sentinel),
664
            )
665
            ast.fix_missing_locations(replacement)
3✔
666
            setter(replacement)
3✔
667
            try:
3✔
668
                instructions = self.compile_instructions()
3✔
669
            finally:
670
                setter(expr)
3✔
671

672
            if sys.version_info >= (3, 10):
3✔
673
                try:
1✔
674
                    handle_jumps(instructions, original_instructions)
1✔
675
                except Exception:
1✔
676
                    # Give other candidates a chance
677
                    if TESTING or expr_index < len(exprs) - 1:
1!
678
                        continue
1✔
679
                    raise
×
680

681
            indices = [
3✔
682
                i
683
                for i, instruction in enumerate(instructions)
684
                if instruction.argval == sentinel
685
            ]
686

687
            # There can be several indices when the bytecode is duplicated,
688
            # as happens in a finally block in 3.9+
689
            # First we remove the opcodes caused by our modifications
690
            for index_num, sentinel_index in enumerate(indices):
3✔
691
                # Adjustment for removing sentinel instructions below
692
                # in past iterations
693
                sentinel_index -= index_num * 2
3✔
694

695
                assert_(instructions.pop(sentinel_index).opname == 'LOAD_CONST')
3✔
696
                assert_(instructions.pop(sentinel_index).opname == 'BINARY_POWER')
3✔
697

698
            # Then we see if any of the instruction indices match
699
            for index_num, sentinel_index in enumerate(indices):
3✔
700
                sentinel_index -= index_num * 2
3✔
701
                new_index = sentinel_index - 1
3✔
702

703
                if new_index != original_index:
3✔
704
                    continue
3✔
705

706
                original_inst = original_instructions[original_index]
3✔
707
                new_inst = instructions[new_index]
3✔
708

709
                # In Python 3.9+, changing 'not x in y' to 'not sentinel_transformation(x in y)'
710
                # changes a CONTAINS_OP(invert=1) to CONTAINS_OP(invert=0),<sentinel stuff>,UNARY_NOT
711
                if (
3✔
712
                        original_inst.opname == new_inst.opname in ('CONTAINS_OP', 'IS_OP')
713
                        and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
714
                        and (
715
                        original_instructions[original_index + 1].opname
716
                        != instructions[new_index + 1].opname == 'UNARY_NOT'
717
                )):
718
                    # Remove the difference for the upcoming assert
719
                    instructions.pop(new_index + 1)
2✔
720

721
                # Check that the modified instructions don't have anything unexpected
722
                # 3.10 is a bit too weird to assert this in all cases but things still work
723
                if sys.version_info < (3, 10):
3✔
724
                    for inst1, inst2 in zip_longest(
2✔
725
                        original_instructions, instructions
726
                    ):
727
                        assert_(inst1 and inst2 and opnames_match(inst1, inst2))
2✔
728

729
                yield expr
3✔
730

731
    def compile_instructions(self):
6✔
732
        # type: () -> List[EnhancedInstruction]
733
        module_code = compile_similar_to(self.tree, self.code)
3✔
734
        code = only(self.find_codes(module_code))
3✔
735
        return self.clean_instructions(code)
3✔
736

737
    def find_codes(self, root_code):
6✔
738
        # type: (types.CodeType) -> list
739
        checks = [
3✔
740
            attrgetter('co_firstlineno'),
741
            attrgetter('co_freevars'),
742
            attrgetter('co_cellvars'),
743
            lambda c: is_ipython_cell_code_name(c.co_name) or c.co_name,
744
        ] # type: List[Callable]
745
        if not self.is_pytest:
3✔
746
            checks += [
3✔
747
                attrgetter('co_names'),
748
                attrgetter('co_varnames'),
749
            ]
750

751
        def matches(c):
3✔
752
            # type: (types.CodeType) -> bool
753
            return all(
3✔
754
                f(c) == f(self.code)
755
                for f in checks
756
            )
757

758
        code_options = []
3✔
759
        if matches(root_code):
3✔
760
            code_options.append(root_code)
3✔
761

762
        def finder(code):
3✔
763
            # type: (types.CodeType) -> None
764
            for const in code.co_consts:
3✔
765
                if not inspect.iscode(const):
3✔
766
                    continue
3✔
767

768
                if matches(const):
3✔
769
                    code_options.append(const)
3✔
770
                finder(const)
3✔
771

772
        finder(root_code)
3✔
773
        return code_options
3✔
774

775
    def get_actual_current_instruction(self, lasti):
6✔
776
        # type: (int) -> EnhancedInstruction
777
        """
778
        Get the instruction corresponding to the current
779
        frame offset, skipping EXTENDED_ARG instructions
780
        """
781
        # Don't use get_original_clean_instructions
782
        # because we need the actual instructions including
783
        # EXTENDED_ARG
784
        instructions = list(get_instructions(self.code))
3✔
785
        index = only(
3✔
786
            i
787
            for i, inst in enumerate(instructions)
788
            if inst.offset == lasti
789
        )
790

791
        while True:
792
            instruction = instructions[index]
3✔
793
            if instruction.opname != "EXTENDED_ARG":
3✔
794
                return instruction
3✔
795
            index += 1
2✔
796

797

798

799
def non_sentinel_instructions(instructions, start):
6✔
800
    # type: (List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction]]
801
    """
802
    Yields (index, instruction) pairs excluding the basic
803
    instructions introduced by the sentinel transformation
804
    """
805
    skip_power = False
1✔
806
    for i, inst in islice(enumerate(instructions), start, None):
1✔
807
        if inst.argval == sentinel:
1✔
808
            assert_(inst.opname == "LOAD_CONST")
1✔
809
            skip_power = True
1✔
810
            continue
1✔
811
        elif skip_power:
1✔
812
            assert_(inst.opname == "BINARY_POWER")
1✔
813
            skip_power = False
1✔
814
            continue
1✔
815
        yield i, inst
1✔
816

817

818
def walk_both_instructions(original_instructions, original_start, instructions, start):
6✔
819
    # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Iterator[Tuple[int, EnhancedInstruction, int, EnhancedInstruction]]
820
    """
821
    Yields matching indices and instructions from the new and original instructions,
822
    leaving out changes made by the sentinel transformation.
823
    """
824
    original_iter = islice(enumerate(original_instructions), original_start, None)
1✔
825
    new_iter = non_sentinel_instructions(instructions, start)
1✔
826
    inverted_comparison = False
1✔
827
    while True:
828
        try:
1✔
829
            original_i, original_inst = next(original_iter)
1✔
830
            new_i, new_inst = next(new_iter)
1✔
831
        except StopIteration:
1✔
832
            return
1✔
833
        if (
1✔
834
            inverted_comparison
835
            and original_inst.opname != new_inst.opname == "UNARY_NOT"
836
        ):
837
            new_i, new_inst = next(new_iter)
1✔
838
        inverted_comparison = (
1✔
839
            original_inst.opname == new_inst.opname in ("CONTAINS_OP", "IS_OP")
840
            and original_inst.arg != new_inst.arg # type: ignore[attr-defined]
841
        )
842
        yield original_i, original_inst, new_i, new_inst
1✔
843

844

845
def handle_jumps(instructions, original_instructions):
6✔
846
    # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> None
847
    """
848
    Transforms instructions in place until it looks more like original_instructions.
849
    This is only needed in 3.10+ where optimisations lead to more drastic changes
850
    after the sentinel transformation.
851
    Replaces JUMP instructions that aren't also present in original_instructions
852
    with the sections that they jump to until a raise or return.
853
    In some other cases duplication found in `original_instructions`
854
    is replicated in `instructions`.
855
    """
856
    while True:
857
        for original_i, original_inst, new_i, new_inst in walk_both_instructions(
1✔
858
            original_instructions, 0, instructions, 0
859
        ):
860
            if opnames_match(original_inst, new_inst):
1✔
861
                continue
1✔
862

863
            if "JUMP" in new_inst.opname and "JUMP" not in original_inst.opname:
1✔
864
                # Find where the new instruction is jumping to, ignoring
865
                # instructions which have been copied in previous iterations
866
                start = only(
1✔
867
                    i
868
                    for i, inst in enumerate(instructions)
869
                    if inst.offset == new_inst.argval
870
                    and not getattr(inst, "_copied", False)
871
                )
872
                # Replace the jump instruction with the jumped to section of instructions
873
                # That section may also be deleted if it's not similarly duplicated
874
                # in original_instructions
875
                new_instructions = handle_jump(
1✔
876
                    original_instructions, original_i, instructions, start
877
                )
878
                assert new_instructions is not None
1✔
879
                instructions[new_i : new_i + 1] = new_instructions            
1✔
880
            else:
881
                # Extract a section of original_instructions from original_i to return/raise
882
                orig_section = []
1✔
883
                for section_inst in original_instructions[original_i:]:
1!
884
                    orig_section.append(section_inst)
1✔
885
                    if section_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
1✔
886
                        break
1✔
887
                else:
888
                    # No return/raise - this is just a mismatch we can't handle
889
                    raise AssertionError
×
890

891
                instructions[new_i:new_i] = only(find_new_matching(orig_section, instructions))
1✔
892

893
            # instructions has been modified, the for loop can't sensibly continue
894
            # Restart it from the beginning, checking for other issues
895
            break
1✔
896

897
        else:  # No mismatched jumps found, we're done
898
            return
1✔
899

900

901
def find_new_matching(orig_section, instructions):
6✔
902
    # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> Iterator[List[EnhancedInstruction]]
903
    """
904
    Yields sections of `instructions` which match `orig_section`.
905
    The yielded sections include sentinel instructions, but these
906
    are ignored when checking for matches.
907
    """
908
    for start in range(len(instructions) - len(orig_section)):
1✔
909
        indices, dup_section = zip(
1✔
910
            *islice(
911
                non_sentinel_instructions(instructions, start),
912
                len(orig_section),
913
            )
914
        )
915
        if len(dup_section) < len(orig_section):
1✔
916
            return
1✔
917
        if sections_match(orig_section, dup_section):
1✔
918
            yield instructions[start:indices[-1] + 1]
1✔
919

920

921
def handle_jump(original_instructions, original_start, instructions, start):
6✔
922
    # type: (List[EnhancedInstruction], int, List[EnhancedInstruction], int) -> Optional[List[EnhancedInstruction]]
923
    """
924
    Returns the section of instructions starting at `start` and ending
925
    with a RETURN_VALUE or RAISE_VARARGS instruction.
926
    There should be a matching section in original_instructions starting at original_start.
927
    If that section doesn't appear elsewhere in original_instructions,
928
    then also delete the returned section of instructions.
929
    """
930
    for original_j, original_inst, new_j, new_inst in walk_both_instructions(
1!
931
        original_instructions, original_start, instructions, start
932
    ):
933
        assert_(opnames_match(original_inst, new_inst))
1✔
934
        if original_inst.opname in ("RETURN_VALUE", "RAISE_VARARGS"):
1✔
935
            inlined = deepcopy(instructions[start : new_j + 1])
1✔
936
            for inl in inlined:
1✔
937
                inl._copied = True
1✔
938
            orig_section = original_instructions[original_start : original_j + 1]
1✔
939
            if not check_duplicates(
1✔
940
                original_start, orig_section, original_instructions
941
            ):
942
                instructions[start : new_j + 1] = []
1✔
943
            return inlined
1✔
944
    
945
    return None
×
946

947

948
def check_duplicates(original_i, orig_section, original_instructions):
6✔
949
    # type: (int, List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
950
    """
951
    Returns True if a section of original_instructions starting somewhere other
952
    than original_i and matching orig_section is found, i.e. orig_section is duplicated.
953
    """
954
    for dup_start in range(len(original_instructions)):
1!
955
        if dup_start == original_i:
1✔
956
            continue
1✔
957
        dup_section = original_instructions[dup_start : dup_start + len(orig_section)]
1✔
958
        if len(dup_section) < len(orig_section):
1✔
959
            return False
1✔
960
        if sections_match(orig_section, dup_section):
1✔
961
            return True
1✔
962
    
963
    return False
×
964

965
def sections_match(orig_section, dup_section):
6✔
966
    # type: (List[EnhancedInstruction], List[EnhancedInstruction]) -> bool
967
    """
968
    Returns True if the given lists of instructions have matching linenos and opnames.
969
    """
970
    return all(
1✔
971
        (
972
            orig_inst.lineno == dup_inst.lineno
973
            # POP_BLOCKs have been found to have differing linenos in innocent cases
974
            or "POP_BLOCK" == orig_inst.opname == dup_inst.opname
975
        )
976
        and opnames_match(orig_inst, dup_inst)
977
        for orig_inst, dup_inst in zip(orig_section, dup_section)
978
    )
979

980

981
def opnames_match(inst1, inst2):
6✔
982
    # type: (Instruction, Instruction) -> bool
983
    return (
3✔
984
        inst1.opname == inst2.opname
985
        or "JUMP" in inst1.opname
986
        and "JUMP" in inst2.opname
987
        or (inst1.opname == "PRINT_EXPR" and inst2.opname == "POP_TOP")
988
        or (
989
            inst1.opname in ("LOAD_METHOD", "LOOKUP_METHOD")
990
            and inst2.opname == "LOAD_ATTR"
991
        )
992
        or (inst1.opname == "CALL_METHOD" and inst2.opname == "CALL_FUNCTION")
993
    )
994

995

996
def get_setter(node):
6✔
997
    # type: (EnhancedAST) -> Optional[Callable[[ast.AST], None]]
998
    parent = node.parent
3✔
999
    for name, field in ast.iter_fields(parent):
3!
1000
        if field is node:
3✔
1001
            def setter(new_node):
3✔
1002
                # type: (ast.AST) -> None
1003
                return setattr(parent, name, new_node)
3✔
1004
            return setter
3✔
1005
        elif isinstance(field, list):
3✔
1006
            for i, item in enumerate(field):
3✔
1007
                if item is node:
3✔
1008
                    def setter(new_node):
3✔
1009
                        # type: (ast.AST) -> None
1010
                        field[i] = new_node
3✔
1011

1012
                    return setter
3✔
1013
    return None
×
1014

1015
lock = RLock()
6✔
1016

1017

1018
@cache
6✔
1019
def statement_containing_node(node):
6✔
1020
    # type: (ast.AST) -> EnhancedAST
1021
    while not isinstance(node, ast.stmt):
6✔
1022
        node = cast(EnhancedAST, node).parent
6✔
1023
    return cast(EnhancedAST, node)
6✔
1024

1025

1026
def assert_linenos(tree):
6✔
1027
    # type: (ast.AST) -> Iterator[int]
1028
    for node in ast.walk(tree):
3✔
1029
        if (
3✔
1030
                hasattr(node, 'parent') and
1031
                isinstance(statement_containing_node(node), ast.Assert)
1032
        ):
1033
            for lineno in node_linenos(node):
3✔
1034
                yield lineno
3✔
1035

1036

1037
def _extract_ipython_statement(stmt):
6✔
1038
    # type: (EnhancedAST) -> ast.Module
1039
    # IPython separates each statement in a cell to be executed separately
1040
    # So NodeFinder should only compile one statement at a time or it
1041
    # will find a code mismatch.
1042
    while not isinstance(stmt.parent, ast.Module):
6!
1043
        stmt = stmt.parent
×
1044
    # use `ast.parse` instead of `ast.Module` for better portability
1045
    # python3.8 changes the signature of `ast.Module`
1046
    # Inspired by https://github.com/pallets/werkzeug/pull/1552/files
1047
    tree = ast.parse("")
6✔
1048
    tree.body = [cast(ast.stmt, stmt)]
6✔
1049
    ast.copy_location(tree, stmt)
6✔
1050
    return tree
6✔
1051

1052

1053
def is_ipython_cell_code_name(code_name):
6✔
1054
    # type: (str) -> bool
1055
    return bool(re.match(r"(<module>|<cell line: \d+>)$", code_name))
6✔
1056

1057

1058
def is_ipython_cell_filename(filename):
6✔
1059
    # type: (str) -> bool
1060
    return bool(re.search(r"<ipython-input-|[/\\]ipykernel_\d+[/\\]", filename))
6✔
1061

1062

1063
def is_ipython_cell_code(code_obj):
6✔
1064
    # type: (types.CodeType) -> bool
1065
    return (
6✔
1066
        is_ipython_cell_filename(code_obj.co_filename) and
1067
        is_ipython_cell_code_name(code_obj.co_name)
1068
    )
1069

1070

1071
def find_node_ipython(frame, lasti, stmts, source):
6✔
1072
    # type: (types.FrameType, int, Set[EnhancedAST], Source) -> Tuple[Optional[Any], Optional[Any]]
1073
    node = decorator = None
6✔
1074
    for stmt in stmts:
6✔
1075
        tree = _extract_ipython_statement(stmt)
6✔
1076
        try:
6✔
1077
            node_finder = NodeFinder(frame, stmts, tree, lasti, source)
6✔
1078
            if (node or decorator) and (node_finder.result or node_finder.decorator):
6✔
1079
                # Found potential nodes in separate statements,
1080
                # cannot resolve ambiguity, give up here
1081
                return None, None
6✔
1082

1083
            node = node_finder.result
6✔
1084
            decorator = node_finder.decorator
6✔
1085
        except Exception:
×
1086
            pass
×
1087
    return decorator, node
6✔
1088

1089

1090

1091
def node_linenos(node):
6✔
1092
    # type: (ast.AST) -> Iterator[int]
1093
    if hasattr(node, "lineno"):
6✔
1094
        linenos = [] # type: Sequence[int]
6✔
1095
        if hasattr(node, "end_lineno") and isinstance(node, ast.expr):
6✔
1096
            assert node.end_lineno is not None # type: ignore[attr-defined]
6✔
1097
            linenos = range(node.lineno, node.end_lineno + 1) # type: ignore[attr-defined]
6✔
1098
        else:
1099
            linenos = [node.lineno] # type: ignore[attr-defined]
6✔
1100
        for lineno in linenos:
6✔
1101
            yield lineno
6✔
1102

1103

1104
if sys.version_info >= (3, 11):
6✔
1105
    from ._position_node_finder import PositionNodeFinder as NodeFinder
3✔
1106
else:
1107
    NodeFinder = SentinelNodeFinder
3✔
1108

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc