• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

renatahodovan / grammarinator / 4458095744

pending completion
4458095744

push

github

GitHub
Support args, locals and returns in parser rules (#83)

44 of 44 new or added lines in 1 file covered. (100.0%)

1130 of 1277 relevant lines covered (88.49%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.16
/grammarinator/process.py
1
# Copyright (c) 2017-2023 Renata Hodovan, Akos Kiss.
2
# Copyright (c) 2020 Sebastian Kimberk.
3
#
4
# Licensed under the BSD 3-Clause License
5
# <LICENSE.rst or https://opensource.org/licenses/BSD-3-Clause>.
6
# This file may not be copied, modified, or distributed except
7
# according to those terms.
8

9
import re
1✔
10

11
from argparse import ArgumentParser
1✔
12
from collections import defaultdict, OrderedDict
1✔
13
from itertools import chain
1✔
14
from math import inf
1✔
15
from os import getcwd
1✔
16
from os.path import dirname, exists, join
1✔
17
from pkgutil import get_data
1✔
18
from shutil import copy
1✔
19
from sys import maxunicode
1✔
20

21
import autopep8
1✔
22

23
from antlr4 import CommonTokenStream, FileStream, ParserRuleContext
1✔
24
from inators.arg import add_log_level_argument, add_version_argument, process_log_level_argument
1✔
25
from jinja2 import Environment
1✔
26

27
from .cli import init_logging, logger
1✔
28
from .parser import ANTLRv4Lexer, ANTLRv4Parser
1✔
29
from .pkgdata import __version__
1✔
30

31

32
class Edge(object):
1✔
33

34
    def __init__(self, dst, args=None):
1✔
35
        self.dst = dst
1✔
36
        self.args = args
1✔
37

38

39
class Node(object):
1✔
40

41
    _cnt = 0
1✔
42

43
    def __init__(self, id=None):
1✔
44
        if id is None:
1✔
45
            id = Node._cnt
1✔
46
            Node._cnt += 1
1✔
47
        self.id = id
1✔
48
        self.out_edges = []
1✔
49
        self.min_depth = inf
1✔
50

51
    @property
1✔
52
    def out_neighbours(self):
1✔
53
        return [edge.dst for edge in self.out_edges]
1✔
54

55
    def print_tree(self):
1✔
56
        def _walk(node):
×
57
            nonlocal indent
58
            print(f'{"  " * indent}{str(node)}{"" if node not in visited else " (...recursion)"}')
×
59
            if node in visited:
×
60
                return
×
61

62
            visited.add(node)
×
63
            indent += 1
×
64
            for child in node.out_neighbours:
×
65
                _walk(child)
×
66
            indent -= 1
×
67

68
        visited = set()
×
69
        indent = 0
×
70
        _walk(self)
×
71

72
    def __str__(self):
1✔
73
        return f'cls: {self.__class__.__name__}'
×
74

75

76
class RuleNode(Node):
1✔
77

78
    def __init__(self, name, label, type):
1✔
79
        super().__init__(name if label is None else '_'.join((name, label)))
1✔
80
        self.name = name
1✔
81
        self.type = type
1✔
82

83
        self.labels = {}
1✔
84
        self.args = {}
1✔
85
        self.locals = {}
1✔
86
        self.returns = {}
1✔
87

88
    @property
1✔
89
    def has_var(self):
1✔
90
        return self.labels or self.attributes
1✔
91

92
    @property
1✔
93
    def attributes(self):
1✔
94
        return dict(self.args, **self.locals, **self.returns)
1✔
95

96
    def __str__(self):
1✔
97
        return f'{super().__str__()}; name: {self.name}; var: {self.has_var}'
×
98

99

100
class UnlexerRuleNode(RuleNode):
1✔
101

102
    def __init__(self, name):
1✔
103
        super().__init__(name, None, 'UnlexerRule')
1✔
104
        self.start_ranges = None
1✔
105

106

107
class UnparserRuleNode(RuleNode):
1✔
108

109
    def __init__(self, name, label=None):
1✔
110
        super().__init__(name, label, 'UnparserRule')
1✔
111

112

113
class ImagRuleNode(Node):
1✔
114

115
    def __init__(self, id):
1✔
116
        super().__init__(id)
1✔
117

118

119
class LiteralNode(Node):
1✔
120

121
    def __init__(self, src):
1✔
122
        super().__init__()
1✔
123
        self.src = src
1✔
124

125
    def __str__(self):
1✔
126
        return f'{super().__str__()}; src: {self.src!r}'
×
127

128

129
class CharsetNode(Node):
1✔
130

131
    def __init__(self, rule_id, idx, charset):
1✔
132
        super().__init__()
1✔
133
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
134
        self.idx = idx  # Index of the charset inside the current rule.
1✔
135
        self.charset = charset  # Global identifier of the charset.
1✔
136

137
    def __str__(self):
1✔
138
        return f'{super().__str__()}; idx: {self.idx}; charset: {self.charset}'
×
139

140

141
class LambdaNode(Node):
1✔
142

143
    def __init__(self):
1✔
144
        super().__init__()
1✔
145

146

147
class AlternationNode(Node):
1✔
148

149
    def __init__(self, rule_id, idx, conditions):
1✔
150
        super().__init__()
1✔
151
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
152
        self.idx = idx  # Index of the alternation in the container rule.
1✔
153
        self.conditions = conditions
1✔
154

155
    def simple_alternatives(self):
1✔
156
        # Check if an alternation contains simple alternatives only (simple
157
        # literals or rule references), and return a 2-tuple. If the alternation
158
        # contains any non-simple alternatives, return None, None. If the
159
        # alternation contains simple literals only, the first element of the
160
        # tuple is a list of the literal values, while the second element is None.
161
        # If the alternation contains rule references only, the first element is
162
        # None, while the second element is a list of rule ids. If the alternation
163
        # contains both simple literals and rule references, then both elements of
164
        # the tuple are lists, which are of identical length, and exactly one of
165
        # them contains a non-None value at every index position.
166
        if not self.out_neighbours or any(len(alt.out_neighbours) != 1 or not isinstance(alt.out_neighbours[0], (LiteralNode, RuleNode)) for alt in self.out_neighbours):
1✔
167
            return None, None
1✔
168

169
        simple_lits = [alt.out_neighbours[0].src if isinstance(alt.out_neighbours[0], LiteralNode) else None for alt in self.out_neighbours]
1✔
170
        if all(lit is None for lit in simple_lits):
1✔
171
            simple_lits = None
1✔
172

173
        simple_rules = [alt.out_neighbours[0].id if isinstance(alt.out_neighbours[0], RuleNode) else None for alt in self.out_neighbours]
1✔
174
        if all(rule is None for rule in simple_rules):
1✔
175
            simple_rules = None
1✔
176

177
        return simple_lits, simple_rules
1✔
178

179
    def __str__(self):
1✔
180
        return f'{super().__str__()}; idx: {self.idx}; cond: {", ".join(self.conditions)}'
×
181

182

183
class AlternativeNode(Node):
1✔
184

185
    def __init__(self, rule_id, alt_idx, idx):
1✔
186
        super().__init__()
1✔
187
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
188
        self.alt_idx = alt_idx  # Index of the container alternation inside the container rule.
1✔
189
        self.idx = idx  # Index of the alternative in the container alternation.
1✔
190

191
    def __str__(self):
1✔
192
        return f'{super().__str__()}; idx: {self.idx}'
×
193

194

195
class QuantifierNode(Node):
1✔
196

197
    def __init__(self, rule_id, idx, min, max):
1✔
198
        super().__init__()
1✔
199
        self.rule_id = rule_id  # Identifier of the container rule.
1✔
200
        self.idx = idx  # Index of the quantifier in the container rule.
1✔
201
        self.min = min
1✔
202
        self.max = max
1✔
203

204
    def __str__(self):
1✔
205
        return f'{super().__str__()}; idx: {self.idx}; min: {self.min}; max: {self.max}'
×
206

207

208
class ActionNode(Node):
1✔
209

210
    def __init__(self, src):
1✔
211
        super().__init__()
1✔
212
        self.src = src
1✔
213

214
    def __str__(self):
1✔
215
        return f'{super().__str__()}; src: {self.src}'
×
216

217

218
class VariableNode(Node):
1✔
219

220
    def __init__(self, name, is_list):
1✔
221
        super().__init__()
1✔
222
        self.name = name
1✔
223
        self.is_list = is_list
1✔
224

225
    def __str__(self):
1✔
226
        return f'{super().__str__()}; name: {self.name}; list: {self.is_list}'
×
227

228

229
def printable_ranges(lower_bound, upper_bound):
1✔
230
    ranges = []
1✔
231
    range_start = None
1✔
232
    for c in range(lower_bound, upper_bound):
1✔
233
        if chr(c).isprintable():
1✔
234
            if range_start is None:
1✔
235
                range_start = c
1✔
236
        else:
237
            if range_start is not None:
1✔
238
                ranges.append((range_start, c))
1✔
239
                range_start = None
1✔
240

241
    if range_start is not None:
1✔
242
        ranges.append((range_start, upper_bound))
×
243
    return ranges
1✔
244

245

246
def multirange_diff(r1_list, r2_list):
1✔
247
    def range_diff(r1, r2):
1✔
248
        s1, e1 = r1
1✔
249
        s2, e2 = r2
1✔
250
        endpoints = sorted((s1, s2, e1, e2))
1✔
251
        result = []
1✔
252
        if endpoints[0] == s1:
1✔
253
            result.append((endpoints[0], endpoints[1]))
1✔
254
        if endpoints[3] == e1:
1✔
255
            result.append((endpoints[2], endpoints[3]))
1✔
256
        return result
1✔
257

258
    for r2 in r2_list:
1✔
259
        r1_list = list(chain.from_iterable(range_diff(r1, r2) for r1 in r1_list))
1✔
260
    return r1_list
1✔
261

262

263
class Charset(object):
1✔
264

265
    dot = {
1✔
266
        'any_ascii_letter': [(ord('A'), ord('Z') + 1), (ord('a'), ord('z') + 1)],
267
        'any_ascii_char': printable_ranges(0x00, 0x80),
268
        'any_unicode_char': printable_ranges(0, maxunicode + 1),
269
    }
270

271
    _cnt = 0
1✔
272

273
    def __init__(self, ranges):
1✔
274
        self.id = Charset._cnt
1✔
275
        Charset._cnt += 1
1✔
276
        self.ranges = ranges
1✔
277

278

279
class GrammarGraph(object):
1✔
280

281
    def __init__(self):
1✔
282
        self.name = None
1✔
283
        self.vertices = OrderedDict()
1✔
284
        self.options = {}
1✔
285
        self.charsets = []
1✔
286
        self.header = ''
1✔
287
        self.member = ''
1✔
288
        self.default_rule = None
1✔
289

290
    @property
1✔
291
    def superclass(self):
1✔
292
        return self.options.get('superClass', 'Generator')
1✔
293

294
    @property
1✔
295
    def dot(self):
1✔
296
        return self.options.get('dot', 'any_ascii_char')
1✔
297

298
    @property
1✔
299
    def rules(self):
1✔
300
        return (vertex for vertex in self.vertices.values() if isinstance(vertex, RuleNode))
1✔
301

302
    @property
1✔
303
    def imag_rules(self):
1✔
304
        return (vertex for vertex in self.vertices.values() if isinstance(vertex, ImagRuleNode))
1✔
305

306
    def print_tree(self, root=None):
1✔
307
        if not root and not self.default_rule:
×
308
            raise ValueError('Either `root` must be defined or `print` should be called after `default_rule` is set.')
×
309
        (root or self.vertices[self.default_rule]).print_tree()
×
310

311
    def add_node(self, node):
1✔
312
        self.vertices[node.id] = node
1✔
313
        return node.id
1✔
314

315
    def add_edge(self, frm, to, args=None):
1✔
316
        assert frm in self.vertices, f'{frm} not in vertices.'
1✔
317
        assert to in self.vertices, f'{to} not in vertices.'
1✔
318
        self.vertices[frm].out_edges.append(Edge(dst=self.vertices[to], args=args))
1✔
319

320
    def calc_min_depths(self):
1✔
321
        min_depths = defaultdict(lambda: inf)
1✔
322
        changed = True
1✔
323

324
        while changed:
1✔
325
            changed = False
1✔
326
            for ident, node in self.vertices.items():
1✔
327
                selector = min if isinstance(node, AlternationNode) else max
1✔
328
                min_depth = selector((min_depths[out_node.id] + int(isinstance(out_node, RuleNode))
1✔
329
                                      for out_node in node.out_neighbours if not isinstance(out_node, QuantifierNode) or out_node.min >= 1), default=0)
330

331
                if min_depth < min_depths[ident]:
1✔
332
                    min_depths[ident] = min_depth
1✔
333
                    changed = True
1✔
334

335
        # Lift the minimal depths of the alternatives to the alternations, where the decision will happen.
336
        inf_alt = []
1✔
337
        for ident in min_depths:
1✔
338
            if isinstance(self.vertices[ident], AlternationNode):
1✔
339
                for node in self.vertices[ident].out_neighbours:
1✔
340
                    if min_depths[node.id] == inf:
1✔
341
                        # Generate human-readable description for an alternative in the graph. The output is a
342
                        # (rule node, alternation node, alternative node) string, where `rule` defines the container
343
                        # rule and the (alternation node, alternative node) sequence defines a derivation reaching the alternative.
344
                        inf_alt.append(', '.join(map(str, [self.vertices[node.rule_id], self.vertices[ident], node])))
×
345
                min_depths[ident] = [min_depths[node.id] for node in self.vertices[ident].out_neighbours]
1✔
346
        if inf_alt:
1✔
347
            logger.warning('Alternative(s) with infinite derivation (rule node, alternation node, alternative node):\n\t%s', ',\n\t'.join(inf_alt))
×
348

349
        # Remove the lifted Alternatives and check for infinite derivations.
350
        inf_rule = []
1✔
351
        for ident in list(min_depths.keys()):
1✔
352
            if isinstance(self.vertices[ident], AlternativeNode):
1✔
353
                del min_depths[ident]
1✔
354
            elif min_depths[ident] == inf and isinstance(self.vertices[ident], RuleNode):
1✔
355
                inf_rule.append(ident)
×
356
        if inf_rule:
1✔
357
            logger.warning('Rule(s) with infinite derivation (possible cycles): %s', ', '.join(map(repr, inf_rule)))
×
358

359
        for ident, min_depth in min_depths.items():
1✔
360
            self.vertices[ident].min_depth = min_depth
1✔
361

362
    # Calculates the distance of every rule node from the start node. As a result, it can
363
    # point out to rules, that are not available from there, furthermore it can give a hint
364
    # about the farthest node/rule to help to determine a max_depth that has the chance to
365
    # reach every rule.
366
    def analyze(self, root=None):
1✔
367
        root = root or self.default_rule
1✔
368
        min_distances = defaultdict(lambda: inf)
1✔
369
        min_distances[root] = 0
1✔
370

371
        work_list = [root]
1✔
372
        while work_list:
1✔
373
            v = work_list.pop(0)
1✔
374
            for out_v in self.vertices[v].out_neighbours:
1✔
375
                d = min_distances[v] + int(isinstance(out_v, RuleNode))
1✔
376
                if d < min_distances[out_v.id]:
1✔
377
                    min_distances[out_v.id] = d
1✔
378
                    work_list.append(out_v.id)
1✔
379

380
        farthest_ident, max_distance = max(((v_id, d) for v_id, d in min_distances.items() if (isinstance(self.vertices[v_id], RuleNode) and d != inf)), key=lambda item: item[1])
1✔
381
        unreachable_rules = [v_id for v_id, v in self.vertices.items() if isinstance(v, RuleNode) and min_distances[v_id] == inf]
1✔
382

383
        logger.info('\tThe farthest rule from %r is %r (%d steps).', root, farthest_ident, max_distance)
1✔
384
        if unreachable_rules:
1✔
385
            logger.warning('\t%d rule(s) unreachable from %r: %s', len(unreachable_rules), root, ', '.join(map(repr, unreachable_rules)))
1✔
386

387

388
def build_graph(actions, lexer_root, parser_root, default_rule):
1✔
389

390
    def find_conditions(node):
1✔
391
        if not actions:
1✔
392
            return '1'
×
393

394
        if isinstance(node, str):
1✔
395
            return node
×
396

397
        action_block = getattr(node, 'actionBlock', None)
1✔
398
        if action_block:
1✔
399
            if action_block() and action_block().ACTION_CONTENT() and node.QUESTION():
1✔
400
                return ''.join(str(child) for child in action_block().ACTION_CONTENT())
1✔
401
            return '1'
1✔
402

403
        element = getattr(node, 'element', None) or getattr(node, 'lexerElement', None)
1✔
404
        if element:
1✔
405
            if not element():
1✔
406
                return '1'
1✔
407
            return find_conditions(element(0))
1✔
408

409
        child_ref = getattr(node, 'alternative', None) or getattr(node, 'lexerElements', None)
1✔
410

411
        # An alternative can be explicitly empty, in this case it won't have any of the attributes above.
412
        if not child_ref:
1✔
413
            return '1'
×
414

415
        return find_conditions(child_ref())
1✔
416

417
    def character_range_interval(node):
1✔
418
        start = str(node.characterRange().STRING_LITERAL(0))[1:-1]
1✔
419
        end = str(node.characterRange().STRING_LITERAL(1))[1:-1]
1✔
420
        start_cp, start_offset = process_lexer_char(start, 0)
1✔
421
        end_cp, end_offset = process_lexer_char(end, 0)
1✔
422

423
        if start_offset < len(start) or end_offset < len(end):
1✔
424
            raise ValueError(f'Only single characters are allowed in character ranges ({start!r}..{end!r})')
×
425

426
        return start_cp, end_cp + 1
1✔
427

428
    def process_lexer_char(s, offset):
1✔
429
        # To be kept in sync with org.antlr.v4.misc.EscapeSequenceParsing.parseEscape
430

431
        # Original Java code has to handle unicode codepoints which consist of more than one character,
432
        # however in Python 3.3+, we don't have to worry about this: https://stackoverflow.com/a/42262842
433

434
        if s[offset] != '\\':
1✔
435
            return ord(s[offset]), offset + 1
1✔
436

437
        if offset + 2 > len(s):
1✔
438
            raise ValueError('Escape must have at least two characters')
×
439

440
        escaped = s[offset + 1]
1✔
441
        offset += 2  # Move past backslash and escaped character
1✔
442

443
        if escaped == 'u':
1✔
444
            if s[offset] == '{':
1✔
445
                # \u{...}
446
                hex_start_offset = offset + 1
1✔
447
                hex_end_offset = s.find('}', hex_start_offset)
1✔
448
                if hex_end_offset == -1:
1✔
449
                    raise ValueError('Missing closing bracket for unicode escape')
×
450
                if hex_start_offset == hex_end_offset:
1✔
451
                    raise ValueError('Missing codepoint for unicode escape')
×
452

453
                offset = hex_end_offset + 1  # Skip over last bracket
1✔
454
            else:
455
                # \uXXXX
456
                hex_start_offset = offset
1✔
457
                hex_end_offset = hex_start_offset + 4
1✔
458
                if hex_end_offset > len(s):
1✔
459
                    raise ValueError('Non-bracketed unicode escape must be of form \\uXXXX')
×
460

461
                offset = hex_end_offset
1✔
462

463
            try:
1✔
464
                codepoint = int(s[hex_start_offset:hex_end_offset], 16)
1✔
465
            except ValueError as exc:
×
466
                raise ValueError('Invalid hex value') from exc
×
467

468
            if codepoint < 0 or codepoint > maxunicode:
1✔
469
                raise ValueError('Invalid unicode codepoint')
×
470

471
            return codepoint, offset
1✔
472

473
        if escaped in ('p', 'P'):
1✔
474
            raise ValueError('Unicode properties (\\p{...}) are not supported')
×
475

476
        # To be kept in sync with org.antlr.v4.misc.CharSupport.ANTLRLiteralEscapedCharValue
477
        escaped_values = {
1✔
478
            'n': '\n',
479
            'r': '\r',
480
            'b': '\b',
481
            't': '\t',
482
            'f': '\f',
483
            '\\': '\\',
484
            # Additional escape sequences defined by org.antlr.v4.misc.EscapeSequenceParsing.parseEscape
485
            '-': '-',
486
            ']': ']',
487
            '\'': '\''
488
        }
489

490
        if escaped in escaped_values:
1✔
491
            return ord(escaped_values[escaped]), offset
1✔
492

493
        raise ValueError('Invalid escaped value')
×
494

495
    def lexer_charset_interval(s):
1✔
496
        # To be kept in sync with org.antlr.v4.automata.LexerATNFactory.getSetFromCharSetLiteral
497
        assert len(s) > 0, 'Charset cannot be empty'
1✔
498

499
        ranges = []
1✔
500

501
        offset = 0
1✔
502
        while offset < len(s):
1✔
503
            in_range = s[offset] == '-' and offset != 0 and offset != len(s) - 1
1✔
504
            if in_range:
1✔
505
                offset += 1
1✔
506

507
            codepoint, offset = process_lexer_char(s, offset)
1✔
508

509
            if in_range:
1✔
510
                ranges[-1] = (ranges[-1][0], codepoint + 1)
1✔
511
            else:
512
                ranges.append((codepoint, codepoint + 1))
1✔
513

514
        return ranges
1✔
515

516
    def chars_from_set(node):
1✔
517
        if node.characterRange():
1✔
518
            return [character_range_interval(node)]
×
519

520
        if node.LEXER_CHAR_SET():
1✔
521
            return lexer_charset_interval(str(node.LEXER_CHAR_SET())[1:-1])
1✔
522

523
        if node.STRING_LITERAL():
1✔
524
            char = str(node.STRING_LITERAL())[1:-1]
1✔
525
            char_cp, char_offset = process_lexer_char(char, 0)
1✔
526
            if char_offset < len(char):
1✔
527
                raise ValueError(f'Zero or multi-character literals are not allowed in lexer sets: {char!r}')
×
528
            return [(char_cp, char_cp + 1)]
1✔
529

530
        if node.TOKEN_REF():
×
531
            src = str(node.TOKEN_REF())
×
532
            assert graph.vertices[src].start_ranges is not None, f'{src} has no character start ranges.'
×
533
            return graph.vertices[src].start_ranges
×
534

535
        return []
×
536

537
    def unescape_string(s):
1✔
538
        def _iter_unescaped_chars(s):
1✔
539
            offset = 0
1✔
540
            while offset < len(s):
1✔
541
                codepoint, offset = process_lexer_char(s, offset)
1✔
542
                yield chr(codepoint)
1✔
543

544
        return ''.join(c for c in _iter_unescaped_chars(s))
1✔
545

546
    def argActionBlock(node):
1✔
547
        args = {}
1✔
548
        if node and node.argActionBlock():
1✔
549
            for arg in ''.join(str(chr_arg) for chr_arg in node.argActionBlock().ARGUMENT_CONTENT()).split(','):
1✔
550
                arg_name, arg_value = arg, None
1✔
551
                if '=' in arg_name:
1✔
552
                    arg_name, arg_value = arg_name.split('=')
1✔
553
                args[arg_name.strip()] = arg_value.strip() if arg_value else arg_value
1✔
554
        return args
1✔
555

556
    def build_rule(rule, node):
1✔
557
        lexer_rule = isinstance(rule, UnlexerRuleNode)
1✔
558
        alt_idx, quant_idx, chr_idx = 0, 0, 0  # pylint: disable=unused-variable
1✔
559

560
        def build_expr(node, parent_id):
1✔
561
            if isinstance(node, ANTLRv4Parser.ParserRuleSpecContext):
1✔
562
                if actions:
1✔
563
                    rule.args = argActionBlock(node)
1✔
564
                    rule.locals = argActionBlock(node.localsSpec())
1✔
565
                    rule.returns = argActionBlock(node.ruleReturns())
1✔
566
                build_expr(node.ruleBlock(), parent_id)
1✔
567

568
            elif isinstance(node, (ANTLRv4Parser.RuleAltListContext, ANTLRv4Parser.AltListContext, ANTLRv4Parser.LexerAltListContext)):
1✔
569
                children = [child for child in node.children if isinstance(child, ParserRuleContext)]
1✔
570
                if len(children) == 1:
1✔
571
                    build_expr(children[0], parent_id)
1✔
572
                    return
1✔
573

574
                nonlocal alt_idx
575
                alt_id = graph.add_node(AlternationNode(idx=alt_idx, conditions=[find_conditions(child) for child in children], rule_id=rule.name))
1✔
576
                alt_idx += 1
1✔
577
                graph.add_edge(frm=parent_id, to=alt_id)
1✔
578

579
                for i, child in enumerate(children):
1✔
580
                    alternative_id = graph.add_node(AlternativeNode(rule_id=rule.name, alt_idx=graph.vertices[alt_id].idx, idx=i))
1✔
581
                    graph.add_edge(frm=alt_id, to=alternative_id)
1✔
582
                    build_expr(child, alternative_id)
1✔
583

584
            elif isinstance(node, ANTLRv4Parser.LabeledAltContext):
1✔
585
                if not node.identifier():
1✔
586
                    build_expr(node.alternative(), parent_id)
1✔
587
                    return
1✔
588

589
                rule_node = UnparserRuleNode(name=rule.name, label=str(node.identifier().TOKEN_REF() or node.identifier().RULE_REF()))
1✔
590
                graph.add_edge(frm=parent_id, to=graph.add_node(rule_node))
1✔
591
                build_rule(rule_node, node.alternative())
1✔
592

593
            elif isinstance(node, (ANTLRv4Parser.AlternativeContext, ANTLRv4Parser.LexerAltContext)):
1✔
594
                children = node.element() if isinstance(node, ANTLRv4Parser.AlternativeContext) else node.lexerElements().lexerElement()
1✔
595
                for child in children:
1✔
596
                    build_expr(child, parent_id)
1✔
597

598
                if not graph.vertices[parent_id].out_neighbours:
1✔
599
                    graph.add_edge(frm=parent_id, to=lambda_id)
1✔
600

601
            elif isinstance(node, (ANTLRv4Parser.ElementContext, ANTLRv4Parser.LexerElementContext)):
1✔
602
                if node.actionBlock():
1✔
603
                    # Conditions are handled at alternative processing.
604
                    if not actions or node.QUESTION():
1✔
605
                        return
1✔
606

607
                    graph.add_edge(frm=parent_id, to=graph.add_node(ActionNode(src=''.join(str(child) for child in node.actionBlock().ACTION_CONTENT()))))
1✔
608
                    return
1✔
609

610
                suffix = None
1✔
611
                if node.ebnfSuffix():
1✔
612
                    suffix = node.ebnfSuffix()
1✔
613
                elif hasattr(node, 'ebnf') and node.ebnf() and node.ebnf().blockSuffix():
1✔
614
                    suffix = node.ebnf().blockSuffix().ebnfSuffix()
1✔
615

616
                if not suffix:
1✔
617
                    build_expr(node.children[0], parent_id)
1✔
618
                    return
1✔
619

620
                nonlocal quant_idx
621
                suffix = str(suffix.children[0])
1✔
622
                quant_ranges = {'?': {'min': 0, 'max': 1}, '*': {'min': 0, 'max': 'inf'}, '+': {'min': 1, 'max': 'inf'}}
1✔
623
                quant_id = graph.add_node(QuantifierNode(rule_id=rule.name, idx=quant_idx, **quant_ranges[suffix]))
1✔
624
                quant_idx += 1
1✔
625
                graph.add_edge(frm=parent_id, to=quant_id)
1✔
626
                build_expr(node.children[0], quant_id)
1✔
627

628
            elif isinstance(node, ANTLRv4Parser.LabeledElementContext):
1✔
629
                build_expr(node.atom() or node.block(), parent_id)
1✔
630
                ident = node.identifier()
1✔
631
                name = str(ident.RULE_REF() or ident.TOKEN_REF())
1✔
632
                is_list = node.PLUS_ASSIGN() is not None
1✔
633
                graph.add_edge(frm=parent_id, to=graph.add_node(VariableNode(name=name, is_list=is_list)))
1✔
634
                rule.labels[name] = is_list
1✔
635

636
            elif isinstance(node, ANTLRv4Parser.RulerefContext):
1✔
637
                graph.add_edge(frm=parent_id, to=str(node.RULE_REF()), args=argActionBlock(node) if actions else None)
1✔
638

639
            elif isinstance(node, (ANTLRv4Parser.LexerAtomContext, ANTLRv4Parser.AtomContext)):
1✔
640
                nonlocal chr_idx
641

642
                if node.DOT():
1✔
643
                    graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.name, idx=chr_idx, charset=dot_charset.id)))
1✔
644
                    chr_idx += 1
1✔
645

646
                elif node.notSet():
1✔
647
                    if node.notSet().setElement():
1✔
648
                        options = chars_from_set(node.notSet().setElement())
1✔
649
                    else:
650
                        options = []
×
651
                        for set_element in node.notSet().blockSet().setElement():
×
652
                            options.extend(chars_from_set(set_element))
×
653

654
                    charset = Charset(multirange_diff(dot_charset.ranges, sorted(options, key=lambda x: x[0])))
1✔
655
                    graph.charsets.append(charset)
1✔
656
                    graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.name, idx=chr_idx, charset=charset.id)))
1✔
657
                    chr_idx += 1
1✔
658

659
                elif isinstance(node, ANTLRv4Parser.LexerAtomContext) and node.characterRange():
1✔
660
                    start, end = character_range_interval(node)
1✔
661
                    if lexer_rule:
1✔
662
                        rule.start_ranges.append((start, end))
1✔
663

664
                    charset = Charset([(start, end)])
1✔
665
                    graph.charsets.append(charset)
1✔
666
                    graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.name, idx=chr_idx, charset=charset.id)))
1✔
667
                    chr_idx += 1
1✔
668

669
                elif isinstance(node, ANTLRv4Parser.LexerAtomContext) and node.LEXER_CHAR_SET():
1✔
670
                    ranges = lexer_charset_interval(str(node.LEXER_CHAR_SET())[1:-1])
1✔
671
                    if lexer_rule:
1✔
672
                        rule.start_ranges.extend(ranges)
1✔
673

674
                    charset = Charset(sorted(ranges, key=lambda x: x[0]))
1✔
675
                    graph.charsets.append(charset)
1✔
676
                    graph.add_edge(frm=parent_id, to=graph.add_node(CharsetNode(rule_id=rule.name, idx=chr_idx, charset=charset.id)))
1✔
677
                    chr_idx += 1
1✔
678

679
                for child in node.children:
1✔
680
                    build_expr(child, parent_id)
1✔
681

682
            elif isinstance(node, ANTLRv4Parser.TerminalContext):
1✔
683
                if node.TOKEN_REF():
1✔
684
                    graph.add_edge(frm=parent_id, to=str(node.TOKEN_REF()))
1✔
685

686
                elif node.STRING_LITERAL():
1✔
687
                    src = unescape_string(str(node.STRING_LITERAL())[1:-1])
1✔
688

689
                    if lexer_rule:
1✔
690
                        rule.start_ranges.append((ord(src[0]), ord(src[0]) + 1))
1✔
691

692
                    graph.add_edge(frm=parent_id, to=graph.add_node(LiteralNode(src=src)))
1✔
693

694
            elif isinstance(node, ParserRuleContext) and node.getChildCount():
1✔
695
                for child in node.children:
1✔
696
                    build_expr(child, parent_id)
1✔
697

698
        if lexer_rule:
1✔
699
            rule.start_ranges = []
1✔
700

701
        build_expr(node, rule.id)
1✔
702

703
    def build_prequel(node):
1✔
704
        assert isinstance(node, ANTLRv4Parser.GrammarSpecContext)
1✔
705

706
        if not graph.name:
1✔
707
            graph.name = re.sub(r'^(.+?)(Lexer|Parser)?$', r'\1Generator', str(node.grammarDecl().identifier().TOKEN_REF() or node.grammarDecl().identifier().RULE_REF()))
1✔
708

709
        for prequelConstruct in node.prequelConstruct() if node.prequelConstruct() else ():
1✔
710
            for option in prequelConstruct.optionsSpec().option() if prequelConstruct.optionsSpec() else ():
1✔
711
                ident = option.identifier()
1✔
712
                ident = str(ident.RULE_REF() or ident.TOKEN_REF())
1✔
713
                graph.options[ident] = option.optionValue().getText()
1✔
714

715
            for identifier in prequelConstruct.tokensSpec().idList().identifier() if prequelConstruct.tokensSpec() and prequelConstruct.tokensSpec().idList() else ():
1✔
716
                assert identifier.TOKEN_REF() is not None, 'Token names must start with uppercase letter.'
1✔
717
                graph.add_node(ImagRuleNode(id=str(identifier.TOKEN_REF())))
1✔
718

719
            if prequelConstruct.action_() and actions:
1✔
720
                action = prequelConstruct.action_()
1✔
721
                action_ident = action.identifier()
1✔
722
                action_type = str(action_ident.RULE_REF() or action_ident.TOKEN_REF())
1✔
723
                raw_action_src = ''.join(str(child) for child in action.actionBlock().ACTION_CONTENT())
1✔
724

725
                # We simply append both member and header code chunks to the generated source.
726
                # It's the user's responsibility to define them in order.
727
                # Both 'member' and 'members' keywords are accepted.
728
                if action_type in ('member', 'members'):
1✔
729
                    graph.member += raw_action_src
1✔
730
                elif action_type == 'header':
1✔
731
                    graph.header += raw_action_src
1✔
732

733
    def build_rules(node):
1✔
734
        generator_rules, duplicate_rules = [], []
1✔
735
        for rule in node.rules().ruleSpec():
1✔
736
            if rule.parserRuleSpec():
1✔
737
                rule_spec = rule.parserRuleSpec()
1✔
738
                rule_node = UnparserRuleNode(name=str(rule_spec.RULE_REF()))
1✔
739
                antlr_node = rule_spec
1✔
740
            elif rule.lexerRuleSpec():
1✔
741
                rule_spec = rule.lexerRuleSpec()
1✔
742
                rule_node = UnlexerRuleNode(name=str(rule_spec.TOKEN_REF()))
1✔
743
                antlr_node = rule_spec.lexerRuleBlock()
1✔
744
            else:
745
                assert False, 'Should not get here.'
×
746

747
            if rule_node.id not in graph.vertices:
1✔
748
                graph.add_node(rule_node)
1✔
749
                generator_rules.append((rule_node, antlr_node))
1✔
750
            else:
751
                duplicate_rules.append(rule_node.id)
×
752

753
        for mode_spec in node.modeSpec():
1✔
754
            for rule_spec in mode_spec.lexerRuleSpec():
1✔
755
                rule_node = UnlexerRuleNode(name=str(rule_spec.TOKEN_REF()))
1✔
756
                if rule_node.id not in graph.vertices:
1✔
757
                    graph.add_node(rule_node)
1✔
758
                    generator_rules.append((rule_node, rule_spec.lexerRuleBlock()))
1✔
759
                else:
760
                    duplicate_rules.append(rule_node.id)
×
761

762
        if duplicate_rules:
1✔
763
            raise ValueError(f'Rule redefinition(s): {", ".join(duplicate_rules)}')
×
764

765
        for rule_args in generator_rules:
1✔
766
            build_rule(*rule_args)
1✔
767

768
        if default_rule:
1✔
769
            graph.default_rule = default_rule
×
770
        elif node.grammarDecl().grammarType().PARSER() or not (node.grammarDecl().grammarType().LEXER() or node.grammarDecl().grammarType().PARSER()):
1✔
771
            graph.default_rule = generator_rules[0][0].name
1✔
772

773
    graph = GrammarGraph()
1✔
774
    lambda_id = graph.add_node(LambdaNode())
1✔
775
    graph.add_node(UnlexerRuleNode(name='EOF'))
1✔
776

777
    for root in [lexer_root, parser_root]:
1✔
778
        if root:
1✔
779
            build_prequel(root)
1✔
780

781
    dot_charset = Charset(Charset.dot[graph.dot])
1✔
782
    graph.charsets.append(dot_charset)
1✔
783

784
    for root in [lexer_root, parser_root]:
1✔
785
        if root:
1✔
786
            build_rules(root)
1✔
787

788
    graph.calc_min_depths()
1✔
789
    return graph
1✔
790

791

792
def escape_string(s):
1✔
793
    # To be kept in sync with Python's unicode_escape encoding at CPython's
794
    # Objects/unicodeobject.c:PyUnicode_AsUnicodeEscapeString, with the addition
795
    # of also escaping quotes.
796
    escapes = {
1✔
797
        '\t': '\\t',
798
        '\n': '\\n',
799
        '\r': '\\r',
800
        '\\': '\\\\',
801
        '\'': '\\\''
802
    }
803

804
    def _iter_escaped_chars(si):
1✔
805
        for ch in si:
1✔
806
            esc = escapes.get(ch)
1✔
807
            if esc is not None:
1✔
808
                yield esc
1✔
809

810
            else:
811
                cp = ord(ch)
1✔
812
                if 0x20 <= cp < 0x7f:
1✔
813
                    yield ch
1✔
814
                elif cp < 0x100:
1✔
815
                    yield f'\\x{cp:02x}'
1✔
816
                elif cp < 0x10000:
1✔
817
                    yield f'\\u{cp:04x}'
×
818
                else:
819
                    yield f'\\U{cp:08x}'
1✔
820

821
    return ''.join(c for c in _iter_escaped_chars(s))
1✔
822

823

824
class FuzzerFactory(object):
1✔
825
    """
826
    Class that generates fuzzers from grammars.
827
    """
828
    def __init__(self, lang, work_dir=None):
1✔
829
        """
830
        :param lang: Language of the generated code.
831
        :param work_dir: Directory to generate fuzzers into.
832
        """
833
        self.lang = lang
1✔
834
        env = Environment(trim_blocks=True,
1✔
835
                          lstrip_blocks=True,
836
                          keep_trailing_newline=False)
837
        env.filters['substitute'] = lambda s, frm, to: re.sub(frm, to, str(s))
1✔
838
        env.filters['escape_string'] = escape_string
1✔
839
        self.template = env.from_string(get_data(__package__, 'resources/codegen/GeneratorTemplate.' + lang + '.jinja').decode('utf-8'))
1✔
840
        self.work_dir = work_dir or getcwd()
1✔
841

842
    def generate_fuzzer(self, grammars, *, options=None, default_rule=None, encoding='utf-8', lib_dir=None, actions=True, pep8=False):
1✔
843
        """
844
        Generates fuzzers from grammars.
845

846
        :param grammars: List of grammar files to generate from.
847
        :param options: Dictionary of options that override/extend those set in the grammar.
848
        :param default_rule: Name of the default rule to start generation from.
849
        :param encoding: Grammar file encoding.
850
        :param lib_dir: Alternative directory to look for imports.
851
        :param actions: Boolean to enable or disable grammar actions.
852
        :param pep8: Boolean to enable pep8 to beautify the generated fuzzer source.
853
        """
854
        lexer_root, parser_root = None, None
1✔
855

856
        for grammar in grammars:
1✔
857
            if grammar.endswith('.g4'):
1✔
858
                root = self._parse(grammar, encoding, lib_dir)
1✔
859
                # Lexer and/or combined grammars are processed first to evaluate TOKEN_REF-s.
860
                if root.grammarDecl().grammarType().LEXER() or not root.grammarDecl().grammarType().PARSER():
1✔
861
                    lexer_root = root
1✔
862
                else:
863
                    parser_root = root
1✔
864
            else:
865
                copy(grammar, self.work_dir)
×
866

867
        graph = build_graph(actions, lexer_root, parser_root, default_rule)
1✔
868
        graph.options.update(options or {})
1✔
869
        graph.analyze()
1✔
870

871
        src = self.template.render(graph=graph, version=__version__).lstrip()
1✔
872
        with open(join(self.work_dir, graph.name + '.' + self.lang), 'w') as f:
1✔
873
            if pep8:
1✔
874
                src = autopep8.fix_code(src)
1✔
875
            f.write(src)
1✔
876

877
    @staticmethod
1✔
878
    def _collect_imports(root, base_dir, lib_dir):
1✔
879
        imports = set()
1✔
880
        for prequel in root.prequelConstruct():
1✔
881
            if prequel.delegateGrammars():
1✔
882
                for delegate_grammar in prequel.delegateGrammars().delegateGrammar():
1✔
883
                    ident = delegate_grammar.identifier(0)
1✔
884
                    grammar_fn = str(ident.RULE_REF() or ident.TOKEN_REF()) + '.g4'
1✔
885
                    if lib_dir is not None and exists(join(lib_dir, grammar_fn)):
1✔
886
                        imports.add(join(lib_dir, grammar_fn))
1✔
887
                    else:
888
                        imports.add(join(base_dir, grammar_fn))
×
889
        return imports
1✔
890

891
    def _parse(self, grammar, encoding, lib_dir):
1✔
892
        work_list = {grammar}
1✔
893
        root = None
1✔
894

895
        while work_list:
1✔
896
            grammar = work_list.pop()
1✔
897

898
            antlr_parser = ANTLRv4Parser(CommonTokenStream(ANTLRv4Lexer(FileStream(grammar, encoding=encoding))))
1✔
899
            current_root = antlr_parser.grammarSpec()
1✔
900
            # assert antlr_parser._syntaxErrors > 0, 'Parse error in ANTLR grammar.'
901

902
            # Save the 'outermost' grammar.
903
            if not root:
1✔
904
                root = current_root
1✔
905
            else:
906
                # Unite the rules of the imported grammar with the host grammar's rules.
907
                for rule in current_root.rules().ruleSpec():
1✔
908
                    root.rules().addChild(rule)
1✔
909

910
            work_list |= self._collect_imports(current_root, dirname(grammar), lib_dir)
1✔
911

912
        return root
1✔
913

914

915
def execute():
1✔
916
    parser = ArgumentParser(description='Grammarinator: Processor', epilog="""
1✔
917
        The tool processes a grammar in ANTLR v4 format (*.g4, either separated
918
        to lexer and parser grammar files, or a single combined grammar) and
919
        creates a fuzzer that can generate randomized content conforming to
920
        the format described by the grammar.
921
        """)
922
    parser.add_argument('grammar', metavar='FILE', nargs='+',
1✔
923
                        help='ANTLR grammar files describing the expected format to generate.')
924
    parser.add_argument('-D', metavar='OPT=VAL', dest='options', default=[], action='append',
1✔
925
                        help='set/override grammar-level option')
926
    parser.add_argument('--language', metavar='LANG', choices=['py'], default='py',
1✔
927
                        help='language of the generated code (choices: %(choices)s; default: %(default)s)')
928
    parser.add_argument('--no-actions', dest='actions', default=True, action='store_false',
1✔
929
                        help='do not process inline actions.')
930
    parser.add_argument('--rule', '-r', metavar='NAME',
1✔
931
                        help='default rule to start generation from (default: the first parser rule)')
932
    parser.add_argument('--encoding', metavar='ENC', default='utf-8',
1✔
933
                        help='grammar file encoding (default: %(default)s).')
934
    parser.add_argument('--lib', metavar='DIR',
1✔
935
                        help='alternative location of import grammars.')
936
    parser.add_argument('--pep8', default=False, action='store_true',
1✔
937
                        help='enable autopep8 to format the generated fuzzer.')
938
    parser.add_argument('-o', '--out', metavar='DIR', default=getcwd(),
1✔
939
                        help='temporary working directory (default: %(default)s).')
940
    add_log_level_argument(parser, short_alias=())
1✔
941
    add_version_argument(parser, version=__version__)
1✔
942
    args = parser.parse_args()
1✔
943

944
    for grammar in args.grammar:
1✔
945
        if not exists(grammar):
1✔
946
            parser.error(f'{grammar} does not exist.')
×
947

948
    options = {}
1✔
949
    for option in args.options:
1✔
950
        parts = re.fullmatch('([^=]+)=(.*)', option)
1✔
951
        if not parts:
1✔
952
            parser.error(f'option not in OPT=VAL format: {option}')
×
953

954
        name, value = parts.group(1, 2)
1✔
955
        options[name] = value
1✔
956

957
    init_logging()
1✔
958
    process_log_level_argument(args, logger)
1✔
959

960
    FuzzerFactory(args.language, args.out).generate_fuzzer(args.grammar, options=options, default_rule=args.rule, encoding=args.encoding, lib_dir=args.lib, actions=args.actions, pep8=args.pep8)
1✔
961

962

963
if __name__ == '__main__':
1✔
964
    execute()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc