• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5030698672717824

15 May 2026 04:19PM UTC coverage: 64.902% (-0.4%) from 65.282%
5030698672717824

Pull #10401

CirrusCI

ecdsa
add support for LevelDB
Pull Request #10401: add support for LevelDB

463 of 801 new or added lines in 20 files covered. (57.8%)

20 existing lines in 6 files now uncovered.

24964 of 38464 relevant lines covered (64.9%)

0.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/electrum/level_db.py
NEW
1
from __future__ import annotations
×
2

NEW
3
import json
×
NEW
4
import threading
×
NEW
5
import os
×
NEW
6
from typing import Any, Optional, Tuple, Union, Iterator, Iterable
×
NEW
7
import plyvel
×
8

NEW
9
from .stored_dict import BaseDB, FLEX_KEY, key_to_str, json_default
×
10

11
# Todo:
12
# - simplify path: first element is unused
13

14

NEW
15
def locked(func):
×
NEW
16
    def wrapper(self, *args, **kwargs):
×
NEW
17
        with self.lock:
×
NEW
18
            return func(self, *args, **kwargs)
×
NEW
19
    return wrapper
×
20

21

NEW
22
class JsonCodec:
×
23
    """Default value codec: JSON (utf-8)."""
NEW
24
    @staticmethod
×
NEW
25
    def dumps(value: Any) -> bytes:
×
NEW
26
        return json.dumps(value, separators=(",", ":"), ensure_ascii=False, default=json_default).encode("utf-8")
×
27

NEW
28
    @staticmethod
×
NEW
29
    def loads(data: bytes) -> Any:
×
NEW
30
        return json.loads(data.decode("utf-8"))
×
31

32

NEW
33
def _to_bytes_key(k: str) -> bytes:
×
NEW
34
    return k.encode("utf-8")
×
35

NEW
36
def _to_str_key(k: bytes) -> str:
×
NEW
37
    return k.decode("utf-8")
×
38

39

40
# bump this if key/value encoding changes
NEW
41
STORAGE_VERSION = str(0).encode('utf-8')
×
NEW
42
VERSION_FILENAME = 'ELECTRUM_LEVELDB_VERSION'
×
43

NEW
44
class LevelDB(BaseDB):
×
45

NEW
46
    def __init__(
×
47
            self,
48
            path: str,
49
            init_db = True,
50
    ):
NEW
51
        assert path # in-memory only is only allowed with JsonDB
×
NEW
52
        BaseDB.__init__(self, path)
×
NEW
53
        self.lock = threading.RLock()
×
NEW
54
        self.delimiter = "/"
×
NEW
55
        self.codec = JsonCodec
×
NEW
56
        if init_db:
×
NEW
57
            self.init_db()
×
58

NEW
59
    def basename(self) -> str:
×
NEW
60
        return os.path.basename(self.path)
×
61

NEW
62
    def is_encrypted(self):
×
NEW
63
        return False
×
64

NEW
65
    def file_exists(self):
×
NEW
66
        return os.path.exists(self.path)
×
67

NEW
68
    def supports_file_encryption(self):
×
NEW
69
        return False
×
70

NEW
71
    def is_encrypted_with_hw_device(self):
×
NEW
72
        return False
×
73

NEW
74
    def init_db(self):
×
75
        # if path exists, check version file
NEW
76
        version_file = os.path.join(self.path, VERSION_FILENAME)
×
NEW
77
        if os.path.exists(self.path):
×
NEW
78
            if not os.path.exists(version_file):
×
NEW
79
                raise Exception('Not an Electrum DB')
×
NEW
80
            with open(version_file, "rb") as f:
×
NEW
81
                v = f.read()
×
82
                # no upgrades support for the moment
NEW
83
                assert v == STORAGE_VERSION
×
84
        # create DB
85
        # according to the docs, setting write_buffer_size
86
        # to zero forces levelDB to write directly to disk
NEW
87
        self.db = plyvel.DB(
×
88
            self.path,
89
            create_if_missing=True,
90
            write_buffer_size=0,
91
        )
92
        # create version file
NEW
93
        if not os.path.exists(version_file):
×
NEW
94
            with open(version_file, "wb") as f:
×
NEW
95
                f.write(STORAGE_VERSION)
×
96
        # set permissions
NEW
97
        self._set_permissions()
×
98

NEW
99
    def _set_permissions(self):
×
NEW
100
        os.chmod(self.path, 0o700)
×
NEW
101
        for path, dirs, files in os.walk(self.path):
×
NEW
102
            for x in files: os.chmod(os.path.join(path, x), 0o600)
×
NEW
103
            for x in dirs: os.chmod(os.path.join(path, x), 0o700)
×
104

NEW
105
    def _debug(self):
×
NEW
106
        for k, v in self.db.iterator():
×
NEW
107
            self.logger.info(f"{k} -> {v}")
×
108

NEW
109
    def close(self) -> None:
×
NEW
110
        if self.db is not None:
×
NEW
111
            self.logger.info('closing database')
×
NEW
112
            self.db.close()
×
NEW
113
            self.db = None
×
114

NEW
115
    def set_modified(self, b):
×
116
        # fixme: callers should not have to do that
NEW
117
        pass
×
118

NEW
119
    def __enter__(self) -> "LevelDB":
×
NEW
120
        return self
×
121

NEW
122
    def __exit__(self, exc_type, exc, tb) -> None:
×
NEW
123
        self.close()
×
124

NEW
125
    def write(self):
×
NEW
126
        pass
×
127

NEW
128
    def write_and_force_consolidation(self):
×
129
        # called after password update.
130
        # remove remnants encrypted with old passwordd
NEW
131
        self.db.compact_range()
×
132

NEW
133
    def _prefix_bytes(self, path) -> bytes:
×
NEW
134
        assert path[0] == ''
×
NEW
135
        d = self.delimiter.encode("utf-8")
×
NEW
136
        p = d.join([_to_bytes_key(x) for x in path])
×
NEW
137
        if not p:
×
NEW
138
            return b""
×
139
        # Ensure exactly one trailing delimiter for internal prefix usage
NEW
140
        if p.endswith(d):
×
NEW
141
            p = p[:-len(d)]
×
NEW
142
        return p
×
143

NEW
144
    def _full_key(self, path, key: FLEX_KEY) -> bytes:
×
NEW
145
        return self._prefix_bytes(path + [key])
×
146

NEW
147
    def _child_prefix(self, path, key: FLEX_KEY) -> bytes:
×
NEW
148
        d = self.delimiter.encode("utf-8")
×
NEW
149
        return self._full_key(path, key) + d
×
150

NEW
151
    def _has_children(self, path, key: FLEX_KEY) -> bool:
×
NEW
152
        db = self.db
×
NEW
153
        if db is None:
×
NEW
154
            raise RuntimeError("DB is closed")
×
NEW
155
        pfx = self._child_prefix(path, key)
×
NEW
156
        it = db.iterator(prefix=pfx, include_value=False)
×
NEW
157
        try:
×
NEW
158
            next(it)
×
NEW
159
            return True
×
NEW
160
        except StopIteration:
×
NEW
161
            return False
×
162

NEW
163
    def iter_keys(self, path) -> Iterator[str]:
×
164
        """
165
        Iterate unique top-level keys at this view's prefix.
166
        """
NEW
167
        db = self.db
×
NEW
168
        if db is None:
×
NEW
169
            raise RuntimeError("DB is closed")
×
NEW
170
        d = self.delimiter.encode("utf-8")
×
NEW
171
        pb = self._prefix_bytes(path) + d
×
NEW
172
        seen = set()
×
NEW
173
        for k, _v in db.iterator(prefix=pb):
×
NEW
174
            rel = k[len(pb):] if pb else k
×
NEW
175
            first = rel.split(d, 1)[0]
×
NEW
176
            if first not in seen:
×
NEW
177
                seen.add(first)
×
NEW
178
                yield _to_str_key(first)
×
179

NEW
180
    @locked
×
NEW
181
    def remove(self, path, key):
×
NEW
182
        self._delete_subtree(path, key, wb=None)
×
183

NEW
184
    def _delete_subtree(self, path, key: FLEX_KEY, wb: Optional[plyvel.WriteBatch] = None) -> None:
×
NEW
185
        db = self.db
×
NEW
186
        if db is None:
×
NEW
187
            raise RuntimeError("DB is closed")
×
NEW
188
        pfx = self._child_prefix(path, key)
×
NEW
189
        it = db.iterator(prefix=pfx, include_value=False)
×
NEW
190
        deleter = wb.delete if wb is not None else db.delete
×
NEW
191
        for k in it:
×
NEW
192
            deleter(k)
×
193
        # delete scalar at node itself, if present
NEW
194
        k = self._full_key(path, key)
×
NEW
195
        deleter(k)
×
NEW
196
        if wb is None:
×
NEW
197
            r = db.get(k)
×
NEW
198
            assert r is None, r
×
199

NEW
200
    @locked
×
NEW
201
    def clear(self, path) -> None:
×
NEW
202
        db = self.db
×
NEW
203
        if db is None:
×
NEW
204
            raise RuntimeError("DB is closed")
×
NEW
205
        pb = self._prefix_bytes(path)
×
NEW
206
        with db.write_batch() as wb:
×
NEW
207
            for k, _v in db.iterator(prefix=pb):
×
NEW
208
                wb.delete(k)
×
209

NEW
210
    @locked
×
NEW
211
    def get(self, path, key: FLEX_KEY) -> Any:
×
NEW
212
        db = self.db
×
NEW
213
        if db is None:
×
NEW
214
            raise RuntimeError("DB is closed")
×
NEW
215
        raw = db.get(self._full_key(path, key))
×
NEW
216
        if raw is None:
×
NEW
217
            raise KeyError((path, key, self._full_key(path, key)))
×
NEW
218
        return self.codec.loads(raw) # json to python
×
219

NEW
220
    def _flatten_into_batch(self, base_key: bytes, value: Any, wb: plyvel.WriteBatch) -> None:
×
NEW
221
        d = self.delimiter.encode("utf-8")
×
NEW
222
        if isinstance(value, dict):
×
NEW
223
            wb.put(base_key, self.codec.dumps({}))
×
NEW
224
            for k, v in value.items():
×
NEW
225
                k = key_to_str(k)
×
NEW
226
                child_key = base_key + d + _to_bytes_key(k)
×
NEW
227
                self._flatten_into_batch(child_key, v, wb)
×
NEW
228
        elif isinstance(value, list):
×
NEW
229
            wb.put(base_key, self.codec.dumps([]))
×
NEW
230
            for k, v in enumerate(value):
×
NEW
231
                k = key_to_str(k)
×
NEW
232
                child_key = base_key + d + _to_bytes_key(k)
×
NEW
233
                self._flatten_into_batch(child_key, v, wb)
×
234
        else:
NEW
235
            wb.put(base_key, self.codec.dumps(value))
×
236

NEW
237
    def set_write_batch(self):
×
NEW
238
        self._write_batch = self.db.write_batch()
×
239

NEW
240
    def clear_write_batch(self):
×
NEW
241
        self._write_batch = None
×
242

NEW
243
    def get_write_batch(self):
×
NEW
244
        if self._write_batch:
×
NEW
245
            return self._write_batch
×
246
        else:
NEW
247
            return self.db.write_batch()
×
248

NEW
249
    @locked
×
NEW
250
    def put(self, path, key: FLEX_KEY, value: Any) -> None:
×
NEW
251
        db = self.db
×
NEW
252
        if db is None:
×
NEW
253
            raise RuntimeError("DB is closed")
×
NEW
254
        with self.get_write_batch() as wb:
×
255
            # delete any pre-existing dict
NEW
256
            self._delete_subtree(path, key, wb=wb)
×
NEW
257
            if isinstance(value, (list, dict)):
×
NEW
258
                base = self._full_key(path, key)
×
259
                # do not store marker at "key"; only descendants
NEW
260
                self._flatten_into_batch(base, value, wb)
×
261
            else:
NEW
262
                wb.put(self._full_key(path, key), self.codec.dumps(value))
×
263

NEW
264
    @locked
×
NEW
265
    def replace(self, path, key: FLEX_KEY, value: Any) -> None:
×
266
        # called by StoredObject in setattr
NEW
267
        db = self.db
×
NEW
268
        if db is None:
×
NEW
269
            raise RuntimeError("DB is closed")
×
NEW
270
        fullkey = self._full_key(path[:-1], path[-1])
×
NEW
271
        d = self.codec.loads(db.get(fullkey))
×
NEW
272
        d[key] = value
×
NEW
273
        db.put(fullkey, self.codec.dumps(d))
×
274

NEW
275
    @locked
×
NEW
276
    def contains(self, path, key: object) -> bool:
×
NEW
277
        db = self.db
×
NEW
278
        if db is None:
×
NEW
279
            raise RuntimeError("DB is closed")
×
NEW
280
        if db.get(self._full_key(path, key)) is not None:
×
NEW
281
            return True
×
NEW
282
        return False #self._has_children(path, key)
×
283

284

285
    # list methods
286

NEW
287
    @locked
×
NEW
288
    def get_list_item(self, path, s: slice):
×
NEW
289
        n = self.list_len(path)
×
NEW
290
        if type(s) is slice:
×
NEW
291
            start = 0 if s.start is None else s.start if s.start >= 0 else n + s.start
×
NEW
292
            stop = n if s.stop is None else s.stop if s.stop >= 0 else n + s.stop
×
NEW
293
            step = 1 if s.step is None else s.step
×
NEW
294
            return [self.get(path, str(i)) for i in range(start, stop, step)]
×
NEW
295
        elif type(s) is int:
×
NEW
296
            s = n + s if s < 0 else s
×
NEW
297
            return self.get(path, str(s))
×
298
        else:
NEW
299
            raise Exception()
×
300

NEW
301
    @locked
×
NEW
302
    def list_append(self, path, item):
×
NEW
303
        n = self.list_len(path)
×
NEW
304
        self.put(path, str(n), item)
×
305

NEW
306
    @locked
×
NEW
307
    def list_clear(self, path):
×
NEW
308
        path, key = path[:-1], path[-1]
×
309
        #self._delete_subtree(path, key, wb=None)
NEW
310
        self.put(path, key, [])
×
311

NEW
312
    @locked
×
NEW
313
    def dict_len(self, path):
×
314
        # fixme: slow
NEW
315
        return len(list(self.iter_keys(path)))
×
316

NEW
317
    @locked
×
NEW
318
    def list_len(self, path):
×
NEW
319
        return len(list(self.iter_keys(path)))
×
320

NEW
321
    @locked
×
NEW
322
    def list_iter(self, path):
×
NEW
323
        for i in range(self.list_len(path)):
×
NEW
324
            yield self.get(path, str(i))
×
325

NEW
326
    @locked
×
NEW
327
    def list_index(self, path, item):
×
NEW
328
        for k in self.iter_keys(path):
×
NEW
329
            v = self.get(path, k)
×
NEW
330
            if item == v:
×
NEW
331
                return int(k)
×
NEW
332
        raise KeyError(item)
×
333

NEW
334
    @locked
×
NEW
335
    def list_remove(self, path, item):
×
NEW
336
        k = self.list_index(path, item)
×
NEW
337
        n = self.list_len(path)
×
NEW
338
        for i in range(k, n-1):
×
NEW
339
            self.put(path, str(i), self.get(path, str(i+1)))
×
NEW
340
        self.remove(path, str(n-1))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc