• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

renatahodovan / grammarinator / 19830531080

01 Dec 2025 10:53AM UTC coverage: 84.064% (-0.3%) from 84.369%
19830531080

push

github

renatahodovan
Ensure that allow and blocklist field of Tool are initialised

2205 of 2623 relevant lines covered (84.06%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.51
/grammarinator/tool/generator.py
1
# Copyright (c) 2017-2025 Renata Hodovan, Akos Kiss.
2
#
3
# Licensed under the BSD 3-Clause License
4
# <LICENSE.rst or https://opensource.org/licenses/BSD-3-Clause>.
5
# This file may not be copied, modified, or distributed except
6
# according to those terms.
7

8
import logging
1✔
9
import os
1✔
10
import random
1✔
11

12
from contextlib import nullcontext
1✔
13
from copy import deepcopy
1✔
14
from os.path import abspath, dirname
1✔
15
from shutil import rmtree
1✔
16
from typing import Callable, Sequence
1✔
17

18
import xxhash
1✔
19

20
from ..runtime import Generator, DefaultModel, Individual, Listener, Model, Population, Rule, RuleSize, UnlexerRule, UnparserRule, WeightedModel
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
class GeneratorFactory:
1✔
26
    """
27
    Base class of generator factories. A generator factory is a generalization
28
    of a generator class. It has to be a callable that, when called, must return
29
    a generator instance. It must also expose some properties of the generator
30
    class it generalizes that are required to guide generation or mutation by
31
    :class:`GeneratorTool`.
32

33
    This factory generalizes a generator class by simply wrapping it and
34
    forwarding call operations to instantiations of the wrapped class.
35
    Furthermore, generator factories deriving from this base class are
36
    guaranteed to expose all the required generator class properties.
37
    """
38

39
    def __init__(self, generator_class: type[Generator]) -> None:
1✔
40
        """
41
        :param generator_class: The class of the wrapped generator.
42
        """
43
        self._generator_class: type[Generator] = generator_class  #: The class of the wrapped generator.
1✔
44
        # Exposing some class variables of the encapsulated generator.
45
        # In the generator class, they start with `_` to avoid any kind of
46
        # collision with rule names, so they start with `_` here as well.
47
        self._rule_sizes: dict[str, RuleSize] = generator_class._rule_sizes  # Sizes of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor.
1✔
48
        self._alt_sizes: tuple[tuple[RuleSize, ...], ...] = generator_class._alt_sizes  # Sizes of the alternatives of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor.
1✔
49
        self._quant_sizes: tuple[RuleSize, ...] = generator_class._quant_sizes  # Sizes of the quantifiers of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor.
1✔
50

51
    def __call__(self, limit: RuleSize | None = None) -> Generator:
1✔
52
        """
53
        Create a new generator instance.
54

55
        :param limit: The limit on the depth of the trees and on the number of
56
            tokens (number of unlexer rule calls), i.e., it must be possible to
57
            finish generation from the selected node so that the overall depth
58
            and token count of the tree does not exceed these limits (default:
59
            :class:`~grammarinator.runtime.RuleSize`. ``max``). Used to
60
            instantiate the generator.
61
        :return: The created generator instance.
62
        """
63
        return self._generator_class(limit=limit)
×
64

65

66
class DefaultGeneratorFactory(GeneratorFactory):
1✔
67
    """
68
    The default generator factory implementation. When called, a new generator
69
    instance is created backed by a new decision model instance and a set of
70
    newly created listener objects is attached.
71
    """
72

73
    def __init__(self, generator_class: type[Generator], *,
1✔
74
                 model_class: type[Model] | None = None, weights: dict[tuple[str, int, int], float] | None = None,
75
                 listener_classes: list[type[Listener]] | None = None) -> None:
76
        """
77
        :param generator_class: The class of the generator to instantiate.
78
        :param model_class: The class of the model to instantiate. The model
79
            instance is used to instantiate the generator.
80
        :param weights: Initial multipliers of alternatives. Used to instantiate
81
            a :class:`~grammarinator.runtime.WeightedModel` wrapper around the
82
            model.
83
        :param listener_classes: List of listener classes to instantiate and
84
            attach to the generator.
85
        """
86
        super().__init__(generator_class)
1✔
87
        self._model_class: type[Model] = model_class or DefaultModel
1✔
88
        self._weights: dict[tuple[str, int, int], float] | None = weights
1✔
89
        self._listener_classes: list[type[Listener]] = listener_classes or []
1✔
90

91
    def __call__(self, limit: RuleSize | None = None) -> Generator:
1✔
92
        """
93
        Create a new generator instance according to the settings specified for
94
        the factory instance and for this method.
95

96
        :param limit: The limit on the depth of the trees and on the number of
97
            tokens (number of unlexer rule calls), i.e., it must be possible to
98
            finish generation from the selected node so that the overall depth
99
            and token count of the tree does not exceed these limits (default:
100
            :class:`~grammarinator.runtime.RuleSize`. ``max``). Used to
101
            instantiate the generator.
102
        :return: The created generator instance.
103
        """
104
        model = self._model_class()
1✔
105
        if self._weights:
1✔
106
            model = WeightedModel(model, weights=self._weights)
1✔
107

108
        listeners = []
1✔
109
        for listener_class in self._listener_classes:
1✔
110
            listeners.append(listener_class())
1✔
111

112
        generator = self._generator_class(model=model, listeners=listeners, limit=limit)
1✔
113

114
        return generator
1✔
115

116

117
class GeneratorTool:
1✔
118
    """
119
    Tool to create new test cases using the generator produced by
120
    ``grammarinator-process``.
121
    """
122

123
    def __init__(self, generator_factory: type[Generator] | GeneratorFactory, out_format: str, lock=None, rule: str | None = None, limit: RuleSize | None = None,
1✔
124
                 population: Population | None = None, keep_trees: bool = False,
125
                 generate: bool = True, mutate: bool = True, recombine: bool = True, unrestricted: bool = True,
126
                 allowlist: list[str] | None = None, blocklist: list[str] | None = None,
127
                 transformers: list[Callable[[Rule], Rule]] | None = None, serializer: Callable[[Rule], str] | None = None, memo_size: int = 0, unique_attempts: int = 2,
128
                 cleanup: bool = True, encoding: str = 'utf-8', errors: str = 'strict', dry_run: bool = False):
129
        """
130
        :param generator_factory: A callable that can produce instances of a
131
            generator. It is a generalization of a generator class: it has to
132
            instantiate a generator object, and it may also set the decision
133
            model and the listeners of the generator as well. It also has to
134
            expose some properties of the generator class necessary to guide
135
            generation or mutation. In the simplest case, it can be a
136
            ``grammarinator-process``-created subclass of :class:`Generator`,
137
            but in more complex scenarios a factory can be used, e.g., an
138
            instance of a subclass of :class:`GeneratorFactory`,
139
            like :class:`DefaultGeneratorFactory`.
140
        :param rule: Name of the rule to start generation from (default: the
141
            default rule of the generator).
142
        :param out_format: Test output description. It can be a file path
143
            pattern possibly including the ``%d`` placeholder which will be
144
            replaced by the index of the test case. Otherwise, it can be an
145
            empty string, which will result in printing the test case to the
146
            stdout (i.e., not saving to file system).
147
        :param lock: Lock object necessary when printing test cases in parallel
148
            (optional).
149
        :type lock: :class:`multiprocessing.Lock` | None
150
        :param limit: The limit on the depth of the trees and on the number of
151
            tokens (number of unlexer rule calls), i.e., it must be possible to
152
            finish generation from the selected node so that the overall depth
153
            and token count of the tree does not exceed these limits (default:
154
            :class:`~grammarinator.runtime.RuleSize`. ``max``).
155
        :param population: Tree pool for mutation and recombination, e.g., an
156
            instance of :class:`FilePopulation`.
157
        :param keep_trees: Keep generated trees to participate in further
158
            mutations or recombinations (otherwise, only the initial population
159
            will be mutated or recombined). It has effect only if population is
160
            defined.
161
        :param generate: Enable generating new test cases from scratch, i.e.,
162
            purely based on grammar.
163
        :param mutate: Enable mutating existing test cases, i.e., re-generate
164
            part of an existing test case based on grammar.
165
        :param recombine: Enable recombining existing test cases, i.e., replace
166
            part of a test case with a compatible part from another test case.
167
        :param unrestricted: Enable applying possibly grammar-violating
168
            creators.
169
        :param allowlist: List of mutators to allow (by default, all mutators
170
            are allowed).
171
        :param blocklist: List of mutators to block (by default, no mutators are
172
            blocked).
173
        :param transformers: List of transformers to be applied to postprocess
174
            the generated tree before serializing it.
175
        :param serializer: A serializer that takes a tree and produces a string
176
            from it (default: :class:`str`). See
177
            :func:`grammarinator.runtime.simple_space_serializer` for a simple
178
            solution that concatenates tokens with spaces.
179
        :param memo_size: The number of most recently created unique tests
180
            memoized (default: 0).
181
        :param unique_attempts: The limit on how many times to try to generate a
182
            unique (i.e., non-memoized) test case. It has no effect if
183
            ``memo_size`` is 0 (default: 2).
184
        :param cleanup: Enable deleting the generated tests at :meth:`__exit__`.
185
        :param encoding: Output file encoding.
186
        :param errors: Encoding error handling scheme.
187
        :param dry_run: Enable or disable the saving or printing of the result
188
            of generation.
189
        """
190

191
        self._generator_factory = generator_factory
1✔
192
        self._transformers = transformers or []
1✔
193
        self._serializer = serializer or str
1✔
194
        self._rule = rule
1✔
195

196
        if out_format and not dry_run:
1✔
197
            os.makedirs(abspath(dirname(out_format)), exist_ok=True)
1✔
198

199
        self._out_format = out_format
1✔
200
        self._lock = lock or nullcontext()
1✔
201
        self._limit = limit or RuleSize.max
1✔
202
        self._population = population
1✔
203
        self._enable_generation = generate
1✔
204
        self._enable_mutation = mutate
1✔
205
        self._enable_recombination = recombine
1✔
206
        self._keep_trees = keep_trees
1✔
207
        self._memo: dict[int, None] = {}  # NOTE: value associated to test is unimportant, but dict keeps insertion order while set does not
1✔
208
        self._memo_size = memo_size
1✔
209
        self._unique_attempts = max(unique_attempts, 1)
1✔
210
        self._cleanup = cleanup
1✔
211
        self._encoding = encoding
1✔
212
        self._errors = errors
1✔
213
        self._dry_run = dry_run
1✔
214

215
        self.allowlist = allowlist or []
1✔
216
        self.blocklist = blocklist or []
1✔
217

218
        self._generators: list[Callable[[Individual | None, Individual | None], Rule | None]] = []
1✔
219
        self._mutators: list[Callable[[Individual | None, Individual | None], Rule | None]] = []
1✔
220
        self._recombiners: list[Callable[[Individual | None, Individual | None], Rule | None]] = []
1✔
221

222
        if generate:
1✔
223
            self._allow_creator(self._generators, "generate", self.generate)
1✔
224

225
        if mutate:
1✔
226
            self._allow_creator(self._mutators, "regenerate_rule", self.regenerate_rule)
1✔
227
            self._allow_creator(self._mutators, "delete_quantified", self.delete_quantified)
1✔
228
            self._allow_creator(self._mutators, "replicate_quantified", self.replicate_quantified)
1✔
229
            self._allow_creator(self._mutators, "shuffle_quantifieds", self.shuffle_quantifieds)
1✔
230
            self._allow_creator(self._mutators, "hoist_rule", self.hoist_rule)
1✔
231
            self._allow_creator(self._mutators, "swap_local_nodes", self.swap_local_nodes)
1✔
232
            self._allow_creator(self._mutators, "insert_local_node", self.insert_local_node)
1✔
233
            if unrestricted:
1✔
234
                self._allow_creator(self._mutators, "unrestricted_delete", self.unrestricted_delete)
1✔
235
                self._allow_creator(self._mutators, "unrestricted_hoist_rule", self.unrestricted_hoist_rule)
1✔
236

237
        if recombine:
1✔
238
            self._allow_creator(self._recombiners, "replace_node", self.replace_node)
1✔
239
            self._allow_creator(self._recombiners, "insert_quantified", self.insert_quantified)
1✔
240

241
    def __enter__(self):
1✔
242
        return self
1✔
243

244
    def __exit__(self, exc_type, exc_val, exc_tb):
1✔
245
        """
246
        Delete the output directory if the tests were saved to files and if
247
        ``cleanup`` was enabled.
248
        """
249
        if self._cleanup and self._out_format and not self._dry_run:
1✔
250
            rmtree(dirname(self._out_format))
×
251

252
    def _allow_creator(self, creator_map: list[Callable[[Individual | None, Individual | None], Rule | None]], creator_name: str, creator: Callable[[Individual | None, Individual | None], Rule | None]):
1✔
253
        if creator_name not in self.blocklist and (not self.allowlist or creator_name in self.allowlist):
1✔
254
            creator_map.append(creator)
1✔
255

256
    def create_test(self, index: int) -> str | None:
1✔
257
        """
258
        Create a new test case with a randomly selected generator method from
259
        the available options (see :meth:`generate`, :meth:`mutate`, and
260
        :meth:`recombine`). The generated tree is transformed, serialized and
261
        saved according to the parameters used to initialize the current tool
262
        object.
263

264
        :param index: Index of the test case to be generated.
265
        :return: Path to the generated serialized test file. It may be empty if
266
            the tool object was initialized with an empty ``out_format`` or
267
            ``None`` if ``dry_run`` was enabled, and hence the test file was not
268
            saved.
269
        """
270
        for attempt in range(1, self._unique_attempts + 1):
1✔
271
            root = self.create()
1✔
272
            test = self._serializer(root)
1✔
273
            if self._memoize_test(test):
1✔
274
                break
1✔
275
            logger.debug("test case #%d, attempt %d/%d: already generated among the last %d unique test cases", index, attempt, self._unique_attempts, len(self._memo))
×
276
        if self._dry_run:
1✔
277
            return None
×
278

279
        test_fn = self._out_format % index if '%d' in self._out_format else self._out_format
1✔
280

281
        if self._population is not None and self._keep_trees:
1✔
282
            self._population.add_individual(root, path=test_fn)
1✔
283

284
        if test_fn:
1✔
285
            with open(test_fn, 'w', encoding=self._encoding, errors=self._errors, newline='') as f:
1✔
286
                f.write(test)
1✔
287
        else:
288
            with self._lock:
1✔
289
                print(test)
1✔
290

291
        return test_fn
1✔
292

293
    def _memoize_test(self, input: str) -> bool:
1✔
294
        # Memoize the (hash of the) test case. The size of the memo is capped
295
        # by ``memo_size``, i.e., it contains at most that many test cases.
296
        # Returns ``False`` if the test case was already in the memo, ``True``
297
        # if it got added now (or memoization is disabled by ``memo_size=0``).
298
        # When the memo is full and a new test case is added, the oldest entry
299
        # is evicted.
300
        if self._memo_size < 1:
1✔
301
            return True
1✔
302
        test = xxhash.xxh3_64_intdigest(input)
×
303
        if test in self._memo:
×
304
            return False
×
305
        self._memo[test] = None
×
306
        if len(self._memo) > self._memo_size:
×
307
            del self._memo[next(iter(self._memo))]
×
308
        return True
×
309

310
    def _select_creator(self, creators: list[Callable[[Individual | None, Individual | None], Rule | None]], individual1: Individual | None, individual2: Individual | None) -> Callable[[Individual | None, Individual | None], Rule | None]:  # pylint: disable=unused-argument
1✔
311
        # NOTE: May be overridden.
312
        return random.choice(creators)
1✔
313

314
    def _create_tree(self, creators: list[Callable[[Individual | None, Individual | None], Rule | None]], individual1: Individual | None, individual2: Individual | None) -> Rule:
1✔
315
        # Note: creators is potentially modified (creators that return None are removed). Always ensure it is a copy when calling this method.
316
        while creators:
1✔
317
            creator = self._select_creator(creators, individual1, individual2)
1✔
318
            root = creator(individual1, individual2)
1✔
319
            if root:
1✔
320
                break
1✔
321
            creators.remove(creator)
1✔
322
        else:
323
            assert individual1 is not None
×
324
            root = individual1.root
×
325

326
        for transformer in self._transformers:
1✔
327
            root = transformer(root)
×
328
        return root
1✔
329

330
    def create(self) -> Rule:
1✔
331
        """
332
        Create a new tree with a randomly selected generator method from the
333
        available options (see :meth:`generate`, :meth:`mutate`, and
334
        :meth:`recombine`). The generated tree is also transformed according to
335
        the parameters used to initialize the current tool object.
336

337
        :return: The root of the created tree.
338
        """
339
        individual1, individual2 = (self._ensure_individuals(None, None)) if self._population else (None, None)
1✔
340
        creators = []
1✔
341
        creators.extend(self._generators)
1✔
342
        if self._population:
1✔
343
            creators.extend(self._mutators)
1✔
344
            creators.extend(self._recombiners)
1✔
345
        return self._create_tree(creators, individual1, individual2)
1✔
346

347
    def mutate(self, individual: Individual | None = None) -> Rule:
1✔
348
        """
349
        Dispatcher method for mutation operators: it picks one operator randomly
350
        and creates a new tree by applying the operator to an individual. The
351
        generated tree is also transformed according to the parameters used to
352
        initialize the current tool object.
353

354
        Supported mutation operators: :meth:`regenerate_rule`,
355
        :meth:`delete_quantified`, :meth:`replicate_quantified`,
356
        :meth:`shuffle_quantifieds`, :meth:`hoist_rule`,
357
        :meth:`unrestricted_delete`, :meth:`unrestricted_hoist_rule`,
358
        :meth:`swap_local_nodes`, :meth:`insert_local_node`
359

360
        :param individual: The population item to be mutated.
361
        :return: The root of the mutated tree.
362
        """
363
        # NOTE: Intentionally does not check self._enable_mutation!
364
        # If you call this explicitly, then so be it, even if mutation is disabled.
365
        # If individual is None, population MUST exist.
366
        individual = self._ensure_individual(individual)
×
367
        return self._create_tree(self._mutators[:], individual, None)
×
368

369
    def recombine(self, individual1: Individual | None = None, individual2: Individual | None = None) -> Rule:
1✔
370
        """
371
        Dispatcher method for recombination operators: it picks one operator
372
        randomly and creates a new tree by applying the operator to an
373
        individual. The generated tree is also transformed according to the
374
        parameters used to initialize the current tool object.
375

376
        Supported recombination operators: :meth:`replace_node`,
377
        :meth:`insert_quantified`
378

379
        :param individual1: The population item to be used as a recipient during
380
            crossover.
381
        :param individual2: The population item to be used as a donor during
382
            crossover.
383
        :return: The root of the recombined tree.
384
        """
385
        # NOTE: Intentionally does not check self._enable_recombination!
386
        # If you call this explicitly, then so be it, even if recombination is disabled.
387
        # If any of the individuals is None, population MUST exist.
388
        individual1, individual2 = self._ensure_individuals(individual1, individual2)
×
389
        return self._create_tree(self._recombiners[:], individual1, individual2)
×
390

391
    def generate(self, _individual1: Individual | None = None, _individual2: Individual | None = None, *, rule: str | None = None, reserve: RuleSize | None = None) -> UnlexerRule | UnparserRule:
1✔
392
        """
393
        Instantiate a new generator and generate a new tree from scratch.
394

395
        :param rule: Name of the rule to start generation from.
396
        :param reserve: Size budget that needs to be put in reserve before
397
            generating the tree. Practically, deduced from the initially
398
            specified limit. (default values: 0, 0)
399
        :return: The root of the generated tree.
400
        """
401
        # NOTE: Intentionally does not check self._enable_generation!
402
        # If you call this explicitly, then so be it, even if generation is disabled.
403
        reserve = reserve if reserve is not None else RuleSize()
1✔
404
        generator = self._generator_factory(limit=self._limit - reserve)
1✔
405
        rule = rule or self._rule or generator._default_rule.__name__
1✔
406
        return getattr(generator, rule)()
1✔
407

408
    def _ensure_individual(self, individual: Individual | None) -> Individual:
1✔
409
        if individual is None:
1✔
410
            assert self._population is not None
1✔
411
            individual = self._population.select_individual()
1✔
412
        return individual
1✔
413

414
    def _ensure_individuals(self, individual1: Individual | None, individual2: Individual | None) -> tuple[Individual, Individual]:
1✔
415
        individual1 = self._ensure_individual(individual1)
1✔
416
        if individual2 is None:
1✔
417
            assert self._population is not None
1✔
418
            individual2 = self._population.select_individual(individual1)
1✔
419
        return individual1, individual2
1✔
420

421
    def regenerate_rule(self, individual: Individual | None = None, _=None) -> Rule:
1✔
422
        """
423
        Mutate a tree at a random position, i.e., discard and re-generate its
424
        sub-tree at a randomly selected node.
425

426
        :param individual: The population item to be mutated.
427
        :return: The root of the mutated tree.
428
        """
429
        individual = self._ensure_individual(individual)
1✔
430
        root, annot = individual.root, individual.annotations
1✔
431

432
        # Filter items from the nodes of the selected tree that can be regenerated
433
        # within the current maximum depth and token limit (except immutable nodes).
434
        root_tokens = annot.node_tokens[root]
1✔
435
        options = [node for nodes in annot.rules_by_name.values() for node in nodes
1✔
436
                   if (node.parent is not None
437
                       and annot.node_levels[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).depth < self._limit.depth
438
                       and root_tokens - annot.node_tokens[node] + self._generator_factory._rule_sizes.get(node.name, RuleSize(0, 0)).tokens < self._limit.tokens)]
439
        if options:
1✔
440
            mutated_node = random.choice(options)
1✔
441
            reserve = RuleSize(depth=annot.node_levels[mutated_node],
1✔
442
                               tokens=root_tokens - annot.node_tokens[mutated_node])
443
            mutated_node = mutated_node.replace(self.generate(rule=mutated_node.name, reserve=reserve))  # type: ignore[assignment]
1✔
444
            return mutated_node.root
1✔
445

446
        # If selection strategy fails, we fallback and discard the whole tree
447
        # and generate a brand new one instead.
448
        return self.generate(rule=root.name)
×
449

450
    def replace_node(self, recipient_individual: Individual | None = None, donor_individual: Individual | None = None) -> Rule | None:
1✔
451
        """
452
        Recombine two trees at random positions where the nodes are compatible
453
        with each other (i.e., they share the same node name). One of the trees
454
        is called the recipient while the other is the donor. The sub-tree
455
        rooted at a random node of the recipient is discarded and replaced by
456
        the sub-tree rooted at a random node of the donor.
457

458
        :param recipient_individual: The population item to be used as a
459
            recipient during crossover.
460
        :param donor_individual: The population item to be used as a donor
461
            during crossover.
462
        :return: The root of the recombined tree.
463
        """
464
        recipient_individual, donor_individual = self._ensure_individuals(recipient_individual, donor_individual)
1✔
465
        recipient_root, recipient_annot = recipient_individual.root, recipient_individual.annotations
1✔
466
        donor_annot = donor_individual.annotations
1✔
467

468
        recipient_lookup: dict[str, Sequence[Rule]] = dict(recipient_annot.rules_by_name)
1✔
469
        recipient_lookup.update(recipient_annot.quants_by_name)
1✔
470
        recipient_lookup.update(recipient_annot.alts_by_name)
1✔
471

472
        donor_lookup: dict[str, Sequence[Rule]] = dict(donor_annot.rules_by_name)
1✔
473
        donor_lookup.update(donor_annot.quants_by_name)
1✔
474
        donor_lookup.update(donor_annot.alts_by_name)
1✔
475
        common_types = sorted(set(recipient_lookup.keys()) & set(donor_lookup.keys()))
1✔
476

477
        recipient_options = [(rule_name, node) for rule_name in common_types for node in recipient_lookup[rule_name] if node.parent]
1✔
478
        recipient_root_tokens = recipient_annot.node_tokens[recipient_root]
1✔
479
        # Shuffle suitable nodes with sample.
480
        for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)):
1✔
481
            donor_options = donor_lookup[rule_name]
1✔
482
            recipient_node_level = recipient_annot.node_levels[recipient_node]
1✔
483
            recipient_node_tokens = recipient_annot.node_tokens[recipient_node]
1✔
484
            for donor_node in random.sample(donor_options, k=len(donor_options)):
1✔
485
                # Make sure that the output tree won't exceed the depth limit.
486
                if (recipient_node_level + donor_annot.node_depths[donor_node] <= self._limit.depth
1✔
487
                        and recipient_root_tokens - recipient_node_tokens + donor_annot.node_tokens[donor_node] < self._limit.tokens):
488
                    recipient_node.replace(donor_node)
1✔
489
                    return recipient_root
1✔
490

491
        return None
×
492

493
    def insert_quantified(self, recipient_individual: Individual | None = None, donor_individual: Individual | None = None) -> Rule | None:
1✔
494
        """
495
        Selects two compatible quantifier nodes from two trees randomly and if
496
        the quantifier node of the recipient tree is not full (the number of its
497
        children is less than the maximum count), then add one new child to it
498
        at a random position from the children of donors quantifier node.
499

500
        :param recipient_individual: The population item to be used as a
501
            recipient during crossover.
502
        :param donor_individual: The population item to be used as a donor
503
            during crossover.
504
        :return: The root of the extended tree.
505
        """
506
        recipient_individual, donor_individual = self._ensure_individuals(recipient_individual, donor_individual)
1✔
507
        recipient_root, recipient_annot = recipient_individual.root, recipient_individual.annotations
1✔
508
        donor_annot = donor_individual.annotations
1✔
509

510
        common_types = sorted(set(recipient_annot.quants_by_name.keys()) & set(donor_annot.quants_by_name.keys()))
1✔
511
        recipient_options = [(name, node) for name in common_types for node in recipient_annot.quants_by_name[name] if len(node.children) < node.stop]
1✔
512
        recipient_root_tokens = recipient_annot.node_tokens[recipient_root]
1✔
513
        for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)):
1✔
514
            recipient_node_level = recipient_annot.node_levels[recipient_node]
×
515
            donor_options = [quantified for quantifier in donor_annot.quants_by_name[rule_name] for quantified in quantifier.children]
×
516
            for donor_node in random.sample(donor_options, k=len(donor_options)):
×
517
                # Make sure that the output tree won't exceed the depth and token limits.
518
                if (recipient_node_level + donor_annot.node_depths[donor_node] <= self._limit.depth
×
519
                        and recipient_root_tokens + donor_annot.node_tokens[donor_node] < self._limit.tokens):
520
                    recipient_node.insert_child(random.randint(0, len(recipient_node.children)), donor_node)
×
521
                    return recipient_root
×
522
        return None
1✔
523

524
    def delete_quantified(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
525
        """
526
        Removes an optional subtree randomly from a quantifier node.
527

528
        :param individual: The population item to be mutated.
529
        :return: The root of the modified tree.
530
        """
531
        individual = self._ensure_individual(individual)
1✔
532
        root, annot = individual.root, individual.annotations
1✔
533
        options = [child for node in annot.quants if len(node.children) > node.start for child in node.children]
1✔
534
        if options:
1✔
535
            removed_node = random.choice(options)
×
536
            removed_node.remove()
×
537
            return root
×
538
        return None
1✔
539

540
    def unrestricted_delete(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
541
        """
542
        Remove a subtree rooted in any kind of rule node randomly without any
543
        further restriction.
544

545
        :param individual: The population item to be mutated.
546
        :return: The root of the modified tree.
547
        """
548
        individual = self._ensure_individual(individual)
×
549
        root, annot = individual.root, individual.annotations
×
550
        options = [node for node in annot.rules if node != root]
×
551
        if options:
×
552
            removed_node = random.choice(options)
×
553
            removed_node.remove()
×
554
            return root
×
555
        return None
×
556

557
    def replicate_quantified(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
558
        """
559
        Select a quantified sub-tree randomly, replicate it and insert it again
560
        if the maximum quantification count is not reached yet.
561

562
        :param individual: The population item to be mutated.
563
        :return: The root of the modified tree.
564
        """
565
        individual = self._ensure_individual(individual)
1✔
566
        root, annot = individual.root, individual.annotations
1✔
567
        root_options = [node for node in annot.quants if node.stop > len(node.children)]
1✔
568
        recipient_root_tokens = annot.node_tokens[root]
1✔
569
        node_options = [child for root in root_options for child in root.children if
1✔
570
                        recipient_root_tokens < recipient_root_tokens + annot.node_tokens[child] <= self._limit.tokens]
571
        if node_options:
1✔
572
            node_to_repeat = random.choice(node_options)
×
573
            max_repeat = (self._limit.tokens - recipient_root_tokens) // annot.node_tokens[node_to_repeat] if self._limit.tokens != RuleSize.max.tokens else 1
×
574
            repeat = random.randint(1, int(max_repeat)) if max_repeat > 1 else 1
×
575
            for _ in range(repeat):
×
576
                node_to_repeat.parent.insert_child(idx=random.randint(0, len(node_to_repeat.parent.children)), node=deepcopy(node_to_repeat))  # type: ignore[union-attr]
×
577
            return root
×
578
        return None
1✔
579

580
    def shuffle_quantifieds(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
581
        """
582
        Select a quantifier node and shuffle its quantified sub-trees.
583

584
        :param individual: The population item to be mutated.
585
        :return: The root of the modified tree.
586
        """
587
        individual = self._ensure_individual(individual)
1✔
588
        root, annot = individual.root, individual.annotations
1✔
589
        options = [node for node in annot.quants if len(node.children) > 1]
1✔
590
        if options:
1✔
591
            node_to_shuffle = random.choice(options)
×
592
            random.shuffle(node_to_shuffle.children)
×
593
            return root
×
594
        return None
1✔
595

596
    def hoist_rule(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
597
        """
598
        Select an individual of the population to be mutated and select two rule
599
        nodes from it which share the same rule name and are in
600
        ancestor-descendant relationship making possible for the descendant to
601
        replace its ancestor.
602

603
        :param individual: The population item to be mutated.
604
        :return: The root of the hoisted tree.
605
        """
606
        individual = self._ensure_individual(individual)
1✔
607
        root, annot = individual.root, individual.annotations
1✔
608
        for rule in random.sample(annot.rules, k=len(annot.rules)):
1✔
609
            parent = rule.parent
1✔
610
            while parent:
1✔
611
                if parent.name == rule.name:
1✔
612
                    parent.replace(rule)
×
613
                    return root
×
614
                parent = parent.parent
1✔
615
        return None
1✔
616

617
    def unrestricted_hoist_rule(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
618
        """
619
        Select two rule nodes from the input individual which are in
620
        ancestor-descendant relationship (without type compatibility check) and
621
        replace the ancestor with the selected descendant.
622

623
        :param individual: The population item to be mutated.
624
        :return: The root of the modified tree.
625
        """
626
        individual = self._ensure_individual(individual)
1✔
627
        root, annot = individual.root, individual.annotations
1✔
628
        for rule in random.sample(annot.rules, k=len(annot.rules)):
1✔
629
            options = []
1✔
630
            parent = rule.parent
1✔
631
            while parent and parent != root:
1✔
632
                if isinstance(parent, UnparserRule) and len(parent.children) > 1 and not rule.equalTokens(parent):
1✔
633
                    options.append(parent)
×
634
                parent = parent.parent
1✔
635

636
            if options:
1✔
637
                random.choice(options).replace(rule)
×
638
                return root
×
639
        return None
1✔
640

641
    def swap_local_nodes(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
642
        """
643
        Swap two non-overlapping subtrees at random positions in a single test
644
        where the nodes are compatible with each other (i.e., they share the
645
        same node name).
646

647
        :param individual: The population item to be mutated
648
        :return: The root of the mutated tree.
649
        """
650
        individual = self._ensure_individual(individual)
×
651
        root, annot = individual.root, individual.annotations
×
652

653
        options: dict[str, Sequence[Rule]] = dict(annot.rules_by_name)
×
654
        options.update(annot.quants_by_name)
×
655
        options.update(annot.alts_by_name)
×
656

657
        for _, nodes in random.sample(list(options.items()), k=len(options)):
×
658
            # Skip node types without two instances.
659
            if len(nodes) < 2:
×
660
                continue
×
661

662
            shuffled = random.sample(nodes, k=len(nodes))
×
663
            for i, first_node in enumerate(shuffled[:-1]):
×
664
                first_node_level = annot.node_levels[first_node]
×
665
                first_node_depth = annot.node_depths[first_node]
×
666
                for second_node in shuffled[i + 1:]:
×
667
                    second_node_level = annot.node_levels[second_node]
×
668
                    second_node_depth = annot.node_depths[second_node]
×
669
                    if (first_node_level + second_node_depth > self._limit.depth
×
670
                            and second_node_level + first_node_depth > self._limit.depth):
671
                        continue
×
672

673
                    # Avoid swapping two identical nodes with each other.
674
                    if first_node.equalTokens(second_node):
×
675
                        continue
×
676

677
                    # Ensure the subtrees rooted at recipient and donor nodes are disjunct.
678
                    upper_node, lower_node = (first_node, second_node) if first_node_level < second_node_level else (second_node, first_node)
×
679
                    disjunct = True
×
680
                    parent = lower_node.parent
×
681
                    while parent and disjunct:
×
682
                        disjunct = parent != upper_node
×
683
                        parent = parent.parent
×
684

685
                    if not disjunct:
×
686
                        continue
×
687

688
                    first_parent = first_node.parent
×
689
                    second_parent = second_node.parent
×
690
                    assert first_parent is not None and second_parent is not None, 'Both nodes must have a parent.'
×
691
                    first_parent.children[first_parent.children.index(first_node)] = second_node
×
692
                    second_parent.children[second_parent.children.index(second_node)] = first_node
×
693
                    first_node.parent = second_parent
×
694
                    second_node.parent = first_parent
×
695
                    return root
×
696
        return None
×
697

698
    def insert_local_node(self, individual: Individual | None = None, _=None) -> Rule | None:
1✔
699
        """
700
        Select two compatible quantifier nodes from a single test and insert a
701
        random quantified subtree of the second one into the first one at a
702
        random position, while the quantifier restrictions are ensured.
703

704
        :param individual: The population item to be mutated
705
        :return: The root of the mutated tree.
706
        """
707
        individual = self._ensure_individual(individual)
1✔
708
        root, annot = individual.root, individual.annotations
1✔
709
        options = [quantifiers for quantifiers in annot.quants_by_name.values() if len(quantifiers) > 1]
1✔
710
        if not options:
1✔
711
            return root
1✔
712

713
        root_tokens = annot.node_tokens[root]
×
714
        for quantifiers in random.sample(options, k=len(options)):
×
715
            shuffled = random.sample(quantifiers, k=len(quantifiers))
×
716
            for i, recipient_node in enumerate(shuffled[:-1]):
×
717
                if len(recipient_node.children) >= recipient_node.stop:
×
718
                    continue
×
719

720
                recipient_node_level = annot.node_levels[recipient_node]
×
721
                for donor_quantifier in shuffled[i + 1:]:
×
722
                    for donor_quantified_node in donor_quantifier.children:
×
723
                        if (recipient_node_level + annot.node_depths[donor_quantified_node] <= self._limit.depth
×
724
                                and root_tokens + annot.node_tokens[donor_quantified_node] <= self._limit.tokens):
725
                            recipient_node.insert_child(random.randint(0, len(recipient_node.children)) if recipient_node.children else 0,
×
726
                                                        deepcopy(donor_quantified_node))
727
                            return root
×
728
        return None
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc