• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

renatahodovan / grammarinator / 6391267379

03 Oct 2023 09:38AM UTC coverage: 88.402% (-0.1%) from 88.502%
6391267379

push

github

web-flow
Raise error if trying to create empty ranges (#137)

5 of 5 new or added lines in 1 file covered. (100.0%)

1250 of 1414 relevant lines covered (88.4%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.24
/grammarinator/tool/processor.py
1
# Copyright (c) 2017-2023 Renata Hodovan, Akos Kiss.
2
# Copyright (c) 2020 Sebastian Kimberk.
3
#
4
# Licensed under the BSD 3-Clause License
5
# <LICENSE.rst or https://opensource.org/licenses/BSD-3-Clause>.
6
# This file may not be copied, modified, or distributed except
7
# according to those terms.
8

9
import logging
1✔
10
import re
1✔
11

12
from collections import defaultdict, OrderedDict
1✔
13
from itertools import chain
1✔
14
from math import inf
1✔
15
from os import getcwd
1✔
16
from os.path import dirname, exists, join
1✔
17
from pkgutil import get_data
1✔
18
from shutil import copy
1✔
19
from sys import maxunicode
1✔
20

21
import autopep8
1✔
22

23
from antlr4 import CommonTokenStream, FileStream, ParserRuleContext
1✔
24
from jinja2 import Environment
1✔
25

26
from ..pkgdata import __version__
1✔
27
from .g4 import ANTLRv4Lexer, ANTLRv4Parser
1✔
28

29
logger = logging.getLogger(__name__)
1✔
30

31

32
class Edge(object):
1✔
33

34
    def __init__(self, dst, args=None):
1✔
35
        self.dst = dst
1✔
36
        self.args = args
1✔
37

38

39
class Node(object):
1✔
40

41
    _cnt = 0
1✔
42

43
    def __init__(self, id=None):
1✔
44
        if id is None:
1✔
45
            id = Node._cnt
1✔
46
            Node._cnt += 1
1✔
47
        self.id = id
1✔
48
        self.out_edges = []
1✔
49

50
    @property
1✔
51
    def out_neighbours(self):
1✔
52
        return [edge.dst for edge in self.out_edges]
1✔
53

54
    def print_tree(self):
1✔
55
        def _walk(node):
×
56
            nonlocal indent
57
            print(f'{"  " * indent}{str(node)}{"" if node not in visited else " (...recursion)"}')
×
58
            if node in visited:
×
59
                return
×
60

61
            visited.add(node)
×
62
            indent += 1
×
63
            for child in node.out_neighbours:
×
64
                _walk(child)
×
65
            indent -= 1
×
66

67
        visited = set()
×
68
        indent = 0
×
69
        _walk(self)
×
70

71
    def __str__(self):
1✔
72
        return f'cls: {self.__class__.__name__}'
×
73

74

75
class RuleNode(Node):
1✔
76

77
    def __init__(self, name, label, type):
1✔
78
        super().__init__(name if label is None else '_'.join((name, label)))
1✔
79
        self.name = name
1✔
80
        self.type = type
1✔
81
        self.min_depth = None
1✔
82

83
        self.labels = {}
1✔
84
        self.args = {}
1✔
85
        self.locals = {}
1✔
86
        self.returns = {}
1✔
87

88
    @property
1✔
89
    def has_var(self):
1✔
90
        return self.labels or self.attributes
1✔
91

92
    @property
1✔
93
    def attributes(self):
1✔
94
        return dict(self.args, **self.locals, **self.returns)
1✔
95

96
    def __str__(self):
1✔
97
        return f'{super().__str__()}; name: {self.id}; var: {self.has_var}'
×
98

99

100
class UnlexerRuleNode(RuleNode):
1✔
101

102
    _lit_cnt = 0
1✔
103

104
    def __init__(self, name=None):
1✔
105
        if not name:
1✔
106
            name = f'T__{UnlexerRuleNode._lit_cnt}'
1✔
107
            UnlexerRuleNode._lit_cnt += 1
1✔
108
        super().__init__(name, None, 'UnlexerRule')
1✔
109
        self.start_ranges = None
1✔
110

111

112
class UnparserRuleNode(RuleNode):
1✔
113

114
    def __init__(self, name, label=None):
1✔
115
        super().__init__(name, label, 'UnparserRule')
1✔
116

117

118
class ImagRuleNode(Node):
1✔
119

120
    def __init__(self, id):
1✔
121
        super().__init__(id)
1✔
122

123

124
class LiteralNode(Node):
1✔
125

126
    def __init__(self, src):
1✔
127
        super().__init__()
1✔
128
        self.src = src
1✔
129

130
    def __str__(self):
1✔
131
        return f'{super().__str__()}; src: {self.src!r}'
×
132

133

134
class CharsetNode(Node):
1✔
135

136
    def __init__(self, rule_id, idx, charset):
1✔
137
        super().__init__()
1✔
138
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
139
        self.idx = idx  # Index of the charset inside the current rule.
1✔
140
        self.charset = charset  # Global identifier of the charset.
1✔
141

142
    def __str__(self):
1✔
143
        return f'{super().__str__()}; idx: {self.idx}; charset: {self.charset}'
×
144

145

146
class LambdaNode(Node):
1✔
147

148
    def __init__(self):
1✔
149
        super().__init__()
1✔
150

151

152
class AlternationNode(Node):
1✔
153

154
    def __init__(self, rule_id, idx, conditions):
1✔
155
        super().__init__()
1✔
156
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
157
        self.idx = idx  # Index of the alternation in the container rule.
1✔
158
        self.conditions = conditions
1✔
159
        self.min_depths = None
1✔
160

161
    def simple_alternatives(self):
1✔
162
        # Check if an alternation contains simple alternatives only (simple
163
        # literals or rule references), and return a 2-tuple. If the alternation
164
        # contains any non-simple alternatives, return None, None. If the
165
        # alternation contains simple literals only, the first element of the
166
        # tuple is a list of the literal values, while the second element is None.
167
        # If the alternation contains rule references only, the first element is
168
        # None, while the second element is a list of rule ids. If the alternation
169
        # contains both simple literals and rule references, then both elements of
170
        # the tuple are lists, which are of identical length, and exactly one of
171
        # them contains a non-None value at every index position.
172
        if not self.out_neighbours or any(len(alt.out_neighbours) != 1 or not isinstance(alt.out_neighbours[0], (LiteralNode, RuleNode)) for alt in self.out_neighbours):
1✔
173
            return None, None
1✔
174

175
        simple_lits = [alt.out_neighbours[0].src if isinstance(alt.out_neighbours[0], LiteralNode) else None for alt in self.out_neighbours]
1✔
176
        if all(lit is None for lit in simple_lits):
1✔
177
            simple_lits = None
1✔
178

179
        simple_rules = [alt.out_neighbours[0].id if isinstance(alt.out_neighbours[0], RuleNode) else None for alt in self.out_neighbours]
1✔
180
        if all(rule is None for rule in simple_rules):
1✔
181
            simple_rules = None
×
182

183
        return simple_lits, simple_rules
1✔
184

185
    def __str__(self):
1✔
186
        return f'{super().__str__()}; idx: {self.idx}; cond: {", ".join(self.conditions)}'
×
187

188

189
class AlternativeNode(Node):
1✔
190

191
    def __init__(self, rule_id, alt_idx, idx):
1✔
192
        super().__init__()
1✔
193
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
194
        self.alt_idx = alt_idx  # Index of the container alternation inside the container rule.
1✔
195
        self.idx = idx  # Index of the alternative in the container alternation.
1✔
196

197
    def __str__(self):
1✔
198
        return f'{super().__str__()}; idx: {self.idx}'
×
199

200

201
class QuantifierNode(Node):
1✔
202

203
    def __init__(self, rule_id, idx, min, max):
1✔
204
        super().__init__()
1✔
205
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
206
        self.idx = idx  # Index of the quantifier in the container rule.
1✔
207
        self.min = min
1✔
208
        self.max = max
1✔
209
        self.min_depth = None
1✔
210

211
    def __str__(self):
1✔
212
        return f'{super().__str__()}; idx: {self.idx}; min: {self.min}; max: {self.max}'
×
213

214

215
class ActionNode(Node):
1✔
216

217
    def __init__(self, src):
1✔
218
        super().__init__()
1✔
219
        self.src = src
1✔
220

221
    def __str__(self):
1✔
222
        return f'{super().__str__()}; src: {self.src}'
×
223

224

225
class VariableNode(Node):
1✔
226

227
    def __init__(self, name, is_list):
1✔
228
        super().__init__()
1✔
229
        self.name = name
1✔
230
        self.is_list = is_list
1✔
231

232
    def __str__(self):
1✔
233
        return f'{super().__str__()}; name: {self.name}; list: {self.is_list}'
×
234

235

236
def printable_ranges(lower_bound, upper_bound):
1✔
237
    ranges = []
1✔
238
    range_start = None
1✔
239
    for c in range(lower_bound, upper_bound):
1✔
240
        if chr(c).isprintable():
1✔
241
            if range_start is None:
1✔
242
                range_start = c
1✔
243
        else:
244
            if range_start is not None:
1✔
245
                ranges.append((range_start, c))
1✔
246
                range_start = None
1✔
247

248
    if range_start is not None:
1✔
249
        ranges.append((range_start, upper_bound))
×
250
    return ranges
1✔
251

252

253
def multirange_diff(r1_list, r2_list):
1✔
254
    def range_diff(r1, r2):
1✔
255
        s1, e1 = r1
1✔
256
        s2, e2 = r2
1✔
257
        endpoints = sorted((s1, s2, e1, e2))
1✔
258
        result = []
1✔
259
        if endpoints[0] == s1:
1✔
260
            result.append((endpoints[0], endpoints[1]))
1✔
261
        if endpoints[3] == e1:
1✔
262
            result.append((endpoints[2], endpoints[3]))
1✔
263
        return result
1✔
264

265
    for r2 in r2_list:
1✔
266
        r1_list = list(chain.from_iterable(range_diff(r1, r2) for r1 in r1_list))
1✔
267
    return r1_list
1✔
268

269

270
class Charset(object):
1✔
271

272
    dot = {
1✔
273
        'any_ascii_letter': [(ord('A'), ord('Z') + 1), (ord('a'), ord('z') + 1)],
274
        'any_ascii_char': printable_ranges(0x00, 0x80),
275
        'any_unicode_char': printable_ranges(0, maxunicode + 1),
276
    }
277

278
    _cnt = 0
1✔
279

280
    def __init__(self, ranges):
1✔
281
        if not ranges:
1✔
282
            raise ValueError('Charset must contain at least one range')
×
283
        for start, end in ranges:
1✔
284
            if end <= start:
1✔
285
                raise ValueError(f"Charset range must not be empty: '\\u{{{start:x}}}'..'\\u{{{end - 1:x}}}', '{chr(start)}'..'{chr(end - 1)}'")
×
286
        self.id = Charset._cnt
1✔
287
        Charset._cnt += 1
1✔
288
        self.ranges = ranges
1✔
289

290

291
class GrammarGraph(object):
1✔
292

293
    def __init__(self):
1✔
294
        self.name = None
1✔
295
        self.vertices = OrderedDict()
1✔
296
        self.options = {}
1✔
297
        self.charsets = []
1✔
298
        self.header = ''
1✔
299
        self.members = ''
1✔
300
        self.default_rule = None
1✔
301

302
    @property
1✔
303
    def superclass(self):
1✔
304
        return self.options.get('superClass', 'Generator')
1✔
305

306
    @property
1✔
307
    def dot(self):
1✔
308
        return self.options.get('dot', 'any_ascii_char')
1✔
309

310
    @property
1✔
311
    def rules(self):
1✔
312
        return (vertex for vertex in self.vertices.values() if isinstance(vertex, RuleNode))
1✔
313

314
    @property
1✔
315
    def imag_rules(self):
1✔
316
        return (vertex for vertex in self.vertices.values() if isinstance(vertex, ImagRuleNode))
1✔
317

318
    def print_tree(self, root=None):
1✔
319
        if not root and not self.default_rule:
×
320
            raise ValueError('Either `root` must be defined or `print` should be called after `default_rule` is set.')
×
321
        (root or self.vertices[self.default_rule]).print_tree()
×
322

323
    def add_node(self, node):
1✔
324
        self.vertices[node.id] = node
1✔
325
        return node.id
1✔
326

327
    def add_edge(self, frm, to, args=None):
1✔
328
        assert frm in self.vertices, f'{frm} not in vertices.'
1✔
329
        assert to in self.vertices, f'{to} not in vertices.'
1✔
330
        self.vertices[frm].out_edges.append(Edge(dst=self.vertices[to], args=args))
1✔
331

332
    def calc_min_depths(self):
1✔
333
        min_depths = defaultdict(lambda: inf)
1✔
334
        changed = True
1✔
335

336
        while changed:
1✔
337
            changed = False
1✔
338
            for ident, node in self.vertices.items():
1✔
339
                selector = min if isinstance(node, AlternationNode) else max
1✔
340
                min_depth = selector((min_depths[out_node.id] + int(isinstance(out_node, RuleNode))
1✔
341
                                      for out_node in node.out_neighbours if not isinstance(out_node, QuantifierNode) or out_node.min > 0), default=0)
342

343
                if min_depth < min_depths[ident]:
1✔
344
                    min_depths[ident] = min_depth
1✔
345
                    changed = True
1✔
346

347
        for ident, node in self.vertices.items():
1✔
348
            if isinstance(node, RuleNode):
1✔
349
                node.min_depth = min_depths[ident]
1✔
350
            elif isinstance(node, QuantifierNode):
1✔
351
                node.min_depth = 0 if node.min > 0 else min_depths[ident]
1✔
352
            elif isinstance(node, AlternationNode):
1✔
353
                # Lift the minimal depths of the alternatives to the alternations, where the decision will happen.
354
                node.min_depths = [min_depths[alt.id] for alt in node.out_neighbours]
1✔
355

356

357
def escape_string(s):
1✔
358
    # To be kept in sync with Python's unicode_escape encoding at CPython's
359
    # Objects/unicodeobject.c:PyUnicode_AsUnicodeEscapeString, with the addition
360
    # of also escaping quotes.
361
    escapes = {
1✔
362
        '\t': '\\t',
363
        '\n': '\\n',
364
        '\r': '\\r',
365
        '\\': '\\\\',
366
        '\'': '\\\''
367
    }
368

369
    def _iter_escaped_chars(si):
1✔
370
        for ch in si:
1✔
371
            esc = escapes.get(ch)
1✔
372
            if esc is not None:
1✔
373
                yield esc
1✔
374

375
            else:
376
                cp = ord(ch)
1✔
377
                if 0x20 <= cp < 0x7f:
1✔
378
                    yield ch
1✔
379
                elif cp < 0x100:
1✔
380
                    yield f'\\x{cp:02x}'
1✔
381
                elif cp < 0x10000:
1✔
382
                    yield f'\\u{cp:04x}'
1✔
383
                else:
384
                    yield f'\\U{cp:08x}'
1✔
385

386
    return ''.join(c for c in _iter_escaped_chars(s))
1✔
387

388

389
class ProcessorTool(object):
1✔
390
    """
391
    Class to process ANTLRv4 grammar files, build an internal representation
392
    from them and create a generator class that is able to produce textual data
393
    according to the grammar files.
394
    """
395
    def __init__(self, lang, work_dir=None):
1✔
396
        """
397
        :param str lang: Language of the generated code (currently, only ``'py'`` is accepted as Python is the only supported language).
398
        :param str work_dir: Directory to generate fuzzers into (default: the current working directory).
399
        """
400
        self._lang = lang
1✔
401
        env = Environment(trim_blocks=True,
1✔
402
                          lstrip_blocks=True,
403
                          keep_trailing_newline=False)
404
        env.filters['substitute'] = lambda s, frm, to: re.sub(frm, to, str(s))
1✔
405
        env.filters['escape_string'] = escape_string
1✔
406
        self._template = env.from_string(get_data(__package__, 'resources/codegen/GeneratorTemplate.' + lang + '.jinja').decode('utf-8'))
1✔
407
        self._work_dir = work_dir or getcwd()
1✔
408

409
    def process(self, grammars, *, options=None, default_rule=None, encoding='utf-8', errors='strict', lib_dir=None, actions=True, pep8=False):
1✔
410
        """
411
        Perform the four main steps:
412

413
          1. Parse the grammar files.
414
          2. Build an internal representation of the grammar.
415
          3. Translate the internal representation into a generator source code in the target language.
416
          4. Save the source code into file.
417

418
        :param list[str] grammars: List of grammar files to produce generator from.
419
        :param dict options: Options dictionary to override/extend the options set in the grammar.
420
               Currenly, the following options are supported:
421

422
                 1. ``superClass``: Define the ancestor for the current grammar. The generator of this grammar will be inherited from ``superClass``. (default: :class:`grammarinator.runtime.Generator`)
423
                 2. ``dot``: Define how to handle the ``.`` wildcard in the grammar. Three keywords are accepted:
424

425
                     1. ``any_ascii_letter``: generate any ASCII letters
426
                     2. ``any_ascii_char``: generate any ASCII characters
427
                     3. ``any_unicode_char``: generate any Unicode characters
428

429
                    (default: ``any_ascii_char``)
430

431
        :param str default_rule: Name of the default rule to start generation from (default: first parser rule in the grammar).
432
        :param str encoding: Grammar file encoding.
433
        :param str errors: Encoding error handling scheme.
434
        :param str lib_dir: Alternative directory to look for grammar imports beside the current working directory.
435
        :param bool actions: Boolean to enable grammar actions. If they are disabled then the inline actions and semantic
436
               predicates of the input grammar (snippets in ``{...}`` and ``{...}?`` form) are disregarded (i.e., no code is
437
               generated from them).
438
        :param bool pep8: Boolean to enable pep8 to beautify the generated fuzzer source.
439
        """
440
        lexer_root, parser_root = None, None
1✔
441

442
        for grammar in grammars:
1✔
443
            if grammar.endswith('.g4'):
1✔
444
                root = self._parse_grammar(grammar, encoding, errors, lib_dir)
1✔
445
                # Lexer and/or combined grammars are processed first to evaluate TOKEN_REF-s.
446
                if root.grammarDecl().grammarType().LEXER() or not root.grammarDecl().grammarType().PARSER():
1✔
447
                    lexer_root = root
1✔
448
                else:
449
                    parser_root = root
1✔
450
            else:
451
                copy(grammar, self._work_dir)
×
452

453
        graph = self._build_graph(actions, lexer_root, parser_root, options, default_rule)
1✔
454
        self._analyze_graph(graph)
1✔
455

456
        src = self._template.render(graph=graph, version=__version__).lstrip()
1✔
457
        with open(join(self._work_dir, graph.name + '.' + self._lang), 'w') as f:
1✔
458
            if pep8:
1✔
459
                src = autopep8.fix_code(src)
1✔
460
            f.write(src)
1✔
461

462
    def _parse_grammar(self, grammar, encoding, errors, lib_dir):
1✔
463
        work_list = {grammar}
1✔
464
        root = None
1✔
465

466
        while work_list:
1✔
467
            grammar = work_list.pop()
1✔
468

469
            antlr_parser = ANTLRv4Parser(CommonTokenStream(ANTLRv4Lexer(FileStream(grammar, encoding=encoding, errors=errors))))
1✔
470
            current_root = antlr_parser.grammarSpec()
1✔
471
            # assert antlr_parser._syntaxErrors > 0, 'Parse error in ANTLR grammar.'
472

473
            # Save the 'outermost' grammar.
474
            if not root:
1✔
475
                root = current_root
1✔
476
            else:
477
                # Unite the rules of the imported grammar with the host grammar's rules.
478
                for rule in current_root.rules().ruleSpec():
1✔
479
                    root.rules().addChild(rule)
1✔
480

481
            work_list |= self._collect_imports(current_root, dirname(grammar), lib_dir)
1✔
482

483
        return root
1✔
484

485
    @staticmethod
1✔
486
    def _collect_imports(root, base_dir, lib_dir):
1✔
487
        imports = set()
1✔
488
        for prequel in root.prequelConstruct():
1✔
489
            if prequel.delegateGrammars():
1✔
490
                for delegate_grammar in prequel.delegateGrammars().delegateGrammar():
1✔
491
                    ident = delegate_grammar.identifier(0)
1✔
492
                    grammar_fn = str(ident.RULE_REF() or ident.TOKEN_REF()) + '.g4'
1✔
493
                    if lib_dir is not None and exists(join(lib_dir, grammar_fn)):
1✔
494
                        imports.add(join(lib_dir, grammar_fn))
1✔
495
                    else:
496
                        imports.add(join(base_dir, grammar_fn))
×
497
        return imports
1✔
498

499
    @staticmethod
1✔
500
    def _build_graph(actions, lexer_root, parser_root, options, default_rule):
1✔
501

502
        def find_conditions(node):
1✔
503
            if not actions:
1✔
504
                return '1'
×
505

506
            if isinstance(node, str):
1✔
507
                return node
×
508

509
            action_block = getattr(node, 'actionBlock', None)
1✔
510
            if action_block:
1✔
511
                if action_block() and action_block().ACTION_CONTENT() and node.QUESTION():
1✔
512
                    return ''.join(str(child) for child in action_block().ACTION_CONTENT())
1✔
513
                return '1'
1✔
514

515
            element = getattr(node, 'element', None) or getattr(node, 'lexerElement', None)
1✔
516
            if element:
1✔
517
                if not element():
1✔
518
                    return '1'
1✔
519
                return find_conditions(element(0))
1✔
520

521
            child_ref = getattr(node, 'alternative', None) or getattr(node, 'lexerElements', None)
1✔
522

523
            # An alternative can be explicitly empty, in this case it won't have any of the attributes above.
524
            if not child_ref:
1✔
525
                return '1'
×
526

527
            return find_conditions(child_ref())
1✔
528

529
        def character_range_interval(node):
1✔
530
            start = str(node.characterRange().STRING_LITERAL(0))[1:-1]
1✔
531
            end = str(node.characterRange().STRING_LITERAL(1))[1:-1]
1✔
532
            start_cp, start_offset = process_lexer_char(start, 0)
1✔
533
            end_cp, end_offset = process_lexer_char(end, 0)
1✔
534

535
            if start_offset < len(start) or end_offset < len(end):
1✔
536
                raise ValueError(f'Only single characters are allowed in character ranges ({start!r}..{end!r})')
×
537

538
            return start_cp, end_cp + 1
1✔
539

540
        def process_lexer_char(s, offset):
1✔
541
            # To be kept in sync with org.antlr.v4.misc.EscapeSequenceParsing.parseEscape
542

543
            # Original Java code has to handle unicode codepoints which consist of more than one character,
544
            # however in Python 3.3+, we don't have to worry about this: https://stackoverflow.com/a/42262842
545

546
            if s[offset] != '\\':
1✔
547
                return ord(s[offset]), offset + 1
1✔
548

549
            if offset + 2 > len(s):
1✔
550
                raise ValueError('Escape must have at least two characters')
×
551

552
            escaped = s[offset + 1]
1✔
553
            offset += 2  # Move past backslash and escaped character
1✔
554

555
            if escaped == 'u':
1✔
556
                if s[offset] == '{':
1✔
557
                    # \u{...}
558
                    hex_start_offset = offset + 1
1✔
559
                    hex_end_offset = s.find('}', hex_start_offset)
1✔
560
                    if hex_end_offset == -1:
1✔
561
                        raise ValueError('Missing closing bracket for unicode escape')
×
562
                    if hex_start_offset == hex_end_offset:
1✔
563
                        raise ValueError('Missing codepoint for unicode escape')
×
564

565
                    offset = hex_end_offset + 1  # Skip over last bracket
1✔
566
                else:
567
                    # \uXXXX
568
                    hex_start_offset = offset
1✔
569
                    hex_end_offset = hex_start_offset + 4
1✔
570
                    if hex_end_offset > len(s):
1✔
571
                        raise ValueError('Non-bracketed unicode escape must be of form \\uXXXX')
×
572

573
                    offset = hex_end_offset
1✔
574

575
                try:
1✔
576
                    codepoint = int(s[hex_start_offset:hex_end_offset], 16)
1✔
577
                except ValueError as exc:
×
578
                    raise ValueError('Invalid hex value') from exc
×
579

580
                if codepoint < 0 or codepoint > maxunicode:
1✔
581
                    raise ValueError('Invalid unicode codepoint')
×
582

583
                return codepoint, offset
1✔
584

585
            if escaped in ('p', 'P'):
1✔
586
                raise ValueError('Unicode properties (\\p{...}) are not supported')
×
587

588
            # To be kept in sync with org.antlr.v4.misc.CharSupport.ANTLRLiteralEscapedCharValue
589
            escaped_values = {
1✔
590
                'n': '\n',
591
                'r': '\r',
592
                'b': '\b',
593
                't': '\t',
594
                'f': '\f',
595
                '\\': '\\',
596
                # Additional escape sequences defined by org.antlr.v4.misc.EscapeSequenceParsing.parseEscape
597
                '-': '-',
598
                ']': ']',
599
                '\'': '\''
600
            }
601

602
            if escaped in escaped_values:
1✔
603
                return ord(escaped_values[escaped]), offset
1✔
604

605
            raise ValueError('Invalid escaped value')
×
606

607
        def lexer_charset_interval(s):
1✔
608
            # To be kept in sync with org.antlr.v4.automata.LexerATNFactory.getSetFromCharSetLiteral
609
            assert len(s) > 0, 'Charset cannot be empty'
1✔
610

611
            ranges = []
1✔
612

613
            offset = 0
1✔
614
            while offset < len(s):
1✔
615
                in_range = s[offset] == '-' and offset != 0 and offset != len(s) - 1
1✔
616
                if in_range:
1✔
617
                    offset += 1
1✔
618

619
                codepoint, offset = process_lexer_char(s, offset)
1✔
620

621
                if in_range:
1✔
622
                    ranges[-1] = (ranges[-1][0], codepoint + 1)
1✔
623
                else:
624
                    ranges.append((codepoint, codepoint + 1))
1✔
625

626
            return ranges
1✔
627

628
        def chars_from_set(node):
1✔
629
            if node.characterRange():
1✔
630
                return [character_range_interval(node)]
×
631

632
            if node.LEXER_CHAR_SET():
1✔
633
                return lexer_charset_interval(str(node.LEXER_CHAR_SET())[1:-1])
1✔
634

635
            if node.STRING_LITERAL():
1✔
636
                char = str(node.STRING_LITERAL())[1:-1]
1✔
637
                char_cp, char_offset = process_lexer_char(char, 0)
1✔
638
                if char_offset < len(char):
1✔
639
                    raise ValueError(f'Zero or multi-character literals are not allowed in lexer sets: {char!r}')
×
640
                return [(char_cp, char_cp + 1)]
1✔
641

642
            if node.TOKEN_REF():
×
643
                src = str(node.TOKEN_REF())
×
644
                assert graph.vertices[src].start_ranges is not None, f'{src} has no character start ranges.'
×
645
                return graph.vertices[src].start_ranges
×
646

647
            return []
×
648

649
        def unescape_string(s):
1✔
650
            def _iter_unescaped_chars(s):
1✔
651
                offset = 0
1✔
652
                while offset < len(s):
1✔
653
                    codepoint, offset = process_lexer_char(s, offset)
1✔
654
                    yield chr(codepoint)
1✔
655

656
            return ''.join(c for c in _iter_unescaped_chars(s))
1✔
657

658
        def argActionBlock(node):
1✔
659
            args = {}
1✔
660
            if node and node.argActionBlock():
1✔
661
                for arg in ''.join(str(chr_arg) for chr_arg in node.argActionBlock().ARGUMENT_CONTENT()).split(','):
1✔
662
                    arg_name, arg_value = arg, None
1✔
663
                    if '=' in arg_name:
1✔
664
                        arg_name, arg_value = arg_name.split('=')
1✔
665
                    args[arg_name.strip()] = arg_value.strip() if arg_value else arg_value
1✔
666
            return args
1✔
667

668
        def build_rule(rule, node):
1✔
669
            lexer_rule = isinstance(rule, UnlexerRuleNode)
1✔
670
            alt_idx, quant_idx, chr_idx = 0, 0, 0  # pylint: disable=unused-variable
1✔
671

672
            def build_expr(node, parent_id):
1✔
673
                if isinstance(node, ANTLRv4Parser.ParserRuleSpecContext):
1✔
674
                    if actions:
1✔
675
                        rule.args = argActionBlock(node)
1✔
676
                        rule.locals = argActionBlock(node.localsSpec())
1✔
677
                        rule.returns = argActionBlock(node.ruleReturns())
1✔
678
                    build_expr(node.ruleBlock(), parent_id)
1✔
679

680
                elif isinstance(node, (ANTLRv4Parser.RuleAltListContext, ANTLRv4Parser.AltListContext, ANTLRv4Parser.LexerAltListContext)):
1✔
681
                    children = [child for child in node.children if isinstance(child, ParserRuleContext)]
1✔
682
                    if len(children) == 1:
1✔
683
                        build_expr(children[0], parent_id)
1✔
684
                        return
1✔
685

686
                    nonlocal alt_idx
687
                    alt_id = graph.add_node(AlternationNode(idx=alt_idx, conditions=[find_conditions(child) for child in children], rule_id=rule.id))
1✔
688
                    alt_idx += 1
1✔
689
                    graph.add_edge(frm=parent_id, to=alt_id)
1✔
690

691
                    for i, child in enumerate(children):
1✔
692
                        alternative_id = graph.add_node(AlternativeNode(rule_id=rule.id, alt_idx=graph.vertices[alt_id].idx, idx=i))
1✔
693
                        graph.add_edge(frm=alt_id, to=alternative_id)
1✔
694
                        build_expr(child, alternative_id)
1✔
695

696
                elif isinstance(node, ANTLRv4Parser.LabeledAltContext):
1✔
697
                    if not node.identifier():
1✔
698
                        build_expr(node.alternative(), parent_id)
1✔
699
                        return
1✔
700

701
                    rule_node = UnparserRuleNode(name=rule.name, label=str(node.identifier().TOKEN_REF() or node.identifier().RULE_REF()))
1✔
702
                    graph.add_edge(frm=parent_id, to=graph.add_node(rule_node))
1✔
703
                    build_rule(rule_node, node.alternative())
1✔
704

705
                elif isinstance(node, (ANTLRv4Parser.AlternativeContext, ANTLRv4Parser.LexerAltContext)):
1✔
706
                    children = node.element() if isinstance(node, ANTLRv4Parser.AlternativeContext) else node.lexerElements().lexerElement()
1✔
707
                    for child in children:
1✔
708
                        build_expr(child, parent_id)
1✔
709

710
                    if not graph.vertices[parent_id].out_neighbours:
1✔
711
                        graph.add_edge(frm=parent_id, to=lambda_id)
1✔
712

713
                elif isinstance(node, (ANTLRv4Parser.ElementContext, ANTLRv4Parser.LexerElementContext)):
1✔
714
                    if node.actionBlock():
1✔
715
                        # Conditions are handled at alternative processing.
716
                        if not actions or node.QUESTION():
1✔
717
                            return
1✔
718

719
                        graph.add_edge(frm=parent_id, to=graph.add_node(ActionNode(src=''.join(str(child) for child in node.actionBlock().ACTION_CONTENT()))))
1✔
720
                        return
1✔
721

722
                    suffix = None
1✔
723
                    if node.ebnfSuffix():
1✔
724
                        suffix = node.ebnfSuffix()
1✔
725
                    elif hasattr(node, 'ebnf') and node.ebnf() and node.ebnf().blockSuffix():
1✔
726
                        suffix = node.ebnf().blockSuffix().ebnfSuffix()
1✔
727

728
                    if not suffix:
1✔
729
                        build_expr(node.children[0], parent_id)
1✔
730
                        return
1✔
731

732
                    nonlocal quant_idx
733
                    suffix = str(suffix.children[0])
1✔
734
                    quant_ranges = {'?': {'min': 0, 'max': 1}, '*': {'min': 0, 'max': 'inf'}, '+': {'min': 1, 'max': 'inf'}}
1✔
735
                    quant_id = graph.add_node(QuantifierNode(rule_id=rule.id, idx=quant_idx, **quant_ranges[suffix]))
1✔
736
                    quant_idx += 1
1✔
737
                    graph.add_edge(frm=parent_id, to=quant_id)
1✔
738
                    build_expr(node.children[0], quant_id)
1✔
739

740
                elif isinstance(node, ANTLRv4Parser.LabeledElementContext):
1✔
741
                    build_expr(node.atom() or node.block(), parent_id)
1✔
742
                    ident = node.identifier()
1✔
743
                    name = str(ident.RULE_REF() or ident.TOKEN_REF())
1✔
744
                    is_list = node.PLUS_ASSIGN() is not None
1✔
745
                    graph.add_edge(frm=parent_id, to=graph.add_node(VariableNode(name=name, is_list=is_list)))
1✔
746
                    rule.labels[name] = is_list
1✔
747

748
                elif isinstance(node, ANTLRv4Parser.RulerefContext):
1✔
749
                    graph.add_edge(frm=parent_id, to=str(node.RULE_REF()), args=argActionBlock(node) if actions else None)
1✔
750

751
                elif isinstance(node, (ANTLRv4Parser.LexerAtomContext, ANTLRv4Parser.AtomContext)):
1✔
752
                    nonlocal chr_idx
753

754
                    if node.DOT():
1✔
755
                        if isinstance(node, ANTLRv4Parser.LexerAtomContext):
1✔
756
                            graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.id, idx=chr_idx, charset=dot_charset.id)))
1✔
757
                            chr_idx += 1
1✔
758
                        else:
759
                            if '_dot' not in graph.vertices:
1✔
760
                                # Create an artificial `_dot` rule with an alternation of all the lexer rules.
761
                                parser_dot_id = graph.add_node(UnparserRuleNode(name='_dot', label=None))
1✔
762
                                unlexer_ids = [v.name for vid, v in graph.vertices.items() if isinstance(v, UnlexerRuleNode) and v.id != 'EOF']
1✔
763
                                alt_id = graph.add_node(AlternationNode(rule_id=parser_dot_id, idx=0, conditions=[1] * len(unlexer_ids)))
1✔
764
                                graph.add_edge(frm=parser_dot_id, to=alt_id)
1✔
765
                                for i, lexer_id in enumerate(unlexer_ids):
1✔
766
                                    alternative_id = graph.add_node(AlternativeNode(rule_id=parser_dot_id, alt_idx=0, idx=i))
1✔
767
                                    graph.add_edge(frm=alt_id, to=alternative_id)
1✔
768
                                    graph.add_edge(frm=alternative_id, to=lexer_id)
1✔
769
                            graph.add_edge(frm=parent_id, to='_dot')
1✔
770

771
                    elif node.notSet():
1✔
772
                        if node.notSet().setElement():
1✔
773
                            not_ranges = chars_from_set(node.notSet().setElement())
1✔
774
                        else:
775
                            not_ranges = []
×
776
                            for set_element in node.notSet().blockSet().setElement():
×
777
                                not_ranges.extend(chars_from_set(set_element))
×
778

779
                        charset = Charset(multirange_diff(dot_charset.ranges, sorted(not_ranges, key=lambda x: x[0])))
1✔
780
                        graph.charsets.append(charset)
1✔
781
                        graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.id, idx=chr_idx, charset=charset.id)))
1✔
782
                        chr_idx += 1
1✔
783

784
                    elif isinstance(node, ANTLRv4Parser.LexerAtomContext) and node.characterRange():
1✔
785
                        start, end = character_range_interval(node)
1✔
786
                        if lexer_rule:
1✔
787
                            rule.start_ranges.append((start, end))
1✔
788

789
                        charset = Charset([(start, end)])
1✔
790
                        graph.charsets.append(charset)
1✔
791
                        graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.id, idx=chr_idx, charset=charset.id)))
1✔
792
                        chr_idx += 1
1✔
793

794
                    elif isinstance(node, ANTLRv4Parser.LexerAtomContext) and node.LEXER_CHAR_SET():
1✔
795
                        ranges = lexer_charset_interval(str(node.LEXER_CHAR_SET())[1:-1])
1✔
796
                        if lexer_rule:
1✔
797
                            rule.start_ranges.extend(ranges)
1✔
798

799
                        charset = Charset(sorted(ranges, key=lambda x: x[0]))
1✔
800
                        graph.charsets.append(charset)
1✔
801
                        graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.id, idx=chr_idx, charset=charset.id)))
1✔
802
                        chr_idx += 1
1✔
803

804
                    for child in node.children:
1✔
805
                        build_expr(child, parent_id)
1✔
806

807
                elif isinstance(node, ANTLRv4Parser.TerminalContext):
1✔
808
                    if node.TOKEN_REF():
1✔
809
                        graph.add_edge(frm=parent_id, to=str(node.TOKEN_REF()))
1✔
810

811
                    elif node.STRING_LITERAL():
1✔
812
                        src = unescape_string(str(node.STRING_LITERAL())[1:-1])
1✔
813

814
                        if lexer_rule:
1✔
815
                            rule.start_ranges.append((ord(src[0]), ord(src[0]) + 1))
1✔
816
                            graph.add_edge(frm=parent_id, to=graph.add_node(LiteralNode(src=src)))
1✔
817
                        else:
818
                            # Ensure that every inline literal in parser rules has its lexer rule
819
                            # found or implicitly created.
820
                            lit_id = literal_lookup.get(src)
1✔
821
                            if not lit_id:
1✔
822
                                lit_id = graph.add_node(UnlexerRuleNode())
1✔
823
                                literal_lookup[src] = lit_id
1✔
824
                                graph.add_edge(frm=lit_id, to=graph.add_node(LiteralNode(src=src)))
1✔
825
                            graph.add_edge(frm=parent_id, to=lit_id)
1✔
826

827
                elif isinstance(node, ParserRuleContext) and node.getChildCount():
1✔
828
                    for child in node.children:
1✔
829
                        build_expr(child, parent_id)
1✔
830

831
            if lexer_rule:
1✔
832
                rule.start_ranges = []
1✔
833

834
            build_expr(node, rule.id)
1✔
835

836
            # Save lexer rules with constant literals to enable resolving them in parser rules.
837
            if lexer_rule and len(rule.out_edges) == 1 and isinstance(rule.out_edges[0].dst, LiteralNode):
1✔
838
                literal_lookup[rule.out_edges[0].dst.src] = rule.id
1✔
839

840
        def build_prequel(node):
1✔
841
            assert isinstance(node, ANTLRv4Parser.GrammarSpecContext)
1✔
842

843
            if not graph.name:
1✔
844
                graph.name = re.sub(r'^(.+?)(Lexer|Parser)?$', r'\1Generator', str(node.grammarDecl().identifier().TOKEN_REF() or node.grammarDecl().identifier().RULE_REF()))
1✔
845

846
            for prequelConstruct in node.prequelConstruct() if node.prequelConstruct() else ():
1✔
847
                for option in prequelConstruct.optionsSpec().option() if prequelConstruct.optionsSpec() else ():
1✔
848
                    ident = option.identifier()
1✔
849
                    ident = str(ident.RULE_REF() or ident.TOKEN_REF())
1✔
850
                    graph.options[ident] = option.optionValue().getText()
1✔
851

852
                for identifier in prequelConstruct.tokensSpec().idList().identifier() if prequelConstruct.tokensSpec() and prequelConstruct.tokensSpec().idList() else ():
1✔
853
                    assert identifier.TOKEN_REF() is not None, 'Token names must start with uppercase letter.'
1✔
854
                    graph.add_node(ImagRuleNode(id=str(identifier.TOKEN_REF())))
1✔
855

856
                if prequelConstruct.action_() and actions:
1✔
857
                    action = prequelConstruct.action_()
1✔
858
                    action_ident = action.identifier()
1✔
859
                    action_type = str(action_ident.RULE_REF() or action_ident.TOKEN_REF())
1✔
860
                    raw_action_src = ''.join(str(child) for child in action.actionBlock().ACTION_CONTENT())
1✔
861

862
                    # We simply append both members and header code chunks to the generated source.
863
                    # It's the user's responsibility to define them in order.
864
                    if action_type == 'members':
1✔
865
                        graph.members += raw_action_src
1✔
866
                    elif action_type == 'header':
1✔
867
                        graph.header += raw_action_src
1✔
868

869
        def build_rules(node):
1✔
870
            generator_rules, duplicate_rules = [], []
1✔
871
            for rule in node.rules().ruleSpec():
1✔
872
                if rule.parserRuleSpec():
1✔
873
                    rule_spec = rule.parserRuleSpec()
1✔
874
                    rule_node = UnparserRuleNode(name=str(rule_spec.RULE_REF()))
1✔
875
                    antlr_node = rule_spec
1✔
876
                elif rule.lexerRuleSpec():
1✔
877
                    rule_spec = rule.lexerRuleSpec()
1✔
878
                    rule_node = UnlexerRuleNode(name=str(rule_spec.TOKEN_REF()))
1✔
879
                    antlr_node = rule_spec.lexerRuleBlock()
1✔
880
                else:
881
                    assert False, 'Should not get here.'
×
882

883
                if rule_node.id not in graph.vertices:
1✔
884
                    graph.add_node(rule_node)
1✔
885
                    generator_rules.append((rule_node, antlr_node))
1✔
886
                else:
887
                    duplicate_rules.append(rule_node.id)
×
888

889
            for mode_spec in node.modeSpec():
1✔
890
                for rule_spec in mode_spec.lexerRuleSpec():
1✔
891
                    rule_node = UnlexerRuleNode(name=str(rule_spec.TOKEN_REF()))
1✔
892
                    if rule_node.id not in graph.vertices:
1✔
893
                        graph.add_node(rule_node)
1✔
894
                        generator_rules.append((rule_node, rule_spec.lexerRuleBlock()))
1✔
895
                    else:
896
                        duplicate_rules.append(rule_node.id)
×
897

898
            if duplicate_rules:
1✔
899
                raise ValueError(f'Rule redefinition(s): {", ".join(duplicate_rules)}')
×
900

901
            # Ensure to process lexer rules first to lookup table from literal constants.
902
            for rule_args in sorted(generator_rules, key=lambda r: int(isinstance(r[0], UnparserRuleNode))):
1✔
903
                build_rule(*rule_args)
1✔
904

905
            if default_rule:
1✔
906
                graph.default_rule = default_rule
×
907
            elif node.grammarDecl().grammarType().PARSER() or not (node.grammarDecl().grammarType().LEXER() or node.grammarDecl().grammarType().PARSER()):
1✔
908
                graph.default_rule = generator_rules[0][0].name
1✔
909

910
        graph = GrammarGraph()
1✔
911
        lambda_id = graph.add_node(LambdaNode())
1✔
912
        graph.add_node(UnlexerRuleNode(name='EOF'))
1✔
913

914
        for root in [lexer_root, parser_root]:
1✔
915
            if root:
1✔
916
                build_prequel(root)
1✔
917
        graph.options.update(options or {})
1✔
918

919
        dot_charset = Charset(Charset.dot[graph.dot])
1✔
920
        graph.charsets.append(dot_charset)
1✔
921

922
        literal_lookup = {}
1✔
923

924
        for root in [lexer_root, parser_root]:
1✔
925
            if root:
1✔
926
                build_rules(root)
1✔
927

928
        graph.calc_min_depths()
1✔
929
        return graph
1✔
930

931
    # Calculates the distance of every rule node from the start node. As a result, it can
932
    # point out to rules, that are not available from there, furthermore it can give a hint
933
    # about the farthest node/rule to help to determine a max_depth that has the chance to
934
    # reach every rule. Also checks for infinite derivations.
935
    @staticmethod
1✔
936
    def _analyze_graph(graph, root=None):
1✔
937
        root = root or graph.default_rule
1✔
938
        min_distances = defaultdict(lambda: inf)
1✔
939
        min_distances[root] = 0
1✔
940

941
        work_list = [root]
1✔
942
        while work_list:
1✔
943
            v = work_list.pop(0)
1✔
944
            for out_v in graph.vertices[v].out_neighbours:
1✔
945
                d = min_distances[v] + int(isinstance(out_v, RuleNode))
1✔
946
                if d < min_distances[out_v.id]:
1✔
947
                    min_distances[out_v.id] = d
1✔
948
                    work_list.append(out_v.id)
1✔
949

950
        farthest_ident, max_distance = max(((v_id, d) for v_id, d in min_distances.items() if (isinstance(graph.vertices[v_id], RuleNode) and d != inf)), key=lambda item: item[1])
1✔
951
        unreachable_rules = [v_id for v_id, v in graph.vertices.items() if isinstance(v, RuleNode) and min_distances[v_id] == inf]
1✔
952

953
        logger.info('\tThe farthest rule from %r is %r (%d steps).', root, farthest_ident, max_distance)
1✔
954
        if unreachable_rules:
1✔
955
            logger.warning('\t%d rule(s) unreachable from %r: %s', len(unreachable_rules), root, ', '.join(map(repr, unreachable_rules)))
1✔
956

957
        inf_alts = []
1✔
958
        inf_rules = []
1✔
959
        for ident, node in graph.vertices.items():
1✔
960
            if isinstance(node, AlternationNode):
1✔
961
                for alternative_idx, alternative_node in enumerate(node.out_neighbours):
1✔
962
                    if node.min_depths[alternative_idx] == inf:
1✔
963
                        # Generate human-readable description for an alternative in the graph. The output is a
964
                        # (rule node, alternation node, alternative node) string, where `rule` defines the container
965
                        # rule and the (alternation node, alternative node) sequence defines a derivation reaching the alternative.
966
                        inf_alts.append(', '.join(map(str, [graph.vertices[alternative_node.rule_id], node, alternative_node])))
×
967
            elif isinstance(node, RuleNode):
1✔
968
                if node.min_depth == inf:
1✔
969
                    inf_rules.append(ident)
×
970
        if inf_alts:
1✔
971
            logger.warning('\t%d alternative(s) with infinite derivation (rule node, alternation node, alternative node):\n\t%s', len(inf_alts), ',\n\t'.join(inf_alts))
×
972
        if inf_rules:
1✔
973
            logger.warning('\t%d rule(s) with infinite derivation (possible cycles): %s', len(inf_rules), ', '.join(map(repr, inf_rules)))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc