• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

feihoo87 / waveforms / 7111190278

06 Dec 2023 06:51AM UTC coverage: 33.465% (-9.2%) from 42.666%
7111190278

push

github

feihoo87
v1.9.0

1 of 1 new or added line in 1 file covered. (100.0%)

225 existing lines in 3 files now uncovered.

2795 of 8352 relevant lines covered (33.47%)

2.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/waveforms/cache.py
UNCOV
1
import functools
×
UNCOV
2
import hashlib
×
UNCOV
3
import pathlib
×
UNCOV
4
import pickle
×
UNCOV
5
import time
×
UNCOV
6
from typing import Any, Callable, Hashable, KeysView, Optional, Union
×
7

UNCOV
8
try:
×
UNCOV
9
    from portalocker import Lock
×
10
except:
×
11
    import warnings
×
12

13
    warnings.warn('portalocker not installed, cache will not be thread safe!')
×
14

15
    class Lock():
×
16

17
        def __init__(self, file, mode):
×
18
            self.file = file
×
19
            self.mode = mode
×
20
            self.fh = None
×
21

22
        def __enter__(self):
×
23
            self.fh = open(self.file, self.mode)
×
24
            return self.fh
×
25

26
        def __exit__(self, exc_type, exc_value, traceback):
×
27
            self.fh.close()
×
28

29

UNCOV
30
cache_dir = pathlib.Path.home() / '.waveforms' / 'cache'
×
UNCOV
31
cache_dir.mkdir(parents=True, exist_ok=True)
×
32

UNCOV
33
MAXVALUESIZE = 1024
×
UNCOV
34
Decorator = Callable[[Callable], Callable]
×
35

36

UNCOV
37
class Cache(dict):
×
38

UNCOV
39
    def __init__(self, name: str, path: pathlib.Path = cache_dir):
×
UNCOV
40
        name = name.split('.')
×
UNCOV
41
        self.path: pathlib.Path = path.joinpath(*name[:-1]) / (name[-1] +
×
42
                                                               '.cache.d')
UNCOV
43
        self._buffer = {}
×
UNCOV
44
        self._index = {}
×
UNCOV
45
        self._mtime = 0
×
UNCOV
46
        (self.path / 'values').mkdir(parents=True, exist_ok=True)
×
47

UNCOV
48
    @property
×
UNCOV
49
    def index(self) -> pathlib.Path:
×
UNCOV
50
        return self.path / 'index'
×
51

UNCOV
52
    @staticmethod
×
UNCOV
53
    def _hash(key: Hashable) -> str:
×
UNCOV
54
        return hashlib.md5(pickle.dumps(key)).hexdigest()
×
55

UNCOV
56
    def _syncIndex(self) -> None:
×
UNCOV
57
        if not self.index.exists() or self._mtime > self.index.stat().st_mtime:
×
UNCOV
58
            with Lock(self.index, 'wb') as fh:
×
UNCOV
59
                pickle.dump(self._index, fh)
×
UNCOV
60
        elif self._mtime < self.index.stat().st_mtime:
×
61
            with Lock(self.index, 'rb') as fh:
×
62
                self._index = pickle.load(fh)
×
UNCOV
63
        self._mtime = self.index.stat().st_mtime
×
64

UNCOV
65
    def _valuePath(self, key: Hashable) -> pathlib.Path:
×
UNCOV
66
        hashedKey = self._hash(key)
×
UNCOV
67
        return self.path / 'values' / hashedKey
×
68

UNCOV
69
    def _storeValue(self, k: Hashable, buf: bytes) -> None:
×
UNCOV
70
        with Lock(self._valuePath(k), 'wb') as fh:
×
UNCOV
71
            fh.write(buf)
×
72

UNCOV
73
    def _loadValue(self, k: Hashable) -> None:
×
74
        with Lock(self._valuePath(k), 'rb') as fh:
×
75
            buf = fh.read()
×
76
        hashstr = self._hash(buf)
×
77
        self._index[k] = self._valuePath(k).stat().st_mtime, None, hashstr
×
78
        self._buffer[k] = pickle.loads(buf)
×
79

UNCOV
80
    def __setitem__(self, k: Hashable, v: Any) -> None:
×
UNCOV
81
        buf = pickle.dumps(v)
×
UNCOV
82
        if len(buf) <= MAXVALUESIZE:
×
83
            mtime = time.time()
×
84
            self._index[k] = mtime, buf, ''
×
85
            self._mtime = mtime
×
86
            self._valuePath(k).unlink(missing_ok=True)
×
87
        else:
UNCOV
88
            hashstr = self._hash(buf)
×
UNCOV
89
            if k not in self._index or hashstr != self._index[k][2]:
×
UNCOV
90
                self._storeValue(k, buf)
×
UNCOV
91
                mtime = self._valuePath(k).stat().st_mtime
×
UNCOV
92
                self._index[k] = mtime, None, hashstr
×
UNCOV
93
                self._mtime = mtime
×
UNCOV
94
        self._syncIndex()
×
UNCOV
95
        self._buffer[k] = v
×
96

UNCOV
97
    def __getitem__(self, k: Hashable) -> Any:
×
UNCOV
98
        self._syncIndex()
×
UNCOV
99
        mtime, buf, hashstr = self._index[k]
×
UNCOV
100
        if k not in self._buffer and buf is not None:
×
101
            self._buffer[k] = pickle.loads(buf)
×
UNCOV
102
        elif k not in self._buffer:
×
103
            self._loadValue(k)
×
UNCOV
104
        return self._buffer[k]
×
105

UNCOV
106
    def __contains__(self, x: Hashable) -> bool:
×
107
        self._syncIndex()
×
108
        return x in self._index
×
109

UNCOV
110
    def keys(self) -> KeysView[Hashable]:
×
111
        self._syncIndex()
×
112
        return self._index.keys()
×
113

UNCOV
114
    def clear(self) -> None:
×
115
        self._buffer.clear()
×
116
        self._index.clear()
×
117
        self.index.unlink()
×
118
        for f in (self.path / 'values').iterdir():
×
119
            f.unlink()
×
120
        self._mtime = 0
×
121

122

UNCOV
123
__caches = {}
×
124

125

UNCOV
126
def _getCache(name: str) -> Cache:
×
UNCOV
127
    try:
×
UNCOV
128
        return __caches[name]
×
UNCOV
129
    except:
×
UNCOV
130
        __caches[name] = Cache(name)
×
UNCOV
131
        return __caches[name]
×
132

133

UNCOV
134
def cache(storage: Optional[Union[str, Cache]] = None) -> Decorator:
×
135

UNCOV
136
    def decorator(func: Callable,
×
137
                  storage: Optional[Union[str, Cache]] = storage) -> Callable:
UNCOV
138
        if storage is None:
×
UNCOV
139
            storage = _getCache('.'.join([func.__module__, func.__name__]))
×
140
        elif isinstance(storage, str):
×
141
            storage = _getCache(storage)
×
142
        elif not isinstance(storage, Cache):
×
143
            raise Exception(f'storage type {type(storage)} not understand!')
×
144

UNCOV
145
        @functools.wraps(func)
×
UNCOV
146
        def wrapper(*args, **kwds):
×
UNCOV
147
            kwds = {k: kwds[k] for k in sorted(kwds)}
×
UNCOV
148
            try:
×
UNCOV
149
                ret = storage[pickle.dumps((args, kwds))]
×
UNCOV
150
            except:
×
UNCOV
151
                ret = func(*args, **kwds)
×
UNCOV
152
                storage[pickle.dumps((args, kwds))] = ret
×
UNCOV
153
            return ret
×
154

UNCOV
155
        wrapper.cache = storage
×
UNCOV
156
        return wrapper
×
157

UNCOV
158
    return decorator
×
159

160

UNCOV
161
def clear(name: str) -> None:
×
162
    cache = _getCache(name)
×
163
    cache.clear()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc