• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pymorphy2-fork / DAWG-Python / 13367104793

17 Feb 2025 09:33AM UTC coverage: 86.722% (-0.6%) from 87.367%
13367104793

Pull #41

github

web-flow
Merge 6eca861aa into 44e13ac5b
Pull Request #41: Add annotations to units.py, fix some other typing problems

168 of 218 branches covered (77.06%)

Branch coverage included in aggregate %.

42 of 49 new or added lines in 3 files covered. (85.71%)

32 existing lines in 3 files now uncovered.

668 of 746 relevant lines covered (89.54%)

5.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.65
/dawg_python/dawgs.py
1
from __future__ import annotations
6✔
2

3
import struct
6✔
4
from binascii import a2b_base64
6✔
5
from typing import TYPE_CHECKING
6✔
6

7
from . import wrapper
6✔
8

9
if TYPE_CHECKING:
6!
NEW
10
    from pathlib import Path
×
NEW
11
    from typing import Generator, Mapping
×
12

NEW
13
    from typing_extensions import Self, TypeAlias
×
14

NEW
15
    Replaces: TypeAlias = Mapping[str, str | list[str]]
×
NEW
16
    CompiledReplaces: TypeAlias = Mapping[str, list[tuple[bytes, str]]]
×
17

18

19
class DAWG:
6✔
20
    """
21
    Base DAWG wrapper.
22
    """
23
    dct: wrapper.Dictionary | None
6✔
24

25
    def __init__(self) -> None:
6✔
26
        self.dct = None
6✔
27

28
    def __contains__(self, key) -> bool:
6✔
29
        if not isinstance(key, bytes):
6✔
30
            key = key.encode("utf8")
6✔
31
        return self.dct.contains(key)
6✔
32

33
    def load(self, path: str | Path) -> Self:
6✔
34
        """
35
        Loads DAWG from a file.
36
        """
37
        self.dct = wrapper.Dictionary.load(path)
6✔
38
        return self
6✔
39

40
    def _has_value(self, index: int) -> bool:
6✔
41
        return self.dct.has_value(index)
6✔
42

43
    def _similar_keys(self, current_prefix: str, key: str, index: int, replace_chars: CompiledReplaces) -> list[str]:
6✔
44

45
        res = []
6✔
46
        start_pos = len(current_prefix)
6✔
47
        end_pos = len(key)
6✔
48
        word_pos = start_pos
6✔
49

50
        while word_pos < end_pos:
6✔
51
            b_step = key[word_pos].encode("utf8")
6✔
52

53
            if b_step in replace_chars:
6✔
54
                for (b_replace_char, u_replace_char) in replace_chars[b_step]:
6✔
55
                    next_index = index
6✔
56

57
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
58

59
                    if next_index:
6✔
60
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
61
                        extra_keys = self._similar_keys(prefix, key, next_index, replace_chars)
6✔
62
                        res += extra_keys
6✔
63

64
            index = self.dct.follow_bytes(b_step, index)
6✔
65
            if index is None:
6✔
66
                break
6✔
67
            word_pos += 1
6✔
68

69
        else:
70
            if self._has_value(index):
6!
71
                found_key = current_prefix + key[start_pos:]
6✔
72
                res.insert(0, found_key)
6✔
73

74
        return res
6✔
75

76
    def similar_keys(self, key: str, replaces: CompiledReplaces):
6✔
77
        """
78
        Returns all variants of ``key`` in this DAWG according to
79
        ``replaces``.
80

81
        ``replaces`` is an object obtained from
82
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
83
        that maps single-char unicode strings to (one or more) single-char
84
        unicode strings.
85

86
        This may be useful e.g. for handling single-character umlauts.
87
        """
88
        return self._similar_keys("", key, self.dct.ROOT, replaces)
6✔
89

90
    @classmethod
6✔
91
    def compile_replaces(cls, replaces: Replaces) -> CompiledReplaces:
6✔
92

93
        for k,v in replaces.items():
6✔
94
            if len(k) != 1:
6!
95
                msg = "Keys must be single-char unicode strings."
×
96
                raise ValueError(msg)
×
97
            if (isinstance(v, str) and len(v) != 1):
6!
98
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
UNCOV
99
                raise ValueError(msg)
×
100
            if isinstance(v, list) and (any(len(v_entry) != 1 for v_entry in v) or len(v) < 1):
6!
101
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
UNCOV
102
                raise ValueError(msg)
×
103

104
        return {
6✔
105
            k.encode("utf8"): [(v_entry.encode("utf8"), v_entry) for v_entry in v]
106
            for k, v in replaces.items()
107
        }
108

109
    def prefixes(self, key: str | bytes) -> list[str]:
6✔
110
        """
111
        Returns a list with keys of this DAWG that are prefixes of the ``key``.
112
        """
113
        res = []
6✔
114
        index = self.dct.ROOT
6✔
115
        if not isinstance(key, bytes):
6!
116
            key = key.encode("utf8")
6✔
117

118
        pos = 1
6✔
119

120
        for ch in key:
6✔
121
            index = self.dct.follow_char(ch, index)
6✔
122
            if not index:
6✔
123
                break
6✔
124

125
            if self._has_value(index):
6✔
126
                res.append(key[:pos].decode("utf8"))
6✔
127
            pos += 1
6✔
128

129
        return res
6✔
130

131

132
class CompletionDAWG(DAWG):
6✔
133
    """
134
    DAWG with key completion support.
135
    """
136
    dct: wrapper.Dictionary
6✔
137
    guide: wrapper.Guide | None
6✔
138

139
    def __init__(self) -> None:
6✔
140
        super().__init__()
6✔
141
        self.guide = None
6✔
142

143
    def keys(self, prefix: str = "") -> list[str]:
6✔
144
        b_prefix = prefix.encode("utf8")
6✔
145
        res = []
6✔
146

147
        index = self.dct.follow_bytes(b_prefix, self.dct.ROOT)
6✔
148
        if index is None:
6✔
149
            return res
6✔
150

151
        completer = wrapper.Completer(self.dct, self.guide)
6✔
152
        completer.start(index, b_prefix)
6✔
153

154
        while completer.next():
6✔
155
            key = completer.key.decode("utf8")
6✔
156
            res.append(key)
6✔
157

158
        return res
6✔
159

160
    def iterkeys(self, prefix: str = "") -> Generator[str, None, None]:
6✔
161
        b_prefix = prefix.encode("utf8")
6✔
162
        index = self.dct.follow_bytes(b_prefix, self.dct.ROOT)
6✔
163
        if index is None:
6!
UNCOV
164
            return
×
165

166
        completer = wrapper.Completer(self.dct, self.guide)
6✔
167
        completer.start(index, b_prefix)
6✔
168

169
        while completer.next():
6✔
170
            yield completer.key.decode("utf8")
6✔
171

172
    def load(self, path: str | Path) -> Self:
6✔
173
        """
174
        Loads DAWG from a file.
175
        """
176
        self.dct = wrapper.Dictionary()
6✔
177
        self.guide = wrapper.Guide()
6✔
178

179
        with open(path, "rb") as f:
6✔
180
            self.dct.read(f)
6✔
181
            self.guide.read(f)
6✔
182

183
        return self
6✔
184

185

186
PAYLOAD_SEPARATOR = b"\x01"
6✔
187
MAX_VALUE_SIZE = 32768
6✔
188

189

190
class BytesDAWG(CompletionDAWG):
6✔
191
    """
192
    DAWG that is able to transparently store extra binary payload in keys;
193
    there may be several payloads for the same key.
194

195
    In other words, this class implements read-only DAWG-based
196
    {unicode -> list of bytes objects} mapping.
197
    """
198

199
    def __init__(self, payload_separator=PAYLOAD_SEPARATOR) -> None:
6✔
200
        super().__init__()
6✔
201
        self._payload_separator = payload_separator
6✔
202

203
    def __contains__(self, key) -> bool:
6✔
204
        if not isinstance(key, bytes):
6!
205
            key = key.encode("utf8")
6✔
206
        return bool(self._follow_key(key))
6✔
207

208
    def __getitem__(self, key):
6✔
209
        res = self.get(key)
6✔
210
        if res is None:
6✔
211
            raise KeyError(key)
6✔
212
        return res
6✔
213

214
    def get(self, key, default=None):
6✔
215
        """
216
        Returns a list of payloads (as byte objects) for a given key
217
        or ``default`` if the key is not found.
218
        """
219
        if not isinstance(key, bytes):
6!
220
            key = key.encode("utf8")
6✔
221

222
        return self.b_get_value(key) or default
6✔
223

224
    def _follow_key(self, b_key):
6✔
225
        index = self.dct.follow_bytes(b_key, self.dct.ROOT)
6✔
226
        if not index:
6✔
227
            return False
6✔
228

229
        index = self.dct.follow_bytes(self._payload_separator, index)
6✔
230
        if not index:
6✔
231
            return False
6✔
232

233
        return index
6✔
234

235
    def _value_for_index(self, index):
6✔
236
        res = []
6✔
237

238
        completer = wrapper.Completer(self.dct, self.guide)
6✔
239

240
        completer.start(index)
6✔
241
        while completer.next():
6✔
242
            b64_data = completer.key
6✔
243
            res.append(a2b_base64(b64_data))
6✔
244

245
        return res
6✔
246

247
    def b_get_value(self, b_key):
6✔
248
        index = self._follow_key(b_key)
6✔
249
        if not index:
6✔
250
            return []
6✔
251
        return self._value_for_index(index)
6✔
252

253
    def keys(self, prefix=""):
6✔
254
        if not isinstance(prefix, bytes):
6!
255
            prefix = prefix.encode("utf8")
6✔
256
        res = []
6✔
257

258
        index = self.dct.ROOT
6✔
259

260
        if prefix:
6✔
261
            index = self.dct.follow_bytes(prefix, index)
6✔
262
            if not index:
6✔
263
                return res
6✔
264

265
        completer = wrapper.Completer(self.dct, self.guide)
6✔
266
        completer.start(index, prefix)
6✔
267

268
        while completer.next():
6✔
269
            payload_idx = completer.key.index(self._payload_separator)
6✔
270
            u_key = completer.key[:payload_idx].decode("utf8")
6✔
271
            res.append(u_key)
6✔
272
        return res
6✔
273

274
    def iterkeys(self, prefix=""):
6✔
275
        if not isinstance(prefix, bytes):
6!
276
            prefix = prefix.encode("utf8")
6✔
277

278
        index = self.dct.ROOT
6✔
279

280
        if prefix:
6!
281
            index = self.dct.follow_bytes(prefix, index)
×
282
            if not index:
×
283
                return
×
284

285
        completer = wrapper.Completer(self.dct, self.guide)
6✔
286
        completer.start(index, prefix)
6✔
287

288
        while completer.next():
6✔
289
            payload_idx = completer.key.index(self._payload_separator)
6✔
290
            u_key = completer.key[:payload_idx].decode("utf8")
6✔
291
            yield u_key
6✔
292

293
    def items(self, prefix=""):
6✔
294
        if not isinstance(prefix, bytes):
6!
295
            prefix = prefix.encode("utf8")
6✔
296
        res = []
6✔
297

298
        index = self.dct.ROOT
6✔
299
        if prefix:
6✔
300
            index = self.dct.follow_bytes(prefix, index)
6✔
301
            if not index:
6!
UNCOV
302
                return res
×
303

304
        completer = wrapper.Completer(self.dct, self.guide)
6✔
305
        completer.start(index, prefix)
6✔
306

307
        while completer.next():
6✔
308
            key, value = completer.key.split(self._payload_separator)
6✔
309
            res.append((key.decode("utf8"), a2b_base64(value)))
6✔
310

311
        return res
6✔
312

313
    def iteritems(self, prefix=""):
6✔
314
        if not isinstance(prefix, bytes):
6!
315
            prefix = prefix.encode("utf8")
6✔
316

317
        index = self.dct.ROOT
6✔
318
        if prefix:
6✔
319
            index = self.dct.follow_bytes(prefix, index)
6✔
320
            if not index:
6✔
321
                return
6✔
322

323
        completer = wrapper.Completer(self.dct, self.guide)
6✔
324
        completer.start(index, prefix)
6✔
325

326
        while completer.next():
6✔
327
            key, value = completer.key.split(self._payload_separator)
6✔
328
            item = (key.decode("utf8"), a2b_base64(value))
6✔
329
            yield item
6✔
330

331
    def _has_value(self, index):
6✔
332
        return self.dct.follow_bytes(PAYLOAD_SEPARATOR, index)
6✔
333

334
    def _similar_items(self, current_prefix, key, index, replace_chars):
6✔
335

336
        res = []
6✔
337
        start_pos = len(current_prefix)
6✔
338
        end_pos = len(key)
6✔
339
        word_pos = start_pos
6✔
340

341
        while word_pos < end_pos:
6✔
342
            b_step = key[word_pos].encode("utf8")
6✔
343

344
            if b_step in replace_chars:
6✔
345
                for (b_replace_char, u_replace_char) in replace_chars[b_step]:
6✔
346
                    next_index = index
6✔
347

348
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
349

350
                    if next_index:
6✔
351
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
352
                        extra_items = self._similar_items(prefix, key, next_index, replace_chars)
6✔
353
                        res += extra_items
6✔
354

355
            index = self.dct.follow_bytes(b_step, index)
6✔
356
            if not index:
6✔
357
                break
6✔
358
            word_pos += 1
6✔
359

360
        else:
361
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
362
            if index:
6!
363
                found_key = current_prefix + key[start_pos:]
6✔
364
                value = self._value_for_index(index)
6✔
365
                res.insert(0, (found_key, value))
6✔
366

367
        return res
6✔
368

369
    def similar_items(self, key, replaces):
6✔
370
        """
371
        Returns a list of (key, value) tuples for all variants of ``key``
372
        in this DAWG according to ``replaces``.
373

374
        ``replaces`` is an object obtained from
375
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
376
        that maps single-char unicode strings to (one or more) single-char
377
        unicode strings.
378
        """
379
        return self._similar_items("", key, self.dct.ROOT, replaces)
6✔
380

381
    def _similar_item_values(self, start_pos, key, index, replace_chars):
6✔
382
        res = []
6✔
383
        end_pos = len(key)
6✔
384
        word_pos = start_pos
6✔
385

386
        while word_pos < end_pos:
6✔
387
            b_step = key[word_pos].encode("utf8")
6✔
388

389
            if b_step in replace_chars:
6✔
390
                for (b_replace_char, _u_replace_char) in replace_chars[b_step]:
6✔
391
                    next_index = index
6✔
392

393
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
394

395
                    if next_index:
6✔
396
                        extra_items = self._similar_item_values(word_pos + 1, key, next_index, replace_chars)
6✔
397
                        res += extra_items
6✔
398

399
            index = self.dct.follow_bytes(b_step, index)
6✔
400
            if not index:
6✔
401
                break
6✔
402
            word_pos += 1
6✔
403

404
        else:
405
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
406
            if index:
6!
407
                value = self._value_for_index(index)
6✔
408
                res.insert(0, value)
6✔
409

410
        return res
6✔
411

412
    def similar_item_values(self, key, replaces):
6✔
413
        """
414
        Returns a list of values for all variants of the ``key``
415
        in this DAWG according to ``replaces``.
416

417
        ``replaces`` is an object obtained from
418
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
419
        that maps single-char unicode strings to (one or more) single-char
420
        unicode strings.
421
        """
422
        return self._similar_item_values(0, key, self.dct.ROOT, replaces)
6✔
423

424

425
class RecordDAWG(BytesDAWG):
6✔
426
    def __init__(self, fmt, payload_separator=PAYLOAD_SEPARATOR) -> None:
6✔
427
        super().__init__(payload_separator)
6✔
428
        self._struct = struct.Struct(str(fmt))
6✔
429
        self.fmt = fmt
6✔
430

431
    def _value_for_index(self, index):
6✔
432
        value = super()._value_for_index(index)
6✔
433
        return [self._struct.unpack(val) for val in value]
6✔
434

435
    def items(self, prefix=""):
6✔
436
        res = super().items(prefix)
6✔
437
        return [(key, self._struct.unpack(val)) for (key, val) in res]
6✔
438

439
    def iteritems(self, prefix=""):
6✔
440
        res = super().iteritems(prefix)
×
441
        return ((key, self._struct.unpack(val)) for (key, val) in res)
×
442

443

444
LOOKUP_ERROR = -1
6✔
445

446

447
class IntDAWG(DAWG):
6✔
448
    """
449
    Dict-like class based on DAWG.
450
    It can store integer values for unicode keys.
451
    """
452

453
    def __getitem__(self, key):
6✔
454
        res = self.get(key, LOOKUP_ERROR)
6✔
455
        if res == LOOKUP_ERROR:
6✔
456
            raise KeyError(key)
6✔
457
        return res
6✔
458

459
    def get(self, key, default=None):
6✔
460
        """
461
        Return value for the given key or ``default`` if the key is not found.
462
        """
463
        if not isinstance(key, bytes):
6!
464
            key = key.encode("utf8")
6✔
465
        res = self.b_get_value(key)
6✔
466
        if res == LOOKUP_ERROR:
6✔
467
            return default
6✔
468
        return res
6✔
469

470
    def b_get_value(self, key):
6✔
471
        return self.dct.find(key)
6✔
472

473

474
class IntCompletionDAWG(CompletionDAWG, IntDAWG):
6✔
475
    """
476
    Dict-like class based on DAWG.
477
    It can store integer values for unicode keys and support key completion.
478
    """
479

480
    def items(self, prefix=""):
6✔
481
        if not isinstance(prefix, bytes):
6!
482
            prefix = prefix.encode("utf8")
6✔
483
        res = []
6✔
484
        index = self.dct.ROOT
6✔
485

486
        if prefix:
6!
UNCOV
487
            index = self.dct.follow_bytes(prefix, index)
×
UNCOV
488
            if not index:
×
UNCOV
489
                return res
×
490

491
        completer = wrapper.Completer(self.dct, self.guide)
6✔
492
        completer.start(index, prefix)
6✔
493

494
        while completer.next():
6✔
495
            res.append((completer.key.decode("utf8"), completer.value()))
6✔
496

497
        return res
6✔
498

499
    def iteritems(self, prefix=""):
6✔
UNCOV
500
        if not isinstance(prefix, bytes):
×
UNCOV
501
            prefix = prefix.encode("utf8")
×
UNCOV
502
        index = self.dct.ROOT
×
503

UNCOV
504
        if prefix:
×
UNCOV
505
            index = self.dct.follow_bytes(prefix, index)
×
UNCOV
506
            if not index:
×
UNCOV
507
                return
×
508

UNCOV
509
        completer = wrapper.Completer(self.dct, self.guide)
×
UNCOV
510
        completer.start(index, prefix)
×
511

UNCOV
512
        while completer.next():
×
UNCOV
513
            yield completer.key.decode("utf8"), completer.value()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc