• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pymorphy2-fork / DAWG-Python / 13367587766

17 Feb 2025 10:00AM UTC coverage: 86.722% (-0.6%) from 87.367%
13367587766

Pull #41

github

web-flow
Merge 7af4a19cf into 44e13ac5b
Pull Request #41: Add annotations to units.py, fix some other typing problems

168 of 218 branches covered (77.06%)

Branch coverage included in aggregate %.

75 of 82 new or added lines in 3 files covered. (91.46%)

2 existing lines in 1 file now uncovered.

668 of 746 relevant lines covered (89.54%)

5.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.65
/dawg_python/dawgs.py
1
from __future__ import annotations
6✔
2

3
import struct
6✔
4
from binascii import a2b_base64
6✔
5
from typing import TYPE_CHECKING
6✔
6

7
from . import wrapper
6✔
8

9
if TYPE_CHECKING:
6!
NEW
10
    from pathlib import Path
×
NEW
11
    from typing import Any, Generator, Mapping
×
12

NEW
13
    from typing_extensions import Self, TypeAlias
×
14

NEW
15
    Replaces: TypeAlias = Mapping[str, str | list[str]]
×
NEW
16
    CompiledReplaces: TypeAlias = Mapping[str, list[tuple[bytes, str]]]
×
17

18

19
class DAWG:
6✔
20
    """
21
    Base DAWG wrapper.
22
    """
23

24
    dct: wrapper.Dictionary | None
6✔
25

26
    def __init__(self) -> None:
6✔
27
        self.dct = None
6✔
28

29
    def __contains__(self, key) -> bool:
6✔
30
        if not isinstance(key, bytes):
6✔
31
            key = key.encode("utf8")
6✔
32
        return self.dct.contains(key)
6✔
33

34
    def load(self, path: str | Path) -> Self:
6✔
35
        """
36
        Loads DAWG from a file.
37
        """
38
        self.dct = wrapper.Dictionary.load(path)
6✔
39
        return self
6✔
40

41
    def _has_value(self, index: int) -> bool:
6✔
42
        return self.dct.has_value(index)
6✔
43

44
    def _similar_keys(self, current_prefix: str, key: str, index: int, replace_chars: CompiledReplaces) -> list[str]:
6✔
45
        res = []
6✔
46
        start_pos = len(current_prefix)
6✔
47
        end_pos = len(key)
6✔
48
        word_pos = start_pos
6✔
49

50
        while word_pos < end_pos:
6✔
51
            b_step = key[word_pos].encode("utf8")
6✔
52

53
            if b_step in replace_chars:
6✔
54
                for b_replace_char, u_replace_char in replace_chars[b_step]:
6✔
55
                    next_index = index
6✔
56

57
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
58

59
                    if next_index:
6✔
60
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
61
                        extra_keys = self._similar_keys(prefix, key, next_index, replace_chars)
6✔
62
                        res += extra_keys
6✔
63

64
            index = self.dct.follow_bytes(b_step, index)
6✔
65
            if index is None:
6✔
66
                break
6✔
67
            word_pos += 1
6✔
68

69
        else:
70
            if self._has_value(index):
6!
71
                found_key = current_prefix + key[start_pos:]
6✔
72
                res.insert(0, found_key)
6✔
73

74
        return res
6✔
75

76
    def similar_keys(self, key: str, replaces: CompiledReplaces):
6✔
77
        """
78
        Returns all variants of ``key`` in this DAWG according to
79
        ``replaces``.
80

81
        ``replaces`` is an object obtained from
82
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
83
        that maps single-char unicode strings to (one or more) single-char
84
        unicode strings.
85

86
        This may be useful e.g. for handling single-character umlauts.
87
        """
88
        return self._similar_keys("", key, self.dct.ROOT, replaces)
6✔
89

90
    @classmethod
6✔
91
    def compile_replaces(cls, replaces: Replaces) -> CompiledReplaces:
6✔
92
        for k, v in replaces.items():
6✔
93
            if len(k) != 1:
6!
94
                msg = "Keys must be single-char unicode strings."
×
95
                raise ValueError(msg)
×
96
            if isinstance(v, str) and len(v) != 1:
6!
97
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
98
                raise ValueError(msg)
×
99
            if isinstance(v, list) and (any(len(v_entry) != 1 for v_entry in v) or len(v) < 1):
6!
100
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
101
                raise ValueError(msg)
×
102

103
        return {k.encode("utf8"): [(v_entry.encode("utf8"), v_entry) for v_entry in v] for k, v in replaces.items()}
6✔
104

105
    def prefixes(self, key: str | bytes) -> list[str]:
6✔
106
        """
107
        Returns a list with keys of this DAWG that are prefixes of the ``key``.
108
        """
109
        res = []
6✔
110
        index = self.dct.ROOT
6✔
111
        if not isinstance(key, bytes):
6!
112
            key = key.encode("utf8")
6✔
113

114
        pos = 1
6✔
115

116
        for ch in key:
6✔
117
            index = self.dct.follow_char(ch, index)
6✔
118
            if not index:
6✔
119
                break
6✔
120

121
            if self._has_value(index):
6✔
122
                res.append(key[:pos].decode("utf8"))
6✔
123
            pos += 1
6✔
124

125
        return res
6✔
126

127

128
class CompletionDAWG(DAWG):
6✔
129
    """
130
    DAWG with key completion support.
131
    """
132

133
    dct: wrapper.Dictionary
6✔
134
    guide: wrapper.Guide | None
6✔
135

136
    def __init__(self) -> None:
6✔
137
        super().__init__()
6✔
138
        self.guide = None
6✔
139

140
    def keys(self, prefix: str = "") -> list[str]:
6✔
141
        b_prefix = prefix.encode("utf8")
6✔
142
        res = []
6✔
143

144
        index = self.dct.follow_bytes(b_prefix, self.dct.ROOT)
6✔
145
        if index is None:
6✔
146
            return res
6✔
147

148
        completer = wrapper.Completer(self.dct, self.guide)
6✔
149
        completer.start(index, b_prefix)
6✔
150

151
        while completer.next():
6✔
152
            key = completer.key.decode("utf8")
6✔
153
            res.append(key)
6✔
154

155
        return res
6✔
156

157
    def iterkeys(self, prefix: str = "") -> Generator[str, None, None]:
6✔
158
        b_prefix = prefix.encode("utf8")
6✔
159
        index = self.dct.follow_bytes(b_prefix, self.dct.ROOT)
6✔
160
        if index is None:
6!
161
            return
×
162

163
        completer = wrapper.Completer(self.dct, self.guide)
6✔
164
        completer.start(index, b_prefix)
6✔
165

166
        while completer.next():
6✔
167
            yield completer.key.decode("utf8")
6✔
168

169
    def load(self, path: str | Path) -> Self:
6✔
170
        """
171
        Loads DAWG from a file.
172
        """
173
        self.dct = wrapper.Dictionary()
6✔
174
        self.guide = wrapper.Guide()
6✔
175

176
        with open(path, "rb") as f:
6✔
177
            self.dct.read(f)
6✔
178
            self.guide.read(f)
6✔
179

180
        return self
6✔
181

182

183
PAYLOAD_SEPARATOR = b"\x01"
6✔
184
MAX_VALUE_SIZE = 32768
6✔
185

186

187
class BytesDAWG(CompletionDAWG):
6✔
188
    """
189
    DAWG that is able to transparently store extra binary payload in keys;
190
    there may be several payloads for the same key.
191

192
    In other words, this class implements read-only DAWG-based
193
    {unicode -> list of bytes objects} mapping.
194
    """
195

196
    def __init__(self, payload_separator: bytes | None = PAYLOAD_SEPARATOR) -> None:
6✔
197
        super().__init__()
6✔
198
        self._payload_separator = payload_separator
6✔
199

200
    def __contains__(self, key: str | bytes) -> bool:
6✔
201
        if not isinstance(key, bytes):
6!
202
            key = key.encode("utf8")
6✔
203
        return bool(self._follow_key(key))
6✔
204

205
    def __getitem__(self, key):
6✔
206
        res = self.get(key)
6✔
207
        if res is None:
6✔
208
            raise KeyError(key)
6✔
209
        return res
6✔
210

211
    def get(self, key: str | bytes, default: list[bytes] | None = None) -> list[bytes] | None:
6✔
212
        """
213
        Returns a list of payloads (as byte objects) for a given key
214
        or ``default`` if the key is not found.
215
        """
216
        if not isinstance(key, bytes):
6!
217
            key = key.encode("utf8")
6✔
218

219
        return self.b_get_value(key) or default
6✔
220

221
    def _follow_key(self, b_key: bytes) -> int | None:
6✔
222
        index = self.dct.follow_bytes(b_key, self.dct.ROOT)
6✔
223
        if not index:
6✔
224
            return None
6✔
225

226
        index = self.dct.follow_bytes(self._payload_separator, index)
6✔
227
        if not index:
6✔
228
            return None
6✔
229

230
        return index
6✔
231

232
    def _value_for_index(self, index: int) -> list[bytes]:
6✔
233
        res = []
6✔
234

235
        completer = wrapper.Completer(self.dct, self.guide)
6✔
236

237
        completer.start(index)
6✔
238
        while completer.next():
6✔
239
            b64_data = completer.key
6✔
240
            res.append(a2b_base64(b64_data))
6✔
241

242
        return res
6✔
243

244
    def b_get_value(self, b_key) -> list[bytes]:
6✔
245
        index = self._follow_key(b_key)
6✔
246
        if not index:
6✔
247
            return []
6✔
248
        return self._value_for_index(index)
6✔
249

250
    def keys(self, prefix: str | bytes = "") -> list[str]:
6✔
251
        if not isinstance(prefix, bytes):
6!
252
            prefix = prefix.encode("utf8")
6✔
253
        res = []
6✔
254

255
        index = self.dct.ROOT
6✔
256

257
        if prefix:
6✔
258
            index = self.dct.follow_bytes(prefix, index)
6✔
259
            if not index:
6✔
260
                return res
6✔
261

262
        completer = wrapper.Completer(self.dct, self.guide)
6✔
263
        completer.start(index, prefix)
6✔
264

265
        while completer.next():
6✔
266
            payload_idx = completer.key.index(self._payload_separator)
6✔
267
            u_key = completer.key[:payload_idx].decode("utf8")
6✔
268
            res.append(u_key)
6✔
269
        return res
6✔
270

271
    def iterkeys(self, prefix: str | bytes = "") -> Generator[bytes, None, None]:
6✔
272
        if not isinstance(prefix, bytes):
6!
273
            prefix = prefix.encode("utf8")
6✔
274

275
        index = self.dct.ROOT
6✔
276

277
        if prefix:
6!
278
            index = self.dct.follow_bytes(prefix, index)
×
279
            if not index:
×
280
                return
×
281

282
        completer = wrapper.Completer(self.dct, self.guide)
6✔
283
        completer.start(index, prefix)
6✔
284

285
        while completer.next():
6✔
286
            payload_idx = completer.key.index(self._payload_separator)
6✔
287
            u_key = completer.key[:payload_idx].decode("utf8")
6✔
288
            yield u_key
6✔
289

290
    def items(self, prefix: str | bytes = "") -> list[tuple[str, bytes]]:
6✔
291
        if not isinstance(prefix, bytes):
6!
292
            prefix = prefix.encode("utf8")
6✔
293
        res = []
6✔
294

295
        index = self.dct.ROOT
6✔
296
        if prefix:
6✔
297
            index = self.dct.follow_bytes(prefix, index)
6✔
298
            if not index:
6!
299
                return res
×
300

301
        completer = wrapper.Completer(self.dct, self.guide)
6✔
302
        completer.start(index, prefix)
6✔
303

304
        while completer.next():
6✔
305
            key, value = completer.key.split(self._payload_separator)
6✔
306
            res.append((key.decode("utf8"), a2b_base64(value)))
6✔
307

308
        return res
6✔
309

310
    def iteritems(self, prefix: str | bytes = "") -> Generator[tuple[str, bytes], None, None]:
6✔
311
        if not isinstance(prefix, bytes):
6!
312
            prefix = prefix.encode("utf8")
6✔
313

314
        index = self.dct.ROOT
6✔
315
        if prefix:
6✔
316
            index = self.dct.follow_bytes(prefix, index)
6✔
317
            if not index:
6✔
318
                return
6✔
319

320
        completer = wrapper.Completer(self.dct, self.guide)
6✔
321
        completer.start(index, prefix)
6✔
322

323
        while completer.next():
6✔
324
            key, value = completer.key.split(self._payload_separator)
6✔
325
            item = (key.decode("utf8"), a2b_base64(value))
6✔
326
            yield item
6✔
327

328
    def _has_value(self, index: int) -> int | None:
6✔
329
        return self.dct.follow_bytes(PAYLOAD_SEPARATOR, index)
6✔
330

331
    def _similar_items(
6✔
332
        self,
333
        current_prefix: str,
334
        key: str,
335
        index: int,
336
        replace_chars: CompiledReplaces,
337
    ) -> list[tuple[str, bytes]]:
338
        res = []
6✔
339
        start_pos = len(current_prefix)
6✔
340
        end_pos = len(key)
6✔
341
        word_pos = start_pos
6✔
342

343
        while word_pos < end_pos:
6✔
344
            b_step = key[word_pos].encode("utf8")
6✔
345

346
            if b_step in replace_chars:
6✔
347
                for b_replace_char, u_replace_char in replace_chars[b_step]:
6✔
348
                    next_index = index
6✔
349

350
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
351

352
                    if next_index:
6✔
353
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
354
                        extra_items = self._similar_items(prefix, key, next_index, replace_chars)
6✔
355
                        res += extra_items
6✔
356

357
            index = self.dct.follow_bytes(b_step, index)
6✔
358
            if not index:
6✔
359
                break
6✔
360
            word_pos += 1
6✔
361

362
        else:
363
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
364
            if index:
6!
365
                found_key = current_prefix + key[start_pos:]
6✔
366
                value = self._value_for_index(index)
6✔
367
                res.insert(0, (found_key, value))
6✔
368

369
        return res
6✔
370

371
    def similar_items(self, key: str, replaces: CompiledReplaces) -> list[tuple[str, bytes]]:
6✔
372
        """
373
        Returns a list of (key, value) tuples for all variants of ``key``
374
        in this DAWG according to ``replaces``.
375

376
        ``replaces`` is an object obtained from
377
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
378
        that maps single-char unicode strings to (one or more) single-char
379
        unicode strings.
380
        """
381
        return self._similar_items("", key, self.dct.ROOT, replaces)
6✔
382

383
    def _similar_item_values(
6✔
384
        self,
385
        start_pos: int,
386
        key: str,
387
        index: int,
388
        replace_chars: CompiledReplaces,
389
    ) -> list[bytes]:
390
        res = []
6✔
391
        end_pos = len(key)
6✔
392
        word_pos = start_pos
6✔
393

394
        while word_pos < end_pos:
6✔
395
            b_step = key[word_pos].encode("utf8")
6✔
396

397
            if b_step in replace_chars:
6✔
398
                for b_replace_char, _u_replace_char in replace_chars[b_step]:
6✔
399
                    next_index = index
6✔
400

401
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
402

403
                    if next_index:
6✔
404
                        extra_items = self._similar_item_values(word_pos + 1, key, next_index, replace_chars)
6✔
405
                        res += extra_items
6✔
406

407
            index = self.dct.follow_bytes(b_step, index)
6✔
408
            if not index:
6✔
409
                break
6✔
410
            word_pos += 1
6✔
411

412
        else:
413
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
414
            if index:
6!
415
                value = self._value_for_index(index)
6✔
416
                res.insert(0, value)
6✔
417

418
        return res
6✔
419

420
    def similar_item_values(self, key: str, replaces: CompiledReplaces) -> list[bytes]:
6✔
421
        """
422
        Returns a list of values for all variants of the ``key``
423
        in this DAWG according to ``replaces``.
424

425
        ``replaces`` is an object obtained from
426
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
427
        that maps single-char unicode strings to (one or more) single-char
428
        unicode strings.
429
        """
430
        return self._similar_item_values(0, key, self.dct.ROOT, replaces)
6✔
431

432

433
class RecordDAWG(BytesDAWG):
6✔
434
    def __init__(self, fmt: str | bytes, payload_separator: bytes = PAYLOAD_SEPARATOR) -> None:
6✔
435
        super().__init__(payload_separator)
6✔
436
        self._struct = struct.Struct(fmt)
6✔
437
        self.fmt = fmt
6✔
438

439
    def _value_for_index(self, index: int) -> list[tuple[Any, ...]]:
6✔
440
        value = super()._value_for_index(index)
6✔
441
        return [self._struct.unpack(val) for val in value]
6✔
442

443
    def items(self, prefix: str | bytes = "") -> list[tuple[str, tuple[Any, ...]]]:
6✔
444
        res = super().items(prefix)
6✔
445
        return [(key, self._struct.unpack(val)) for (key, val) in res]
6✔
446

447
    def iteritems(self, prefix: str | bytes = "") -> Generator[tuple[str, tuple[Any, ...]], None, None]:
6✔
448
        res = super().iteritems(prefix)
×
449
        return ((key, self._struct.unpack(val)) for (key, val) in res)
×
450

451

452
LOOKUP_ERROR = -1
6✔
453

454

455
class IntDAWG(DAWG):
6✔
456
    """
457
    Dict-like class based on DAWG.
458
    It can store integer values for unicode keys.
459
    """
460

461
    def __getitem__(self, key: str | bytes) -> int | None:
6✔
462
        res = self.get(key, LOOKUP_ERROR)
6✔
463
        if res == LOOKUP_ERROR:
6✔
464
            raise KeyError(key)
6✔
465
        return res
6✔
466

467
    def get(self, key: str | bytes, default: int | None = None) -> int | None:
6✔
468
        """
469
        Return value for the given key or ``default`` if the key is not found.
470
        """
471
        if not isinstance(key, bytes):
6!
472
            key = key.encode("utf8")
6✔
473
        res = self.b_get_value(key)
6✔
474
        if res == LOOKUP_ERROR:
6✔
475
            return default
6✔
476
        return res
6✔
477

478
    def b_get_value(self, key: bytes) -> int:
6✔
479
        return self.dct.find(key)
6✔
480

481

482
class IntCompletionDAWG(CompletionDAWG, IntDAWG):
6✔
483
    """
484
    Dict-like class based on DAWG.
485
    It can store integer values for unicode keys and support key completion.
486
    """
487

488
    def items(self, prefix=""):
6✔
489
        if not isinstance(prefix, bytes):
6!
490
            prefix = prefix.encode("utf8")
6✔
491
        res = []
6✔
492
        index = self.dct.ROOT
6✔
493

494
        if prefix:
6!
495
            index = self.dct.follow_bytes(prefix, index)
×
496
            if not index:
×
497
                return res
×
498

499
        completer = wrapper.Completer(self.dct, self.guide)
6✔
500
        completer.start(index, prefix)
6✔
501

502
        while completer.next():
6✔
503
            res.append((completer.key.decode("utf8"), completer.value()))
6✔
504

505
        return res
6✔
506

507
    def iteritems(self, prefix=""):
6✔
508
        if not isinstance(prefix, bytes):
×
509
            prefix = prefix.encode("utf8")
×
510
        index = self.dct.ROOT
×
511

512
        if prefix:
×
513
            index = self.dct.follow_bytes(prefix, index)
×
514
            if not index:
×
515
                return
×
516

517
        completer = wrapper.Completer(self.dct, self.guide)
×
518
        completer.start(index, prefix)
×
519

520
        while completer.next():
×
521
            yield completer.key.decode("utf8"), completer.value()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc