• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

ContinualAI / avalanche / 4993189103

pending completion
4993189103

Pull #1370

github

Unknown Committer
Unknown Commit Message
Pull Request #1370: Add base elements to support distributed comms. Add supports_distributed plugin flag.

258 of 822 new or added lines in 27 files covered. (31.39%)

80 existing lines in 5 files now uncovered.

15585 of 21651 relevant lines covered (71.98%)

2.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

42.19
/avalanche/evaluation/metrics/cumulative_accuracies.py
1
################################################################################
2
# Copyright (c) 2021 ContinualAI.                                              #
3
# Copyrights licensed under the MIT License.                                   #
4
# See the accompanying LICENSE file for terms.                                 #
5
#                                                                              #
6
# Date: 30-12-2020                                                             #
7
# Author(s): Lorenzo Pellegrini                                                #
8
# E-mail: contact@continualai.org                                              #
9
# Website: www.continualai.org                                                 #
10
################################################################################
11

12
from collections import defaultdict
4✔
13
from typing import Dict, List, Union, TYPE_CHECKING
4✔
14

15
import numpy as np
4✔
16
import torch
4✔
17
from torch import Tensor
4✔
18

19
from avalanche.benchmarks import OnlineCLExperience
4✔
20
from avalanche.evaluation import GenericPluginMetric, Metric, PluginMetric
4✔
21
from avalanche.evaluation.metrics.mean import Mean
4✔
22
from avalanche.evaluation.metric_utils import (phase_and_task,
4✔
23
                                               stream_type,
24
                                               generic_get_metric_name,
25
                                               default_metric_name_template)
26
from avalanche.evaluation.metric_results import MetricValue
4✔
27

28
if TYPE_CHECKING:
4✔
29
    from avalanche.evaluation.metric_results import MetricResult
×
30
    from avalanche.training.templates import SupervisedTemplate
×
31

32

33
class CumulativeAccuracy(Metric[Dict[int, float]]):
4✔
34
    """
4✔
35
    Metric used by the CumulativeAccuracyPluginMetric,
36
    holds a dictionnary of per-task cumulative accuracies
37
    and updates the cumulative accuracy based on the classes splits
38
    provided for the growing incremental task.
39
    The update is performed as described in the paper
40
    "On the importance of cross-task
41
    features for class-incremental learning"
42
    Soutif et. al, https://arxiv.org/abs/2106.11930
43
    """
44

45
    def __init__(self):
4✔
46
        self._mean_accuracy = defaultdict(lambda: Mean())
4✔
47

48
    @torch.no_grad()
4✔
49
    def update(
4✔
50
        self,
51
        classes_splits,
52
        predicted_y: Tensor,
53
        true_y: Tensor,
54
    ) -> None:
55
        true_y = torch.as_tensor(true_y)
4✔
56
        predicted_y = torch.as_tensor(predicted_y)
4✔
57
        if len(true_y) != len(predicted_y):
4✔
58
            raise ValueError("Size mismatch for true_y " 
×
59
                             "and predicted_y tensors")
60
        for t, classes in classes_splits.items():
4✔
61

62
            # Only compute Accuracy for classes that are in classes set
63
            if len(set(true_y.cpu().numpy()).intersection(classes)) == 0:
4✔
64
                # Here this assumes that true_y is only 
65
                # coming from the same classes split, 
66
                # this is a shortcut
67
                # but sometimes this is not true so we
68
                # do additional filtering later to make sure
69
                continue
×
70
            
71
            idxs = np.where(np.isin(true_y.cpu(), list(classes)))[0]
4✔
72
            y = true_y[idxs]
4✔
73
            logits_exp = predicted_y[idxs, :]
4✔
74

75
            logits_exp = logits_exp[:, list(classes)]
4✔
76
            prediction = torch.argmax(logits_exp, dim=1)
4✔
77

78
            true_positives = float(torch.sum(torch.eq(prediction, y)))
4✔
79
            total_patterns = len(y)
4✔
80
            self._mean_accuracy[t].update(
4✔
81
                true_positives / total_patterns, total_patterns
82
            )
83

84
    def result(self) -> Dict[int, float]:
4✔
85
        """Retrieves the running accuracy.
86

87
        Calling this method will not change the internal state of the metric.
88

89
        :return: The current running accuracy, which is a float value
90
            between 0 and 1.
91
        """
92
        return {t: self._mean_accuracy[t].result() for t in self._mean_accuracy}
4✔
93

94
    def reset(self) -> None:
4✔
95
        """Resets the metric.
96

97
        :return: None.
98
        """
99
        for t in self._mean_accuracy:
4✔
100
            self._mean_accuracy[t].reset()
4✔
101

102

103
class CumulativeAccuracyPluginMetric(
4✔
104
    GenericPluginMetric[Dict[int, float], CumulativeAccuracy]
105
):
106
    def __init__(self, reset_at="stream", emit_at="stream", mode="eval"):
4✔
107
        """
108
        Creates the CumulativeAccuracy plugin metric,
109
        this stores and updates the Cumulative Accuracy metric described in
110
        "On the importance of cross-task
111
        features for class-incremental learning"
112
        Soutif et. al, https://arxiv.org/abs/2106.11930
113
        """
114

115
        self.classes_seen_so_far = set()
×
UNCOV
116
        self.classes_splits = {}
×
UNCOV
117
        super().__init__(CumulativeAccuracy(), 
×
118
                         reset_at=reset_at, 
119
                         emit_at=emit_at, 
120
                         mode=mode)
121

122
    def before_training_exp(self, strategy, **kwargs):
4✔
123
        super().before_training_exp(strategy, **kwargs)
×
124
        if isinstance(strategy.experience, OnlineCLExperience):
×
UNCOV
125
            if strategy.experience.access_task_boundaries:
×
UNCOV
126
                new_classes = set(
×
127
                    strategy.experience.
128
                    origin_experience.
129
                    classes_in_this_experience
130
                )
UNCOV
131
                task_id = (strategy.experience.
×
132
                           origin_experience.
133
                           current_experience)
134
            else:
UNCOV
135
                raise AttributeError(
×
136
                    "Online Scenario has to allow "
137
                    "access to task boundaries for"
138
                    " the Cumulative Accuracy Metric"
139
                    " to be computed"
140
                )
141
        else:
UNCOV
142
            new_classes = set(strategy.experience.classes_in_this_experience)
×
143
            task_id = strategy.experience.current_experience
×
144

UNCOV
145
        self.classes_seen_so_far = self.classes_seen_so_far.union(new_classes)
×
UNCOV
146
        self.classes_splits[task_id] = self.classes_seen_so_far
×
147

148
    def reset(self) -> None:
4✔
UNCOV
149
        self._metric.reset()
×
150

151
    def result(self) -> Dict[int, float]:
4✔
UNCOV
152
        return self._metric.result()
×
153

154
    def update(self, strategy):
4✔
UNCOV
155
        self._metric.update(
×
156
            self.classes_splits,
157
            strategy.mb_output,
158
            strategy.mb_y)
159

160
    def _package_result(self, strategy: "SupervisedTemplate") -> "MetricResult":
4✔
161
        assert strategy.experience is not None
×
162
        metric_value = self.result()
×
UNCOV
163
        plot_x_position = strategy.clock.train_iterations
×
164

165
        phase_name, task_label = phase_and_task(strategy)
×
166
        stream = stream_type(strategy.experience)
×
167

UNCOV
168
        metrics = []
×
UNCOV
169
        for k, v in metric_value.items():
×
UNCOV
170
            metric_name = generic_get_metric_name(
×
171
                default_metric_name_template,
172
                {
173
                    "metric_name": str(self),
174
                    "task_label": None,
175
                    "phase_name": phase_name,
176
                    "experience_id": k,
177
                    "stream_name": stream,
178
                },
179
            )
UNCOV
180
            metrics.append(
×
181
                MetricValue(self, metric_name, v, plot_x_position)
182
            )
UNCOV
183
        return metrics
×
184

185
    def __repr__(self):
4✔
UNCOV
186
        return "CumulativeAccuracy"
×
187

188

189
class CumulativeForgettingPluginMetric(
4✔
190
    GenericPluginMetric[Dict[int, float], CumulativeAccuracy]
191
):
192
    """
4✔
193
    The CumulativeForgetting metric, describing the accuracy loss
194
    detected for a certain experience.
195

196
    This plugin metric, computed separately for each experience,
197
    is the difference between the cumulative accuracy result obtained after
198
    first training on a experience and the accuracy result obtained
199
    on the same experience at the end of successive experiences.
200

201
    This metric is computed during the eval phase only.
202
    """
203

204
    def __init__(self, reset_at="stream", emit_at="stream", mode="eval"):
4✔
205
        """
206
        Creates an instance of the CumulativeForgetting metric.
207
        """
208

UNCOV
209
        self.classes_splits = {}
×
210
        self.classes_seen_so_far = set()
×
211

212
        self.initial = {}
×
UNCOV
213
        self.last = {}
×
214

UNCOV
215
        self.train_task_id = None
×
216

UNCOV
217
        super().__init__(CumulativeAccuracy(), 
×
218
                         reset_at=reset_at, 
219
                         emit_at=emit_at, 
220
                         mode=mode)
221

222
    def before_training_exp(self, strategy, **kwargs):
4✔
UNCOV
223
        super().before_training_exp(strategy, **kwargs)
×
UNCOV
224
        if isinstance(strategy.experience, OnlineCLExperience):
×
UNCOV
225
            if strategy.experience.access_task_boundaries:
×
226
                new_classes = set(
×
227
                    strategy.experience.
228
                    origin_experience.
229
                    classes_in_this_experience
230
                )
UNCOV
231
                task_id = (strategy.experience.
×
232
                           origin_experience.
233
                           current_experience)
234
            else:
UNCOV
235
                raise AttributeError(
×
236
                    "Online Scenario has to allow "
237
                    "access to task boundaries for"
238
                    " the Cumulative Accuracy Metric"
239
                    " to be computed"
240
                )
241
        else:
UNCOV
242
            new_classes = set(strategy.experience.classes_in_this_experience)
×
UNCOV
243
            task_id = strategy.experience.current_experience
×
244

245
        self.classes_seen_so_far = self.classes_seen_so_far.union(new_classes)
×
246
        self.classes_splits[task_id] = self.classes_seen_so_far
×
247

248
        # Update train task id
UNCOV
249
        experience = strategy.experience
×
UNCOV
250
        if isinstance(experience, OnlineCLExperience):
×
251
            self.train_task_id = experience.origin_experience.current_experience
×
252
        else:
UNCOV
253
            self.train_task_id = experience.current_experience
×
254

255
    def reset(self):
4✔
UNCOV
256
        self._metric.reset()
×
257

258
    def result(self) -> Dict[int, float]:
4✔
259
        forgetting = self._compute_forgetting()
×
UNCOV
260
        return forgetting
×
261

262
    def _package_result(self, strategy: "SupervisedTemplate") -> "MetricResult":
4✔
UNCOV
263
        assert strategy.experience is not None
×
264
        metric_value = self.result()
×
265
        plot_x_position = strategy.clock.train_iterations
×
266

UNCOV
267
        phase_name, task_label = phase_and_task(strategy)
×
UNCOV
268
        stream = stream_type(strategy.experience)
×
269

UNCOV
270
        metrics = []
×
UNCOV
271
        for k, v in metric_value.items():
×
UNCOV
272
            metric_name = generic_get_metric_name(
×
273
                default_metric_name_template,
274
                {
275
                    "metric_name": str(self),
276
                    "task_label": None,
277
                    "phase_name": phase_name,
278
                    "experience_id": k,
279
                    "stream_name": stream,
280
                },
281
            )
282
            metrics.append(
×
283
                MetricValue(self, metric_name, v, plot_x_position)
284
            )
UNCOV
285
        return metrics
×
286

287
    def update(self, strategy):
4✔
288
        self._metric.update(
×
289
            self.classes_splits, 
290
            strategy.mb_output, 
291
            strategy.mb_y)
292

293
    def _compute_forgetting(self):
4✔
294
        for t, item in self._metric.result().items():
×
295
            if t not in self.initial:
×
UNCOV
296
                self.initial[t] = item
×
297
            else:
UNCOV
298
                self.last[t] = item
×
299

300
        forgetting = {}
×
UNCOV
301
        for k, v in self.last.items():
×
UNCOV
302
            forgetting[k] = self.initial[k] - self.last[k]
×
303

UNCOV
304
        return forgetting
×
305

306
    def __str__(self):
4✔
UNCOV
307
        return "CumulativeForgetting"
×
308

309

310
__all__ = [
4✔
311
    "CumulativeAccuracyPluginMetric",
312
    "CumulativeForgettingPluginMetric",
313
    "CumulativeAccuracy",
314
]
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc