• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pymorphy2-fork / DAWG-Python / 10228458181

03 Aug 2024 01:39PM UTC coverage: 87.992% (-0.3%) from 88.259%
10228458181

push

github

insolor
Add comment about silenced annotation warnings

215 of 263 branches covered (81.75%)

Branch coverage included in aggregate %.

657 of 728 relevant lines covered (90.25%)

5.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.73
/dawg_python/dawgs.py
1
import struct
6✔
2
from binascii import a2b_base64
6✔
3

4
from . import wrapper
6✔
5

6

7
class DAWG:
6✔
8
    """
9
    Base DAWG wrapper.
10
    """
11

12
    def __init__(self) -> None:
6✔
13
        self.dct = None
6✔
14

15
    def __contains__(self, key) -> bool:
6✔
16
        if not isinstance(key, bytes):
6✔
17
            key = key.encode("utf8")
6✔
18
        return self.dct.contains(key)
6✔
19

20
    def load(self, path):
6✔
21
        """
22
        Loads DAWG from a file.
23
        """
24
        self.dct = wrapper.Dictionary.load(path)
6✔
25
        return self
6✔
26

27
    def _has_value(self, index):
6✔
28
        return self.dct.has_value(index)
6✔
29

30
    def _similar_keys(self, current_prefix, key, index, replace_chars):
6✔
31

32
        res = []
6✔
33
        start_pos = len(current_prefix)
6✔
34
        end_pos = len(key)
6✔
35
        word_pos = start_pos
6✔
36

37
        while word_pos < end_pos:
6✔
38
            b_step = key[word_pos].encode("utf8")
6✔
39

40
            if b_step in replace_chars:
6✔
41
                for (b_replace_char, u_replace_char) in replace_chars[b_step]:
6✔
42
                    next_index = index
6✔
43

44
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
45

46
                    if next_index:
6✔
47
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
48
                        extra_keys = self._similar_keys(prefix, key, next_index, replace_chars)
6✔
49
                        res += extra_keys
6✔
50

51
            index = self.dct.follow_bytes(b_step, index)
6✔
52
            if index is None:
6✔
53
                break
6✔
54
            word_pos += 1
6✔
55

56
        else:
57
            if self._has_value(index):
6!
58
                found_key = current_prefix + key[start_pos:]
6✔
59
                res.insert(0, found_key)
6✔
60

61
        return res
6✔
62

63
    def similar_keys(self, key, replaces):
6✔
64
        """
65
        Returns all variants of ``key`` in this DAWG according to
66
        ``replaces``.
67

68
        ``replaces`` is an object obtained from
69
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
70
        that maps single-char unicode strings to (one or more) single-char
71
        unicode strings.
72

73
        This may be useful e.g. for handling single-character umlauts.
74
        """
75
        return self._similar_keys("", key, self.dct.ROOT, replaces)
6✔
76

77
    @classmethod
6✔
78
    def compile_replaces(cls, replaces):
6✔
79

80
        for k,v in replaces.items():
6✔
81
            if len(k) != 1:
6!
82
                msg = "Keys must be single-char unicode strings."
×
83
                raise ValueError(msg)
×
84
            if (isinstance(v, str) and len(v) != 1):
6!
85
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
86
                raise ValueError(msg)
×
87
            if isinstance(v, list) and (any(len(v_entry) != 1 for v_entry in v) or len(v) < 1):
6!
88
                msg = "Values must be single-char unicode strings or non-empty lists of such."
×
89
                raise ValueError(msg)
×
90

91
        return {
6✔
92
            k.encode("utf8"): [(v_entry.encode("utf8"), v_entry) for v_entry in v]
93
            for k, v in replaces.items()
94
        }
95

96
    def prefixes(self, key):
6✔
97
        """
98
        Returns a list with keys of this DAWG that are prefixes of the ``key``.
99
        """
100
        res = []
6✔
101
        index = self.dct.ROOT
6✔
102
        if not isinstance(key, bytes):
6!
103
            key = key.encode("utf8")
6✔
104

105
        pos = 1
6✔
106

107
        for ch in key:
6✔
108
            index = self.dct.follow_char(ch, index)
6✔
109
            if not index:
6✔
110
                break
6✔
111

112
            if self._has_value(index):
6✔
113
                res.append(key[:pos].decode("utf8"))
6✔
114
            pos += 1
6✔
115

116
        return res
6✔
117

118

119
class CompletionDAWG(DAWG):
6✔
120
    """
121
    DAWG with key completion support.
122
    """
123

124
    def __init__(self) -> None:
6✔
125
        super().__init__()
6✔
126
        self.guide = None
6✔
127

128
    def keys(self, prefix=""):
6✔
129
        b_prefix = prefix.encode("utf8")
6✔
130
        res = []
6✔
131

132
        index = self.dct.follow_bytes(b_prefix, self.dct.ROOT)
6✔
133
        if index is None:
6✔
134
            return res
6✔
135

136
        completer = wrapper.Completer(self.dct, self.guide)
6✔
137
        completer.start(index, b_prefix)
6✔
138

139
        while completer.next():
6✔
140
            key = completer.key.decode("utf8")
6✔
141
            res.append(key)
6✔
142

143
        return res
6✔
144

145
    def iterkeys(self, prefix=""):
6✔
146
        b_prefix = prefix.encode("utf8")
6✔
147
        index = self.dct.follow_bytes(b_prefix, self.dct.ROOT)
6✔
148
        if index is None:
6!
149
            return
×
150

151
        completer = wrapper.Completer(self.dct, self.guide)
6✔
152
        completer.start(index, b_prefix)
6✔
153

154
        while completer.next():
6✔
155
            yield completer.key.decode("utf8")
6✔
156

157
    def load(self, path):
6✔
158
        """
159
        Loads DAWG from a file.
160
        """
161
        self.dct = wrapper.Dictionary()
6✔
162
        self.guide = wrapper.Guide()
6✔
163

164
        with open(path, "rb") as f:
6✔
165
            self.dct.read(f)
6✔
166
            self.guide.read(f)
6✔
167

168
        return self
6✔
169

170

171
PAYLOAD_SEPARATOR = b"\x01"
6✔
172
MAX_VALUE_SIZE = 32768
6✔
173

174

175
class BytesDAWG(CompletionDAWG):
6✔
176
    """
177
    DAWG that is able to transparently store extra binary payload in keys;
178
    there may be several payloads for the same key.
179

180
    In other words, this class implements read-only DAWG-based
181
    {unicode -> list of bytes objects} mapping.
182
    """
183

184
    def __init__(self, payload_separator=PAYLOAD_SEPARATOR) -> None:
6✔
185
        super().__init__()
6✔
186
        self._payload_separator = payload_separator
6✔
187

188
    def __contains__(self, key) -> bool:
6✔
189
        if not isinstance(key, bytes):
6!
190
            key = key.encode("utf8")
6✔
191
        return bool(self._follow_key(key))
6✔
192

193
    def __getitem__(self, key):
6✔
194
        res = self.get(key)
6✔
195
        if res is None:
6✔
196
            raise KeyError(key)
6✔
197
        return res
6✔
198

199
    def get(self, key, default=None):
6✔
200
        """
201
        Returns a list of payloads (as byte objects) for a given key
202
        or ``default`` if the key is not found.
203
        """
204
        if not isinstance(key, bytes):
6!
205
            key = key.encode("utf8")
6✔
206

207
        return self.b_get_value(key) or default
6✔
208

209
    def _follow_key(self, b_key):
6✔
210
        index = self.dct.follow_bytes(b_key, self.dct.ROOT)
6✔
211
        if not index:
6✔
212
            return False
6✔
213

214
        index = self.dct.follow_bytes(self._payload_separator, index)
6✔
215
        if not index:
6✔
216
            return False
6✔
217

218
        return index
6✔
219

220
    def _value_for_index(self, index):
6✔
221
        res = []
6✔
222

223
        completer = wrapper.Completer(self.dct, self.guide)
6✔
224

225
        completer.start(index)
6✔
226
        while completer.next():
6✔
227
            b64_data = completer.key
6✔
228
            res.append(a2b_base64(b64_data))
6✔
229

230
        return res
6✔
231

232
    def b_get_value(self, b_key):
6✔
233
        index = self._follow_key(b_key)
6✔
234
        if not index:
6✔
235
            return []
6✔
236
        return self._value_for_index(index)
6✔
237

238
    def keys(self, prefix=""):
6✔
239
        if not isinstance(prefix, bytes):
6!
240
            prefix = prefix.encode("utf8")
6✔
241
        res = []
6✔
242

243
        index = self.dct.ROOT
6✔
244

245
        if prefix:
6✔
246
            index = self.dct.follow_bytes(prefix, index)
6✔
247
            if not index:
6✔
248
                return res
6✔
249

250
        completer = wrapper.Completer(self.dct, self.guide)
6✔
251
        completer.start(index, prefix)
6✔
252

253
        while completer.next():
6✔
254
            payload_idx = completer.key.index(self._payload_separator)
6✔
255
            u_key = completer.key[:payload_idx].decode("utf8")
6✔
256
            res.append(u_key)
6✔
257
        return res
6✔
258

259
    def iterkeys(self, prefix=""):
6✔
260
        if not isinstance(prefix, bytes):
6!
261
            prefix = prefix.encode("utf8")
6✔
262

263
        index = self.dct.ROOT
6✔
264

265
        if prefix:
6!
266
            index = self.dct.follow_bytes(prefix, index)
×
267
            if not index:
×
268
                return
×
269

270
        completer = wrapper.Completer(self.dct, self.guide)
6✔
271
        completer.start(index, prefix)
6✔
272

273
        while completer.next():
6✔
274
            payload_idx = completer.key.index(self._payload_separator)
6✔
275
            u_key = completer.key[:payload_idx].decode("utf8")
6✔
276
            yield u_key
6✔
277

278
    def items(self, prefix=""):
6✔
279
        if not isinstance(prefix, bytes):
6!
280
            prefix = prefix.encode("utf8")
6✔
281
        res = []
6✔
282

283
        index = self.dct.ROOT
6✔
284
        if prefix:
6✔
285
            index = self.dct.follow_bytes(prefix, index)
6✔
286
            if not index:
6!
287
                return res
×
288

289
        completer = wrapper.Completer(self.dct, self.guide)
6✔
290
        completer.start(index, prefix)
6✔
291

292
        while completer.next():
6✔
293
            key, value = completer.key.split(self._payload_separator)
6✔
294
            res.append((key.decode("utf8"), a2b_base64(value)))
6✔
295

296
        return res
6✔
297

298
    def iteritems(self, prefix=""):
6✔
299
        if not isinstance(prefix, bytes):
6!
300
            prefix = prefix.encode("utf8")
6✔
301

302
        index = self.dct.ROOT
6✔
303
        if prefix:
6✔
304
            index = self.dct.follow_bytes(prefix, index)
6✔
305
            if not index:
6✔
306
                return
6✔
307

308
        completer = wrapper.Completer(self.dct, self.guide)
6✔
309
        completer.start(index, prefix)
6✔
310

311
        while completer.next():
6✔
312
            key, value = completer.key.split(self._payload_separator)
6✔
313
            item = (key.decode("utf8"), a2b_base64(value))
6✔
314
            yield item
6✔
315

316
    def _has_value(self, index):
6✔
317
        return self.dct.follow_bytes(PAYLOAD_SEPARATOR, index)
6✔
318

319
    def _similar_items(self, current_prefix, key, index, replace_chars):
6✔
320

321
        res = []
6✔
322
        start_pos = len(current_prefix)
6✔
323
        end_pos = len(key)
6✔
324
        word_pos = start_pos
6✔
325

326
        while word_pos < end_pos:
6✔
327
            b_step = key[word_pos].encode("utf8")
6✔
328

329
            if b_step in replace_chars:
6✔
330
                for (b_replace_char, u_replace_char) in replace_chars[b_step]:
6✔
331
                    next_index = index
6✔
332

333
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
334

335
                    if next_index:
6✔
336
                        prefix = current_prefix + key[start_pos:word_pos] + u_replace_char
6✔
337
                        extra_items = self._similar_items(prefix, key, next_index, replace_chars)
6✔
338
                        res += extra_items
6✔
339

340
            index = self.dct.follow_bytes(b_step, index)
6✔
341
            if not index:
6✔
342
                break
6✔
343
            word_pos += 1
6✔
344

345
        else:
346
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
347
            if index:
6!
348
                found_key = current_prefix + key[start_pos:]
6✔
349
                value = self._value_for_index(index)
6✔
350
                res.insert(0, (found_key, value))
6✔
351

352
        return res
6✔
353

354
    def similar_items(self, key, replaces):
6✔
355
        """
356
        Returns a list of (key, value) tuples for all variants of ``key``
357
        in this DAWG according to ``replaces``.
358

359
        ``replaces`` is an object obtained from
360
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
361
        that maps single-char unicode strings to (one or more) single-char
362
        unicode strings.
363
        """
364
        return self._similar_items("", key, self.dct.ROOT, replaces)
6✔
365

366
    def _similar_item_values(self, start_pos, key, index, replace_chars):
6✔
367
        res = []
6✔
368
        end_pos = len(key)
6✔
369
        word_pos = start_pos
6✔
370

371
        while word_pos < end_pos:
6✔
372
            b_step = key[word_pos].encode("utf8")
6✔
373

374
            if b_step in replace_chars:
6✔
375
                for (b_replace_char, _u_replace_char) in replace_chars[b_step]:
6✔
376
                    next_index = index
6✔
377

378
                    next_index = self.dct.follow_bytes(b_replace_char, next_index)
6✔
379

380
                    if next_index:
6✔
381
                        extra_items = self._similar_item_values(word_pos + 1, key, next_index, replace_chars)
6✔
382
                        res += extra_items
6✔
383

384
            index = self.dct.follow_bytes(b_step, index)
6✔
385
            if not index:
6✔
386
                break
6✔
387
            word_pos += 1
6✔
388

389
        else:
390
            index = self.dct.follow_bytes(self._payload_separator, index)
6✔
391
            if index:
6!
392
                value = self._value_for_index(index)
6✔
393
                res.insert(0, value)
6✔
394

395
        return res
6✔
396

397
    def similar_item_values(self, key, replaces):
6✔
398
        """
399
        Returns a list of values for all variants of the ``key``
400
        in this DAWG according to ``replaces``.
401

402
        ``replaces`` is an object obtained from
403
        ``DAWG.compile_replaces(mapping)`` where mapping is a dict
404
        that maps single-char unicode strings to (one or more) single-char
405
        unicode strings.
406
        """
407
        return self._similar_item_values(0, key, self.dct.ROOT, replaces)
6✔
408

409

410
class RecordDAWG(BytesDAWG):
6✔
411
    def __init__(self, fmt, payload_separator=PAYLOAD_SEPARATOR) -> None:
6✔
412
        super().__init__(payload_separator)
6✔
413
        self._struct = struct.Struct(str(fmt))
6✔
414
        self.fmt = fmt
6✔
415

416
    def _value_for_index(self, index):
6✔
417
        value = super()._value_for_index(index)
6✔
418
        return [self._struct.unpack(val) for val in value]
6✔
419

420
    def items(self, prefix=""):
6✔
421
        res = super().items(prefix)
6✔
422
        return [(key, self._struct.unpack(val)) for (key, val) in res]
6✔
423

424
    def iteritems(self, prefix=""):
6✔
425
        res = super().iteritems(prefix)
×
426
        return ((key, self._struct.unpack(val)) for (key, val) in res)
×
427

428

429
LOOKUP_ERROR = -1
6✔
430

431

432
class IntDAWG(DAWG):
6✔
433
    """
434
    Dict-like class based on DAWG.
435
    It can store integer values for unicode keys.
436
    """
437

438
    def __getitem__(self, key):
6✔
439
        res = self.get(key, LOOKUP_ERROR)
6✔
440
        if res == LOOKUP_ERROR:
6✔
441
            raise KeyError(key)
6✔
442
        return res
6✔
443

444
    def get(self, key, default=None):
6✔
445
        """
446
        Return value for the given key or ``default`` if the key is not found.
447
        """
448
        if not isinstance(key, bytes):
6!
449
            key = key.encode("utf8")
6✔
450
        res = self.b_get_value(key)
6✔
451
        if res == LOOKUP_ERROR:
6✔
452
            return default
6✔
453
        return res
6✔
454

455
    def b_get_value(self, key):
6✔
456
        return self.dct.find(key)
6✔
457

458

459
class IntCompletionDAWG(CompletionDAWG, IntDAWG):
6✔
460
    """
461
    Dict-like class based on DAWG.
462
    It can store integer values for unicode keys and support key completion.
463
    """
464

465
    def items(self, prefix=""):
6✔
466
        if not isinstance(prefix, bytes):
6!
467
            prefix = prefix.encode("utf8")
6✔
468
        res = []
6✔
469
        index = self.dct.ROOT
6✔
470

471
        if prefix:
6!
472
            index = self.dct.follow_bytes(prefix, index)
×
473
            if not index:
×
474
                return res
×
475

476
        completer = wrapper.Completer(self.dct, self.guide)
6✔
477
        completer.start(index, prefix)
6✔
478

479
        while completer.next():
6✔
480
            res.append((completer.key.decode("utf8"), completer.value()))
6✔
481

482
        return res
6✔
483

484
    def iteritems(self, prefix=""):
6✔
485
        if not isinstance(prefix, bytes):
×
486
            prefix = prefix.encode("utf8")
×
487
        index = self.dct.ROOT
×
488

489
        if prefix:
×
490
            index = self.dct.follow_bytes(prefix, index)
×
491
            if not index:
×
492
                return
×
493

494
        completer = wrapper.Completer(self.dct, self.guide)
×
495
        completer.start(index, prefix)
×
496

497
        while completer.next():
×
498
            yield completer.key.decode("utf8"), completer.value()
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc