• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

beartype / plum / 6423029888

05 Oct 2023 06:29PM UTC coverage: 99.009%. First build
6423029888

Pull #112

github

PhilipVinc
Add function name to resolver

This is needed because to generate docstrings we use this wrapping trick, but the wrapping trick breaks inspect so we cannot really use it in the higher level logic
Pull Request #112: [2/3] Split Signature into Signature and Method classes (preparatory work for pretty PR)

124 of 134 new or added lines in 4 files covered. (92.54%)

999 of 1009 relevant lines covered (99.01%)

3.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.71
/plum/function.py
1
import os
4✔
2
import textwrap
4✔
3
from copy import copy
4✔
4
from functools import wraps
4✔
5
from types import MethodType
4✔
6
from typing import Any, Callable, List, Optional, Tuple, TypeVar, Union
4✔
7

8
from .method import Method
4✔
9
from .resolver import AmbiguousLookupError, NotFoundLookupError, Resolver
4✔
10
from .signature import Signature, append_default_args
4✔
11
from .type import resolve_type_hint
4✔
12
from .util import TypeHint, repr_short
4✔
13

14
__all__ = ["Function"]
4✔
15

16

17
_promised_convert = None
4✔
18
"""function or None: This will be set to :func:`.parametric.convert`."""
2✔
19

20
# `typing.Self` is available for Python 3.11 and higher.
21
try:  # pragma: specific no cover 3.11
22
    from typing import Self
23
except ImportError:  # pragma: specific no cover 3.8 3.9 3.10
24
    Self = TypeVar("Self", bound="Function")
25

26
SomeExceptionType = TypeVar("SomeExceptionType", bound=Exception)
4✔
27

28

29
def _convert(obj: Any, target_type: TypeHint) -> Any:
4✔
30
    """Convert an object to a particular type. Only converts if `target_type` is set.
31

32
    Args:
33
        obj (object): Object to convert.
34
        target_type (type): Type to convert to.
35

36
    Returns:
37
        object: `object_to_covert` converted to type of `obj_from_target`.
38
    """
39
    if target_type is Any:
4✔
40
        return obj
4✔
41
    else:
42
        return _promised_convert(obj, target_type)
4✔
43

44

45
_owner_transfer = {}
4✔
46
"""dict[type, type]: When the keys of this dictionary are detected as the owner of
2✔
47
a function (see :meth:`Function.owner`), make the corresponding value the owner."""
48

49

50
class _FunctionMeta(type):
4✔
51
    """:class:`Function` implements `__doc__`, which overrides the docstring of the
52
    class. This simple metaclass ensures that `Function.__doc__` still prints as the
53
    docstring of the class."""
54

55
    @property
4✔
56
    def __doc__(self):
4✔
57
        return self._class_doc
4✔
58

59

60
class Function(metaclass=_FunctionMeta):
4✔
61
    """A function.
62

63
    Args:
64
        f (function): Function that is wrapped.
65
        owner (str, optional): Name of the class that owns the function.
66
    """
67

68
    # When we set `__doc__`, we will lose the docstring of the class, so we save it now.
69
    # Correctly printing the docstring is handled by :class:`_FunctionMeta`.
70
    _class_doc = __doc__
4✔
71

72
    _instances = []
4✔
73

74
    def __init__(self, f: Callable, owner: Optional[str] = None) -> None:
4✔
75
        Function._instances.append(self)
4✔
76

77
        self._f: Callable = f
4✔
78
        self._cache = {}
4✔
79
        wraps(f)(self)  # Sets `self._doc`.
4✔
80

81
        # `owner` is the name of the owner. We will later attempt to resolve to
82
        # which class it actually points.
83
        self._owner_name: Optional[str] = owner
4✔
84
        self._owner: Optional[type] = None
4✔
85

86
        # Initialise pending and resolved methods.
87
        self._pending: List[Tuple[Callable, Optional[Signature], int]] = []
4✔
88
        self._resolver = Resolver(function_name=self.__name__)
4✔
89
        self._resolved: List[Tuple[Callable, Signature, int]] = []
4✔
90

91
    @property
4✔
92
    def owner(self):
4✔
93
        """object or None: Owner of the function. If `None`, then there is no owner."""
94
        if self._owner is None and self._owner_name is not None:
4✔
95
            name = self._owner_name.split(".")[-1]
4✔
96
            self._owner = self._f.__globals__[name]
4✔
97
            # Check if the ownership needs to be transferred to another class. This
98
            # can be very important for preventing infinite loops.
99
            while self._owner in _owner_transfer:
4✔
100
                self._owner = _owner_transfer[self._owner]
4✔
101
        return self._owner
4✔
102

103
    @property
4✔
104
    def __doc__(self) -> Optional[str]:
4✔
105
        """str or None: Documentation of the function. This consists of the
106
        documentation of the function given at initialisation with the documentation
107
        of all other registered methods appended.
108

109
        Upon instantiation, this property is available through `obj.__doc__`.
110
        """
111
        try:
4✔
112
            self._resolve_pending_registrations()
4✔
113
        except NameError:  # pragma: specific no cover 3.7 3.8 3.9
114
            # When `staticmethod` is combined with
115
            # `from __future__ import annotations`, in Python 3.10 and higher
116
            # `staticmethod` will attempt to inherit `__doc__` (see
117
            # https://docs.python.org/3/library/functions.html#staticmethod). Since
118
            # we are still in class construction, forward references are not yet
119
            # defined, so attempting to resolve all pending methods might fail with a
120
            # `NameError`. This is fine, because later calling `__doc__` on the
121
            # `staticmethod` will again call this `__doc__`, at which point all methods
122
            # will resolve properly. For now, we just ignore the error and undo the
123
            # partially completed :meth:`Function._resolve_pending_registrations` by
124
            # clearing the cache.
125
            self.clear_cache(reregister=False)
126

127
        # Don't do any fancy appending of docstrings when the environment variable
128
        # `PLUM_SIMPLE_DOC` is set to `1`.
129
        if "PLUM_SIMPLE_DOC" in os.environ and os.environ["PLUM_SIMPLE_DOC"] == "1":
4✔
130
            return self._doc
4✔
131

132
        # Derive the basis of the docstring from `self._f`, removing any indentation.
133
        doc = self._doc.strip()
4✔
134
        if doc:
4✔
135
            # Do not include the first line when removing the indentation.
136
            lines = doc.splitlines()
4✔
137
            doc = lines[0]
4✔
138
            # There might not be more than one line.
139
            if len(lines) > 1:
4✔
140
                doc += "\n" + textwrap.dedent("\n".join(lines[1:]))
4✔
141

142
        # Append the docstrings of all other implementations to it. Exclude the
143
        # docstring from `self._f`, because that one forms the basis (see boave).
144
        resolver_doc = self._resolver.doc(exclude=self._f)
4✔
145
        if resolver_doc:
4✔
146
            # Add a newline if the documentation is non-empty.
147
            if doc:
4✔
148
                doc = doc + "\n\n"
4✔
149
            doc += resolver_doc
4✔
150
            # Replace separators with horizontal lines of the right length.
151
            separator_length = max(map(len, doc.splitlines()))
4✔
152
            doc = doc.replace("<separator>", "-" * separator_length)
4✔
153

154
        # If the docstring is empty, return `None`, which is consistent with omitting
155
        # the docstring.
156
        return doc if doc else None
4✔
157

158
    @__doc__.setter
4✔
159
    def __doc__(self, value: str) -> None:
4✔
160
        # Ensure that `self._doc` remains a string.
161
        self._doc = value if value else ""
4✔
162

163
    @property
4✔
164
    def methods(self) -> List[Signature]:
4✔
165
        """list[:class:`.signature.Signature`]: All available methods."""
166
        self._resolve_pending_registrations()
4✔
167
        return self._resolver.methods
4✔
168

169
    def dispatch(
4✔
170
        self: Self, method: Optional[Callable] = None, precedence=0
171
    ) -> Union[Self, Callable[[Callable], Self]]:
172
        """Decorator to extend the function with another signature.
173

174
        Args:
175
            precedence (int, optional): Precedence of the signature. Defaults to `0`.
176

177
        Returns:
178
            function: Decorator.
179
        """
180
        if method is None:
4✔
181
            return lambda m: self.dispatch(m, precedence=precedence)
4✔
182

183
        self.register(method, precedence=precedence)
4✔
184
        return self
4✔
185

186
    def dispatch_multi(
4✔
187
        self: Self, *signatures: Union[Signature, Tuple[TypeHint, ...]]
188
    ) -> Callable[[Callable], Self]:
189
        """Decorator to extend the function with multiple signatures at once.
190

191
        Args:
192
            *signatures (tuple or :class:`.signature.Signature`): Signatures to
193
                register.
194

195
        Returns:
196
            function: Decorator.
197
        """
198
        resolved_signatures = []
4✔
199
        for signature in signatures:
4✔
200
            if isinstance(signature, Signature):
4✔
201
                resolved_signatures.append(signature)
4✔
202
            elif isinstance(signature, tuple):
4✔
203
                resolved_signatures.append(Signature(*signature))
4✔
204
            else:
205
                raise ValueError(
4✔
206
                    f"Signature `{signature}` must be a tuple or of type "
207
                    f"`plum.signature.Signature`."
208
                )
209

210
        def decorator(method):
4✔
211
            # The precedence will not be used, so we can safely set it to `None`.
212
            for signature in resolved_signatures:
4✔
213
                self.register(method, signature=signature, precedence=None)
4✔
214
            return self
4✔
215

216
        return decorator
4✔
217

218
    def clear_cache(self, reregister: bool = True) -> None:
4✔
219
        """Clear cache.
220

221
        Args:
222
            reregister (bool, optional): Also reregister all methods. Defaults to
223
                `True`.
224
        """
225
        self._cache.clear()
4✔
226

227
        if reregister:
4✔
228
            # Add all resolved to pending.
229
            self._pending.extend(self._resolved)
4✔
230

231
            # Clear resolved.
232
            self._resolved = []
4✔
233
            self._resolver = Resolver()
4✔
234

235
    def register(
4✔
236
        self, f: Callable, signature: Optional[Signature] = None, precedence=0
237
    ) -> None:
238
        """Register a method.
239

240
        Either `signature` or `precedence` must be given.
241

242
        Args:
243
            f (function): Function that implements the method.
244
            signature (:class:`.signature.Signature`, optional): Signature. If it is
245
                not given, it will be derived from `f`.
246
            precedence (int, optional): Precedence of the function. If `signature` is
247
                given, then this argument will not be used. Defaults to `0`.
248
        """
249
        self._pending.append((f, signature, precedence))
4✔
250

251
    def _resolve_pending_registrations(self) -> None:
4✔
252
        # Keep track of whether anything registered.
253
        registered = False
4✔
254

255
        # Perform any pending registrations.
256
        for f, signature, precedence in self._pending:
4✔
257
            # Add to resolved registrations.
258
            self._resolved.append((f, signature, precedence))
4✔
259

260
            # Obtain the signature if it is not available.
261
            if signature is None:
4✔
262
                signature = Signature.from_callable(f, precedence=precedence)
4✔
263
            else:
264
                # Ensure that the implementation is `f`, but make a copy before
265
                # mutating.
266
                signature = copy(signature)
4✔
267

268
            # Process default values.
269
            for subsignature in append_default_args(signature, f):
4✔
270
                submethod = Method(f, subsignature, function_name=self.__name__)
4✔
271
                self._resolver.register(submethod)
4✔
272
                registered = True
4✔
273

274
        if registered:
4✔
275
            self._pending = []
4✔
276

277
            # Clear cache.
278
            self.clear_cache(reregister=False)
4✔
279

280
    def _enhance_exception(self, e: SomeExceptionType) -> SomeExceptionType:
4✔
281
        """Enchance an exception by prepending a prefix to the message of the exception
282
        which specifies that the message is for this function.
283

284
        Args:
285
            e (:class:`Exception`): Exception.
286

287
        Returns:
288
            :class:`Exception`: `e`, but with a prefix appended to the message.
289
        """
290
        # Specify to which function the message pertains.
291
        prefix = f"For function `{self.__name__}`"
4✔
292
        if self.owner:
4✔
293
            prefix += f" of `{repr_short(self.owner)}`"
4✔
294
        prefix = prefix + ", "
4✔
295
        # Return a new exception of the same type which incorporates the prefix.
296
        message = str(e)
4✔
297
        return type(e)(prefix + message[0].lower() + message[1:])
4✔
298

299
    def resolve_method(
4✔
300
        self, target: Union[Tuple[object, ...], Signature]
301
    ) -> Tuple[Callable, TypeHint]:
302
        """Find the method and return type for arguments.
303

304
        Args:
305
            target (object): Target.
306

307
        Returns:
308
            function: Method.
309
            type: Return type.
310
        """
311
        self._resolve_pending_registrations()
4✔
312

313
        try:
4✔
314
            # Attempt to find the method using the resolver.
315
            method = self._resolver.resolve(target)
4✔
316
            impl = method.implementation
4✔
317
            return_type = method.return_type
4✔
318

319
        except AmbiguousLookupError as e:
4✔
320
            __tracebackhide__ = True
4✔
321
            raise self._enhance_exception(e) from None  # Specify this function.
4✔
322

323
        except NotFoundLookupError as e:
4✔
324
            __tracebackhide__ = True
4✔
325

326
            e = self._enhance_exception(e)  # Specify this function.
4✔
327
            impl, return_type = self._handle_not_found_lookup_error(e)
4✔
328

329
        return impl, return_type
4✔
330

331
    def _handle_not_found_lookup_error(
4✔
332
        self, ex: NotFoundLookupError
333
    ) -> Tuple[Callable, TypeHint]:
334
        if not self.owner:
4✔
335
            # Not in a class. Nothing we can do.
336
            raise ex from None
4✔
337

338
        # In a class. Walk through the classes in the class's MRO, except for this
339
        # class, and try to get the method.
340
        method = None
4✔
341
        return_type = object
4✔
342

343
        for c in self.owner.__mro__[1:]:
4✔
344
            # Skip the top of the type hierarchy given by `object` and `type`. We do
345
            # not suddenly want to fall back to any unexpected default behaviour.
346
            if c in {object, type}:
4✔
347
                continue
4✔
348

349
            # We need to check `c.__dict__` here instead of using `hasattr` since e.g.
350
            # `c.__le__` will return  even if `c` does not implement `__le__`!
351
            if self._f.__name__ in c.__dict__:
4✔
352
                method = getattr(c, self._f.__name__)
4✔
353
            else:
354
                # For some reason, coverage fails to catch the `continue` below. Add
355
                # the do-nothing `_ = None` fixes this.
356
                # TODO: Remove this once coverage properly catches this.
357
                _ = None
4✔
358
                continue
4✔
359

360
            # Ignore abstract methods.
361
            if getattr(method, "__isabstractmethod__", False):
4✔
362
                method = None
4✔
363
                continue
4✔
364

365
            # We found a good candidate. Break.
366
            break
4✔
367

368
        if not method:
4✔
369
            # If no method has been found after walking through the MRO, raise the
370
            # original exception.
371
            raise ex from None
4✔
372
        return method, return_type
4✔
373

374
    def __call__(self, *args, **kw_args):
4✔
375
        __tracebackhide__ = True
4✔
376
        method, return_type = self._resolve_method_with_cache(args=args)
4✔
377
        return _convert(method(*args, **kw_args), return_type)
4✔
378

379
    def _resolve_method_with_cache(
4✔
380
        self,
381
        args: Union[Tuple[object, ...], Signature, None] = None,
382
        types: Optional[Tuple[TypeHint, ...]] = None,
383
    ) -> Tuple[Callable, TypeHint]:
384
        if args is None and types is None:
4✔
385
            raise ValueError(
4✔
386
                "Arguments `args` and `types` cannot both be `None`. "
387
                "This should never happen!"
388
            )
389

390
        # Before attempting to use the cache, resolve any unresolved registrations. Use
391
        # an `if`-statement to speed up the common case.
392
        if self._pending:
4✔
393
            self._resolve_pending_registrations()
4✔
394

395
        if types is None:
4✔
396
            # Attempt to use the cache based on the types of the arguments.
397
            types = tuple(map(type, args))
4✔
398
        try:
4✔
399
            return self._cache[types]
4✔
400
        except KeyError:
4✔
401
            __tracebackhide__ = True
4✔
402

403
            if args is None:
4✔
404
                args = Signature(*(resolve_type_hint(t) for t in types))
4✔
405

406
            # Cache miss. Run the resolver based on the arguments.
407
            method, return_type = self.resolve_method(args)
4✔
408
            # If the resolver is faithful, then we can perform caching using the types
409
            # of the arguments. If the resolver is not faithful, then we cannot.
410
            if self._resolver.is_faithful:
4✔
411
                self._cache[types] = method, return_type
4✔
412
            return method, return_type
4✔
413

414
    def invoke(self, *types: TypeHint) -> Callable:
4✔
415
        """Invoke a particular method.
416

417
        Args:
418
            *types: Types to resolve.
419

420
        Returns:
421
            function: Method.
422
        """
423
        method, return_type = self._resolve_method_with_cache(types=types)
4✔
424

425
        @wraps(self._f)
4✔
426
        def wrapped_method(*args, **kw_args):
4✔
427
            return _convert(method(*args, **kw_args), return_type)
4✔
428

429
        return wrapped_method
4✔
430

431
    def __get__(self, instance, owner):
4✔
432
        if instance is not None:
4✔
433
            return MethodType(_BoundFunction(self, instance), instance)
4✔
434
        else:
435
            return self
4✔
436

437
    def __repr__(self) -> str:
4✔
438
        return (
4✔
439
            f"<function {self._f} with {len(self._resolver)} registered and"
440
            f" {len(self._pending)} pending method(s)>"
441
        )
442

443

444
def _generate_qualname(f: Callable) -> str:
4✔
445
    # modname = getattr(f, "__module__", "")
446
    # if modname is not None and len(modname) > 0:
447
    #     modname = f"{modname}."
448
    # Todo: if we ever want to scope functions, we can
449
    # just uncomment the code above.
NEW
450
    modname = ""
×
451

NEW
452
    qualname = getattr(f, "__qualname__", None)
×
NEW
453
    if qualname is not None and len(modname) > 0:
×
NEW
454
        qualname = f"{modname}{qualname}"
×
NEW
455
    qualname = qualname.replace("__main__.", "")
×
456

NEW
457
    name = getattr(f, "__name__", "")
×
NEW
458
    return name, qualname
×
459

460

461
class _BoundFunction:
4✔
462
    """A bound instance of `.function.Function`.
463

464
    Args:
465
        f (:class:`.function.Function`): Bound function.
466
        instance (object): Instance to which the function is bound.
467
    """
468

469
    def __init__(self, f, instance):
4✔
470
        self._f = f
4✔
471
        wraps(f._f)(self)  # This will call the setter for `__doc__`.
4✔
472
        self._instance = instance
4✔
473

474
    @property
4✔
475
    def __doc__(self):
4✔
476
        return self._f.__doc__
4✔
477

478
    @__doc__.setter
4✔
479
    def __doc__(self, value):
4✔
480
        # Don't need to do anything here. The docstring will be derived from `self._f`.
481
        # We, however, do need to implement this method, because :func:`wraps` calls
482
        # it.
483
        pass
4✔
484

485
    def __call__(self, _, *args, **kw_args):
4✔
486
        return self._f(self._instance, *args, **kw_args)
4✔
487

488
    def invoke(self, *types):
4✔
489
        """See :meth:`.Function.invoke`."""
490

491
        @wraps(self._f._f)
4✔
492
        def wrapped_method(*args, **kw_args):
4✔
493
            # TODO: Can we do this without `type` here?
494
            method = self._f.invoke(type(self._instance), *types)
4✔
495
            return method(self._instance, *args, **kw_args)
4✔
496

497
        return wrapped_method
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc