• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

renatahodovan / grammarinator / 18014802027

25 Sep 2025 04:55PM UTC coverage: 84.053% (-0.5%) from 84.559%
18014802027

push

github

web-flow
Fix when listeners get notified about entry to / exit from lexer rules in C++ (#330)

The Python runtime already properly handles the notification of
listeners about token fragments, i.e., listeners see the name of
the fragment on entry / exit. In the C++ runtime, the listeners
saw the name of the enclosing lexer rule. This commit fixes this
issue.

2161 of 2571 relevant lines covered (84.05%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.69
/grammarinator/tool/generator.py
1
# Copyright (c) 2017-2025 Renata Hodovan, Akos Kiss.
2
#
3
# Licensed under the BSD 3-Clause License
4
# <LICENSE.rst or https://opensource.org/licenses/BSD-3-Clause>.
5
# This file may not be copied, modified, or distributed except
6
# according to those terms.
7

8
import codecs
1✔
9
import logging
1✔
10
import os
1✔
11
import random
1✔
12

13
from contextlib import nullcontext
1✔
14
from copy import deepcopy
1✔
15
from os.path import abspath, dirname
1✔
16
from shutil import rmtree
1✔
17
from typing import Callable, Optional, Sequence, Union
1✔
18

19
import xxhash
1✔
20

21
from ..runtime import Generator, DefaultModel, Individual, Listener, Model, Population, Rule, RuleSize, UnlexerRule, UnparserRule, WeightedModel
1✔
22

23
logger = logging.getLogger(__name__)
1✔
24

25

26
class GeneratorFactory:
1✔
27
    """
28
    Base class of generator factories. A generator factory is a generalization
29
    of a generator class. It has to be a callable that, when called, must return
30
    a generator instance. It must also expose some properties of the generator
31
    class it generalizes that are required to guide generation or mutation by
32
    :class:`GeneratorTool`.
33

34
    This factory generalizes a generator class by simply wrapping it and
35
    forwarding call operations to instantiations of the wrapped class.
36
    Furthermore, generator factories deriving from this base class are
37
    guaranteed to expose all the required generator class properties.
38
    """
39

40
    def __init__(self, generator_class: type[Generator]) -> None:
1✔
41
        """
42
        :param generator_class: The class of the wrapped generator.
43
        """
44
        self._generator_class: type[Generator] = generator_class  #: The class of the wrapped generator.
1✔
45
        # Exposing some class variables of the encapsulated generator.
46
        # In the generator class, they start with `_` to avoid any kind of
47
        # collision with rule names, so they start with `_` here as well.
48
        self._rule_sizes: dict[str, RuleSize] = generator_class._rule_sizes  # Sizes of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor.
1✔
49
        self._alt_sizes: tuple[tuple[RuleSize, ...], ...] = generator_class._alt_sizes  # Sizes of the alternatives of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor.
1✔
50
        self._quant_sizes: tuple[RuleSize, ...] = generator_class._quant_sizes  # Sizes of the quantifiers of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor.
1✔
51

52
    def __call__(self, limit: Optional[RuleSize] = None) -> Generator:
1✔
53
        """
54
        Create a new generator instance.
55

56
        :param limit: The limit on the depth of the trees and on the number of
57
            tokens (number of unlexer rule calls), i.e., it must be possible to
58
            finish generation from the selected node so that the overall depth
59
            and token count of the tree does not exceed these limits (default:
60
            :class:`~grammarinator.runtime.RuleSize`. ``max``). Used to
61
            instantiate the generator.
62
        :return: The created generator instance.
63
        """
64
        return self._generator_class(limit=limit)
×
65

66

67
class DefaultGeneratorFactory(GeneratorFactory):
1✔
68
    """
69
    The default generator factory implementation. When called, a new generator
70
    instance is created backed by a new decision model instance and a set of
71
    newly created listener objects is attached.
72
    """
73

74
    def __init__(self, generator_class: type[Generator], *,
1✔
75
                 model_class: Optional[type[Model]] = None, weights: Optional[dict[tuple[str, int, int], float]] = None,
76
                 listener_classes: Optional[list[type[Listener]]] = None) -> None:
77
        """
78
        :param generator_class: The class of the generator to instantiate.
79
        :param model_class: The class of the model to instantiate. The model
80
            instance is used to instantiate the generator.
81
        :param weights: Initial multipliers of alternatives. Used to instantiate
82
            a :class:`~grammarinator.runtime.WeightedModel` wrapper around the
83
            model.
84
        :param listener_classes: List of listener classes to instantiate and
85
            attach to the generator.
86
        """
87
        super().__init__(generator_class)
1✔
88
        self._model_class: type[Model] = model_class or DefaultModel
1✔
89
        self._weights: Optional[dict[tuple[str, int, int], float]] = weights
1✔
90
        self._listener_classes: list[type[Listener]] = listener_classes or []
1✔
91

92
    def __call__(self, limit: Optional[RuleSize] = None) -> Generator:
1✔
93
        """
94
        Create a new generator instance according to the settings specified for
95
        the factory instance and for this method.
96

97
        :param limit: The limit on the depth of the trees and on the number of
98
            tokens (number of unlexer rule calls), i.e., it must be possible to
99
            finish generation from the selected node so that the overall depth
100
            and token count of the tree does not exceed these limits (default:
101
            :class:`~grammarinator.runtime.RuleSize`. ``max``). Used to
102
            instantiate the generator.
103
        :return: The created generator instance.
104
        """
105
        model = self._model_class()
1✔
106
        if self._weights:
1✔
107
            model = WeightedModel(model, weights=self._weights)
1✔
108

109
        listeners = []
1✔
110
        for listener_class in self._listener_classes:
1✔
111
            listeners.append(listener_class())
1✔
112

113
        generator = self._generator_class(model=model, listeners=listeners, limit=limit)
1✔
114

115
        return generator
1✔
116

117

118
class GeneratorTool:
1✔
119
    """
120
    Tool to create new test cases using the generator produced by ``grammarinator-process``.
121
    """
122

123
    def __init__(self, generator_factory: Union[type[Generator], GeneratorFactory], out_format: str, lock=None, rule: Optional[str] = None, limit: Optional[RuleSize] = None,
1✔
124
                 population: Optional[Population] = None, generate: bool = True, mutate: bool = True, recombine: bool = True, unrestricted: bool = True, keep_trees: bool = False,
125
                 transformers: Optional[list[Callable[[Rule], Rule]]] = None, serializer: Optional[Callable[[Rule], str]] = None, memo_size: int = 0, unique_attempts: int = 2,
126
                 cleanup: bool = True, encoding: str = 'utf-8', errors: str = 'strict', dry_run: bool = False):
127
        """
128
        :param generator_factory: A callable that can produce instances of a
129
            generator. It is a generalization of a generator class: it has to
130
            instantiate a generator object, and it may also set the decision
131
            model and the listeners of the generator as well. It also has to
132
            expose some properties of the generator class necessary to guide
133
            generation or mutation. In the simplest case, it can be a
134
            ``grammarinator-process``-created subclass of :class:`Generator`,
135
            but in more complex scenarios a factory can be used, e.g., an
136
            instance of a subclass of :class:`GeneratorFactory`,
137
            like :class:`DefaultGeneratorFactory`.
138
        :param rule: Name of the rule to start generation from (default: the
139
            default rule of the generator).
140
        :param out_format: Test output description. It can be a file path pattern possibly including the ``%d``
141
            placeholder which will be replaced by the index of the test case. Otherwise, it can be an empty string,
142
            which will result in printing the test case to the stdout (i.e., not saving to file system).
143
        :param lock: Lock object necessary when printing test cases in parallel (optional).
144
        :type lock: :class:`multiprocessing.Lock` | None
145
        :param limit: The limit on the depth of the trees and on the
146
            number of tokens (number of unlexer rule calls), i.e., it must be
147
            possible to finish generation from the selected node so that the
148
            overall depth and token count of the tree does not exceed these
149
            limits (default: :class:`~grammarinator.runtime.RuleSize`. ``max``).
150
        :param population: Tree pool for mutation and recombination, e.g., an
151
            instance of :class:`DefaultPopulation`.
152
        :param generate: Enable generating new test cases from scratch, i.e., purely based on grammar.
153
        :param mutate: Enable mutating existing test cases, i.e., re-generate part of an existing test case based on grammar.
154
        :param recombine: Enable recombining existing test cases, i.e., replace part of a test case with a compatible part from another test case.
155
        :param unrestricted: Enable applying possibly grammar-violating creators.
156
        :param keep_trees: Keep generated trees to participate in further mutations or recombinations
157
            (otherwise, only the initial population will be mutated or recombined). It has effect only if
158
            population is defined.
159
        :param transformers: List of transformers to be applied to postprocess
160
            the generated tree before serializing it.
161
        :param serializer: A serializer that takes a tree and produces a string from it (default: :class:`str`).
162
            See :func:`grammarinator.runtime.simple_space_serializer` for a simple solution that concatenates tokens with spaces.
163
        :param memo_size: The number of most recently created unique tests
164
            memoized (default: 0).
165
        :param unique_attempts: The limit on how many times to try to
166
            generate a unique (i.e., non-memoized) test case. It has no effect
167
            if ``memo_size`` is 0 (default: 2).
168
        :param cleanup: Enable deleting the generated tests at :meth:`__exit__`.
169
        :param encoding: Output file encoding.
170
        :param errors: Encoding error handling scheme.
171
        :param dry_run: Enable or disable the saving or printing of the result of generation.
172
        """
173

174
        self._generator_factory = generator_factory
1✔
175
        self._transformers = transformers or []
1✔
176
        self._serializer = serializer or str
1✔
177
        self._rule = rule
1✔
178

179
        if out_format and not dry_run:
1✔
180
            os.makedirs(abspath(dirname(out_format)), exist_ok=True)
1✔
181

182
        self._out_format = out_format
1✔
183
        self._lock = lock or nullcontext()
1✔
184
        self._limit = limit or RuleSize.max
1✔
185
        self._population = population
1✔
186
        self._enable_generation = generate
1✔
187
        self._enable_mutation = mutate
1✔
188
        self._enable_recombination = recombine
1✔
189
        self._keep_trees = keep_trees
1✔
190
        self._memo: dict[int, None] = {}  # NOTE: value associated to test is unimportant, but dict keeps insertion order while set does not
1✔
191
        self._memo_size = memo_size
1✔
192
        self._unique_attempts = max(unique_attempts, 1)
1✔
193
        self._cleanup = cleanup
1✔
194
        self._encoding = encoding
1✔
195
        self._errors = errors
1✔
196
        self._dry_run = dry_run
1✔
197

198
        self._generators: list[Callable[[Optional[Individual], Optional[Individual]], Optional[Rule]]] = [self.generate]
1✔
199
        self._mutators: list[Callable[[Optional[Individual], Optional[Individual]], Optional[Rule]]] = [
1✔
200
            self.regenerate_rule,
201
            self.delete_quantified,
202
            self.replicate_quantified,
203
            self.shuffle_quantifieds,
204
            self.hoist_rule,
205
            self.swap_local_nodes,
206
            self.insert_local_node,
207
        ]
208
        self._recombiners: list[Callable[[Optional[Individual], Optional[Individual]], Optional[Rule]]] = [
1✔
209
            self.replace_node,
210
            self.insert_quantified,
211
        ]
212
        if unrestricted:
1✔
213
            self._mutators += [
1✔
214
                self.unrestricted_delete,
215
                self.unrestricted_hoist_rule,
216
            ]
217

218
    def __enter__(self):
1✔
219
        return self
1✔
220

221
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
222
        """
223
        Delete the output directory if the tests were saved to files and if ``cleanup`` was enabled.
224
        """
225
        if self._cleanup and self._out_format and not self._dry_run:
1✔
226
            rmtree(dirname(self._out_format))
×
227

228
    def create_test(self, index: int) -> Optional[str]:
1✔
229
        """
230
        Create a new test case with a randomly selected generator method from the
231
        available options (see :meth:`generate`, :meth:`mutate`, and
232
        :meth:`recombine`). The generated tree is transformed, serialized and saved
233
        according to the parameters used to initialize the current tool object.
234

235
        :param index: Index of the test case to be generated.
236
        :return: Path to the generated serialized test file. It may be empty if
237
            the tool object was initialized with an empty ``out_format`` or
238
            ``None`` if ``dry_run`` was enabled, and hence the test file was not
239
            saved.
240
        """
241
        for attempt in range(1, self._unique_attempts + 1):
1✔
242
            root = self.create()
1✔
243
            test = self._serializer(root)
1✔
244
            if self._memoize_test(test):
1✔
245
                break
1✔
246
            logger.debug("test case #%d, attempt %d/%d: already generated among the last %d unique test cases", index, attempt, self._unique_attempts, len(self._memo))
×
247
        if self._dry_run:
1✔
248
            return None
×
249

250
        test_fn = self._out_format % index if '%d' in self._out_format else self._out_format
1✔
251

252
        if self._population is not None and self._keep_trees:
1✔
253
            self._population.add_individual(root, path=test_fn)
1✔
254

255
        if test_fn:
1✔
256
            with codecs.open(test_fn, 'w', self._encoding, self._errors) as f:
1✔
257
                f.write(test)
1✔
258
        else:
259
            with self._lock:
1✔
260
                print(test)
1✔
261

262
        return test_fn
1✔
263

264
    def _memoize_test(self, input: str) -> bool:
1✔
265
        # Memoize the (hash of the) test case. The size of the memo is capped
266
        # by ``memo_size``, i.e., it contains at most that many test cases.
267
        # Returns ``False`` if the test case was already in the memo, ``True``
268
        # if it got added now (or memoization is disabled by ``memo_size=0``).
269
        # When the memo is full and a new test case is added, the oldest entry
270
        # is evicted.
271
        if self._memo_size < 1:
1✔
272
            return True
1✔
273
        test = xxhash.xxh3_64_intdigest(input)
×
274
        if test in self._memo:
×
275
            return False
×
276
        self._memo[test] = None
×
277
        if len(self._memo) > self._memo_size:
×
278
            del self._memo[next(iter(self._memo))]
×
279
        return True
×
280

281
    def _select_creator(self, creators: list[Callable[[Optional[Individual], Optional[Individual]], Optional[Rule]]], individual1: Optional[Individual], individual2: Optional[Individual]) -> Callable[[Optional[Individual], Optional[Individual]], Optional[Rule]]:  # pylint: disable=unused-argument
1✔
282
        # NOTE: May be overridden.
283
        return random.choice(creators)
1✔
284

285
    def _create_tree(self, creators: list[Callable[[Optional[Individual], Optional[Individual]], Optional[Rule]]], individual1: Optional[Individual], individual2: Optional[Individual]) -> Rule:
1✔
286
        # Note: creators is potentially modified (creators that return None are removed). Always ensure it is a copy when calling this method.
287
        while creators:
1✔
288
            creator = self._select_creator(creators, individual1, individual2)
1✔
289
            root = creator(individual1, individual2)
1✔
290
            if root:
1✔
291
                break
1✔
292
            creators.remove(creator)
1✔
293
        else:
294
            assert individual1 is not None
×
295
            root = individual1.root
×
296

297
        for transformer in self._transformers:
1✔
298
            root = transformer(root)
×
299
        return root
1✔
300

301
    def create(self) -> Rule:
1✔
302
        """
303
        Create a new tree with a randomly selected generator method from the
304
        available options (see :meth:`generate`, :meth:`mutate`, and
305
        :meth:`recombine`). The generated tree is also transformed according to the
306
        parameters used to initialize the current tool object.
307

308
        :return: The root of the created tree.
309
        """
310
        individual1, individual2 = (self._ensure_individuals(None, None)) if self._population else (None, None)
1✔
311
        creators = []
1✔
312
        if self._enable_generation:
1✔
313
            creators.extend(self._generators)
1✔
314
        if self._population:
1✔
315
            if self._enable_mutation:
1✔
316
                creators.extend(self._mutators)
1✔
317
            if self._enable_recombination:
1✔
318
                creators.extend(self._recombiners)
1✔
319
        return self._create_tree(creators, individual1, individual2)
1✔
320

321
    def mutate(self, individual: Optional[Individual] = None) -> Rule:
1✔
322
        """
323
        Dispatcher method for mutation operators: it picks one operator randomly and
324
        creates a new tree by applying the operator to an individual. The generated
325
        tree is also transformed according to the parameters used to initialize the
326
        current tool object.
327

328
        Supported mutation operators: :meth:`regenerate_rule`,
329
        :meth:`delete_quantified`, :meth:`replicate_quantified`,
330
        :meth:`shuffle_quantifieds`, :meth:`hoist_rule`,
331
        :meth:`unrestricted_delete`, :meth:`unrestricted_hoist_rule`,
332
        :meth:`swap_local_nodes`, :meth:`insert_local_node`
333

334
        :param individual: The population item to be mutated.
335
        :return: The root of the mutated tree.
336
        """
337
        # NOTE: Intentionally does not check self._enable_mutation!
338
        # If you call this explicitly, then so be it, even if mutation is disabled.
339
        # If individual is None, population MUST exist.
340
        individual = self._ensure_individual(individual)
×
341
        return self._create_tree(self._mutators[:], individual, None)
×
342

343
    def recombine(self, individual1: Optional[Individual] = None, individual2: Optional[Individual] = None) -> Rule:
1✔
344
        """
345
        Dispatcher method for recombination operators: it picks one operator
346
        randomly and creates a new tree by applying the operator to an individual.
347
        The generated tree is also transformed according to the parameters used to
348
        initialize the current tool object.
349

350
        Supported recombination operators: :meth:`replace_node`,
351
        :meth:`insert_quantified`
352

353
        :param individual1: The population item to be used as a recipient during
354
            crossover.
355
        :param individual2: The population item to be used as a donor during
356
            crossover.
357
        :return: The root of the recombined tree.
358
        """
359
        # NOTE: Intentionally does not check self._enable_recombination!
360
        # If you call this explicitly, then so be it, even if recombination is disabled.
361
        # If any of the individuals is None, population MUST exist.
362
        individual1, individual2 = self._ensure_individuals(individual1, individual2)
×
363
        return self._create_tree(self._recombiners[:], individual1, individual2)
×
364

365
    def generate(self, _individual1: Optional[Individual] = None, _individual2: Optional[Individual] = None, *, rule: Optional[str] = None, reserve: Optional[RuleSize] = None) -> Union[UnlexerRule, UnparserRule]:
1✔
366
        """
367
        Instantiate a new generator and generate a new tree from scratch.
368

369
        :param rule: Name of the rule to start generation from.
370
        :param reserve: Size budget that needs to be put in reserve before
371
            generating the tree. Practically, deduced from the initially
372
            specified limit. (default values: 0, 0)
373
        :return: The root of the generated tree.
374
        """
375
        # NOTE: Intentionally does not check self._enable_generation!
376
        # If you call this explicitly, then so be it, even if generation is disabled.
377
        reserve = reserve if reserve is not None else RuleSize()
1✔
378
        generator = self._generator_factory(limit=self._limit - reserve)
1✔
379
        rule = rule or self._rule or generator._default_rule.__name__
1✔
380
        return getattr(generator, rule)()
1✔
381

382
    def _ensure_individual(self, individual: Optional[Individual]) -> Individual:
1✔
383
        if individual is None:
1✔
384
            assert self._population is not None
1✔
385
            individual = self._population.select_individual()
1✔
386
        return individual
1✔
387

388
    def _ensure_individuals(self, individual1: Optional[Individual], individual2: Optional[Individual]) -> tuple[Individual, Individual]:
1✔
389
        individual1 = self._ensure_individual(individual1)
1✔
390
        if individual2 is None:
1✔
391
            assert self._population is not None
1✔
392
            individual2 = self._population.select_individual(individual1)
1✔
393
        return individual1, individual2
1✔
394

395
    def regenerate_rule(self, individual: Optional[Individual] = None, _=None) -> Rule:
1✔
396
        """
397
        Mutate a tree at a random position, i.e., discard and re-generate its
398
        sub-tree at a randomly selected node.
399

400
        :param individual: The population item to be mutated.
401
        :return: The root of the mutated tree.
402
        """
403
        individual = self._ensure_individual(individual)
1✔
404
        root, annot = individual.root, individual.annotations
1✔
405

406
        # Filter items from the nodes of the selected tree that can be regenerated
407
        # within the current maximum depth and token limit (except immutable nodes).
408
        root_tokens = annot.node_tokens[root]
1✔
409
        options = [node for nodes in annot.rules_by_name.values() for node in nodes
1✔
410
                   if (node.parent is not None
411
                       and annot.node_levels[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).depth < self._limit.depth
412
                       and root_tokens - annot.node_tokens[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).tokens < self._limit.tokens)]
413
        if options:
1✔
414
            mutated_node = random.choice(options)
1✔
415
            reserve = RuleSize(depth=annot.node_levels[mutated_node],
1✔
416
                               tokens=root_tokens - annot.node_tokens[mutated_node])
417
            mutated_node = mutated_node.replace(self.generate(rule=mutated_node.name, reserve=reserve))  # type: ignore[assignment]
1✔
418
            return mutated_node.root
1✔
419

420
        # If selection strategy fails, we fallback and discard the whole tree
421
        # and generate a brand new one instead.
422
        return self.generate(rule=root.name)
×
423

424
    def replace_node(self, recipient_individual: Optional[Individual] = None, donor_individual: Optional[Individual] = None) -> Optional[Rule]:
1✔
425
        """
426
        Recombine two trees at random positions where the nodes are compatible
427
        with each other (i.e., they share the same node name). One of the trees
428
        is called the recipient while the other is the donor. The sub-tree
429
        rooted at a random node of the recipient is discarded and replaced
430
        by the sub-tree rooted at a random node of the donor.
431

432
        :param recipient_individual: The population item to be used as a
433
            recipient during crossover.
434
        :param donor_individual: The population item to be used as a donor
435
            during crossover.
436
        :return: The root of the recombined tree.
437
        """
438
        recipient_individual, donor_individual = self._ensure_individuals(recipient_individual, donor_individual)
1✔
439
        recipient_root, recipient_annot = recipient_individual.root, recipient_individual.annotations
1✔
440
        donor_annot = donor_individual.annotations
1✔
441

442
        recipient_lookup: dict[str, Sequence[Rule]] = dict(recipient_annot.rules_by_name)
1✔
443
        recipient_lookup.update(recipient_annot.quants_by_name)
1✔
444
        recipient_lookup.update(recipient_annot.alts_by_name)
1✔
445

446
        donor_lookup: dict[str, Sequence[Rule]] = dict(donor_annot.rules_by_name)
1✔
447
        donor_lookup.update(donor_annot.quants_by_name)
1✔
448
        donor_lookup.update(donor_annot.alts_by_name)
1✔
449
        common_types = sorted(set(recipient_lookup.keys()) & set(donor_lookup.keys()))
1✔
450

451
        recipient_options = [(rule_name, node) for rule_name in common_types for node in recipient_lookup[rule_name] if node.parent]
1✔
452
        recipient_root_tokens = recipient_annot.node_tokens[recipient_root]
1✔
453
        # Shuffle suitable nodes with sample.
454
        for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)):
1✔
455
            donor_options = donor_lookup[rule_name]
1✔
456
            recipient_node_level = recipient_annot.node_levels[recipient_node]
1✔
457
            recipient_node_tokens = recipient_annot.node_tokens[recipient_node]
1✔
458
            for donor_node in random.sample(donor_options, k=len(donor_options)):
1✔
459
                # Make sure that the output tree won't exceed the depth limit.
460
                if (recipient_node_level + donor_annot.node_depths[donor_node] <= self._limit.depth
1✔
461
                        and recipient_root_tokens - recipient_node_tokens + donor_annot.node_tokens[donor_node] < self._limit.tokens):
462
                    recipient_node.replace(donor_node)
1✔
463
                    return recipient_root
1✔
464

465
        return None
×
466

467
    def insert_quantified(self, recipient_individual: Optional[Individual] = None, donor_individual: Optional[Individual] = None) -> Optional[Rule]:
1✔
468
        """
469
        Selects two compatible quantifier nodes from two trees randomly and if
470
        the quantifier node of the recipient tree is not full (the number of
471
        its children is less than the maximum count), then add one new child
472
        to it at a random position from the children of donors quantifier node.
473

474
        :param recipient_individual: The population item to be used as a
475
            recipient during crossover.
476
        :param donor_individual: The population item to be used as a donor
477
            during crossover.
478
        :return: The root of the extended tree.
479
        """
480
        recipient_individual, donor_individual = self._ensure_individuals(recipient_individual, donor_individual)
1✔
481
        recipient_root, recipient_annot = recipient_individual.root, recipient_individual.annotations
1✔
482
        donor_annot = donor_individual.annotations
1✔
483

484
        common_types = sorted(set(recipient_annot.quants_by_name.keys()) & set(donor_annot.quants_by_name.keys()))
1✔
485
        recipient_options = [(name, node) for name in common_types for node in recipient_annot.quants_by_name[name] if len(node.children) < node.stop]
1✔
486
        recipient_root_tokens = recipient_annot.node_tokens[recipient_root]
1✔
487
        for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)):
1✔
488
            recipient_node_level = recipient_annot.node_levels[recipient_node]
×
489
            donor_options = [quantified for quantifier in donor_annot.quants_by_name[rule_name] for quantified in quantifier.children]
×
490
            for donor_node in random.sample(donor_options, k=len(donor_options)):
×
491
                # Make sure that the output tree won't exceed the depth and token limits.
492
                if (recipient_node_level + donor_annot.node_depths[donor_node] <= self._limit.depth
×
493
                        and recipient_root_tokens + donor_annot.node_tokens[donor_node] < self._limit.tokens):
494
                    recipient_node.insert_child(random.randint(0, len(recipient_node.children)), donor_node)
×
495
                    return recipient_root
×
496
        return None
1✔
497

498
    def delete_quantified(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
499
        """
500
        Removes an optional subtree randomly from a quantifier node.
501

502
        :param individual: The population item to be mutated.
503
        :return: The root of the modified tree.
504
        """
505
        individual = self._ensure_individual(individual)
1✔
506
        root, annot = individual.root, individual.annotations
1✔
507
        options = [child for node in annot.quants if len(node.children) > node.start for child in node.children]
1✔
508
        if options:
1✔
509
            removed_node = random.choice(options)
×
510
            removed_node.remove()
×
511
            return root
×
512
        return None
1✔
513

514
    def unrestricted_delete(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
515
        """
516
        Remove a subtree rooted in any kind of rule node randomly without any
517
        further restriction.
518

519
        :param individual: The population item to be mutated.
520
        :return: The root of the modified tree.
521
        """
522
        individual = self._ensure_individual(individual)
1✔
523
        root, annot = individual.root, individual.annotations
1✔
524
        options = [node for node in annot.rules if node != root]
1✔
525
        if options:
1✔
526
            removed_node = random.choice(options)
1✔
527
            removed_node.remove()
1✔
528
            return root
1✔
529
        return None
×
530

531
    def replicate_quantified(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
532
        """
533
        Select a quantified sub-tree randomly, replicate it and insert it again if
534
        the maximum quantification count is not reached yet.
535

536
        :param individual: The population item to be mutated.
537
        :return: The root of the modified tree.
538
        """
539
        individual = self._ensure_individual(individual)
1✔
540
        root, annot = individual.root, individual.annotations
1✔
541
        root_options = [node for node in annot.quants if node.stop > len(node.children)]
1✔
542
        recipient_root_tokens = annot.node_tokens[root]
1✔
543
        node_options = [child for root in root_options for child in root.children if
1✔
544
                        recipient_root_tokens < recipient_root_tokens + annot.node_tokens[child] <= self._limit.tokens]
545
        if node_options:
1✔
546
            node_to_repeat = random.choice(node_options)
×
547
            max_repeat = (self._limit.tokens - recipient_root_tokens) // annot.node_tokens[node_to_repeat] if self._limit.tokens != RuleSize.max.tokens else 1
×
548
            repeat = random.randint(1, int(max_repeat)) if max_repeat > 1 else 1
×
549
            for _ in range(repeat):
×
550
                node_to_repeat.parent.insert_child(idx=random.randint(0, len(node_to_repeat.parent.children)), node=deepcopy(node_to_repeat))  # type: ignore[union-attr]
×
551
            return root
×
552
        return None
1✔
553

554
    def shuffle_quantifieds(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
555
        """
556
        Select a quantifier node and shuffle its quantified sub-trees.
557

558
        :param individual: The population item to be mutated.
559
        :return: The root of the modified tree.
560
        """
561
        individual = self._ensure_individual(individual)
×
562
        root, annot = individual.root, individual.annotations
×
563
        options = [node for node in annot.quants if len(node.children) > 1]
×
564
        if options:
×
565
            node_to_shuffle = random.choice(options)
×
566
            random.shuffle(node_to_shuffle.children)
×
567
            return root
×
568
        return None
×
569

570
    def hoist_rule(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
571
        """
572
        Select an individual of the population to be mutated and select two
573
        rule nodes from it which share the same rule name and are in
574
        ancestor-descendant relationship making possible for the descendant
575
        to replace its ancestor.
576

577
        :param individual: The population item to be mutated.
578
        :return: The root of the hoisted tree.
579
        """
580
        individual = self._ensure_individual(individual)
1✔
581
        root, annot = individual.root, individual.annotations
1✔
582
        for rule in random.sample(annot.rules, k=len(annot.rules)):
1✔
583
            parent = rule.parent
1✔
584
            while parent:
1✔
585
                if parent.name == rule.name:
1✔
586
                    parent.replace(rule)
×
587
                    return root
×
588
                parent = parent.parent
1✔
589
        return None
1✔
590

591
    def unrestricted_hoist_rule(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
592
        """
593
        Select two rule nodes from the input individual which are in
594
        ancestor-descendant relationship (without type compatibility check) and
595
        replace the ancestor with the selected descendant.
596

597
        :param individual: The population item to be mutated.
598
        :return: The root of the modified tree.
599
        """
600
        individual = self._ensure_individual(individual)
1✔
601
        root, annot = individual.root, individual.annotations
1✔
602
        for rule in random.sample(annot.rules, k=len(annot.rules)):
1✔
603
            options = []
1✔
604
            parent = rule.parent
1✔
605
            while parent and parent != root:
1✔
606
                if isinstance(parent, UnparserRule) and len(parent.children) > 1 and not rule.equalTokens(parent):
1✔
607
                    options.append(parent)
×
608
                parent = parent.parent
1✔
609

610
            if options:
1✔
611
                random.choice(options).replace(rule)
×
612
                return root
×
613
        return None
1✔
614

615
    def swap_local_nodes(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
616
        """
617
        Swap two non-overlapping subtrees at random positions in a single test
618
        where the nodes are compatible with each other (i.e., they share the same node name).
619

620
        :param individual: The population item to be mutated
621
        :return: The root of the mutated tree.
622
        """
623
        individual = self._ensure_individual(individual)
×
624
        root, annot = individual.root, individual.annotations
×
625

626
        options: dict[str, Sequence[Rule]] = dict(annot.rules_by_name)
×
627
        options.update(annot.quants_by_name)
×
628
        options.update(annot.alts_by_name)
×
629

630
        for _, nodes in random.sample(list(options.items()), k=len(options)):
×
631
            # Skip node types without two instances.
632
            if len(nodes) < 2:
×
633
                continue
×
634

635
            shuffled = random.sample(nodes, k=len(nodes))
×
636
            for i, first_node in enumerate(shuffled[:-1]):
×
637
                first_node_level = annot.node_levels[first_node]
×
638
                first_node_depth = annot.node_depths[first_node]
×
639
                for second_node in shuffled[i + 1:]:
×
640
                    second_node_level = annot.node_levels[second_node]
×
641
                    second_node_depth = annot.node_depths[second_node]
×
642
                    if (first_node_level + second_node_depth > self._limit.depth
×
643
                            and second_node_level + first_node_depth > self._limit.depth):
644
                        continue
×
645

646
                    # Avoid swapping two identical nodes with each other.
647
                    if first_node.equalTokens(second_node):
×
648
                        continue
×
649

650
                    # Ensure the subtrees rooted at recipient and donor nodes are disjunct.
651
                    upper_node, lower_node = (first_node, second_node) if first_node_level < second_node_level else (second_node, first_node)
×
652
                    disjunct = True
×
653
                    parent = lower_node.parent
×
654
                    while parent and disjunct:
×
655
                        disjunct = parent != upper_node
×
656
                        parent = parent.parent
×
657

658
                    if not disjunct:
×
659
                        continue
×
660

661
                    first_parent = first_node.parent
×
662
                    second_parent = second_node.parent
×
663
                    assert first_parent is not None and second_parent is not None, 'Both nodes must have a parent.'
×
664
                    first_parent.children[first_parent.children.index(first_node)] = second_node
×
665
                    second_parent.children[second_parent.children.index(second_node)] = first_node
×
666
                    first_node.parent = second_parent
×
667
                    second_node.parent = first_parent
×
668
                    return root
×
669
        return None
×
670

671
    def insert_local_node(self, individual: Optional[Individual] = None, _=None) -> Optional[Rule]:
1✔
672
        """
673
        Select two compatible quantifier nodes from a single test and
674
        insert a random quantified subtree of the second one into the
675
        first one at a random position, while the quantifier restrictions
676
        are ensured.
677

678
        :param individual: The population item to be mutated
679
        :return: The root of the mutated tree.
680
        """
681
        individual = self._ensure_individual(individual)
1✔
682
        root, annot = individual.root, individual.annotations
1✔
683
        options = [quantifiers for quantifiers in annot.quants_by_name.values() if len(quantifiers) > 1]
1✔
684
        if not options:
1✔
685
            return root
1✔
686

687
        root_tokens = annot.node_tokens[root]
×
688
        for quantifiers in random.sample(options, k=len(options)):
×
689
            shuffled = random.sample(quantifiers, k=len(quantifiers))
×
690
            for i, recipient_node in enumerate(shuffled[:-1]):
×
691
                if len(recipient_node.children) >= recipient_node.stop:
×
692
                    continue
×
693

694
                recipient_node_level = annot.node_levels[recipient_node]
×
695
                for donor_quantifier in shuffled[i + 1:]:
×
696
                    for donor_quantified_node in donor_quantifier.children:
×
697
                        if (recipient_node_level + annot.node_depths[donor_quantified_node] <= self._limit.depth
×
698
                                and root_tokens + annot.node_tokens[donor_quantified_node] <= self._limit.tokens):
699
                            recipient_node.insert_child(random.randint(0, len(recipient_node.children)) if recipient_node.children else 0,
×
700
                                                        deepcopy(donor_quantified_node))
701
                            return root
×
702
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc