• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gocept / gocept.cache / 6453157996

09 Oct 2023 06:33AM UTC coverage: 100.0%. Remained the same
6453157996

push

github

sallner
[pre-commit.ci] pre-commit autoupdate

updates:
- https://github.com/pre-commit/mirrors-autopep8 → https://github.com/hhatto/autopep8
- [github.com/hhatto/autopep8: v2.0.2 → v2.0.4](https://github.com/hhatto/autopep8/compare/v2.0.2...v2.0.4)

34 of 34 branches covered (100.0%)

Branch coverage included in aggregate %.

211 of 211 relevant lines covered (100.0%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

100.0
/src/gocept/cache/method.py
1
import decorator
2
import inspect
1✔
3
import time
1✔
4
import zope.testing.cleanup
1✔
5

6

7
_caches = {}
1✔
8
_timeouts = {}
1✔
9

10

11
def collect():
1✔
12
    """Clear cache of results which have timed out"""
13
    for func in _caches:
1✔
14
        for key in list(_caches[func]):
1✔
15
            if (time.time() - _caches[func][key][1] >=
1✔
16
                    _timeouts[func]):
17
                _caches[func].pop(key, None)
1✔
18

19

20
def clear():
1✔
21
    _caches.clear()
1✔
22
    _timeouts.clear()
1✔
23

24

25
zope.testing.cleanup.addCleanUp(clear)
1✔
26

27

28
class do_not_cache_and_return:
1✔
29
    """Class which may be returned by a memoized method"""
30

31
    def __init__(self, value):
1✔
32
        self.value = value
1✔
33

34
    def __call__(self):
1✔
35
        return self.value
1✔
36

37

38
def Memoize(timeout, ignore_self=False, _caches=_caches, _timeouts=_timeouts):
1✔
39
    """Memoize With Timeout
40

41
    timeout ... in seconds
42

43
    Based on http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/325905
44

45
    """
46
    @decorator.decorator
1✔
47
    def func(f, *args, **kwargs):
1✔
48
        cache = _caches.setdefault(f, {})
1✔
49
        _timeouts.setdefault(f, timeout)
1✔
50

51
        cache_args = args
1✔
52
        if ignore_self:
1✔
53
            parameters = inspect.signature(f).parameters
1✔
54
            if parameters and next(iter(parameters)) == 'self':
1✔
55
                cache_args = args[1:]
1✔
56

57
        kw = list(kwargs.items())
1✔
58
        kw.sort()
1✔
59
        key = (cache_args, tuple(kw))
1✔
60

61
        try:
1✔
62
            hash(key)
1✔
63
        except TypeError:
1✔
64
            # Not hashable.
65
            key = None
1✔
66

67
        try:
1✔
68
            value, cached_time = cache[key]
1✔
69
            if (time.time() - cached_time) > timeout:
1✔
70
                raise KeyError
1✔
71
        except KeyError:
1✔
72
            value = f(*args, **kwargs)
1✔
73
            if isinstance(value, do_not_cache_and_return):
1✔
74
                return value()
1✔
75
            if key is not None:
1✔
76
                cache[key] = (value, time.time())
1✔
77
        return value
1✔
78

79
    return func
1✔
80

81

82
def memoize_on_attribute(attribute_name, timeout, ignore_self=False):
1✔
83
    @decorator.decorator
1✔
84
    def func(function, *args, **kw):
1✔
85
        try:
1✔
86
            self = args[0]
1✔
87
            cache = getattr(self, attribute_name)
1✔
88
        except (IndexError, AttributeError):
1✔
89
            raise TypeError(
1✔
90
                "gocept.cache.method.memoize_on_attribute could" +
91
                " not retrieve cache attribute '%s' for function %r"
92
                % (attribute_name, function))
93
        return Memoize(timeout, _caches=cache,
1✔
94
                       ignore_self=ignore_self)(function)(*args, **kw)
95
    return func
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc