• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jmanuel1 / concat / 11359781012

16 Oct 2024 05:57AM UTC coverage: 82.027% (-0.5%) from 82.554%
11359781012

push

github

web-flow
Merge pull request #35 from jmanuel1/python-3.12

Upgrade to Python 3.12

217 of 281 new or added lines in 11 files covered. (77.22%)

10 existing lines in 5 files now uncovered.

4080 of 4974 relevant lines covered (82.03%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.87
/concat/parser_combinators/__init__.py
1
"""Parser combinators.
1✔
2

3
Error handling is based on the blog post "Parser Combinators and Error
4
Reporting" for better error messages.
5
"""
6

7
import itertools
1✔
8
from typing import (
1✔
9
    Any,
10
    Callable,
11
    Generator,
12
    Generic,
13
    Iterable,
14
    Iterator,
15
    Sequence,
16
    Tuple,
17
    TypeVar,
18
    Optional,
19
    Union,
20
    List,
21
    cast,
22
    overload,
23
)
24
from typing_extensions import Protocol
1✔
25

26

27
class _SupportsPlus(Protocol):
1✔
28
    def __add__(self, other: '_SupportsPlus') -> '_SupportsPlus':
1✔
29
        ...
×
30

31

32
T = TypeVar('T')
1✔
33
_T_co = TypeVar('_T_co', covariant=True)
1✔
34
_T_contra = TypeVar('_T_contra', contravariant=True)
1✔
35
U = TypeVar('U')
1✔
36
_U_co = TypeVar('_U_co', covariant=True)
1✔
37
_U_supports_plus = TypeVar('_U_supports_plus', bound=_SupportsPlus)
1✔
38
_V = TypeVar('_V')
1✔
39
V = TypeVar('V')
1✔
40

41

42
def _maybe_inf_range(minimum: int, maximum: float) -> Iterable[int]:
1✔
43
    if maximum == float('inf'):
1✔
44
        return itertools.count(minimum)
1✔
45
    return range(minimum, int(maximum))
1✔
46

47

48
class FailureTree:
1✔
49
    """Failure messages and positions from parsing.
1✔
50

51
    Failures can be nested."""
52

53
    def __init__(
1✔
54
        self, expected: str, furthest_index: int, children: List['FailureTree']
55
    ) -> None:
56
        self.expected = expected
1✔
57
        self.furthest_index = furthest_index
1✔
58
        self.children = children
1✔
59

60
    def __repr__(self) -> str:
1✔
61
        return f'{type(self).__qualname__}({self.expected!r}, {self.furthest_index!r}, {self.children!r})'
1✔
62

63
    def __eq__(self, other: object) -> bool:
1✔
64
        if isinstance(other, FailureTree):
1✔
65
            return (self.expected, self.furthest_index, self.children) == (
1✔
66
                other.expected,
67
                other.furthest_index,
68
                other.children,
69
            )
70
        return NotImplemented
×
71

72
    def __hash__(self) -> int:
1✔
73
        return hash((self.expected, self.furthest_index, tuple(self.children)))
×
74

75

76
def furthest_failure(failures: Iterable[FailureTree]) -> Optional[FailureTree]:
1✔
77
    furthest_index = -1
1✔
78
    result = None
1✔
79
    for failure in failures:
1✔
80
        if failure.furthest_index > furthest_index:
1✔
81
            furthest_index = failure.furthest_index
1✔
82
            result = failure
1✔
83
    return result
1✔
84

85

86
class Result(Generic[_T_co]):
1✔
87
    """Class representing the output of a parser and the parsing state."""
1✔
88

89
    def __init__(
1✔
90
        self,
91
        output: _T_co,
92
        current_index: int,
93
        is_success: bool,
94
        failures: Optional[FailureTree] = None,
95
        is_committed: bool = False,
96
    ) -> None:
97
        self.output = output
1✔
98
        self.current_index = current_index
1✔
99
        if not is_success and failures is None:
1✔
100
            raise ValueError(
×
101
                f'{type(self).__qualname__} representing failure should have a failure tree'
102
            )
103
        self.is_success = is_success
1✔
104
        self.failures = failures
1✔
105
        self.is_committed = is_committed
1✔
106

107
    def __repr__(self) -> str:
1✔
UNCOV
108
        return f'{type(self).__qualname__}({self.output!r}, {self.current_index!r}, {self.is_success!r}, {self.failures!r}, is_committed={self.is_committed!r})'
×
109

110
    def __eq__(self, other: object) -> bool:
1✔
111
        if isinstance(other, Result):
1✔
112
            return (
1✔
113
                self.output,
114
                self.current_index,
115
                self.is_success,
116
                self.failures,
117
                self.is_committed,
118
            ) == (
119
                other.output,
120
                other.current_index,
121
                other.is_success,
122
                other.failures,
123
                other.is_committed,
124
            )
125
        return NotImplemented
×
126

127
    def __hash__(self) -> int:
1✔
128
        return hash(
×
129
            (
130
                self.output,
131
                self.current_index,
132
                self.is_success,
133
                self.failures,
134
                self.is_committed,
135
            )
136
        )
137

138

139
class Parser(Generic[_T_contra, _U_co]):
1✔
140
    """A parser in the functional style."""
1✔
141

142
    def __init__(
1✔
143
        self, f: Callable[[Sequence[_T_contra], int], Result[_U_co]]
144
    ) -> None:
145
        self._f = f
1✔
146

147
    def __or__(
1✔
148
        self, other: 'Parser[_T_contra, _V]'
149
    ) -> 'Parser[_T_contra, Union[_U_co, _V]]':
150
        @Parser
1✔
151
        def new_parser(
1✔
152
            stream: Sequence[_T_contra], index: int
153
        ) -> Result[Union[_U_co, _V]]:
154
            left_result = self(stream, index)
1✔
155
            if (
1✔
156
                left_result.is_success
157
                or left_result.is_committed
158
                and left_result.current_index > index
159
            ):
160
                return left_result
1✔
161
            right_result = other(stream, index)
1✔
162
            new_failure: Optional[FailureTree]
163
            if right_result.is_success:
1✔
164
                if left_result.current_index > right_result.current_index:
1✔
165
                    if right_result.failures is not None:
1✔
166
                        assert left_result.failures is not None
×
167
                        new_failure = FailureTree(
×
168
                            f'{left_result.failures.expected} or {right_result.failures.expected}',
169
                            left_result.failures.furthest_index,
170
                            left_result.failures.children
171
                            + right_result.failures.children,
172
                        )
173
                    else:
174
                        new_failure = left_result.failures
1✔
175
                    return Result(
1✔
176
                        right_result.output,
177
                        right_result.current_index,
178
                        True,
179
                        new_failure,
180
                    )
181
                return right_result
1✔
182
            if (
1✔
183
                right_result.is_committed
184
                and right_result.current_index > index
185
            ):
186
                return right_result
×
187
            assert left_result.failures is not None
1✔
188
            assert right_result.failures is not None
1✔
189
            new_failure = furthest_failure(
1✔
190
                [left_result.failures, right_result.failures]
191
            )
192
            output: Union[_U_co, _V] = (
1✔
193
                left_result.output
194
                if new_failure is left_result.failures
195
                else right_result.output
196
            )
197
            current_index = (
1✔
198
                left_result.current_index
199
                if new_failure is left_result.failures
200
                else right_result.current_index
201
            )
202
            return Result(output, current_index, False, new_failure)
1✔
203

204
        return new_parser
1✔
205

206
    def __add__(
1✔
207
        self: 'Parser[T, _U_supports_plus]',
208
        other: 'Parser[T, _U_supports_plus]',
209
    ) -> 'Parser[T, _U_supports_plus]':
210
        @generate
1✔
211
        def new_parser() -> Generator:
1✔
212
            first = yield self
1✔
213
            second = yield other
1✔
214
            return first + second
1✔
215

216
        return new_parser
1✔
217

218
    # This is based upon parsy's desc combinator: see license.
219
    def desc(self, description: str) -> 'Parser[_T_contra, _U_co]':
1✔
220
        @Parser
1✔
221
        def new_parser(
1✔
222
            stream: Sequence[_T_contra], index: int
223
        ) -> Result[_U_co]:
224
            result = self(stream, index)
1✔
225
            if not result.is_success and result.failures is not None:
1✔
226
                if result.current_index == index:
1✔
227
                    new_failure = FailureTree(
1✔
228
                        description, result.failures.furthest_index, []
229
                    )
230
                    return Result(result.output, index, False, new_failure)
1✔
231
                new_failure = FailureTree(
1✔
232
                    description,
233
                    result.failures.furthest_index,
234
                    [result.failures],
235
                )
236
                return Result(
1✔
237
                    result.output, result.current_index, False, new_failure
238
                )
239
            return result
1✔
240

241
        return new_parser
1✔
242

243
    def map(
1✔
244
        self, fn: Callable[[_U_co], V]
245
    ) -> 'Parser[_T_contra, Union[_U_co, V]]':
246
        @Parser
1✔
247
        def new_parser(
1✔
248
            stream: Sequence[_T_contra], index: int
249
        ) -> Result[Union[_U_co, V]]:
250
            result = self(stream, index)
1✔
251
            if result.is_success:
1✔
252
                return Result(
1✔
253
                    fn(result.output),
254
                    result.current_index,
255
                    True,
256
                    result.failures,
257
                )
258
            return result
1✔
259

260
        return new_parser
1✔
261

262
    def bind(
1✔
263
        self, f: Callable[[_U_co], 'Parser[_T_contra, V]']
264
    ) -> 'Parser[_T_contra, V]':
265
        @generate
1✔
266
        def new_parser() -> Generator:
1✔
267
            a = yield self
1✔
268
            return (yield f(a))
1✔
269

270
        return new_parser
1✔
271

272
    def __rshift__(
1✔
273
        self, other: 'Parser[_T_contra, V]'
274
    ) -> 'Parser[_T_contra, Union[tuple, V]]':
275
        return seq(self, other).map(lambda tup: tup[1])
1✔
276

277
    def __lshift__(
1✔
278
        self, other: 'Parser[_T_contra, V]'
279
    ) -> 'Parser[_T_contra, Union[tuple, _U_co]]':
280
        return seq(self, other).map(lambda tup: tup[0])
1✔
281

282
    def times(
1✔
283
        self, min: int, max: float = float('inf')
284
    ) -> 'Parser[_T_contra, List[_U_co]]':
285
        @generate
1✔
286
        def new_parser() -> Generator:
1✔
287
            output = []
1✔
288
            for _ in range(min):
1✔
289
                result = yield self
1✔
290
                output.append(result)
1✔
291
            for _ in _maybe_inf_range(min, max):
1✔
292
                result = yield self.map(lambda val: (val,)).optional()  # type: ignore
1✔
293
                if result is not None:
1✔
294
                    output.append(result[0])
1✔
295
                    continue
1✔
296
                break
1✔
297
            return output
1✔
298

299
        return new_parser
1✔
300

301
    def many(self) -> 'Parser[_T_contra, List[_U_co]]':
1✔
302
        return self.times(min=0)
1✔
303

304
    def at_least(self, n: int) -> 'Parser[_T_contra, List[_U_co]]':
1✔
305
        return self.times(min=n)
1✔
306

307
    def result(self, res: V) -> 'Parser[_T_contra, V]':
1✔
308
        @generate
1✔
309
        def new_parser() -> Generator:
1✔
310
            yield self
1✔
311
            return res
1✔
312

313
        return new_parser
1✔
314

315
    def sep_by(
1✔
316
        self, sep: 'Parser[_T_contra, V]', min=0, max=float('inf')
317
    ) -> 'Parser[_T_contra, List[_U_co]]':
318
        @generate
1✔
319
        def new_parser() -> Generator:
1✔
320
            output = []
1✔
321
            for i in range(min):
1✔
322
                output.append((yield self))
1✔
323
                if i != min - 1:
1✔
324
                    yield sep
1✔
325
            if max <= min:
1✔
326
                return output
1✔
327
            for i in _maybe_inf_range(min, max):
1✔
328
                if i == 0:
1✔
329
                    maybe_item = yield self.map(lambda val: (val,)).optional()  # type: ignore
1✔
330
                else:
331
                    maybe_item = yield (
1✔
332
                        sep >> self.map(lambda val: (val,))  # type: ignore
333
                    ).optional()
334
                if maybe_item is None:
1✔
335
                    break
1✔
336
                output.append(maybe_item[0])
1✔
337
            return output
1✔
338

339
        return new_parser
1✔
340

341
    def optional(self) -> 'Parser[_T_contra, Optional[_U_co]]':
1✔
342
        return self | success(None)
1✔
343

344
    # See
345
    # https://elixirforum.com/t/parser-combinators-how-to-know-when-many-should-return-an-error/46048/12
346
    # on the problem this solves.
347
    def commit(self) -> 'Parser[_T_contra, _U_co]':
1✔
348
        """Do not try alteratives adjacent to this parser if it consumes input before failure.
349

350
        This is useful with combinators like many(): parser.many() succeeds
351
        even if `parser` fails in a way that you know is an error and should be
352
        reported for a better error message."""
353

354
        @Parser
1✔
355
        def new_parser(
1✔
356
            stream: Sequence[_T_contra], index: int
357
        ) -> Result[_U_co]:
358
            result = self(stream, index)
1✔
359
            return Result(
1✔
360
                result.output,
361
                result.current_index,
362
                result.is_success,
363
                result.failures,
364
                is_committed=True,
365
            )
366

367
        return new_parser
1✔
368

369
    def concat(
1✔
370
        self: 'Parser[_T_contra, Iterable[str]]',
371
    ) -> 'Parser[_T_contra, Iterable[str]]':
372
        return self.map(''.join)
1✔
373

374
    def __call__(
1✔
375
        self, stream: Sequence[_T_contra], index: int
376
    ) -> Result[_U_co]:
377
        return self._f(stream, index)
1✔
378

379
    def parse(self, seq: Sequence[_T_contra]) -> _U_co:
1✔
380
        result = self(seq, 0)
1✔
381
        if result.current_index < len(seq):
1✔
382
            if result.failures is None:
1✔
383
                failure_children = []
×
384
            else:
385
                failure_children = [result.failures]
1✔
386
            result = Result(
1✔
387
                result.output,
388
                result.current_index,
389
                False,
390
                FailureTree(
391
                    'end of input', result.current_index, failure_children,
392
                ),
393
            )
394
        if result.is_success:
1✔
395
            return result.output
1✔
396
        raise ParseError(result)
1✔
397

398

399
class ParseError(Exception):
1✔
400
    """Exception raised for unrecoverable parser errors."""
1✔
401

402

403
def success(val: T) -> Parser[Any, T]:
1✔
404
    return seq().result(val)
1✔
405

406

407
def seq(*parsers: 'Parser[T, Any]') -> 'Parser[T, tuple]':
1✔
408
    @Parser
1✔
409
    def new_parser(stream: Sequence[T], index: int) -> Result:
1✔
410
        failures = []
1✔
411
        output = []
1✔
412
        for parser in parsers:
1✔
413
            result = parser(stream, index)
1✔
414
            if result.failures is not None:
1✔
415
                failures.append(result.failures)
1✔
416
            output.append(result.output)
1✔
417
            if result.is_success:
1✔
418
                index = result.current_index
1✔
419
                continue
1✔
420
            failure = furthest_failure(failures)
1✔
421
            return Result(tuple(output), index, False, failure)
1✔
422
        return Result(tuple(output), index, True, furthest_failure(failures))
1✔
423

424
    return new_parser
1✔
425

426

427
def alt(*parsers: 'Parser[T, Any]') -> 'Parser[T, Any]':
1✔
428
    parser: Parser[T, None] = fail('nothing')
1✔
429
    for p in parsers:
1✔
430
        parser |= p
1✔
431
    return parser
1✔
432

433

434
def fail(expected: str) -> Parser[T, None]:
1✔
435
    @Parser
1✔
436
    def parser(_: Sequence[T], index: int) -> Result[None]:
1✔
437
        failure = FailureTree(expected, index, [])
1✔
438
        return Result(None, index, False, failure)
1✔
439

440
    return parser
1✔
441

442

443
@Parser
1✔
444
def peek_prev(stream: Sequence[T], index: int) -> Result[T]:
1✔
445
    if index:
1✔
446
        return Result(stream[index - 1], index, True)
1✔
NEW
447
    return Result(
×
448
        None, index, False, FailureTree('not the start of file', index, [])
449
    )
450

451

452
def test_item(
1✔
453
    func: Callable[[T], bool], description: str
454
) -> Parser[T, Optional[T]]:
455
    @Parser
1✔
456
    def parser(stream: Sequence[T], index: int) -> Result[Optional[T]]:
1✔
457
        if index < len(stream) and func(stream[index]):
1✔
458
            return Result(stream[index], index + 1, True, None)
1✔
459
        return Result(None, index, False, FailureTree(description, index, []))
1✔
460

461
    return parser
1✔
462

463

464
ParserGeneratingFunction = Callable[[], Generator[Parser[T, Any], Any, V]]
1✔
465

466

467
@overload
1✔
468
def generate(
1✔
469
    desc_or_generator: str,
470
) -> Callable[[ParserGeneratingFunction[T, V]], Parser[T, V]]:
471
    ...
×
472

473

474
@overload
1✔
475
def generate(
1✔
476
    desc_or_generator: ParserGeneratingFunction[T, V]
477
) -> Parser[T, V]:
478
    ...
×
479

480

481
def generate(
1✔
482
    desc_or_generator: Union[str, ParserGeneratingFunction[T, V]]
483
) -> Union[
484
    Callable[[ParserGeneratingFunction[T, V]], Parser[T, V]], Parser[T, V],
485
]:
486
    if isinstance(desc_or_generator, str):
1✔
487
        return lambda generator: generate(generator).desc(desc_or_generator)
1✔
488

489
    @Parser
1✔
490
    def new_parser(stream: Sequence[T], index: int) -> Result[V]:
1✔
491
        failures = []
1✔
492
        iterator = desc_or_generator()
1✔
493
        results = []
1✔
494
        output = None
1✔
495
        try:
1✔
496
            while True:
1✔
497
                parser = iterator.send(output)
1✔
498
                result = parser(stream, index)
1✔
499
                results.append(result)
1✔
500
                output = result.output
1✔
501
                if not result.is_success:
1✔
502
                    result = cast(
1✔
503
                        Result[Any], _result_with_furthest_failure(results)
504
                    )
505
                    return Result(
1✔
506
                        result.output,
507
                        result.current_index,
508
                        False,
509
                        result.failures,
510
                    )
511
                if result.failures is not None:
1✔
512
                    failures.append(result.failures)
1✔
513
                index = result.current_index
1✔
514
        except StopIteration as e:
1✔
515
            return Result(e.value, index, True, furthest_failure(failures))
1✔
516

517
    return new_parser
1✔
518

519

520
def _result_with_furthest_failure(
1✔
521
    results: List[Result[T]],
522
) -> Optional[Result[T]]:
523
    furthest_index = -1
1✔
524
    ret = None
1✔
525
    for result in results:
1✔
526
        if (
1✔
527
            result.failures is not None
528
            and result.failures.furthest_index > furthest_index
529
        ):
530
            furthest_index = result.failures.furthest_index
1✔
531
            ret = result
1✔
532
    return ret
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc