• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

beartype / plum / 6424098959

05 Oct 2023 08:24PM UTC coverage: 96.955% (-3.0%) from 100.0%
6424098959

push

github

GitHub
Try to fix coveralls (#116)

1019 of 1051 relevant lines covered (96.96%)

3.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.54
/plum/function.py
1
import os
4✔
2
import textwrap
4✔
3
from copy import copy
4✔
4
from functools import wraps
4✔
5
from types import MethodType
4✔
6
from typing import Any, Callable, List, Optional, Tuple, TypeVar, Union
4✔
7

8
from .resolver import AmbiguousLookupError, NotFoundLookupError, Resolver
4✔
9
from .signature import Signature, append_default_args, extract_signature
4✔
10
from .type import resolve_type_hint
4✔
11
from .util import TypeHint, repr_short
4✔
12

13
__all__ = ["Function"]
4✔
14

15

16
_promised_convert = None
4✔
17
"""function or None: This will be set to :func:`.parametric.convert`."""
18

19
# `typing.Self` is available for Python 3.11 and higher.
20
try:  # pragma: specific no cover 3.11
2✔
21
    from typing import Self
2✔
22
except ImportError:  # pragma: specific no cover 3.8 3.9 3.10
2✔
23
    Self = TypeVar("Self", bound="Function")
2✔
24

25
SomeExceptionType = TypeVar("SomeExceptionType", bound=Exception)
2✔
26

27

28
def _convert(obj: Any, target_type: TypeHint) -> Any:
2✔
29
    """Convert an object to a particular type. Only converts if `target_type` is set.
30

31
    Args:
32
        obj (object): Object to convert.
33
        target_type (type): Type to convert to.
34

35
    Returns:
36
        object: `object_to_covert` converted to type of `obj_from_target`.
37
    """
38
    if target_type is Any:
4✔
39
        return obj
4✔
40
    else:
×
41
        return _promised_convert(obj, target_type)
4✔
42

43

44
def _change_function_name(f: Callable, name: str) -> Callable:
4✔
45
    """It is not always the case that `f.__name__` is writable. To solve this, first
46
    create a temporary function that wraps `f` and then change the name.
47

48
    Args:
49
        f (function): Function to change the name of.
50
        name (str): New name.
51

52
    Returns:
53
        function: Function that wraps `f` and has name `name`.
54
    """
55

56
    @wraps(f)
4✔
57
    def f_renamed(*args, **kw_args):
4✔
58
        return f(*args, **kw_args)
4✔
59

60
    f_renamed.__name__ = name
4✔
61
    return f_renamed
4✔
62

63

64
_owner_transfer = {}
4✔
65
"""dict[type, type]: When the keys of this dictionary are detected as the owner of
66
a function (see :meth:`Function.owner`), make the corresponding value the owner."""
67

68

69
class _FunctionMeta(type):
4✔
70
    """:class:`Function` implements `__doc__`, which overrides the docstring of the
71
    class. This simple metaclass ensures that `Function.__doc__` still prints as the
72
    docstring of the class."""
73

74
    @property
4✔
75
    def __doc__(self):
4✔
76
        return self._class_doc
4✔
77

78

79
class Function(metaclass=_FunctionMeta):
4✔
80
    """A function.
81

82
    Args:
83
        f (function): Function that is wrapped.
84
        owner (str, optional): Name of the class that owns the function.
85
    """
86

87
    # When we set `__doc__`, we will lose the docstring of the class, so we save it now.
88
    # Correctly printing the docstring is handled by :class:`_FunctionMeta`.
89
    _class_doc = __doc__
4✔
90

91
    _instances = []
4✔
92

93
    def __init__(self, f: Callable, owner: Optional[str] = None) -> None:
4✔
94
        Function._instances.append(self)
4✔
95

96
        self._f: Callable = f
4✔
97
        self._cache = {}
4✔
98
        wraps(f)(self)  # Sets `self._doc`.
4✔
99

100
        # `owner` is the name of the owner. We will later attempt to resolve to
101
        # which class it actually points.
102
        self._owner_name: Optional[str] = owner
4✔
103
        self._owner: Optional[type] = None
4✔
104

105
        # Initialise pending and resolved methods.
106
        self._pending: List[Tuple[Callable, Optional[Signature], int]] = []
4✔
107
        self._resolver = Resolver()
4✔
108
        self._resolved: List[Tuple[Callable, Signature, int]] = []
4✔
109

110
    @property
4✔
111
    def owner(self):
4✔
112
        """object or None: Owner of the function. If `None`, then there is no owner."""
113
        if self._owner is None and self._owner_name is not None:
114
            name = self._owner_name.split(".")[-1]
115
            self._owner = self._f.__globals__[name]
116
            # Check if the ownership needs to be transferred to another class. This
117
            # can be very important for preventing infinite loops.
118
            while self._owner in _owner_transfer:
119
                self._owner = _owner_transfer[self._owner]
120
        return self._owner
121

122
    @property
123
    def __doc__(self) -> Optional[str]:
124
        """str or None: Documentation of the function. This consists of the
125
        documentation of the function given at initialisation with the documentation
126
        of all other registered methods appended.
127

128
        Upon instantiation, this property is available through `obj.__doc__`.
129
        """
130
        try:
4✔
131
            self._resolve_pending_registrations()
4✔
132
        except NameError:  # pragma: specific no cover 3.7 3.8 3.9
2✔
133
            # When `staticmethod` is combined with
134
            # `from __future__ import annotations`, in Python 3.10 and higher
135
            # `staticmethod` will attempt to inherit `__doc__` (see
136
            # https://docs.python.org/3/library/functions.html#staticmethod). Since
137
            # we are still in class construction, forward references are not yet
138
            # defined, so attempting to resolve all pending methods might fail with a
139
            # `NameError`. This is fine, because later calling `__doc__` on the
140
            # `staticmethod` will again call this `__doc__`, at which point all methods
141
            # will resolve properly. For now, we just ignore the error and undo the
142
            # partially completed :meth:`Function._resolve_pending_registrations` by
143
            # clearing the cache.
144
            self.clear_cache(reregister=False)
2✔
145

146
        # Don't do any fancy appending of docstrings when the environment variable
147
        # `PLUM_SIMPLE_DOC` is set to `1`.
148
        if "PLUM_SIMPLE_DOC" in os.environ and os.environ["PLUM_SIMPLE_DOC"] == "1":
4✔
149
            return self._doc
4✔
150

151
        # Derive the basis of the docstring from `self._f`, removing any indentation.
152
        doc = self._doc.strip()
4✔
153
        if doc:
4✔
154
            # Do not include the first line when removing the indentation.
155
            lines = doc.splitlines()
4✔
156
            doc = lines[0]
4✔
157
            # There might not be more than one line.
158
            if len(lines) > 1:
4✔
159
                doc += "\n" + textwrap.dedent("\n".join(lines[1:]))
4✔
160

161
        # Append the docstrings of all other implementations to it. Exclude the
162
        # docstring from `self._f`, because that one forms the basis (see boave).
163
        resolver_doc = self._resolver.doc(exclude=self._f)
4✔
164
        if resolver_doc:
4✔
165
            # Add a newline if the documentation is non-empty.
166
            if doc:
4✔
167
                doc = doc + "\n\n"
4✔
168
            doc += resolver_doc
4✔
169
            # Replace separators with horizontal lines of the right length.
170
            separator_length = max(map(len, doc.splitlines()))
4✔
171
            doc = doc.replace("<separator>", "-" * separator_length)
4✔
172

173
        # If the docstring is empty, return `None`, which is consistent with omitting
174
        # the docstring.
175
        return doc if doc else None
4✔
176

177
    @__doc__.setter
4✔
178
    def __doc__(self, value: str) -> None:
4✔
179
        # Ensure that `self._doc` remains a string.
180
        self._doc = value if value else ""
4✔
181

182
    @property
4✔
183
    def methods(self) -> List[Signature]:
4✔
184
        """list[:class:`.signature.Signature`]: All available methods."""
185
        self._resolve_pending_registrations()
186
        return self._resolver.signatures
187

188
    def dispatch(
189
        self: Self, method: Optional[Callable] = None, precedence=0
190
    ) -> Union[Self, Callable[[Callable], Self]]:
191
        """Decorator to extend the function with another signature.
192

193
        Args:
194
            precedence (int, optional): Precedence of the signature. Defaults to `0`.
195

196
        Returns:
197
            function: Decorator.
198
        """
199
        if method is None:
4✔
200
            return lambda m: self.dispatch(m, precedence=precedence)
4✔
201

202
        self.register(method, precedence=precedence)
4✔
203
        return self
4✔
204

205
    def dispatch_multi(
4✔
206
        self: Self, *signatures: Union[Signature, Tuple[TypeHint, ...]]
4✔
207
    ) -> Callable[[Callable], Self]:
4✔
208
        """Decorator to extend the function with multiple signatures at once.
209

210
        Args:
211
            *signatures (tuple or :class:`.signature.Signature`): Signatures to
212
                register.
213

214
        Returns:
215
            function: Decorator.
216
        """
217
        resolved_signatures = []
4✔
218
        for signature in signatures:
4✔
219
            if isinstance(signature, Signature):
4✔
220
                resolved_signatures.append(signature)
4✔
221
            elif isinstance(signature, tuple):
4✔
222
                resolved_signatures.append(Signature(*signature))
4✔
223
            else:
224
                raise ValueError(
4✔
225
                    f"Signature `{signature}` must be a tuple or of type "
4✔
226
                    f"`plum.signature.Signature`."
227
                )
228

229
        def decorator(method):
4✔
230
            # The precedence will not be used, so we can safely set it to `None`.
231
            for signature in resolved_signatures:
4✔
232
                self.register(method, signature=signature, precedence=None)
4✔
233
            return self
4✔
234

235
        return decorator
4✔
236

237
    def clear_cache(self, reregister: bool = True) -> None:
4✔
238
        """Clear cache.
239

240
        Args:
241
            reregister (bool, optional): Also reregister all methods. Defaults to
242
                `True`.
243
        """
244
        self._cache.clear()
4✔
245

246
        if reregister:
4✔
247
            # Add all resolved to pending.
248
            self._pending.extend(self._resolved)
4✔
249

250
            # Clear resolved.
251
            self._resolved = []
4✔
252
            self._resolver = Resolver()
4✔
253

254
    def register(
4✔
255
        self, f: Callable, signature: Optional[Signature] = None, precedence=0
4✔
256
    ) -> None:
4✔
257
        """Register a method.
258

259
        Either `signature` or `precedence` must be given.
260

261
        Args:
262
            f (function): Function that implements the method.
263
            signature (:class:`.signature.Signature`, optional): Signature. If it is
264
                not given, it will be derived from `f`.
265
            precedence (int, optional): Precedence of the function. If `signature` is
266
                given, then this argument will not be used. Defaults to `0`.
267
        """
268
        self._pending.append((f, signature, precedence))
4✔
269

270
    def _resolve_pending_registrations(self) -> None:
4✔
271
        # Keep track of whether anything registered.
272
        registered = False
4✔
273

274
        # Perform any pending registrations.
275
        for f, signature, precedence in self._pending:
4✔
276
            # Add to resolved registrations.
277
            self._resolved.append((f, signature, precedence))
4✔
278

279
            # Obtain the signature if it is not available.
280
            if signature is None:
4✔
281
                signature = extract_signature(f, precedence=precedence)
4✔
282
            else:
283
                # Ensure that the implementation is `f`, but make a copy before
284
                # mutating.
285
                signature = copy(signature)
4✔
286
                signature.implementation = f
4✔
287

288
            # Ensure that the implementation has the right name, because this name
289
            # will show up in the docstring.
290
            if getattr(signature.implementation, "__name__", None) != self.__name__:
4✔
291
                signature.implementation = _change_function_name(
4✔
292
                    signature.implementation,
4✔
293
                    self.__name__,
4✔
294
                )
295

296
            # Process default values.
297
            for subsignature in append_default_args(signature, f):
4✔
298
                self._resolver.register(subsignature)
4✔
299
                registered = True
4✔
300

301
        if registered:
4✔
302
            self._pending = []
4✔
303

304
            # Clear cache.
305
            self.clear_cache(reregister=False)
4✔
306

307
    def _enhance_exception(self, e: SomeExceptionType) -> SomeExceptionType:
4✔
308
        """Enchance an exception by prepending a prefix to the message of the exception
309
        which specifies that the message is for this function.
310

311
        Args:
312
            e (:class:`Exception`): Exception.
313

314
        Returns:
315
            :class:`Exception`: `e`, but with a prefix appended to the message.
316
        """
317
        # Specify to which function the message pertains.
318
        prefix = f"For function `{self.__name__}`"
4✔
319
        if self.owner:
4✔
320
            prefix += f" of `{repr_short(self.owner)}`"
4✔
321
        prefix = prefix + ", "
4✔
322
        # Return a new exception of the same type which incorporates the prefix.
323
        message = str(e)
4✔
324
        return type(e)(prefix + message[0].lower() + message[1:])
4✔
325

326
    def resolve_method(
4✔
327
        self, target: Union[Tuple[object, ...], Signature]
4✔
328
    ) -> Tuple[Callable, TypeHint]:
4✔
329
        """Find the method and return type for arguments.
330

331
        Args:
332
            target (object): Target.
333

334
        Returns:
335
            function: Method.
336
            type: Return type.
337
        """
338
        self._resolve_pending_registrations()
4✔
339

340
        try:
4✔
341
            # Attempt to find the method using the resolver.
342
            signature = self._resolver.resolve(target)
4✔
343
            method = signature.implementation
4✔
344
            return_type = signature.return_type
4✔
345

346
        except AmbiguousLookupError as e:
4✔
347
            __tracebackhide__ = True
4✔
348
            raise self._enhance_exception(e) from None  # Specify this function.
4✔
349

350
        except NotFoundLookupError as e:
4✔
351
            __tracebackhide__ = True
4✔
352

353
            e = self._enhance_exception(e)  # Specify this function.
4✔
354
            method, return_type = self._handle_not_found_lookup_error(e)
4✔
355

356
        return method, return_type
4✔
357

358
    def _handle_not_found_lookup_error(
4✔
359
        self, ex: NotFoundLookupError
4✔
360
    ) -> Tuple[Callable, TypeHint]:
4✔
361
        if not self.owner:
4✔
362
            # Not in a class. Nothing we can do.
363
            raise ex from None
4✔
364

365
        # In a class. Walk through the classes in the class's MRO, except for this
366
        # class, and try to get the method.
367
        method = None
4✔
368
        return_type = object
4✔
369

370
        for c in self.owner.__mro__[1:]:
4✔
371
            # Skip the top of the type hierarchy given by `object` and `type`. We do
372
            # not suddenly want to fall back to any unexpected default behaviour.
373
            if c in {object, type}:
4✔
374
                continue
4✔
375

376
            # We need to check `c.__dict__` here instead of using `hasattr` since e.g.
377
            # `c.__le__` will return  even if `c` does not implement `__le__`!
378
            if self._f.__name__ in c.__dict__:
4✔
379
                method = getattr(c, self._f.__name__)
4✔
380
            else:
381
                # For some reason, coverage fails to catch the `continue` below. Add
382
                # the do-nothing `_ = None` fixes this.
383
                # TODO: Remove this once coverage properly catches this.
384
                _ = None
4✔
385
                continue
4✔
386

387
            # Ignore abstract methods.
388
            if getattr(method, "__isabstractmethod__", False):
4✔
389
                method = None
4✔
390
                continue
4✔
391

392
            # We found a good candidate. Break.
393
            break
4✔
394

395
        if not method:
4✔
396
            # If no method has been found after walking through the MRO, raise the
397
            # original exception.
398
            raise ex from None
4✔
399
        return method, return_type
4✔
400

401
    def __call__(self, *args, **kw_args):
4✔
402
        __tracebackhide__ = True
4✔
403
        method, return_type = self._resolve_method_with_cache(args=args)
4✔
404
        return _convert(method(*args, **kw_args), return_type)
4✔
405

406
    def _resolve_method_with_cache(
4✔
407
        self,
408
        args: Union[Tuple[object, ...], Signature, None] = None,
4✔
409
        types: Optional[Tuple[TypeHint, ...]] = None,
4✔
410
    ) -> Tuple[Callable, TypeHint]:
4✔
411
        if args is None and types is None:
4✔
412
            raise ValueError(
4✔
413
                "Arguments `args` and `types` cannot both be `None`. "
4✔
414
                "This should never happen!"
415
            )
416

417
        # Before attempting to use the cache, resolve any unresolved registrations. Use
418
        # an `if`-statement to speed up the common case.
419
        if self._pending:
4✔
420
            self._resolve_pending_registrations()
4✔
421

422
        if types is None:
4✔
423
            # Attempt to use the cache based on the types of the arguments.
424
            types = tuple(map(type, args))
4✔
425
        try:
4✔
426
            return self._cache[types]
4✔
427
        except KeyError:
4✔
428
            __tracebackhide__ = True
4✔
429

430
            if args is None:
4✔
431
                args = Signature(*(resolve_type_hint(t) for t in types))
4✔
432

433
            # Cache miss. Run the resolver based on the arguments.
434
            method, return_type = self.resolve_method(args)
4✔
435
            # If the resolver is faithful, then we can perform caching using the types
436
            # of the arguments. If the resolver is not faithful, then we cannot.
437
            if self._resolver.is_faithful:
4✔
438
                self._cache[types] = method, return_type
4✔
439
            return method, return_type
4✔
440

441
    def invoke(self, *types: TypeHint) -> Callable:
4✔
442
        """Invoke a particular method.
443

444
        Args:
445
            *types: Types to resolve.
446

447
        Returns:
448
            function: Method.
449
        """
450
        method, return_type = self._resolve_method_with_cache(types=types)
4✔
451

452
        @wraps(self._f)
4✔
453
        def wrapped_method(*args, **kw_args):
4✔
454
            return _convert(method(*args, **kw_args), return_type)
4✔
455

456
        return wrapped_method
4✔
457

458
    def __get__(self, instance, owner):
4✔
459
        if instance is not None:
4✔
460
            return MethodType(_BoundFunction(self, instance), instance)
4✔
461
        else:
462
            return self
4✔
463

464
    def __repr__(self) -> str:
4✔
465
        return (
4✔
466
            f"<function {self._f} with {len(self._resolver)} registered and"
4✔
467
            f" {len(self._pending)} pending method(s)>"
3✔
468
        )
469

470

471
class _BoundFunction:
4✔
472
    """A bound instance of `.function.Function`.
473

474
    Args:
475
        f (:class:`.function.Function`): Bound function.
476
        instance (object): Instance to which the function is bound.
477
    """
478

479
    def __init__(self, f, instance):
4✔
480
        self._f = f
4✔
481
        wraps(f._f)(self)  # This will call the setter for `__doc__`.
4✔
482
        self._instance = instance
4✔
483

484
    @property
4✔
485
    def __doc__(self):
4✔
486
        return self._f.__doc__
4✔
487

488
    @__doc__.setter
4✔
489
    def __doc__(self, value):
4✔
490
        # Don't need to do anything here. The docstring will be derived from `self._f`.
491
        # We, however, do need to implement this method, because :func:`wraps` calls
492
        # it.
493
        pass
4✔
494

495
    def __call__(self, _, *args, **kw_args):
4✔
496
        return self._f(self._instance, *args, **kw_args)
4✔
497

498
    def invoke(self, *types):
4✔
499
        """See :meth:`.Function.invoke`."""
500

501
        @wraps(self._f._f)
502
        def wrapped_method(*args, **kw_args):
503
            # TODO: Can we do this without `type` here?
504
            method = self._f.invoke(type(self._instance), *types)
505
            return method(self._instance, *args, **kw_args)
506

507
        return wrapped_method
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc