• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jmanuel1 / concat / 8128448627

03 Mar 2024 06:50AM UTC coverage: 81.51% (+0.4%) from 81.14%
8128448627

push

github

web-flow
Merge pull request #32 from jmanuel1/parser

Replace parsy with my own parser combinator library

423 of 478 new or added lines in 9 files covered. (88.49%)

12 existing lines in 3 files now uncovered.

3130 of 3840 relevant lines covered (81.51%)

0.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.56
/concat/parser_combinators/__init__.py
1
"""Parser combinators.
1✔
2

3
Error handling is based on the blog post "Parser Combinators and Error
4
Reporting" for better error messages.
5
"""
6

7
import itertools
1✔
8
from typing import (
1✔
9
    Any,
10
    Callable,
11
    Generator,
12
    Generic,
13
    Iterable,
14
    Iterator,
15
    Sequence,
16
    Tuple,
17
    TypeVar,
18
    Optional,
19
    Union,
20
    List,
21
    cast,
22
    overload,
23
)
24
from typing_extensions import Protocol
1✔
25

26

27
class _SupportsPlus(Protocol):
1✔
28
    def __add__(self, other: '_SupportsPlus') -> '_SupportsPlus':
1✔
NEW
29
        ...
×
30

31

32
T = TypeVar('T')
1✔
33
_T_co = TypeVar('_T_co', covariant=True)
1✔
34
_T_contra = TypeVar('_T_contra', contravariant=True)
1✔
35
U = TypeVar('U')
1✔
36
_U_co = TypeVar('_U_co', covariant=True)
1✔
37
_U_supports_plus = TypeVar('_U_supports_plus', bound=_SupportsPlus)
1✔
38
_V = TypeVar('_V')
1✔
39
V = TypeVar('V')
1✔
40

41

42
def _maybe_inf_range(minimum: int, maximum: float) -> Iterable[int]:
1✔
43
    if maximum == float('inf'):
1✔
44
        return itertools.count(minimum)
1✔
45
    return range(minimum, int(maximum))
1✔
46

47

48
class FailureTree:
1✔
49
    """Failure messages and positions from parsing.
1✔
50

51
    Failures can be nested."""
52

53
    def __init__(
1✔
54
        self, expected: str, furthest_index: int, children: List['FailureTree']
55
    ) -> None:
56
        self.expected = expected
1✔
57
        self.furthest_index = furthest_index
1✔
58
        self.children = children
1✔
59

60
    def __repr__(self) -> str:
1✔
61
        return f'{type(self).__qualname__}({self.expected!r}, {self.furthest_index!r}, {self.children!r})'
1✔
62

63
    def __eq__(self, other: object) -> bool:
1✔
64
        if isinstance(other, FailureTree):
1✔
65
            return (self.expected, self.furthest_index, self.children) == (
1✔
66
                other.expected,
67
                other.furthest_index,
68
                other.children,
69
            )
NEW
70
        return NotImplemented
×
71

72
    def __hash__(self) -> int:
1✔
NEW
73
        return hash((self.expected, self.furthest_index, tuple(self.children)))
×
74

75

76
def furthest_failure(failures: Iterable[FailureTree]) -> Optional[FailureTree]:
1✔
77
    furthest_index = -1
1✔
78
    result = None
1✔
79
    for failure in failures:
1✔
80
        if failure.furthest_index > furthest_index:
1✔
81
            furthest_index = failure.furthest_index
1✔
82
            result = failure
1✔
83
    return result
1✔
84

85

86
class Result(Generic[_T_co]):
1✔
87
    """Class representing the output of a parser and the parsing state."""
1✔
88

89
    def __init__(
1✔
90
        self,
91
        output: _T_co,
92
        current_index: int,
93
        is_success: bool,
94
        failures: Optional[FailureTree] = None,
95
    ) -> None:
96
        self.output = output
1✔
97
        self.current_index = current_index
1✔
98
        if not is_success and failures is None:
1✔
NEW
99
            raise ValueError(
×
100
                f'{type(self).__qualname__} representing failure should have a failure tree'
101
            )
102
        self.is_success = is_success
1✔
103
        self.failures = failures
1✔
104

105
    def __repr__(self) -> str:
1✔
106
        return f'{type(self).__qualname__}({self.output!r}, {self.current_index!r}, {self.is_success!r}, {self.failures!r})'
1✔
107

108
    def __eq__(self, other: object) -> bool:
1✔
109
        if isinstance(other, Result):
1✔
110
            return (
1✔
111
                self.output,
112
                self.current_index,
113
                self.is_success,
114
                self.failures,
115
            ) == (
116
                other.output,
117
                other.current_index,
118
                other.is_success,
119
                other.failures,
120
            )
NEW
121
        return NotImplemented
×
122

123
    def __hash__(self) -> int:
1✔
NEW
124
        return hash(
×
125
            (self.output, self.current_index, self.is_success, self.failures)
126
        )
127

128

129
class Parser(Generic[_T_contra, _U_co]):
1✔
130
    """A parser in the functional style."""
1✔
131

132
    def __init__(
1✔
133
        self, f: Callable[[Sequence[_T_contra], int], Result[_U_co]]
134
    ) -> None:
135
        self._f = f
1✔
136

137
    def __or__(
1✔
138
        self, other: 'Parser[_T_contra, _V]'
139
    ) -> 'Parser[_T_contra, Union[_U_co, _V]]':
140
        @Parser
1✔
141
        def new_parser(
1✔
142
            stream: Sequence[_T_contra], index: int
143
        ) -> Result[Union[_U_co, _V]]:
144
            left_result = self(stream, index)
1✔
145
            if left_result.is_success:
1✔
146
                return left_result
1✔
147
            right_result = other(stream, index)
1✔
148
            new_failure: Optional[FailureTree]
149
            if right_result.is_success:
1✔
150
                if left_result.current_index > right_result.current_index:
1✔
NEW
151
                    if (
×
152
                        left_result.failures is not None
153
                        and right_result.failures is not None
154
                    ):
NEW
155
                        new_failure = FailureTree(
×
156
                            f'{left_result.failures.expected} or {right_result.failures.expected}',
157
                            left_result.failures.furthest_index,
158
                            left_result.failures.children
159
                            + right_result.failures.children,
160
                        )
NEW
161
                        return Result(
×
162
                            right_result.output,
163
                            right_result.current_index,
164
                            True,
165
                            new_failure,
166
                        )
NEW
167
                    raise Exception('todo')
×
168
                return right_result
1✔
169
            assert left_result.failures is not None
1✔
170
            assert right_result.failures is not None
1✔
171
            new_failure = furthest_failure(
1✔
172
                [left_result.failures, right_result.failures]
173
            )
174
            output: Union[_U_co, _V] = (
1✔
175
                left_result.output
176
                if new_failure is left_result.failures
177
                else right_result.output
178
            )
179
            current_index = (
1✔
180
                left_result.current_index
181
                if new_failure is left_result.failures
182
                else right_result.current_index
183
            )
184
            return Result(output, current_index, False, new_failure)
1✔
185

186
        return new_parser
1✔
187

188
    def __add__(
1✔
189
        self: 'Parser[T, _U_supports_plus]',
190
        other: 'Parser[T, _U_supports_plus]',
191
    ) -> 'Parser[T, _U_supports_plus]':
192
        @generate
1✔
193
        def new_parser() -> Generator:
1✔
194
            first = yield self
1✔
195
            second = yield other
1✔
196
            return first + second
1✔
197

198
        return new_parser
1✔
199

200
    # This is based upon parsy's desc combinator: see license.
201
    def desc(self, description: str) -> 'Parser[_T_contra, _U_co]':
1✔
202
        @Parser
1✔
203
        def new_parser(
1✔
204
            stream: Sequence[_T_contra], index: int
205
        ) -> Result[_U_co]:
206
            result = self(stream, index)
1✔
207
            if not result.is_success and result.failures is not None:
1✔
208
                if result.current_index == index:
1✔
209
                    new_failure = FailureTree(
1✔
210
                        description, result.failures.furthest_index, []
211
                    )
212
                    return Result(result.output, index, False, new_failure)
1✔
213
                new_failure = FailureTree(
1✔
214
                    description,
215
                    result.failures.furthest_index,
216
                    [result.failures],
217
                )
218
                return Result(
1✔
219
                    result.output, result.current_index, False, new_failure
220
                )
221
            return result
1✔
222

223
        return new_parser
1✔
224

225
    def map(
1✔
226
        self, fn: Callable[[_U_co], V]
227
    ) -> 'Parser[_T_contra, Union[_U_co, V]]':
228
        @Parser
1✔
229
        def new_parser(
1✔
230
            stream: Sequence[_T_contra], index: int
231
        ) -> Result[Union[_U_co, V]]:
232
            result = self(stream, index)
1✔
233
            if result.is_success:
1✔
234
                return Result(
1✔
235
                    fn(result.output),
236
                    result.current_index,
237
                    True,
238
                    result.failures,
239
                )
240
            return result
1✔
241

242
        return new_parser
1✔
243

244
    def bind(
1✔
245
        self, f: Callable[[_U_co], 'Parser[_T_contra, V]']
246
    ) -> 'Parser[_T_contra, V]':
247
        @generate
1✔
248
        def new_parser() -> Generator:
1✔
249
            a = yield self
1✔
250
            return (yield f(a))
1✔
251

252
        return new_parser
1✔
253

254
    def __rshift__(
1✔
255
        self, other: 'Parser[_T_contra, V]'
256
    ) -> 'Parser[_T_contra, Union[tuple, V]]':
257
        return seq(self, other).map(lambda tup: tup[1])
1✔
258

259
    def __lshift__(
1✔
260
        self, other: 'Parser[_T_contra, V]'
261
    ) -> 'Parser[_T_contra, Union[tuple, _U_co]]':
262
        return seq(self, other).map(lambda tup: tup[0])
1✔
263

264
    def times(
1✔
265
        self, min: int, max: float = float('inf')
266
    ) -> 'Parser[_T_contra, List[_U_co]]':
267
        @generate
1✔
268
        def new_parser() -> Generator:
1✔
269
            output = []
1✔
270
            for _ in range(min):
1✔
271
                result = yield self
1✔
272
                output.append(result)
1✔
273
            for _ in _maybe_inf_range(min, max):
1✔
274
                result = yield self.map(lambda val: (val,)).optional()  # type: ignore
1✔
275
                if result is not None:
1✔
276
                    output.append(result[0])
1✔
277
                    continue
1✔
278
                break
1✔
279
            return output
1✔
280

281
        return new_parser
1✔
282

283
    def many(self) -> 'Parser[_T_contra, List[_U_co]]':
1✔
284
        return self.times(min=0)
1✔
285

286
    def at_least(self, n: int) -> 'Parser[_T_contra, List[_U_co]]':
1✔
287
        return self.times(min=n)
1✔
288

289
    def result(self, res: V) -> 'Parser[_T_contra, V]':
1✔
290
        @generate
1✔
291
        def new_parser() -> Generator:
1✔
292
            yield self
1✔
293
            return res
1✔
294

295
        return new_parser
1✔
296

297
    def sep_by(
1✔
298
        self, sep: 'Parser[_T_contra, V]', min=0, max=float('inf')
299
    ) -> 'Parser[_T_contra, List[_U_co]]':
300
        @generate
1✔
301
        def new_parser() -> Generator:
1✔
302
            output = []
1✔
303
            for i in range(min):
1✔
304
                output.append((yield self))
1✔
305
                if i != min - 1:
1✔
306
                    yield sep
1✔
307
            if max <= min:
1✔
308
                return output
1✔
309
            for i in _maybe_inf_range(min, max):
1✔
310
                if i == 0:
1✔
311
                    maybe_item = yield self.map(lambda val: (val,)).optional()  # type: ignore
1✔
312
                else:
313
                    maybe_item = yield (
1✔
314
                        sep >> self.map(lambda val: (val,))  # type: ignore
315
                    ).optional()
316
                if maybe_item is None:
1✔
317
                    break
1✔
318
                output.append(maybe_item[0])
1✔
319
            return output
1✔
320

321
        return new_parser
1✔
322

323
    def optional(self) -> 'Parser[_T_contra, Optional[_U_co]]':
1✔
324
        @Parser
1✔
325
        def new_parser(
1✔
326
            stream: Sequence[_T_contra], index: int
327
        ) -> Result[Optional[_U_co]]:
328
            result = self(stream, index)
1✔
329
            if result.is_success:
1✔
330
                return result
1✔
331
            return Result(None, index, True, result.failures)
1✔
332

333
        return new_parser
1✔
334

335
    def concat(
1✔
336
        self: 'Parser[_T_contra, Iterable[str]]',
337
    ) -> 'Parser[_T_contra, Iterable[str]]':
338
        return self.map(''.join)
1✔
339

340
    def __call__(
1✔
341
        self, stream: Sequence[_T_contra], index: int
342
    ) -> Result[_U_co]:
343
        return self._f(stream, index)
1✔
344

345
    def parse(self, seq: Sequence[_T_contra]) -> _U_co:
1✔
346
        result = self(seq, 0)
1✔
347
        if result.current_index < len(seq):
1✔
NEW
348
            if result.failures is None:
×
NEW
349
                failure_children = []
×
350
            else:
NEW
351
                failure_children = [result.failures]
×
NEW
352
            result = Result(
×
353
                result.output,
354
                result.current_index,
355
                False,
356
                FailureTree(
357
                    'end of input', result.current_index, failure_children,
358
                ),
359
            )
360
        if result.is_success:
1✔
361
            return result.output
1✔
NEW
362
        raise ParseError(result)
×
363

364

365
class ParseError(Exception):
1✔
366
    """Exception raised for unrecoverable parser errors."""
1✔
367

368

369
def success(val: T) -> Parser[Any, T]:
1✔
370
    return seq().result(val)
1✔
371

372

373
def seq(*parsers: 'Parser[T, Any]') -> 'Parser[T, tuple]':
1✔
374
    @Parser
1✔
375
    def new_parser(stream: Sequence[T], index: int) -> Result:
1✔
376
        failures = []
1✔
377
        output = []
1✔
378
        for parser in parsers:
1✔
379
            result = parser(stream, index)
1✔
380
            if result.failures is not None:
1✔
381
                failures.append(result.failures)
1✔
382
            output.append(result.output)
1✔
383
            if result.is_success:
1✔
384
                index = result.current_index
1✔
385
                continue
1✔
386
            failure = furthest_failure(failures)
1✔
387
            return Result(tuple(output), index, False, failure)
1✔
388
        return Result(tuple(output), index, True, furthest_failure(failures))
1✔
389

390
    return new_parser
1✔
391

392

393
def alt(*parsers: 'Parser[T, Any]') -> 'Parser[T, Any]':
1✔
394
    parser: Parser[T, None] = fail('nothing')
1✔
395
    for p in parsers:
1✔
396
        parser |= p
1✔
397
    return parser
1✔
398

399

400
def fail(expected: str) -> Parser[T, None]:
1✔
401
    @Parser
1✔
402
    def parser(_: Sequence[T], index: int) -> Result[None]:
1✔
403
        failure = FailureTree(expected, index, [])
1✔
404
        return Result(None, index, False, failure)
1✔
405

406
    return parser
1✔
407

408

409
def test_item(
1✔
410
    func: Callable[[T], bool], description: str
411
) -> Parser[T, Optional[T]]:
412
    @Parser
1✔
413
    def parser(stream: Sequence[T], index: int) -> Result[Optional[T]]:
1✔
414
        if index < len(stream) and func(stream[index]):
1✔
415
            return Result(stream[index], index + 1, True, None)
1✔
416
        return Result(None, index, False, FailureTree(description, index, []))
1✔
417

418
    return parser
1✔
419

420

421
ParserGeneratingFunction = Callable[[], Generator[Parser[T, Any], Any, V]]
1✔
422

423

424
@overload
1✔
425
def generate(
1✔
426
    desc_or_generator: str,
427
) -> Callable[[ParserGeneratingFunction[T, V]], Parser[T, V]]:
NEW
428
    ...
×
429

430

431
@overload
1✔
432
def generate(
1✔
433
    desc_or_generator: ParserGeneratingFunction[T, V]
434
) -> Parser[T, V]:
NEW
435
    ...
×
436

437

438
def generate(
1✔
439
    desc_or_generator: Union[str, ParserGeneratingFunction[T, V]]
440
) -> Union[
441
    Callable[[ParserGeneratingFunction[T, V]], Parser[T, V]], Parser[T, V],
442
]:
443
    if isinstance(desc_or_generator, str):
1✔
444
        return lambda generator: generate(generator).desc(desc_or_generator)
1✔
445

446
    @Parser
1✔
447
    def new_parser(stream: Sequence[T], index: int) -> Result[V]:
1✔
448
        failures = []
1✔
449
        iterator = desc_or_generator()
1✔
450
        results = []
1✔
451
        output = None
1✔
452
        try:
1✔
453
            while True:
1✔
454
                parser = iterator.send(output)
1✔
455
                result = parser(stream, index)
1✔
456
                results.append(result)
1✔
457
                output = result.output
1✔
458
                if not result.is_success:
1✔
459
                    result = cast(
1✔
460
                        Result[Any], _result_with_furthest_failure(results)
461
                    )
462
                    return Result(
1✔
463
                        result.output,
464
                        result.current_index,
465
                        False,
466
                        result.failures,
467
                    )
468
                if result.failures is not None:
1✔
469
                    failures.append(result.failures)
1✔
470
                index = result.current_index
1✔
471
        except StopIteration as e:
1✔
472
            return Result(e.value, index, True, furthest_failure(failures))
1✔
473

474
    return new_parser
1✔
475

476

477
def _result_with_furthest_failure(
1✔
478
    results: List[Result[T]],
479
) -> Optional[Result[T]]:
480
    furthest_index = -1
1✔
481
    ret = None
1✔
482
    for result in results:
1✔
483
        if (
1✔
484
            result.failures is not None
485
            and result.failures.furthest_index > furthest_index
486
        ):
487
            furthest_index = result.failures.furthest_index
1✔
488
            ret = result
1✔
489
    return ret
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc