• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

aas-core-works / abnf-to-regexp / 8387904128

22 Mar 2024 09:05AM UTC coverage: 86.974%. Remained the same
8387904128

push

github

web-flow
Discontinue Python 3.7 and include 3.11 (#33)

We discontinue Python 3.7 as we can not run `black` on it anymore.

We also include Python 3.11 in CI to support a more modern version.

661 of 760 relevant lines covered (86.97%)

3.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.08
/abnf_to_regexp/nested_python.py
1
"""Translate an ABNF to a code snippet of Python."""
2

3
import collections
4✔
4
import enum
4✔
5
import io
4✔
6
import string
4✔
7
from typing import (
4✔
8
    Mapping,
9
    Type,
10
    MutableMapping,
11
    List,
12
    Sequence,
13
    Optional,
14
    Tuple,
15
)
16

17
import abnf
4✔
18
import regex as re
4✔
19
from icontract import require, ensure
4✔
20
import sortedcontainers
4✔
21

22
import abnf_to_regexp.abnf_transformation
4✔
23
import abnf_to_regexp.compression
4✔
24
from abnf_to_regexp.base import (
4✔
25
    Element,
26
    Repetition,
27
    Alternation,
28
    Concatenation,
29
    Range,
30
    CaseInsensitivity,
31
    Literal,
32
    CharacterClass,
33
    Reference,
34
    Transformer,
35
    Visitor,
36
)
37
from abnf_to_regexp.representation import escape_for_character_class
4✔
38

39
# Short-circuit the rules from RFC 5234 to regular expressions
40
_RFC_5234 = {
4✔
41
    "CR": Literal("\x0D"),
42
    "LF": Literal("\x0A"),
43
    "CRLF": Literal("\x0D\x0A"),
44
    "HTAB": Literal("\x09"),
45
    "DQUOTE": Literal('"'),
46
    "SP": Literal(" "),
47
    "WSP": CharacterClass([Literal(" "), Literal("\x09")]),
48
    "VCHAR": Range(start="\x21", end="\x7E"),
49
    "ALPHA": CharacterClass(
50
        elements=[Range(start="a", end="z"), Range(start="A", end="Z")]
51
    ),
52
    "DIGIT": Range(start="0", end="9"),
53
    "HEXDIG": CharacterClass(
54
        elements=[
55
            Range(start="0", end="9"),
56
            Range(start="A", end="F"),
57
            Range(start="a", end="f"),
58
        ]
59
    ),
60
    "BIT": CharacterClass(elements=[Literal(value="0"), Literal(value="1")]),
61
}
62

63

64
class ABNFTransformer(abnf_to_regexp.abnf_transformation.TransformerToElement):
4✔
65
    """Translate ABNF rule to a regular expressions including references."""
66

67
    def transform_rule(self, rule: abnf.Rule) -> Element:
4✔
68
        # Short-circuit the rules from RFC 5234
69
        short_circuited = _RFC_5234.get(rule.name, None)
4✔
70
        if short_circuited is not None:
4✔
71
            return short_circuited
4✔
72
        else:
73
            return Reference(name=rule.name)
4✔
74

75

76
class _RenameRules(Transformer):
4✔
77
    def __init__(self, mapping: Mapping[str, str]) -> None:
4✔
78
        self.mapping = mapping
4✔
79

80
    def transform_reference(self, element: Reference) -> Element:
4✔
81
        if element.name not in self.mapping:
4✔
82
            raise KeyError(
×
83
                f"The reference to a rule name has not been mapped: {element.name!r}"
84
            )
85

86
        return Reference(name=self.mapping[element.name])
4✔
87

88

89
@ensure(lambda result: all(name.isidentifier() for name in result[0]))
4✔
90
def _rename_rules_to_variable_identifiers(
4✔
91
    table: Mapping[str, Element]
92
) -> Tuple["collections.OrderedDict[str, Element]", Mapping[str, str]]:
93
    """
94
    Rename all rules and the references to make them valid variable identifiers.
95

96
    Return (table with renamed rules, mapping original name -> valid identifier).
97
    """
98
    mapping = dict()  # type: MutableMapping[str, str]
4✔
99
    for name in table:
4✔
100
        proposed_name = re.sub("[^a-zA-Z_0-9]", "_", name).lower()
4✔
101
        i = 1
4✔
102
        while proposed_name in mapping:
4✔
103
            proposed_name = proposed_name + str(i)
×
104
            i += 1
×
105

106
        mapping[name] = proposed_name
4✔
107

108
    transformer = _RenameRules(mapping=mapping)
4✔
109

110
    new_table = collections.OrderedDict()  # type: collections.OrderedDict[str, Element]
4✔
111
    for name, element in table.items():
4✔
112
        new_table[mapping[name]] = transformer.transform(element)
4✔
113

114
    return new_table, mapping
4✔
115

116

117
class _ReferenceVisitor(Visitor):
4✔
118
    """List all the references in the given regular expression."""
119

120
    def __init__(self) -> None:
4✔
121
        self.references = []  # type: List[str]
4✔
122

123
    def visit_reference(self, element: Reference) -> None:
4✔
124
        self.references.append(element.name)
4✔
125

126

127
@ensure(lambda result: (result[0] is None) ^ (result[1] is None))
4✔
128
def _topological_sort(
4✔
129
    graph: Mapping[str, List[str]]
130
) -> Tuple[Optional[List[str]], Optional[str]]:
131
    """
132
    Figure out the dependency graph using the topological sort.
133

134
    Return None if there is a cycle.
135
    """
136
    # See https://en.wikipedia.org/wiki/Topological_sorting#Depth-first%20search
137
    trace = []  # type: List[str]
4✔
138
    # We use sorted containers to avoid non-deterministic behavior.
139
    identifiers_without_permanent_marks = sortedcontainers.SortedSet(graph.keys())
4✔
140
    permanent_marks = sortedcontainers.SortedSet()  # Set[str]
4✔
141
    temporary_marks = sortedcontainers.SortedSet()  # Set[str]
4✔
142

143
    visited_more_than_once = None  # type: Optional[str]
4✔
144

145
    def visit(an_identifier: str) -> None:
4✔
146
        nonlocal visited_more_than_once
147
        nonlocal trace
148

149
        if visited_more_than_once:
4✔
150
            return
×
151

152
        if an_identifier in permanent_marks:
4✔
153
            return
4✔
154

155
        if an_identifier in temporary_marks:
4✔
156
            visited_more_than_once = an_identifier
4✔
157
            return
4✔
158

159
        temporary_marks.add(an_identifier)
4✔
160

161
        for reference in graph[an_identifier]:
4✔
162
            visit(reference)
4✔
163

164
        temporary_marks.remove(an_identifier)
4✔
165
        permanent_marks.add(an_identifier)
4✔
166
        identifiers_without_permanent_marks.remove(an_identifier)
4✔
167
        trace.insert(0, an_identifier)
4✔
168

169
    while len(identifiers_without_permanent_marks) > 0 and not visited_more_than_once:
4✔
170
        visit(identifiers_without_permanent_marks[0])
4✔
171

172
    if visited_more_than_once:
4✔
173
        return None, visited_more_than_once
4✔
174

175
    return trace, None
4✔
176

177

178
def _reorder_table_by_dependencies(
4✔
179
    table: "collections.OrderedDict[str, Element]",
180
) -> Tuple[Optional["collections.OrderedDict[str, Element]"], Optional[str]]:
181
    """
182
    Order the table so that the rules are defined after their dependencies.
183

184
    Return (re-ordered table, identifier where a cycle has been observed)
185
    """
186
    # We construct the graph using a sorted dict to avoid non-deterministic
187
    # behavior.
188
    graph = sortedcontainers.SortedDict()  # type: MutableMapping[str, List[str]]
4✔
189
    for identifier, regexp in table.items():
4✔
190
        visitor = _ReferenceVisitor()
4✔
191
        visitor.visit(regexp)
4✔
192

193
        graph[identifier] = visitor.references
4✔
194

195
    trace, duplicate_visit = _topological_sort(graph=graph)
4✔
196
    if duplicate_visit is not None:
4✔
197
        return None, duplicate_visit
4✔
198

199
    assert trace is not None
4✔
200

201
    # Change the order
202
    new_table = collections.OrderedDict()  # type: collections.OrderedDict[str, Element]
4✔
203
    for identifier in reversed(trace):
4✔
204
        new_table[identifier] = table[identifier]
4✔
205

206
    return new_table, None
4✔
207

208

209
@ensure(lambda result: (result[0] is None) ^ (result[1] is None))
4✔
210
def translate(
4✔
211
    rule_cls: Type[abnf.Rule],
212
) -> Tuple[Optional["collections.OrderedDict[str, Element]"], Optional[str]]:
213
    """Translate the ABNF rule to a regular expression."""
214
    table = collections.OrderedDict()  # type: collections.OrderedDict[str, Element]
4✔
215

216
    transformer = ABNFTransformer()
4✔
217

218
    for rule in rule_cls.rules():  # type: ignore
4✔
219
        table[rule.name] = abnf_to_regexp.compression.compress(
4✔
220
            transformer.transform_parser(rule.definition)
221
        )
222

223
    table, name_mapping = _rename_rules_to_variable_identifiers(table=table)
4✔
224

225
    reordered_table, duplicate_visit = _reorder_table_by_dependencies(table=table)
4✔
226
    if duplicate_visit:
4✔
227
        return (
4✔
228
            None,
229
            f"You have a cycle in your ABNF. "
230
            f"The rule has been visited "
231
            f"more than once: {name_mapping[duplicate_visit]}",
232
        )
233
    assert reordered_table is not None
4✔
234

235
    return reordered_table, None
4✔
236

237

238
class _TokenKind(enum.Enum):
4✔
239
    TEXT = 0
4✔
240
    REFERENCE = 1
4✔
241
    BREAK_POINT = 3
4✔
242

243

244
class _Token:
4✔
245
    """Capture a part of the string representing a regular expression."""
246

247
    # fmt: off
248
    @require(
4✔
249
        lambda value, kind:
250
        not (kind == _TokenKind.BREAK_POINT) or value == ''
251
    )
252
    # fmt: on
253
    def __init__(self, value: str, kind: _TokenKind) -> None:
4✔
254
        self.value = value
4✔
255
        self.kind = kind
4✔
256

257

258
class _Stream:
4✔
259
    """Write tokens as you visit the regular expression tree."""
260

261
    def __init__(self) -> None:
4✔
262
        """Initialize with the given values."""
263
        self.tokens = []  # type: List[_Token]
4✔
264

265
    def write_text(self, text: str) -> None:
4✔
266
        """Add a text token."""
267
        self.tokens.append(_Token(value=text, kind=_TokenKind.TEXT))
4✔
268

269
    def mark_breakpoint(self) -> None:
4✔
270
        """Mark the position where line can be broken."""
271
        self.tokens.append(_Token(value="", kind=_TokenKind.BREAK_POINT))
4✔
272

273
    def write_reference(self, name: str) -> None:
4✔
274
        """Add a reference token."""
275
        self.tokens.append(_Token(value=name, kind=_TokenKind.REFERENCE))
4✔
276

277

278
class _Representer(Visitor):
4✔
279
    """
280
    Represent a regular expression as a stream of tokens.
281

282
    Please see ``.stream`` for the result.
283
    """
284

285
    def __init__(self) -> None:
4✔
286
        self.stream = _Stream()
4✔
287

288
    def visit_concatenation(self, element: Concatenation) -> None:
4✔
289
        for i, subelement in enumerate(element.elements):
4✔
290
            if i > 0:
4✔
291
                self.stream.mark_breakpoint()
4✔
292

293
            self.visit(subelement)
4✔
294

295
    def visit_alternation(self, element: Alternation) -> None:
4✔
296
        self.stream.write_text("(")
4✔
297

298
        for i, subelement in enumerate(element.elements):
4✔
299
            if i > 0:
4✔
300
                self.stream.write_text("|")
4✔
301
                self.stream.mark_breakpoint()
4✔
302

303
            self.visit(subelement)
4✔
304

305
        self.stream.write_text(")")
4✔
306

307
    def visit_repetition(self, element: Repetition) -> None:
4✔
308
        if element.min_occurrences == 0 and element.max_occurrences is None:
4✔
309
            suffix = "*"
4✔
310
        elif element.min_occurrences == 0 and element.max_occurrences == 1:
4✔
311
            suffix = "?"
4✔
312
        elif (
4✔
313
            (element.min_occurrences is None or element.min_occurrences == 0)
314
            and element.max_occurrences is not None
315
            and element.max_occurrences > 0
316
        ):
317
            suffix = f"{{{element.max_occurrences}}}"
4✔
318
        elif (
4✔
319
            element.min_occurrences is not None
320
            and element.min_occurrences > 0
321
            and element.max_occurrences is None
322
        ):
323
            if element.min_occurrences == 1:
4✔
324
                suffix = "+"
4✔
325
            else:
326
                suffix = f"{{{element.min_occurrences},}}"
×
327
        elif (
4✔
328
            element.min_occurrences is not None
329
            and element.max_occurrences is not None
330
            and element.min_occurrences == element.max_occurrences
331
        ):
332
            suffix = f"{{{element.min_occurrences}}}"
4✔
333
        else:
334
            suffix = f"{{{element.min_occurrences},{element.max_occurrences}}}"
4✔
335

336
        needs_parentheses = not isinstance(
4✔
337
            element.element, (Alternation, Range, CharacterClass)
338
        )
339

340
        if needs_parentheses:
4✔
341
            self.stream.write_text("(")
4✔
342

343
        self.visit(element.element)
4✔
344

345
        if needs_parentheses:
4✔
346
            self.stream.write_text(f"){suffix}")
4✔
347
        else:
348
            self.stream.write_text(suffix)
4✔
349

350
    def visit_case_insensitivity(self, element: CaseInsensitivity) -> None:
4✔
351
        self.stream.write_text("(?i:")
×
352
        self.visit(element.element)
×
353
        self.stream.write_text(")")
×
354

355
    _NO_NEED_TO_ESCAPE_RE = re.compile(r"[a-zA-Z_0-9\-]*")
4✔
356

357
    def visit_literal(self, element: Literal) -> None:
4✔
358
        # ``re.escape`` is a bit too conservative and produces unreadable regular
359
        # expressions. To make the expressions more readable, we avoid escaping
360
        # the cases where we are sure no escapes are necessary.
361
        if _Representer._NO_NEED_TO_ESCAPE_RE.fullmatch(element.value):
4✔
362
            self.stream.write_text(element.value)
4✔
363
        else:
364
            for character in element.value:
4✔
365
                if character not in string.printable and ord(character) <= 255:
4✔
366
                    escaped_value = f"\\x{ord(character):02x}"
×
367
                elif 255 < ord(character) < 0x10000:
4✔
368
                    escaped_value = f"\\u{ord(character):04x}"
×
369
                elif 0x10000 <= ord(character) <= 0x10FFFF:
4✔
370
                    escaped_value = f"\\U{ord(character):08x}"
×
371
                else:
372
                    escaped_value = re.escape(character)
4✔
373

374
                assert isinstance(escaped_value, str)
4✔
375

376
                self.stream.write_text(escaped_value)
4✔
377

378
    def visit_range(self, element: Range) -> None:
4✔
379
        self.stream.write_text(
4✔
380
            "".join(
381
                (
382
                    "[",
383
                    escape_for_character_class(element.start),
384
                    "-",
385
                    escape_for_character_class(element.end),
386
                    "]",
387
                )
388
            )
389
        )
390

391
    def visit_character_class(self, element: CharacterClass) -> None:
4✔
392
        self.stream.write_text("[")
4✔
393

394
        for i, subelement in enumerate(element.elements):
4✔
395
            if i > 0:
4✔
396
                self.stream.mark_breakpoint()
4✔
397

398
            if isinstance(subelement, Range):
4✔
399
                self.stream.write_text(
4✔
400
                    "".join(
401
                        (
402
                            escape_for_character_class(subelement.start),
403
                            "-",
404
                            escape_for_character_class(subelement.end),
405
                        )
406
                    )
407
                )
408
            elif isinstance(subelement, Literal):
4✔
409
                self.stream.write_text(escape_for_character_class(subelement.value))
4✔
410
            else:
411
                raise NotImplementedError(
×
412
                    f"sub-element {subelement} in element {element}"
413
                )
414

415
        self.stream.write_text("]")
4✔
416

417
    def visit_reference(self, element: Reference) -> None:
4✔
418
        self.stream.write_reference(element.name)
4✔
419

420

421
@ensure(lambda result: not (result[0] == "'") or result[-1] == "'")
4✔
422
@ensure(lambda result: not (result[0] == '"') or result[-1] == '"')
4✔
423
@ensure(lambda result: len(result) > 0)
4✔
424
def _tokens_to_str_literal(tokens: List[_Token]) -> str:
4✔
425
    """Concatenate the tokens and return a Python string literal."""
426
    has_reference = any(token.kind == _TokenKind.REFERENCE for token in tokens)
4✔
427

428
    if not has_reference:
4✔
429
        str_literal = repr("".join(token.value for token in tokens))
4✔
430
    else:
431
        regexp_writer = io.StringIO()
4✔
432
        for token in tokens:
4✔
433
            if token.kind == _TokenKind.TEXT:
4✔
434
                regexp_writer.write(token.value.replace("{", "{{").replace("}", "}}"))
4✔
435
            elif token.kind == _TokenKind.REFERENCE:
4✔
436
                regexp_writer.write("{")
4✔
437
                regexp_writer.write(token.value)
4✔
438
                regexp_writer.write("}")
4✔
439
            elif token.kind == _TokenKind.BREAK_POINT:
4✔
440
                pass
4✔
441
            else:
442
                raise NotImplementedError(token.kind)
×
443

444
        str_literal = f"f{repr(regexp_writer.getvalue())}"
4✔
445

446
    return str_literal
4✔
447

448

449
class _Segment:
4✔
450
    """Represent a list of tokens which should not be broken in the middle."""
451

452
    def __init__(self, tokens: Sequence[_Token]) -> None:
4✔
453
        self.tokens = tokens
4✔
454
        self.length = sum(len(token.value) for token in tokens)
4✔
455

456

457
# fmt: off
458
@require(lambda line_width: line_width > 0)
4✔
459
@ensure(
4✔
460
    lambda segments, result:
461
    all(
462
        segment == other_segment
463
        for segment, other_segment in zip(
464
            segments, (seg
465
                       for line in result
466
                       for seg in line))
467
    )
468
)
469
@ensure(
4✔
470
    lambda segments, result:
471
    len(segments) == sum(len(line) for line in result))
472
# fmt: on
473
def _wrap_segments(segments: List[_Segment], line_width: int) -> List[List[_Segment]]:
4✔
474
    """Wrap ``segments`` into lines that all optimistically fit on ``line_width``."""
475
    lines = []  # type: List[List[_Segment]]
4✔
476

477
    accumulator = []  # type: List[_Segment]
4✔
478
    accumulator_length = 0
4✔
479

480
    for segment in segments:
4✔
481
        if accumulator_length + segment.length > line_width:
4✔
482
            if len(accumulator) > 0:
4✔
483
                lines.append(accumulator)
4✔
484
                accumulator = []
4✔
485
                accumulator_length = 0
4✔
486

487
        accumulator.append(segment)
4✔
488
        accumulator_length += segment.length
4✔
489

490
    if len(accumulator) > 0:
4✔
491
        lines.append(accumulator)
4✔
492

493
    return lines
4✔
494

495

496
# fmt: off
497
@require(
4✔
498
    lambda table:
499
    all(
500
        identifier.isidentifier()
501
        for identifier in table
502
    )
503
)
504
@ensure(lambda result: not result.startswith('\n'))
4✔
505
@ensure(lambda result: not result.endswith('\n'))
4✔
506
# fmt: on
507
def represent(table: "collections.OrderedDict[str, Element]") -> str:
4✔
508
    """Compile the rule table to a snippet of Python code."""
509
    writer = io.StringIO()
4✔
510

511
    for rule_i, (identifier, regexp) in enumerate(table.items()):
4✔
512
        if rule_i > 0:
4✔
513
            writer.write("\n")
4✔
514

515
        representer = _Representer()
4✔
516

517
        representer.visit(regexp)
4✔
518

519
        # Apply a very naive estimation of the total length. This is a very vague
520
        # estimate: we do not account for curly brackets in references nor do we count
521
        # escape characters in string representation. However, this usually does the job
522
        # decently well.
523
        estimated_length = (
4✔
524
            len(identifier)
525
            + 3
526
            + sum(len(token.value) for token in representer.stream.tokens)
527
        )
528

529
        if estimated_length <= 70:
4✔
530
            regexp_str_literal = _tokens_to_str_literal(
4✔
531
                tokens=representer.stream.tokens
532
            )
533
            writer.write(f"{identifier} = {regexp_str_literal}")
4✔
534
        else:
535
            writer.write(f"{identifier} = (\n")
4✔
536

537
            # Split the tokens into segments. The lines should not break in the
538
            # middle of the segment.
539
            segments = []  # type: List[_Segment]
4✔
540
            accumulator = []  # type: List[_Token]
4✔
541

542
            for token in representer.stream.tokens:
4✔
543
                if token.kind == _TokenKind.BREAK_POINT:
4✔
544
                    if len(accumulator) > 0:
4✔
545
                        segments.append(_Segment(accumulator))
4✔
546
                        accumulator = []
4✔
547

548
                else:
549
                    accumulator.append(token)
4✔
550

551
            if len(accumulator) > 0:
4✔
552
                segments.append(_Segment(accumulator))
4✔
553

554
            lines_of_segments = _wrap_segments(
4✔
555
                segments=segments,
556
                # 70: arbitrary line width, 4: indention, 3: " = "
557
                line_width=70 - 4 - len(identifier) - 3,
558
            )
559

560
            for line_i, line in enumerate(lines_of_segments):
4✔
561
                if line_i > 0:
4✔
562
                    writer.write("\n")
4✔
563

564
                writer.write(" " * 4)
4✔
565

566
                # fmt: off
567
                regexp_str_literal = _tokens_to_str_literal(
4✔
568
                    [
569
                        token
570
                        for segment in line
571
                        for token in segment.tokens
572
                    ]
573
                )
574
                # fmt: on
575

576
                writer.write(regexp_str_literal)
4✔
577

578
            writer.write("\n)")
4✔
579

580
    return writer.getvalue()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc