• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ContinualAI / avalanche / 4993189103

pending completion
4993189103

Pull #1370

github

Unknown Committer
Unknown Commit Message
Pull Request #1370: Add base elements to support distributed comms. Add supports_distributed plugin flag.

258 of 822 new or added lines in 27 files covered. (31.39%)

80 existing lines in 5 files now uncovered.

15585 of 21651 relevant lines covered (71.98%)

2.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.51
/avalanche/benchmarks/generators/benchmark_generators.py
1
################################################################################
2
# Copyright (c) 2021 ContinualAI.                                              #
3
# Copyrights licensed under the MIT License.                                   #
4
# See the accompanying LICENSE file for terms.                                 #
5
#                                                                              #
6
# Date: 16-04-2021                                                             #
7
# Author(s): Lorenzo Pellegrini                                                #
8
# E-mail: contact@continualai.org                                              #
9
# Website: avalanche.continualai.org                                           #
10
################################################################################
11

12
""" In this module the high-level benchmark generators are listed. They are
4✔
13
based on the methods already implemented in the "scenario" module. For the
14
specific generators we have: "New Classes" (NC) and "New Instances" (NI); For
15
the generic ones: filelist_benchmark, tensors_benchmark, dataset_benchmark
16
and paths_benchmark.
17
"""
18
from itertools import tee
4✔
19
from typing import (
4✔
20
    Sequence,
21
    Optional,
22
    Dict,
23
    TypeVar,
24
    Union,
25
    Any,
26
    List,
27
    Callable,
28
    Set,
29
    Tuple,
30
    Iterable,
31
    Generator,
32
)
33

34
import torch
4✔
35
from avalanche.benchmarks.scenarios.classification_scenario import \
4✔
36
    ClassificationScenario
37

38
from avalanche.benchmarks.scenarios.dataset_scenario import (
4✔
39
    DatasetScenario,
40
    DatasetStream,
41
    FactoryBasedStream,
42
    StreamDef,
43
    TStreamsUserDict,
44
)
45
from avalanche.benchmarks.scenarios.detection_scenario import DetectionScenario
4✔
46
from avalanche.benchmarks.scenarios.generic_benchmark_creation import *
4✔
47
from avalanche.benchmarks.scenarios import (
4✔
48
    StreamUserDef,
49
)
50
from avalanche.benchmarks.scenarios.generic_scenario import (
4✔
51
    CLStream,
52
    DatasetExperience,
53
    SizedCLStream,
54
)
55
from avalanche.benchmarks.scenarios.lazy_dataset_sequence import (
4✔
56
    LazyDatasetSequence,
57
)
58
from avalanche.benchmarks.scenarios.new_classes.nc_scenario import NCScenario
4✔
59
from avalanche.benchmarks.scenarios.new_instances.ni_scenario import NIScenario
4✔
60
from avalanche.benchmarks.utils.classification_dataset import (
4✔
61
    SupervisedClassificationDataset,
62
    SupportedDataset,
63
    as_supervised_classification_dataset,
64
    make_classification_dataset,
65
    concat_classification_datasets_sequentially
66
)
67
from avalanche.benchmarks.utils.data import AvalancheDataset
4✔
68

69

70
TDatasetScenario = TypeVar(
4✔
71
    'TDatasetScenario',
72
    bound='DatasetScenario')
73
TCLStream = TypeVar(
4✔
74
    'TCLStream',
75
    bound='CLStream')
76
TSizedCLStream = TypeVar(
4✔
77
    'TSizedCLStream',
78
    bound='SizedCLStream')
79
TDatasetExperience = TypeVar(
4✔
80
    'TDatasetExperience',
81
    bound='DatasetExperience')
82
TCLDataset = TypeVar(
4✔
83
    'TCLDataset',
84
    bound='AvalancheDataset')
85

86

87
def nc_benchmark(
4✔
88
    train_dataset: Union[Sequence[SupportedDataset], SupportedDataset],
89
    test_dataset: Union[Sequence[SupportedDataset], SupportedDataset],
90
    n_experiences: int,
91
    task_labels: bool,
92
    *,
93
    shuffle: bool = True,
94
    seed: Optional[int] = None,
95
    fixed_class_order: Optional[Sequence[int]] = None,
96
    per_exp_classes: Optional[Dict[int, int]] = None,
97
    class_ids_from_zero_from_first_exp: bool = False,
98
    class_ids_from_zero_in_each_exp: bool = False,
99
    one_dataset_per_exp: bool = False,
100
    train_transform=None,
101
    eval_transform=None,
102
    reproducibility_data: Optional[Dict[str, Any]] = None
103
) -> NCScenario:
104
    """
105
    This is the high-level benchmark instances generator for the
106
    "New Classes" (NC) case. Given a sequence of train and test datasets creates
107
    the continual stream of data as a series of experiences. Each experience
108
    will contain all the instances belonging to a certain set of classes and a
109
    class won't be assigned to more than one experience.
110

111
    This is the reference helper function for creating instances of Class- or
112
    Task-Incremental benchmarks.
113

114
    The ``task_labels`` parameter determines if each incremental experience has
115
    an increasing task label or if, at the contrary, a default task label 0
116
    has to be assigned to all experiences. This can be useful when
117
    differentiating between Single-Incremental-Task and Multi-Task scenarios.
118

119
    There are other important parameters that can be specified in order to tweak
120
    the behaviour of the resulting benchmark. Please take a few minutes to read
121
    and understand them as they may save you a lot of work.
122

123
    This generator features a integrated reproducibility mechanism that allows
124
    the user to store and later re-load a benchmark. For more info see the
125
    ``reproducibility_data`` parameter.
126

127
    :param train_dataset: A list of training datasets, or a single dataset.
128
    :param test_dataset: A list of test datasets, or a single test dataset.
129
    :param n_experiences: The number of incremental experience. This is not used
130
        when using multiple train/test datasets with the ``one_dataset_per_exp``
131
        parameter set to True.
132
    :param task_labels: If True, each experience will have an ascending task
133
            label. If False, the task label will be 0 for all the experiences.
134
    :param shuffle: If True, the class (or experience) order will be shuffled.
135
        Defaults to True.
136
    :param seed: If ``shuffle`` is True and seed is not None, the class (or
137
        experience) order will be shuffled according to the seed. When None, the
138
        current PyTorch random number generator state will be used. Defaults to
139
        None.
140
    :param fixed_class_order: If not None, the class order to use (overrides
141
        the shuffle argument). Very useful for enhancing reproducibility.
142
        Defaults to None.
143
    :param per_exp_classes: Is not None, a dictionary whose keys are
144
        (0-indexed) experience IDs and their values are the number of classes
145
        to include in the respective experiences. The dictionary doesn't
146
        have to contain a key for each experience! All the remaining experiences
147
        will contain an equal amount of the remaining classes. The
148
        remaining number of classes must be divisible without remainder
149
        by the remaining number of experiences. For instance,
150
        if you want to include 50 classes in the first experience
151
        while equally distributing remaining classes across remaining
152
        experiences, just pass the "{0: 50}" dictionary as the
153
        per_experience_classes parameter. Defaults to None.
154
    :param class_ids_from_zero_from_first_exp: If True, original class IDs
155
        will be remapped so that they will appear as having an ascending
156
        order. For instance, if the resulting class order after shuffling
157
        (or defined by fixed_class_order) is [23, 34, 11, 7, 6, ...] and
158
        class_ids_from_zero_from_first_exp is True, then all the patterns
159
        belonging to class 23 will appear as belonging to class "0",
160
        class "34" will be mapped to "1", class "11" to "2" and so on.
161
        This is very useful when drawing confusion matrices and when dealing
162
        with algorithms with dynamic head expansion. Defaults to False.
163
        Mutually exclusive with the ``class_ids_from_zero_in_each_exp``
164
        parameter.
165
    :param class_ids_from_zero_in_each_exp: If True, original class IDs
166
        will be mapped to range [0, n_classes_in_exp) for each experience.
167
        Defaults to False. Mutually exclusive with the
168
        ``class_ids_from_zero_from_first_exp`` parameter.
169
    :param one_dataset_per_exp: available only when multiple train-test
170
        datasets are provided. If True, each dataset will be treated as a
171
        experience. Mutually exclusive with the ``per_experience_classes`` and
172
        ``fixed_class_order`` parameters. Overrides the ``n_experiences``
173
        parameter. Defaults to False.
174
    :param train_transform: The transformation to apply to the training data,
175
        e.g. a random crop, a normalization or a concatenation of different
176
        transformations (see torchvision.transform documentation for a
177
        comprehensive list of possible transformations). Defaults to None.
178
    :param eval_transform: The transformation to apply to the test data,
179
        e.g. a random crop, a normalization or a concatenation of different
180
        transformations (see torchvision.transform documentation for a
181
        comprehensive list of possible transformations). Defaults to None.
182
    :param reproducibility_data: If not None, overrides all the other
183
        benchmark definition options. This is usually a dictionary containing
184
        data used to reproduce a specific experiment. One can use the
185
        ``get_reproducibility_data`` method to get (and even distribute)
186
        the experiment setup so that it can be loaded by passing it as this
187
        parameter. In this way one can be sure that the same specific
188
        experimental setup is being used (for reproducibility purposes).
189
        Beware that, in order to reproduce an experiment, the same train and
190
        test datasets must be used. Defaults to None.
191

192
    :return: A properly initialized :class:`NCScenario` instance.
193
    """
194

195
    if class_ids_from_zero_from_first_exp and class_ids_from_zero_in_each_exp:
4✔
196
        raise ValueError(
4✔
197
            "Invalid mutually exclusive options "
198
            "class_ids_from_zero_from_first_exp and "
199
            "classes_ids_from_zero_in_each_exp set at the "
200
            "same time"
201
        )
202

203
    if isinstance(train_dataset, (list, tuple)):
4✔
204
        # Multi-dataset setting
205

206
        if not isinstance(test_dataset, (list, tuple)):
4✔
UNCOV
207
            raise ValueError(
×
208
                "If a list is passed for train_dataset, "
209
                "then test_dataset must be a list, too."
210
            )
211

212
        if len(train_dataset) != len(test_dataset):
4✔
UNCOV
213
            raise ValueError(
×
214
                "Train/test dataset lists must contain the "
215
                "exact same number of datasets"
216
            )
217

218
        if per_exp_classes and one_dataset_per_exp:
4✔
UNCOV
219
            raise ValueError(
×
220
                "Both per_experience_classes and one_dataset_per_exp are"
221
                "used, but those options are mutually exclusive"
222
            )
223

224
        if fixed_class_order and one_dataset_per_exp:
4✔
UNCOV
225
            raise ValueError(
×
226
                "Both fixed_class_order and one_dataset_per_exp are"
227
                "used, but those options are mutually exclusive"
228
            )
229

230
        train_dataset_sup = list(
4✔
231
            map(as_supervised_classification_dataset, train_dataset)
232
        )
233
        test_dataset_sup = list(
4✔
234
            map(as_supervised_classification_dataset, test_dataset)
235
        )
236
        
237
        seq_train_dataset, seq_test_dataset, mapping = \
4✔
238
            concat_classification_datasets_sequentially(
239
                train_dataset_sup, test_dataset_sup
240
            )
241

242
        if one_dataset_per_exp:
4✔
243
            # If one_dataset_per_exp is True, each dataset will be treated as
244
            # a experience. In this benchmark, shuffle refers to the experience
245
            # order, not to the class one.
246
            (
4✔
247
                fixed_class_order,
248
                per_exp_classes,
249
            ) = _one_dataset_per_exp_class_order(mapping, shuffle, seed)
250

251
            # We pass a fixed_class_order to the NCGenericScenario
252
            # constructor, so we don't need shuffling.
253
            shuffle = False
4✔
254
            seed = None
4✔
255

256
            # Overrides n_experiences (and per_experience_classes, already done)
257
            n_experiences = len(train_dataset)
4✔
258
    else:
259
        seq_train_dataset = as_supervised_classification_dataset(train_dataset)
4✔
260
        seq_test_dataset = as_supervised_classification_dataset(test_dataset)
4✔
261

262
    transform_groups = dict(
4✔
263
        train=(train_transform, None), eval=(eval_transform, None)
264
    )
265

266
    # Set transformation groups
267
    final_train_dataset = as_supervised_classification_dataset(
4✔
268
        seq_train_dataset,
269
        transform_groups=transform_groups,
270
        initial_transform_group="train",
271
    )
272

273
    final_test_dataset = as_supervised_classification_dataset(
4✔
274
        seq_test_dataset,
275
        transform_groups=transform_groups,
276
        initial_transform_group="eval",
277
    )
278

279
    return NCScenario(
4✔
280
        train_dataset=final_train_dataset,
281
        test_dataset=final_test_dataset,
282
        n_experiences=n_experiences,
283
        task_labels=task_labels,
284
        shuffle=shuffle,
285
        seed=seed,
286
        fixed_class_order=fixed_class_order,
287
        per_experience_classes=per_exp_classes,
288
        class_ids_from_zero_from_first_exp=class_ids_from_zero_from_first_exp,
289
        class_ids_from_zero_in_each_exp=class_ids_from_zero_in_each_exp,
290
        reproducibility_data=reproducibility_data
291
    )
292

293

294
def ni_benchmark(
4✔
295
    train_dataset: Union[Sequence[SupportedDataset], SupportedDataset],
296
    test_dataset: Union[Sequence[SupportedDataset], SupportedDataset],
297
    n_experiences: int,
298
    *,
299
    task_labels: bool = False,
300
    shuffle: bool = True,
301
    seed: Optional[int] = None,
302
    balance_experiences: bool = False,
303
    min_class_patterns_in_exp: int = 0,
304
    fixed_exp_assignment: Optional[Sequence[Sequence[int]]] = None,
305
    train_transform=None,
306
    eval_transform=None,
307
    reproducibility_data: Optional[Dict[str, Any]] = None,
308
) -> NIScenario:
309
    """
310
    This is the high-level benchmark instances generator for the
311
    "New Instances" (NI) case. Given a sequence of train and test datasets
312
    creates the continual stream of data as a series of experiences.
313

314
    This is the reference helper function for creating instances of
315
    Domain-Incremental benchmarks.
316

317
    The ``task_labels`` parameter determines if each incremental experience has
318
    an increasing task label or if, at the contrary, a default task label 0
319
    has to be assigned to all experiences. This can be useful when
320
    differentiating between Single-Incremental-Task and Multi-Task scenarios.
321

322
    There are other important parameters that can be specified in order to tweak
323
    the behaviour of the resulting benchmark. Please take a few minutes to read
324
    and understand them as they may save you a lot of work.
325

326
    This generator features an integrated reproducibility mechanism that allows
327
    the user to store and later re-load a benchmark. For more info see the
328
    ``reproducibility_data`` parameter.
329

330
    :param train_dataset: A list of training datasets, or a single dataset.
331
    :param test_dataset: A list of test datasets, or a single test dataset.
332
    :param n_experiences: The number of experiences.
333
    :param task_labels: If True, each experience will have an ascending task
334
            label. If False, the task label will be 0 for all the experiences.
335
    :param shuffle: If True, patterns order will be shuffled.
336
    :param seed: A valid int used to initialize the random number generator.
337
        Can be None.
338
    :param balance_experiences: If True, pattern of each class will be equally
339
        spread across all experiences. If False, patterns will be assigned to
340
        experiences in a complete random way. Defaults to False.
341
    :param min_class_patterns_in_exp: The minimum amount of patterns of
342
        every class that must be assigned to every experience. Compatible with
343
        the ``balance_experiences`` parameter. An exception will be raised if
344
        this constraint can't be satisfied. Defaults to 0.
345
    :param fixed_exp_assignment: If not None, the pattern assignment
346
        to use. It must be a list with an entry for each experience. Each entry
347
        is a list that contains the indexes of patterns belonging to that
348
        experience. Overrides the ``shuffle``, ``balance_experiences`` and
349
        ``min_class_patterns_in_exp`` parameters.
350
    :param train_transform: The transformation to apply to the training data,
351
        e.g. a random crop, a normalization or a concatenation of different
352
        transformations (see torchvision.transform documentation for a
353
        comprehensive list of possible transformations). Defaults to None.
354
    :param eval_transform: The transformation to apply to the test data,
355
        e.g. a random crop, a normalization or a concatenation of different
356
        transformations (see torchvision.transform documentation for a
357
        comprehensive list of possible transformations). Defaults to None.
358
    :param reproducibility_data: If not None, overrides all the other
359
        benchmark definition options, including ``fixed_exp_assignment``.
360
        This is usually a dictionary containing data used to
361
        reproduce a specific experiment. One can use the
362
        ``get_reproducibility_data`` method to get (and even distribute)
363
        the experiment setup so that it can be loaded by passing it as this
364
        parameter. In this way one can be sure that the same specific
365
        experimental setup is being used (for reproducibility purposes).
366
        Beware that, in order to reproduce an experiment, the same train and
367
        test datasets must be used. Defaults to None.
368

369
    :return: A properly initialized :class:`NIScenario` instance.
370
    """
371
    
372
    seq_train_dataset, seq_test_dataset = train_dataset, test_dataset
4✔
373
    if isinstance(train_dataset, (list, tuple)):
4✔
374
        if not isinstance(test_dataset, (list, tuple)):
4✔
UNCOV
375
            raise ValueError(
×
376
                "If a list is passed for train_dataset, "
377
                "then test_dataset must be a list, too."
378
            )
379
        
380
        if len(train_dataset) != len(test_dataset):
4✔
UNCOV
381
            raise ValueError(
×
382
                "Train/test dataset lists must contain the "
383
                "exact same number of datasets"
384
            )
385

386
        train_dataset_sup = list(
4✔
387
            map(as_supervised_classification_dataset, train_dataset)
388
        )
389
        test_dataset_sup = list(
4✔
390
            map(as_supervised_classification_dataset, test_dataset)
391
        )
392

393
        seq_train_dataset, seq_test_dataset, _ = \
4✔
394
            concat_classification_datasets_sequentially(
395
                train_dataset_sup, test_dataset_sup
396
            )
397
    else:
398
        seq_train_dataset = as_supervised_classification_dataset(train_dataset)
4✔
399
        seq_test_dataset = as_supervised_classification_dataset(test_dataset)
4✔
400

401
    transform_groups = dict(
4✔
402
        train=(train_transform, None), eval=(eval_transform, None)
403
    )
404

405
    # Set transformation groups
406
    final_train_dataset = make_classification_dataset(
4✔
407
        seq_train_dataset,
408
        transform_groups=transform_groups,
409
        initial_transform_group="train",
410
    )
411

412
    final_test_dataset = make_classification_dataset(
4✔
413
        seq_test_dataset,
414
        transform_groups=transform_groups,
415
        initial_transform_group="eval",
416
    )
417

418
    return NIScenario(
4✔
419
        train_dataset=final_train_dataset,
420
        test_dataset=final_test_dataset,
421
        n_experiences=n_experiences,
422
        task_labels=task_labels,
423
        shuffle=shuffle,
424
        seed=seed,
425
        balance_experiences=balance_experiences,
426
        min_class_patterns_in_exp=min_class_patterns_in_exp,
427
        fixed_exp_assignment=fixed_exp_assignment,
428
        reproducibility_data=reproducibility_data
429
    )
430

431

432
# Here we define some high-level APIs an alias of their mid-level counterparts.
433
# This was done mainly because the implementation for the mid-level API is now
434
# quite stable and not particularly complex.
435
dataset_benchmark = create_multi_dataset_generic_benchmark
4✔
436
filelist_benchmark = create_generic_benchmark_from_filelists
4✔
437
paths_benchmark = create_generic_benchmark_from_paths
4✔
438
tensors_benchmark = create_generic_benchmark_from_tensor_lists
4✔
439
lazy_benchmark = create_lazy_generic_benchmark
4✔
440

441

442
def _one_dataset_per_exp_class_order(
4✔
443
    class_list_per_exp: Sequence[Sequence[int]],
444
    shuffle: bool,
445
    seed: Optional[int],
446
) -> Tuple[List[int], Dict[int, int]]:
447
    """
448
    Utility function that shuffles the class order by keeping classes from the
449
    same experience together. Each experience is defined by a different entry in
450
    the class_list_per_exp parameter.
451

452
    :param class_list_per_exp: A list of class lists, one for each experience
453
    :param shuffle: If True, the experience order will be shuffled. If False,
454
        this function will return the concatenation of lists from the
455
        class_list_per_exp parameter.
456
    :param seed: If not None, an integer used to initialize the random
457
        number generator.
458

459
    :returns: A class order that keeps class IDs from the same experience
460
        together (adjacent).
461
    """
462
    dataset_order = list(range(len(class_list_per_exp)))
4✔
463
    if shuffle:
4✔
464
        if seed is not None:
4✔
465
            torch.random.manual_seed(seed)
4✔
466
        dataset_order = torch.as_tensor(dataset_order)[
4✔
467
            torch.randperm(len(dataset_order))
468
        ].tolist()
469
    fixed_class_order: List[int] = []
4✔
470
    classes_per_exp: Dict[int, int] = {}
4✔
471
    for dataset_position, dataset_idx in enumerate(dataset_order):
4✔
472
        fixed_class_order.extend(class_list_per_exp[dataset_idx])
4✔
473
        classes_per_exp[dataset_position] = len(class_list_per_exp[dataset_idx])
4✔
474
    return fixed_class_order, classes_per_exp
4✔
475

476

477
def fixed_size_experience_split_strategy(
4✔
478
    experience_size: int,
479
    shuffle: bool,
480
    drop_last: bool,
481
    experience: DatasetExperience[TCLDataset]
482
) -> Sequence[TCLDataset]:
483
    """
484
    The default splitting strategy used by :func:`data_incremental_benchmark`.
485

486
    This splitting strategy simply splits the experience in smaller experiences
487
    of size `experience_size`.
488

489
    When taking inspiration for your custom splitting strategy, please consider
490
    that all parameters preceding `experience` are filled by
491
    :func:`data_incremental_benchmark` by using `partial` from the `functools`
492
    standard library. A custom splitting strategy must have only a single
493
    parameter: the experience. Consider wrapping your custom splitting strategy
494
    with `partial` if more parameters are needed.
495

496
    Also consider that the stream name of the experience can be obtained by
497
    using `experience.origin_stream.name`.
498

499
    :param experience_size: The experience size (number of instances).
500
    :param shuffle: If True, instances will be shuffled before splitting.
501
    :param drop_last: If True, the last mini-experience will be dropped if
502
        not of size `experience_size`
503
    :param experience: The experience to split.
504
    :return: The list of datasets that will be used to create the
505
        mini-experiences.
506
    """
507

508
    exp_dataset = experience.dataset
4✔
509
    exp_indices = list(range(len(exp_dataset)))
4✔
510

511
    result_datasets = []
4✔
512

513
    if shuffle:
4✔
UNCOV
514
        exp_indices = torch.as_tensor(exp_indices)[
×
515
            torch.randperm(len(exp_indices))
516
        ].tolist()
517

518
    init_idx = 0
4✔
519
    while init_idx < len(exp_indices):
4✔
520
        final_idx = init_idx + experience_size  # Exclusive
4✔
521
        if final_idx > len(exp_indices):
4✔
522
            if drop_last:
4✔
UNCOV
523
                break
×
524

525
            final_idx = len(exp_indices)
4✔
526

527
        result_datasets.append(
4✔
528
            exp_dataset.subset(exp_indices[init_idx:final_idx])
529
        )
530
        init_idx = final_idx
4✔
531

532
    return result_datasets
4✔
533

534

535
TDatasetStream = TypeVar(
4✔
536
    'TDatasetStream',
537
    bound='DatasetStream'
538
)
539

540

541
def _make_plain_experience(
4✔
542
    stream: DatasetStream[DatasetExperience[TCLDataset]],
543
    experience_idx: int
544
) -> DatasetExperience[TCLDataset]:
UNCOV
545
    dataset = stream.benchmark.stream_definitions[
×
546
        stream.name
547
    ].exps_data[experience_idx]
548

UNCOV
549
    return DatasetExperience(
×
550
        current_experience=experience_idx,
551
        origin_stream=stream,
552
        benchmark=stream.benchmark,
553
        dataset=dataset
554
    )
555

556

557
def _smart_benchmark_factory(
4✔
558
    original_benchmark: DatasetScenario,
559
    new_streams_definitions: TStreamsUserDict,
560
    complete_test_set_only: bool
561
) -> DatasetScenario:
562
    
563
    if isinstance(original_benchmark, ClassificationScenario):
4✔
564
        return ClassificationScenario(
4✔
565
            stream_definitions=new_streams_definitions,
566
            complete_test_set_only=complete_test_set_only
567
        )
UNCOV
568
    elif isinstance(original_benchmark, DetectionScenario):
×
UNCOV
569
        return DetectionScenario(
×
570
            stream_definitions=new_streams_definitions,
571
            complete_test_set_only=complete_test_set_only
572
        )
573
    else:
574
        # Generic scenario
UNCOV
575
        return DatasetScenario(
×
576
            stream_definitions=new_streams_definitions,
577
            complete_test_set_only=complete_test_set_only,
578
            stream_factory=FactoryBasedStream,
579
            experience_factory=_make_plain_experience,
580
        )
581

582

583
def data_incremental_benchmark(
4✔
584
    benchmark_instance: DatasetScenario[TDatasetStream,
585
                                        TDatasetExperience,
586
                                        TCLDataset],
587
    experience_size: int,
588
    shuffle: bool = False,
589
    drop_last: bool = False,
590
    split_streams: Sequence[str] = ("train",),
591
    custom_split_strategy: Optional[Callable[
592
        [DatasetExperience[TCLDataset]],
593
        Sequence[TCLDataset]
594
    ]] = None,
595
    *,
596
    benchmark_factory: Optional[Callable[
597
        [
598
            DatasetScenario[TDatasetStream,
599
                            TDatasetExperience,
600
                            TCLDataset],
601
            TStreamsUserDict,
602
            bool
603
        ], DatasetScenario[
604
            DatasetStream[DatasetExperience[TCLDataset]],
605
            DatasetExperience[TCLDataset],
606
            TCLDataset]
607
        ]
608
    ] = _smart_benchmark_factory,
609
    experience_factory: Optional[Callable[
610
        [DatasetStream[DatasetExperience[TCLDataset]], int], 
611
        DatasetExperience[TCLDataset]
612
    ]] = _make_plain_experience,
613
) -> DatasetScenario[
614
        DatasetStream[DatasetExperience[TCLDataset]],
615
        DatasetExperience[TCLDataset],
616
        TCLDataset]:
617
    """
618
    High-level benchmark generator for a Data Incremental setup.
619

620
    This generator accepts an existing benchmark instance and returns a version
621
    of it in which experiences have been split in order to produce a
622
    Data Incremental stream.
623

624
    In its base form this generator will split train experiences in experiences
625
    of a fixed, configurable, size. The split can be also performed on other
626
    streams (like the test one) if needed.
627

628
    The `custom_split_strategy` parameter can be used if a more specific
629
    splitting is required.
630

631
    Beware that experience splitting is NOT executed in a lazy way. This
632
    means that the splitting process takes place immediately. Consider
633
    optimizing the split process for speed when using a custom splitting
634
    strategy.
635

636
    Please note that each mini-experience will have a task labels field
637
    equal to the one of the originating experience.
638

639
    The `complete_test_set_only` field of the resulting benchmark instance
640
    will be `True` only if the same field of original benchmark instance is
641
    `True` and if the resulting test stream contains exactly one experience.
642

643
    :param benchmark_instance: The benchmark to split.
644
    :param experience_size: The size of the experience, as an int. Ignored
645
        if `custom_split_strategy` is used.
646
    :param shuffle: If True, experiences will be split by first shuffling
647
        instances in each experience. This will use the default PyTorch
648
        random number generator at its current state. Defaults to False.
649
        Ignored if `custom_split_strategy` is used.
650
    :param drop_last: If True, if the last experience doesn't contain
651
        `experience_size` instances, then the last experience will be dropped.
652
        Defaults to False. Ignored if `custom_split_strategy` is used.
653
    :param split_streams: The list of streams to split. By default only the
654
        "train" stream will be split.
655
    :param custom_split_strategy: A function that implements a custom splitting
656
        strategy. The function must accept an experience and return a list
657
        of datasets each describing an experience. Defaults to None, which means
658
        that the standard splitting strategy will be used (which creates
659
        experiences of size `experience_size`).
660
        A good starting to understand the mechanism is to look at the
661
        implementation of the standard splitting function
662
        :func:`fixed_size_experience_split_strategy`.
663
    :param benchmark_factory: The scenario factory. Defaults to 
664
        `_smart_experience_factory`, which will try to create a benchmark of the
665
        same class of the originating one. Can be None, in which case a generic
666
        :class:`DatasetScenario` will be used coupled with the factory defined
667
        by the `experience_factory` parameter.
668
    :param experience_factory: The experience factory. Ignored if
669
        `scenario_factory` is not None. Otherwise, defaults to
670
        :class:`DatasetExperience`.
671
    :return: The Data Incremental benchmark instance.
672
    """
673

674
    split_strategy: Callable[
675
        [DatasetExperience[TCLDataset]], 
676
        Sequence[TCLDataset]
677
    ]
678
    if custom_split_strategy is None:
4✔
679
        # functools.partial is a more compact option
680
        # However, MyPy does not understand what a partial is -_-
681
        def fixed_size_experience_split_strategy_wrapper(exp):
4✔
682
            return fixed_size_experience_split_strategy(
4✔
683
                experience_size,
684
                shuffle,
685
                drop_last,
686
                exp
687
            )
688

689
        split_strategy = fixed_size_experience_split_strategy_wrapper
4✔
690
    else:
UNCOV
691
        split_strategy = custom_split_strategy
×
692

693
    stream_definitions: Dict[str, StreamDef[TCLDataset]] = dict(
4✔
694
        benchmark_instance.stream_definitions
695
    )
696

697
    for stream_name in split_streams:
4✔
698
        if stream_name not in stream_definitions:
4✔
UNCOV
699
            raise ValueError(
×
700
                f"Stream {stream_name} could not be found in the "
701
                f"benchmark instance"
702
            )
703

704
        stream: TDatasetStream = getattr(
4✔
705
            benchmark_instance,
706
            f"{stream_name}_stream")
707

708
        split_datasets: List[TCLDataset] = []
4✔
709
        split_task_labels: List[Set[int]] = []
4✔
710

711
        exp: DatasetExperience[TCLDataset]
712
        for exp in stream:
4✔
713
            experiences = split_strategy(exp)
4✔
714
            split_datasets += experiences
4✔
715
            for _ in range(len(experiences)):
4✔
716
                split_task_labels.append(set(exp.task_labels))
4✔
717

718
        stream_def = StreamDef(
4✔
719
            LazyDatasetSequence(split_datasets, len(split_datasets)),
720
            split_task_labels,
721
            stream_definitions[stream_name].origin_dataset,
722
            False
723
        )
724
        stream_def.exps_data.load_all_experiences()
4✔
725

726
        stream_definitions[stream_name] = stream_def
4✔
727

728
    complete_test_set_only = (
4✔
729
        benchmark_instance.complete_test_set_only
730
        and len(stream_definitions["test"].exps_data) == 1
731
    )
732

733
    if benchmark_factory is not None:
4✔
734
        # Try to create a benchmark of the same class of the
735
        # initial benchmark.
736
        return benchmark_factory(
4✔
737
            benchmark_instance,
738
            stream_definitions,
739
            complete_test_set_only
740
        )
741

742
    # Generic benchmark class
UNCOV
743
    if experience_factory is None:
×
UNCOV
744
        experience_factory = _make_plain_experience
×
745

UNCOV
746
    return DatasetScenario(
×
747
        stream_definitions=stream_definitions,
748
        complete_test_set_only=complete_test_set_only,
749
        stream_factory=FactoryBasedStream,
750
        experience_factory=experience_factory,
751
    )
752

753

754
def random_validation_split_strategy(
4✔
755
    validation_size: Union[int, float],
756
    shuffle: bool,
757
    experience: DatasetExperience[TCLDataset],
758
) -> Tuple[TCLDataset, TCLDataset]:
759
    """
760
    The default splitting strategy used by
761
    :func:`benchmark_with_validation_stream`.
762

763
    This splitting strategy simply splits the experience in two experiences (
764
    train and validation) of size `validation_size`.
765

766
    When taking inspiration for your custom splitting strategy, please consider
767
    that all parameters preceding `experience` are filled by
768
    :func:`benchmark_with_validation_stream` by using `partial` from the
769
    `functools` standard library. A custom splitting strategy must have only
770
    a single parameter: the experience. Consider wrapping your custom
771
    splitting strategy with `partial` if more parameters are needed.
772

773
    Also consider that the stream name of the experience can be obtained by
774
    using `experience.origin_stream.name`.
775

776
    :param validation_size: The number of instances to allocate to the
777
    validation experience. Can be an int value or a float between 0 and 1.
778
    :param shuffle: If True, instances will be shuffled before splitting.
779
        Otherwise, the first instances will be allocated to the training
780
        dataset by leaving the last ones to the validation dataset.
781
    :param experience: The experience to split.
782
    :return: A tuple containing 2 elements: the new training and validation
783
        datasets.
784
    """
785

786
    exp_dataset = experience.dataset
4✔
787
    exp_indices = list(range(len(exp_dataset)))
4✔
788

789
    if shuffle:
4✔
UNCOV
790
        exp_indices = torch.as_tensor(exp_indices)[
×
791
            torch.randperm(len(exp_indices))
792
        ].tolist()
793

794
    if 0.0 <= validation_size <= 1.0:
4✔
795
        valid_n_instances = int(validation_size * len(exp_dataset))
4✔
796
    else:
797
        valid_n_instances = int(validation_size)
4✔
798
        if valid_n_instances > len(exp_dataset):
4✔
UNCOV
799
            raise ValueError(
×
800
                f"Can't create the validation experience: not enough "
801
                f"instances. Required {valid_n_instances}, got only"
802
                f"{len(exp_dataset)}"
803
            )
804

805
    train_n_instances = len(exp_dataset) - valid_n_instances
4✔
806
    result_train_dataset = exp_dataset.subset(exp_indices[:train_n_instances])
4✔
807
    result_valid_dataset = exp_dataset.subset(exp_indices[train_n_instances:])
4✔
808
    return result_train_dataset, result_valid_dataset
4✔
809

810

811
def class_balanced_split_strategy(
4✔
812
    validation_size: Union[int, float],
813
    experience: DatasetExperience[SupervisedClassificationDataset],
814
) -> Tuple[SupervisedClassificationDataset, SupervisedClassificationDataset]:
815
    """Class-balanced train/validation splits.
816

817
    This splitting strategy splits `experience` into two experiences
818
    (train and validation) of size `validation_size` using a class-balanced
819
    split. Sample of each class are chosen randomly.
820

821
    You can use this split strategy to split a benchmark with::
822

823
        validation_size = 0.2
824
        foo = lambda exp: class_balanced_split_strategy(validation_size, exp)
825
        bm = benchmark_with_validation_stream(bm, custom_split_strategy=foo)
826

827
    :param validation_size: The percentage of samples to allocate to the
828
        validation experience as a float between 0 and 1.
829
    :param experience: The experience to split.
830
    :return: A tuple containing 2 elements: the new training and validation
831
        datasets.
832
    """
833
    if not isinstance(validation_size, float):
4✔
UNCOV
834
        raise ValueError("validation_size must be an integer")
×
835
    if not 0.0 <= validation_size <= 1.0:
4✔
UNCOV
836
        raise ValueError("validation_size must be a float in [0, 1].")
×
837

838
    exp_dataset = experience.dataset
4✔
839
    if validation_size > len(exp_dataset):
4✔
UNCOV
840
        raise ValueError(
×
841
            f"Can't create the validation experience: not enough "
842
            f"instances. Required {validation_size}, got only"
843
            f"{len(exp_dataset)}"
844
        )
845

846
    exp_indices = list(range(len(exp_dataset)))
4✔
847
    targets_as_tensor = torch.as_tensor(experience.dataset.targets)
4✔
848
    exp_classes: List[int] = targets_as_tensor.unique().tolist()
4✔
849

850
    # shuffle exp_indices
851
    exp_indices = torch.as_tensor(exp_indices)[torch.randperm(len(exp_indices))]
4✔
852
    # shuffle the targets as well
853
    exp_targets = targets_as_tensor[exp_indices]
4✔
854

855
    train_exp_indices = []
4✔
856
    valid_exp_indices = []
4✔
857
    for cid in exp_classes:  # split indices for each class separately.
4✔
858
        c_indices = exp_indices[exp_targets == cid]
4✔
859
        valid_n_instances = int(validation_size * len(c_indices))
4✔
860
        valid_exp_indices.extend(c_indices[:valid_n_instances])
4✔
861
        train_exp_indices.extend(c_indices[valid_n_instances:])
4✔
862

863
    result_train_dataset = exp_dataset.subset(train_exp_indices)
4✔
864
    result_valid_dataset = exp_dataset.subset(valid_exp_indices)
4✔
865
    return result_train_dataset, result_valid_dataset
4✔
866

867

868
def _gen_split(
4✔
869
    split_generator: Iterable[
870
        Tuple[TCLDataset, TCLDataset]
871
    ]
872
) -> Tuple[
873
    Generator[TCLDataset, None, None],
874
    Generator[TCLDataset, None, None],
875
]:
876
    """
877
    Internal utility function to split the train-validation generator
878
    into two distinct generators (one for the train stream and another one
879
    for the valid stream).
880

881
    :param split_generator: The lazy stream generator returning tuples of train
882
        and valid datasets.
883
    :return: Two generators (one for the train, one for the valuid).
884
    """
885

886
    # For more info: https://stackoverflow.com/a/28030261
887
    gen_a, gen_b = tee(split_generator, 2)
4✔
888
    return (a for a, b in gen_a), (b for a, b in gen_b)
4✔
889

890

891
def _lazy_train_val_split(
4✔
892
    split_strategy: Callable[
893
        [DatasetExperience[TCLDataset]],
894
        Tuple[TCLDataset, TCLDataset],
895
    ],
896
    experiences: Iterable[DatasetExperience[TCLDataset]],
897
) -> Generator[
898
    Tuple[TCLDataset, TCLDataset], None, None
899
]:
900
    """
901
    Creates a generator operating around the split strategy and the
902
    experiences stream.
903

904
    :param split_strategy: The strategy used to split each experience in train
905
        and validation datasets.
906
    :return: A generator returning a 2 elements tuple (the train and validation
907
        datasets).
908
    """
909

910
    for new_experience in experiences:
4✔
911
        yield split_strategy(new_experience)
4✔
912

913

914
def benchmark_with_validation_stream(
4✔
915
    benchmark_instance: DatasetScenario[TDatasetStream,
916
                                        TDatasetExperience,
917
                                        TCLDataset],
918
    validation_size: Union[int, float] = 0.5,
919
    shuffle: bool = False,
920
    input_stream: str = "train",
921
    output_stream: str = "valid",
922
    custom_split_strategy: Optional[Callable[
923
        [DatasetExperience[TCLDataset]],
924
        Tuple[TCLDataset, TCLDataset],
925
    ]] = None,
926
    *,
927
    benchmark_factory: Optional[Callable[
928
        [
929
            DatasetScenario[TDatasetStream,
930
                            TDatasetExperience,
931
                            TCLDataset],
932
            TStreamsUserDict,
933
            bool
934
        ], DatasetScenario[
935
                DatasetStream[DatasetExperience[TCLDataset]],
936
                DatasetExperience[TCLDataset],
937
                TCLDataset]]
938
    ] = _smart_benchmark_factory,
939
    experience_factory: Optional[Callable[
940
        [DatasetStream[DatasetExperience[TCLDataset]], int],
941
        DatasetExperience[TCLDataset]
942
    ]] = _make_plain_experience,
943
    lazy_splitting: Optional[bool] = None
944
) -> DatasetScenario[
945
        DatasetStream[DatasetExperience[TCLDataset]],
946
        DatasetExperience[TCLDataset],
947
        TCLDataset]:
948
    """
949
    Helper that can be used to obtain a benchmark with a validation stream.
950

951
    This generator accepts an existing benchmark instance and returns a version
952
    of it in which a validation stream has been added.
953

954
    In its base form this generator will split train experiences to extract
955
    validation experiences of a fixed (by number of instances or relative
956
    size), configurable, size. The split can be also performed on other
957
    streams if needed and the name of the resulting validation stream can
958
    be configured too.
959

960
    Each validation experience will be extracted directly from a single training
961
    experience. Patterns selected for the validation experience will be removed
962
    from the training one.
963

964
    If shuffle is True, the validation stream will be created randomly.
965
    Beware that no kind of class balancing is done.
966

967
    The `custom_split_strategy` parameter can be used if a more specific
968
    splitting is required.
969

970
    Please note that the resulting experiences will have a task labels field
971
    equal to the one of the originating experience.
972

973
    Experience splitting can be executed in a lazy way. This behavior can be
974
    controlled using the `lazy_splitting` parameter. By default, experiences
975
    are split in a lazy way only when the input stream is lazily generated.
976

977
    The default splitting strategy is a random split. A class-balanced split
978
    is also available using `class_balanced_split_strategy`::
979

980
        validation_size = 0.2
981
        foo = lambda exp: class_balanced_split_strategy(validation_size, exp)
982
        bm = benchmark_with_validation_stream(bm, custom_split_strategy=foo)
983

984
    :param benchmark_instance: The benchmark to split.
985
    :param validation_size: The size of the validation experience, as an int
986
        or a float between 0 and 1. Ignored if `custom_split_strategy` is used.
987
    :param shuffle: If True, patterns will be allocated to the validation
988
        stream randomly. This will use the default PyTorch random number
989
        generator at its current state. Defaults to False. Ignored if
990
        `custom_split_strategy` is used. If False, the first instances will be
991
        allocated to the training  dataset by leaving the last ones to the
992
        validation dataset.
993
    :param input_stream: The name of the input stream. Defaults to 'train'.
994
    :param output_stream: The name of the output stream. Defaults to 'valid'.
995
    :param custom_split_strategy: A function that implements a custom splitting
996
        strategy. The function must accept an experience and return a tuple
997
        containing the new train and validation dataset. Defaults to None,
998
        which means that the standard splitting strategy will be used (which
999
        creates experiences according to `validation_size` and `shuffle`).
1000
        A good starting to understand the mechanism is to look at the
1001
        implementation of the standard splitting function
1002
        :func:`random_validation_split_strategy`.
1003
    :param benchmark_factory: The scenario factory. Defaults to 
1004
        `_smart_experience_factory`, which will try to create a benchmark of the
1005
        same class of the originating one. Can be None, in which case a generic
1006
        :class:`DatasetScenario` will be used coupled with the factory defined
1007
        by the `experience_factory` parameter.
1008
    :param experience_factory: The experience factory. Ignored if
1009
        `scenario_factory` is not None. Otherwise, defaults to
1010
        :class:`DatasetExperience`.
1011
    :param lazy_splitting: If True, the stream will be split in a lazy way.
1012
        If False, the stream will be split immediately. Defaults to None, which
1013
        means that the stream will be split in a lazy or non-lazy way depending
1014
        on the laziness of the `input_stream`.
1015
    :return: A benchmark instance in which the validation stream has been added.
1016
    """
1017

1018
    split_strategy: Callable[
1019
        [DatasetExperience[TCLDataset]],
1020
        Tuple[TCLDataset, TCLDataset],
1021
    ]
1022
    if custom_split_strategy is None:
4✔
1023
        # functools.partial is a more compact option
1024
        # However, MyPy does not understand what a partial is -_-
1025
        def random_validation_split_strategy_wrapper(exp):
4✔
1026
            return random_validation_split_strategy(
4✔
1027
                validation_size,
1028
                shuffle,
1029
                exp
1030
            )
1031

1032
        split_strategy = random_validation_split_strategy_wrapper
4✔
1033
    else:
UNCOV
1034
        split_strategy = custom_split_strategy
×
1035

1036
    original_stream_definitions: Dict[str, StreamDef[TCLDataset]] = \
4✔
1037
        benchmark_instance.stream_definitions
1038
    streams = benchmark_instance.streams
4✔
1039

1040
    if input_stream not in streams:
4✔
UNCOV
1041
        raise ValueError(
×
1042
            f"Stream {input_stream} could not be found in the "
1043
            f"benchmark instance"
1044
        )
1045

1046
    if output_stream in streams:
4✔
UNCOV
1047
        raise ValueError(
×
1048
            f"Stream {output_stream} already exists in the "
1049
            f"benchmark instance"
1050
        )
1051

1052
    stream: TDatasetStream = streams[input_stream]
4✔
1053

1054
    if lazy_splitting is None:
4✔
1055
        split_lazily = original_stream_definitions[input_stream].is_lazy
4✔
1056
    else:
1057
        split_lazily = lazy_splitting
4✔
1058

1059
    exps_tasks_labels = list(
4✔
1060
        original_stream_definitions[input_stream].exps_task_labels
1061
    )
1062

1063
    train_exps_source: Union[Iterable[TCLDataset], 
1064
                             Tuple[Iterable[TCLDataset], int]]
1065
    valid_exps_source: Union[Iterable[TCLDataset], 
1066
                             Tuple[Iterable[TCLDataset], int]]
1067
    if not split_lazily:
4✔
1068
        # Classic static splitting
1069
        train_exps_source = []
4✔
1070
        valid_exps_source = []
4✔
1071

1072
        exp: DatasetExperience[TCLDataset]
1073
        for exp in stream:
4✔
1074
            train_exp, valid_exp = split_strategy(exp)
4✔
1075
            train_exps_source.append(train_exp)
4✔
1076
            valid_exps_source.append(valid_exp)
4✔
1077
    else:
1078
        # Lazy splitting (based on a generator)
1079
        split_generator = _lazy_train_val_split(split_strategy, stream)
4✔
1080
        train_exps_gen, valid_exps_gen = _gen_split(split_generator)
4✔
1081
        train_exps_source = (train_exps_gen, len(stream))
4✔
1082
        valid_exps_source = (valid_exps_gen, len(stream))
4✔
1083
    
1084
    stream_definitions: Dict[str, Union[StreamUserDef[TCLDataset], 
4✔
1085
                                        StreamDef[TCLDataset]]] = \
1086
        dict(original_stream_definitions)
1087

1088
    train_stream_def: StreamUserDef[TCLDataset] = StreamUserDef(
4✔
1089
        train_exps_source,
1090
        exps_tasks_labels,
1091
        stream_definitions[input_stream].origin_dataset,
1092
        split_lazily,
1093
    )
1094

1095
    valid_stream_def: StreamUserDef[TCLDataset] = StreamUserDef(
4✔
1096
        valid_exps_source,
1097
        exps_tasks_labels,
1098
        stream_definitions[input_stream].origin_dataset,
1099
        split_lazily,
1100
    )
1101

1102
    stream_definitions[input_stream] = train_stream_def
4✔
1103
    stream_definitions[output_stream] = valid_stream_def
4✔
1104

1105
    complete_test_set_only = benchmark_instance.complete_test_set_only
4✔
1106

1107
    if benchmark_factory is not None:
4✔
1108
        # Try to create a benchmark of the same class of the
1109
        # initial benchmark.
1110
        return benchmark_factory(
4✔
1111
            benchmark_instance,
1112
            stream_definitions,
1113
            complete_test_set_only
1114
        )
1115

1116
    # Generic benchmark class
UNCOV
1117
    if experience_factory is None:
×
UNCOV
1118
        experience_factory = _make_plain_experience
×
1119

UNCOV
1120
    return DatasetScenario(
×
1121
        stream_definitions=stream_definitions,
1122
        complete_test_set_only=complete_test_set_only,
1123
        stream_factory=FactoryBasedStream,
1124
        experience_factory=experience_factory,
1125
    )
1126

1127

1128
__all__ = [
4✔
1129
    "nc_benchmark",
1130
    "ni_benchmark",
1131
    "dataset_benchmark",
1132
    "filelist_benchmark",
1133
    "paths_benchmark",
1134
    "tensors_benchmark",
1135
    "data_incremental_benchmark",
1136
    "benchmark_with_validation_stream",
1137
    "random_validation_split_strategy",
1138
    "class_balanced_split_strategy",
1139
]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc