• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pymorphy2-fork / DAWG-Python / 13369628967

17 Feb 2025 11:53AM UTC coverage: 89.379% (+2.0%) from 87.367%
13369628967

Pull #41

github

web-flow
Merge 096eb9956 into 44e13ac5b
Pull Request #41: Add annotations to units.py, fix some other typing problems

155 of 188 branches covered (82.45%)

Branch coverage included in aggregate %.

82 of 90 new or added lines in 3 files covered. (91.11%)

2 existing lines in 1 file now uncovered.

636 of 697 relevant lines covered (91.25%)

5.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.84
/dawg_python/dawgs.py
1
from __future__ import annotations
6✔
2

3
import struct
6✔
4
from binascii import a2b_base64
6✔
5
from typing import TYPE_CHECKING
6✔
6

7
from . import wrapper
6✔
8

9
if TYPE_CHECKING:
6!
NEW
10
    from pathlib import Path
×
NEW
11
    from typing import Any, Generator, Mapping
×
12

NEW
13
    from typing_extensions import Self, TypeAlias
×
14

NEW
15
    Replaces: TypeAlias = Mapping[str, str | list[str]]
×
NEW
16
    CompiledReplaces: TypeAlias = Mapping[str, list[tuple[bytes, str]]]
×
17

18

19
class DAWG:
6✔
20
    """
21
    Base DAWG wrapper.
22
    """
23

24
    dct: wrapper.Dictionary | None
6✔
25

26
    def __init__(self) -> None:
6✔
27
        self.dct = None
6✔
28

29
    def __contains__(self, key: str | bytes) -> bool:
6✔
30
        if not isinstance(key, bytes):
6✔
31
            key = key.encode("utf8")
6✔
32
        return self.dct.contains(key)
6✔
33

34
    def load(self, path: str | Path) -> Self:
6✔
35
        """
36
        Loads DAWG from a file.
37
        """
38
        self.dct = wrapper.Dictionary.load(path)
6✔
39
        return self
6✔
40

41
    def _has_value(self, index: int) -> bool:
6✔
42
        return self.dct.has_value(index)
6✔
43

44
    def _similar_keys(self, current_prefix: str, key: str, index: int, replace_chars: CompiledReplaces) -> list[str]:
6✔
45
        res = []
6✔
46
        start_pos = len(current_prefix)
6✔
47
        end_pos = len(key)
6✔
48
        word_pos = start_pos
6✔
49

50
        while word_pos < end_pos:
6✔
51
            b_step = key[word_pos].encode("utf8")
6✔
52

53
            if b_step in replace_chars:
6✔
54
                for b_replace_char, u_replace_char in replace_chars[b_step]:
6✔
55
                    next_index = index
6✔
56

57
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
58

59
                    if next_index:
6✔
60
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
61
                        extra_keys = self._similar_keys(prefix, key, next_index, replace_chars)
6✔
62
                        res += extra_keys
6✔
63

64
            index = self.dct.follow_bytes(b_step, index)
6✔
65
            if index is None:
6✔
66
                break
6✔
67
            word_pos += 1
6✔
68

69
        else:
70
            if self._has_value(index):
6!
71
                found_key = current_prefix + key[start_pos:]
6✔
72
                res.insert(0, found_key)
6✔
73

74
        return res
6✔
75

76
    def similar_keys(self, key: str, replaces: CompiledReplaces) -> list[str]:
6✔
77
        """
78
        Returns all variants of ``key`` in this DAWG according to
79
        ``replaces``.
80

81
        ``replaces`` is an object obtained from
82
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
83
        that maps single-char unicode strings to (one or more) single-char
84
        unicode strings.
85

86
        This may be useful e.g. for handling single-character umlauts.
87
        """
88
        return self._similar_keys("", key, self.dct.ROOT, replaces)
6✔
89

90
    @classmethod
6✔
91
    def compile_replaces(cls, replaces: Replaces) -> CompiledReplaces:
6✔
92
        for k, v in replaces.items():
6✔
93
            if len(k) != 1:
6!
94
                msg = "Keys must be single-char unicode strings."
×
95
                raise ValueError(msg)
×
96
            if isinstance(v, str) and len(v) != 1:
6!
97
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
98
                raise ValueError(msg)
×
99
            if isinstance(v, list) and (any(len(v_entry) != 1 for v_entry in v) or len(v) < 1):
6!
100
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
101
                raise ValueError(msg)
×
102

103
        return {k.encode("utf8"): [(v_entry.encode("utf8"), v_entry) for v_entry in v] for k, v in replaces.items()}
6✔
104

105
    def prefixes(self, key: str | bytes) -> list[str]:
6✔
106
        """
107
        Returns a list with keys of this DAWG that are prefixes of the ``key``.
108
        """
109
        res = []
6✔
110
        index = self.dct.ROOT
6✔
111
        if not isinstance(key, bytes):
6!
112
            key = key.encode("utf8")
6✔
113

114
        pos = 1
6✔
115

116
        for ch in key:
6✔
117
            index = self.dct.follow_char(ch, index)
6✔
118
            if not index:
6✔
119
                break
6✔
120

121
            if self._has_value(index):
6✔
122
                res.append(key[:pos].decode("utf8"))
6✔
123
            pos += 1
6✔
124

125
        return res
6✔
126

127

128
class CompletionDAWG(DAWG):
6✔
129
    """
130
    DAWG with key completion support.
131
    """
132

133
    dct: wrapper.Dictionary
6✔
134
    guide: wrapper.Guide | None
6✔
135

136
    def __init__(self) -> None:
6✔
137
        super().__init__()
6✔
138
        self.guide = None
6✔
139

140
    def keys(self, prefix: str = "") -> list[str]:
6✔
141
        return list(self.iterkeys(prefix))
6✔
142

143
    def iterkeys(self, prefix: str = "") -> Generator[str, None, None]:
6✔
144
        b_prefix = prefix.encode("utf8")
6✔
145
        index = self.dct.follow_bytes(b_prefix, self.dct.ROOT)
6✔
146
        if index is None:
6✔
147
            return
6✔
148

149
        completer = wrapper.Completer(self.dct, self.guide)
6✔
150
        completer.start(index, b_prefix)
6✔
151

152
        while completer.next():
6✔
153
            yield completer.key.decode("utf8")
6✔
154

155
    def load(self, path: str | Path) -> Self:
6✔
156
        """
157
        Loads DAWG from a file.
158
        """
159
        self.dct = wrapper.Dictionary()
6✔
160
        self.guide = wrapper.Guide()
6✔
161

162
        with open(path, "rb") as f:
6✔
163
            self.dct.read(f)
6✔
164
            self.guide.read(f)
6✔
165

166
        return self
6✔
167

168

169
PAYLOAD_SEPARATOR = b"\x01"
6✔
170
MAX_VALUE_SIZE = 32768
6✔
171

172

173
class BytesDAWG(CompletionDAWG):
6✔
174
    """
175
    DAWG that is able to transparently store extra binary payload in keys;
176
    there may be several payloads for the same key.
177

178
    In other words, this class implements read-only DAWG-based
179
    {unicode -> list of bytes objects} mapping.
180
    """
181

182
    def __init__(self, payload_separator: bytes | None = PAYLOAD_SEPARATOR) -> None:
6✔
183
        super().__init__()
6✔
184
        self._payload_separator = payload_separator
6✔
185

186
    def __contains__(self, key: str | bytes) -> bool:
6✔
187
        if not isinstance(key, bytes):
6!
188
            key = key.encode("utf8")
6✔
189
        return bool(self._follow_key(key))
6✔
190

191
    def __getitem__(self, key: str | bytes) -> list[bytes]:
6✔
192
        res = self.get(key)
6✔
193
        if res is None:
6✔
194
            raise KeyError(key)
6✔
195
        return res
6✔
196

197
    def get(self, key: str | bytes, default: list[bytes] | None = None) -> list[bytes] | None:
6✔
198
        """
199
        Returns a list of payloads (as byte objects) for a given key
200
        or ``default`` if the key is not found.
201
        """
202
        if not isinstance(key, bytes):
6!
203
            key = key.encode("utf8")
6✔
204

205
        return self.b_get_value(key) or default
6✔
206

207
    def _follow_key(self, b_key: bytes) -> int | None:
6✔
208
        index = self.dct.follow_bytes(b_key, self.dct.ROOT)
6✔
209
        if not index:
6✔
210
            return None
6✔
211

212
        index = self.dct.follow_bytes(self._payload_separator, index)
6✔
213
        if not index:
6✔
214
            return None
6✔
215

216
        return index
6✔
217

218
    def _value_for_index(self, index: int) -> list[bytes]:
6✔
219
        res = []
6✔
220

221
        completer = wrapper.Completer(self.dct, self.guide)
6✔
222

223
        completer.start(index)
6✔
224
        while completer.next():
6✔
225
            b64_data = completer.key
6✔
226
            res.append(a2b_base64(b64_data))
6✔
227

228
        return res
6✔
229

230
    def b_get_value(self, b_key: bytes) -> list[bytes]:
6✔
231
        index = self._follow_key(b_key)
6✔
232
        if not index:
6✔
233
            return []
6✔
234
        return self._value_for_index(index)
6✔
235

236
    def keys(self, prefix: str | bytes = "") -> list[str]:
6✔
237
        return list(self.iterkeys(prefix))
6✔
238

239
    def iterkeys(self, prefix: str | bytes = "") -> Generator[bytes, None, None]:
6✔
240
        if not isinstance(prefix, bytes):
6!
241
            prefix = prefix.encode("utf8")
6✔
242

243
        index = self.dct.ROOT
6✔
244

245
        if prefix:
6✔
246
            index = self.dct.follow_bytes(prefix, index)
6✔
247
            if not index:
6✔
248
                return
6✔
249

250
        completer = wrapper.Completer(self.dct, self.guide)
6✔
251
        completer.start(index, prefix)
6✔
252

253
        while completer.next():
6✔
254
            payload_idx = completer.key.index(self._payload_separator)
6✔
255
            u_key = completer.key[:payload_idx].decode("utf8")
6✔
256
            yield u_key
6✔
257

258
    def items(self, prefix: str | bytes = "") -> list[tuple[str, bytes]]:
6✔
259
        return list(self.iteritems(prefix))
6✔
260

261
    def iteritems(self, prefix: str | bytes = "") -> Generator[tuple[str, bytes], None, None]:
6✔
262
        if not isinstance(prefix, bytes):
6!
263
            prefix = prefix.encode("utf8")
6✔
264

265
        index = self.dct.ROOT
6✔
266
        if prefix:
6✔
267
            index = self.dct.follow_bytes(prefix, index)
6✔
268
            if not index:
6✔
269
                return
6✔
270

271
        completer = wrapper.Completer(self.dct, self.guide)
6✔
272
        completer.start(index, prefix)
6✔
273

274
        while completer.next():
6✔
275
            key, value = completer.key.split(self._payload_separator)
6✔
276
            item = (key.decode("utf8"), a2b_base64(value))
6✔
277
            yield item
6✔
278

279
    def _has_value(self, index: int) -> int | None:
6✔
280
        return self.dct.follow_bytes(PAYLOAD_SEPARATOR, index)
6✔
281

282
    def _similar_items(
6✔
283
        self,
284
        current_prefix: str,
285
        key: str,
286
        index: int,
287
        replace_chars: CompiledReplaces,
288
    ) -> list[tuple[str, bytes]]:
289
        res = []
6✔
290
        start_pos = len(current_prefix)
6✔
291
        end_pos = len(key)
6✔
292
        word_pos = start_pos
6✔
293

294
        while word_pos < end_pos:
6✔
295
            b_step = key[word_pos].encode("utf8")
6✔
296

297
            if b_step in replace_chars:
6✔
298
                for b_replace_char, u_replace_char in replace_chars[b_step]:
6✔
299
                    next_index = index
6✔
300

301
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
302

303
                    if next_index:
6✔
304
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
305
                        extra_items = self._similar_items(prefix, key, next_index, replace_chars)
6✔
306
                        res += extra_items
6✔
307

308
            index = self.dct.follow_bytes(b_step, index)
6✔
309
            if not index:
6✔
310
                break
6✔
311
            word_pos += 1
6✔
312

313
        else:
314
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
315
            if index:
6!
316
                found_key = current_prefix + key[start_pos:]
6✔
317
                value = self._value_for_index(index)
6✔
318
                res.insert(0, (found_key, value))
6✔
319

320
        return res
6✔
321

322
    def similar_items(self, key: str, replaces: CompiledReplaces) -> list[tuple[str, bytes]]:
6✔
323
        """
324
        Returns a list of (key, value) tuples for all variants of ``key``
325
        in this DAWG according to ``replaces``.
326

327
        ``replaces`` is an object obtained from
328
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
329
        that maps single-char unicode strings to (one or more) single-char
330
        unicode strings.
331
        """
332
        return self._similar_items("", key, self.dct.ROOT, replaces)
6✔
333

334
    def _similar_item_values(
6✔
335
        self,
336
        start_pos: int,
337
        key: str,
338
        index: int,
339
        replace_chars: CompiledReplaces,
340
    ) -> list[bytes]:
341
        res = []
6✔
342
        end_pos = len(key)
6✔
343
        word_pos = start_pos
6✔
344

345
        while word_pos < end_pos:
6✔
346
            b_step = key[word_pos].encode("utf8")
6✔
347

348
            if b_step in replace_chars:
6✔
349
                for b_replace_char, _u_replace_char in replace_chars[b_step]:
6✔
350
                    next_index = index
6✔
351

352
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
353

354
                    if next_index:
6✔
355
                        extra_items = self._similar_item_values(word_pos + 1, key, next_index, replace_chars)
6✔
356
                        res += extra_items
6✔
357

358
            index = self.dct.follow_bytes(b_step, index)
6✔
359
            if not index:
6✔
360
                break
6✔
361
            word_pos += 1
6✔
362

363
        else:
364
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
365
            if index:
6!
366
                value = self._value_for_index(index)
6✔
367
                res.insert(0, value)
6✔
368

369
        return res
6✔
370

371
    def similar_item_values(self, key: str, replaces: CompiledReplaces) -> list[bytes]:
6✔
372
        """
373
        Returns a list of values for all variants of the ``key``
374
        in this DAWG according to ``replaces``.
375

376
        ``replaces`` is an object obtained from
377
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
378
        that maps single-char unicode strings to (one or more) single-char
379
        unicode strings.
380
        """
381
        return self._similar_item_values(0, key, self.dct.ROOT, replaces)
6✔
382

383

384
class RecordDAWG(BytesDAWG):
6✔
385
    def __init__(self, fmt: str | bytes, payload_separator: bytes = PAYLOAD_SEPARATOR) -> None:
6✔
386
        super().__init__(payload_separator)
6✔
387
        self._struct = struct.Struct(fmt)
6✔
388
        self.fmt = fmt
6✔
389

390
    def _value_for_index(self, index: int) -> list[tuple[Any, ...]]:
6✔
391
        value = super()._value_for_index(index)
6✔
392
        return [self._struct.unpack(val) for val in value]
6✔
393

394
    def items(self, prefix: str | bytes = "") -> list[tuple[str, tuple[Any, ...]]]:
6✔
395
        return list(self.iteritems(prefix))
6✔
396

397
    def iteritems(self, prefix: str | bytes = "") -> Generator[tuple[str, tuple[Any, ...]], None, None]:
6✔
398
        res = super().iteritems(prefix)
6✔
399
        return ((key, self._struct.unpack(val)) for (key, val) in res)
6✔
400

401

402
LOOKUP_ERROR = -1
6✔
403

404

405
class IntDAWG(DAWG):
6✔
406
    """
407
    Dict-like class based on DAWG.
408
    It can store integer values for unicode keys.
409
    """
410

411
    def __getitem__(self, key: str | bytes) -> int | None:
6✔
412
        res = self.get(key, LOOKUP_ERROR)
6✔
413
        if res == LOOKUP_ERROR:
6✔
414
            raise KeyError(key)
6✔
415
        return res
6✔
416

417
    def get(self, key: str | bytes, default: int | None = None) -> int | None:
6✔
418
        """
419
        Return value for the given key or ``default`` if the key is not found.
420
        """
421
        if not isinstance(key, bytes):
6!
422
            key = key.encode("utf8")
6✔
423
        res = self.b_get_value(key)
6✔
424
        if res == LOOKUP_ERROR:
6✔
425
            return default
6✔
426
        return res
6✔
427

428
    def b_get_value(self, key: bytes) -> int:
6✔
429
        return self.dct.find(key)
6✔
430

431

432
class IntCompletionDAWG(CompletionDAWG, IntDAWG):
6✔
433
    """
434
    Dict-like class based on DAWG.
435
    It can store integer values for unicode keys and support key completion.
436
    """
437

438
    def items(self, prefix: str | bytes = "") -> list[tuple[str, int]]:
6✔
439
        return list(self.iteritems(prefix))
6✔
440

441
    def iteritems(self, prefix: str | bytes = "") -> Generator[tuple[str, int], None, None]:
6✔
442
        if not isinstance(prefix, bytes):
6!
443
            prefix = prefix.encode("utf8")
6✔
444
        index = self.dct.ROOT
6✔
445

446
        if prefix:
6!
447
            index = self.dct.follow_bytes(prefix, index)
×
448
            if not index:
×
449
                return
×
450

451
        completer = wrapper.Completer(self.dct, self.guide)
6✔
452
        completer.start(index, prefix)
6✔
453

454
        while completer.next():
6✔
455
            yield completer.key.decode("utf8"), completer.value()
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc