• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bethgelab / foolbox / 8139141456

04 Mar 2024 11:03AM UTC coverage: 37.923% (-60.6%) from 98.477%
8139141456

Pull #722

github

web-flow
Merge 5663238db into 17e0e9b31
Pull Request #722: Fix guide compilation

1344 of 3544 relevant lines covered (37.92%)

0.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.05
/foolbox/attacks/boundary_attack.py
1
from typing import Union, Tuple, Optional, Any
1✔
2
from typing_extensions import Literal
1✔
3
import numpy as np
1✔
4
import eagerpy as ep
1✔
5
import logging
1✔
6

7
from ..devutils import flatten
1✔
8
from ..devutils import atleast_kd
1✔
9

10
from ..types import Bounds
1✔
11

12
from ..models import Model
1✔
13

14
from ..criteria import Criterion
1✔
15

16
from ..distances import l2
1✔
17

18
from ..tensorboard import TensorBoard
1✔
19

20
from .blended_noise import LinearSearchBlendedUniformNoiseAttack
1✔
21

22
from .base import MinimizationAttack
1✔
23
from .base import T
1✔
24
from .base import get_criterion
1✔
25
from .base import get_is_adversarial
1✔
26
from .base import raise_if_kwargs
1✔
27
from .base import verify_input_bounds
1✔
28

29

30
class BoundaryAttack(MinimizationAttack):
1✔
31
    """A powerful adversarial attack that requires neither gradients
32
    nor probabilities.
33

34
    This is the reference implementation for the attack. [#Bren18]_
35

36
    Notes:
37
        Differences to the original reference implementation:
38
        * We do not perform internal operations with float64
39
        * The samples within a batch can currently influence each other a bit
40
        * We don't perform the additional convergence confirmation
41
        * The success rate tracking changed a bit
42
        * Some other changes due to batching and merged loops
43

44
    Args:
45
        init_attack : Attack to use to find a starting points. Defaults to
46
            LinearSearchBlendedUniformNoiseAttack. Only used if starting_points is None.
47
        steps : Maximum number of steps to run. Might converge and stop before that.
48
        spherical_step : Initial step size for the orthogonal (spherical) step.
49
        source_step : Initial step size for the step towards the target.
50
        source_step_convergance : Sets the threshold of the stop criterion:
51
            if source_step becomes smaller than this value during the attack,
52
            the attack has converged and will stop.
53
        step_adaptation : Factor by which the step sizes are multiplied or divided.
54
        tensorboard : The log directory for TensorBoard summaries. If False, TensorBoard
55
            summaries will be disabled (default). If None, the logdir will be
56
            runs/CURRENT_DATETIME_HOSTNAME.
57
        update_stats_every_k :
58

59
    References:
60
        .. [#Bren18] Wieland Brendel (*), Jonas Rauber (*), Matthias Bethge,
61
           "Decision-Based Adversarial Attacks: Reliable Attacks
62
           Against Black-Box Machine Learning Models",
63
           https://arxiv.org/abs/1712.04248
64
    """
65

66
    distance = l2
1✔
67

68
    def __init__(
1✔
69
        self,
70
        init_attack: Optional[MinimizationAttack] = None,
71
        steps: int = 25000,
72
        spherical_step: float = 1e-2,
73
        source_step: float = 1e-2,
74
        source_step_convergance: float = 1e-7,
75
        step_adaptation: float = 1.5,
76
        tensorboard: Union[Literal[False], None, str] = False,
77
        update_stats_every_k: int = 10,
78
    ):
79
        if init_attack is not None and not isinstance(init_attack, MinimizationAttack):
1✔
80
            raise NotImplementedError
81
        self.init_attack = init_attack
1✔
82
        self.steps = steps
1✔
83
        self.spherical_step = spherical_step
1✔
84
        self.source_step = source_step
1✔
85
        self.source_step_convergance = source_step_convergance
1✔
86
        self.step_adaptation = step_adaptation
1✔
87
        self.tensorboard = tensorboard
1✔
88
        self.update_stats_every_k = update_stats_every_k
1✔
89

90
    def run(
1✔
91
        self,
92
        model: Model,
93
        inputs: T,
94
        criterion: Union[Criterion, T],
95
        *,
96
        early_stop: Optional[float] = None,
97
        starting_points: Optional[T] = None,
98
        **kwargs: Any,
99
    ) -> T:
100
        raise_if_kwargs(kwargs)
×
101
        originals, restore_type = ep.astensor_(inputs)
×
102
        del inputs, kwargs
×
103

104
        verify_input_bounds(originals, model)
×
105

106
        criterion = get_criterion(criterion)
×
107
        is_adversarial = get_is_adversarial(criterion, model)
×
108

109
        if starting_points is None:
×
110
            init_attack: MinimizationAttack
111
            if self.init_attack is None:
×
112
                init_attack = LinearSearchBlendedUniformNoiseAttack(steps=50)
×
113
                logging.info(
×
114
                    f"Neither starting_points nor init_attack given. Falling"
115
                    f" back to {init_attack!r} for initialization."
116
                )
117
            else:
118
                init_attack = self.init_attack
×
119
            # TODO: use call and support all types of attacks (once early_stop is
120
            # possible in __call__)
121
            best_advs = init_attack.run(
×
122
                model, originals, criterion, early_stop=early_stop
123
            )
124
        else:
125
            best_advs = ep.astensor(starting_points)
×
126

127
        is_adv = is_adversarial(best_advs)
×
128
        if not is_adv.all():
×
129
            failed = is_adv.logical_not().float32().sum()
×
130
            if starting_points is None:
×
131
                raise ValueError(
×
132
                    f"init_attack failed for {failed} of {len(is_adv)} inputs"
133
                )
134
            else:
135
                raise ValueError(
×
136
                    f"{failed} of {len(is_adv)} starting_points are not adversarial"
137
                )
138
        del starting_points
×
139

140
        tb = TensorBoard(logdir=self.tensorboard)
×
141

142
        N = len(originals)
×
143
        ndim = originals.ndim
×
144
        spherical_steps = ep.ones(originals, N) * self.spherical_step
×
145
        source_steps = ep.ones(originals, N) * self.source_step
×
146

147
        tb.scalar("batchsize", N, 0)
×
148

149
        # create two queues for each sample to track success rates
150
        # (used to update the hyper parameters)
151
        stats_spherical_adversarial = ArrayQueue(maxlen=100, N=N)
×
152
        stats_step_adversarial = ArrayQueue(maxlen=30, N=N)
×
153

154
        bounds = model.bounds
×
155

156
        for step in range(1, self.steps + 1):
×
157
            converged = source_steps < self.source_step_convergance
×
158
            if converged.all():
×
159
                break  # pragma: no cover
160
            converged = atleast_kd(converged, ndim)
×
161

162
            # TODO: performance: ignore those that have converged
163
            # (we could select the non-converged ones, but we currently
164
            # cannot easily invert this in the end using EagerPy)
165

166
            unnormalized_source_directions = originals - best_advs
×
167
            source_norms = ep.norms.l2(flatten(unnormalized_source_directions), axis=-1)
×
168
            source_directions = unnormalized_source_directions / atleast_kd(
×
169
                source_norms, ndim
170
            )
171

172
            # only check spherical candidates every k steps
173
            check_spherical_and_update_stats = step % self.update_stats_every_k == 0
×
174

175
            candidates, spherical_candidates = draw_proposals(
×
176
                bounds,
177
                originals,
178
                best_advs,
179
                unnormalized_source_directions,
180
                source_directions,
181
                source_norms,
182
                spherical_steps,
183
                source_steps,
184
            )
185
            candidates.dtype == originals.dtype
×
186
            spherical_candidates.dtype == spherical_candidates.dtype
×
187

188
            is_adv = is_adversarial(candidates)
×
189

190
            spherical_is_adv: Optional[ep.Tensor]
191
            if check_spherical_and_update_stats:
×
192
                spherical_is_adv = is_adversarial(spherical_candidates)
×
193
                stats_spherical_adversarial.append(spherical_is_adv)
×
194
                # TODO: algorithm: the original implementation ignores those samples
195
                # for which spherical is not adversarial and continues with the
196
                # next iteration -> we estimate different probabilities (conditional vs. unconditional)
197
                # TODO: thoughts: should we always track this because we compute it anyway
198
                stats_step_adversarial.append(is_adv)
×
199
            else:
200
                spherical_is_adv = None
×
201

202
            # in theory, we are closer per construction
203
            # but limited numerical precision might break this
204
            distances = ep.norms.l2(flatten(originals - candidates), axis=-1)
×
205
            closer = distances < source_norms
×
206
            is_best_adv = ep.logical_and(is_adv, closer)
×
207
            is_best_adv = atleast_kd(is_best_adv, ndim)
×
208

209
            cond = converged.logical_not().logical_and(is_best_adv)
×
210
            best_advs = ep.where(cond, candidates, best_advs)
×
211

212
            tb.probability("converged", converged, step)
×
213
            tb.scalar("updated_stats", check_spherical_and_update_stats, step)
×
214
            tb.histogram("norms", source_norms, step)
×
215
            tb.probability("is_adv", is_adv, step)
×
216
            if spherical_is_adv is not None:
×
217
                tb.probability("spherical_is_adv", spherical_is_adv, step)
×
218
            tb.histogram("candidates/distances", distances, step)
×
219
            tb.probability("candidates/closer", closer, step)
×
220
            tb.probability("candidates/is_best_adv", is_best_adv, step)
×
221
            tb.probability("new_best_adv_including_converged", is_best_adv, step)
×
222
            tb.probability("new_best_adv", cond, step)
×
223

224
            if check_spherical_and_update_stats:
×
225
                full = stats_spherical_adversarial.isfull()
×
226
                tb.probability("spherical_stats/full", full, step)
×
227
                if full.any():
×
228
                    probs = stats_spherical_adversarial.mean()
×
229
                    cond1 = ep.logical_and(probs > 0.5, full)
×
230
                    spherical_steps = ep.where(
×
231
                        cond1, spherical_steps * self.step_adaptation, spherical_steps
232
                    )
233
                    source_steps = ep.where(
×
234
                        cond1, source_steps * self.step_adaptation, source_steps
235
                    )
236
                    cond2 = ep.logical_and(probs < 0.2, full)
×
237
                    spherical_steps = ep.where(
×
238
                        cond2, spherical_steps / self.step_adaptation, spherical_steps
239
                    )
240
                    source_steps = ep.where(
×
241
                        cond2, source_steps / self.step_adaptation, source_steps
242
                    )
243
                    stats_spherical_adversarial.clear(ep.logical_or(cond1, cond2))
×
244
                    tb.conditional_mean(
×
245
                        "spherical_stats/isfull/success_rate/mean", probs, full, step
246
                    )
247
                    tb.probability_ratio(
×
248
                        "spherical_stats/isfull/too_linear", cond1, full, step
249
                    )
250
                    tb.probability_ratio(
×
251
                        "spherical_stats/isfull/too_nonlinear", cond2, full, step
252
                    )
253

254
                full = stats_step_adversarial.isfull()
×
255
                tb.probability("step_stats/full", full, step)
×
256
                if full.any():
×
257
                    probs = stats_step_adversarial.mean()
×
258
                    # TODO: algorithm: changed the two values because we are currently tracking p(source_step_sucess)
259
                    # instead of p(source_step_success | spherical_step_sucess) that was tracked before
260
                    cond1 = ep.logical_and(probs > 0.25, full)
×
261
                    source_steps = ep.where(
×
262
                        cond1, source_steps * self.step_adaptation, source_steps
263
                    )
264
                    cond2 = ep.logical_and(probs < 0.1, full)
×
265
                    source_steps = ep.where(
×
266
                        cond2, source_steps / self.step_adaptation, source_steps
267
                    )
268
                    stats_step_adversarial.clear(ep.logical_or(cond1, cond2))
×
269
                    tb.conditional_mean(
×
270
                        "step_stats/isfull/success_rate/mean", probs, full, step
271
                    )
272
                    tb.probability_ratio(
×
273
                        "step_stats/isfull/success_rate_too_high", cond1, full, step
274
                    )
275
                    tb.probability_ratio(
×
276
                        "step_stats/isfull/success_rate_too_low", cond2, full, step
277
                    )
278

279
            tb.histogram("spherical_step", spherical_steps, step)
×
280
            tb.histogram("source_step", source_steps, step)
×
281
        tb.close()
×
282
        return restore_type(best_advs)
×
283

284

285
class ArrayQueue:
1✔
286
    def __init__(self, maxlen: int, N: int):
1✔
287
        # we use NaN as an indicator for missing data
288
        self.data = np.full((maxlen, N), np.nan)
×
289
        self.next = 0
×
290
        # used to infer the correct framework because this class uses NumPy
291
        self.tensor: Optional[ep.Tensor] = None
×
292

293
    @property
1✔
294
    def maxlen(self) -> int:
1✔
295
        return int(self.data.shape[0])
×
296

297
    @property
1✔
298
    def N(self) -> int:
1✔
299
        return int(self.data.shape[1])
×
300

301
    def append(self, x: ep.Tensor) -> None:
1✔
302
        if self.tensor is None:
×
303
            self.tensor = x
×
304
        x = x.numpy()
×
305
        assert x.shape == (self.N,)
×
306
        self.data[self.next] = x
×
307
        self.next = (self.next + 1) % self.maxlen
×
308

309
    def clear(self, dims: ep.Tensor) -> None:
1✔
310
        if self.tensor is None:
×
311
            self.tensor = dims  # pragma: no cover
312
        dims = dims.numpy()
×
313
        assert dims.shape == (self.N,)
×
314
        assert dims.dtype == np.bool_
×
315
        self.data[:, dims] = np.nan
×
316

317
    def mean(self) -> ep.Tensor:
1✔
318
        assert self.tensor is not None
×
319
        result = np.nanmean(self.data, axis=0)
×
320
        return ep.from_numpy(self.tensor, result)
×
321

322
    def isfull(self) -> ep.Tensor:
1✔
323
        assert self.tensor is not None
×
324
        result = ~np.isnan(self.data).any(axis=0)
×
325
        return ep.from_numpy(self.tensor, result)
×
326

327

328
def draw_proposals(
1✔
329
    bounds: Bounds,
330
    originals: ep.Tensor,
331
    perturbed: ep.Tensor,
332
    unnormalized_source_directions: ep.Tensor,
333
    source_directions: ep.Tensor,
334
    source_norms: ep.Tensor,
335
    spherical_steps: ep.Tensor,
336
    source_steps: ep.Tensor,
337
) -> Tuple[ep.Tensor, ep.Tensor]:
338
    # remember the actual shape
339
    shape = originals.shape
×
340
    assert perturbed.shape == shape
×
341
    assert unnormalized_source_directions.shape == shape
×
342
    assert source_directions.shape == shape
×
343

344
    # flatten everything to (batch, size)
345
    originals = flatten(originals)
×
346
    perturbed = flatten(perturbed)
×
347
    unnormalized_source_directions = flatten(unnormalized_source_directions)
×
348
    source_directions = flatten(source_directions)
×
349
    N, D = originals.shape
×
350

351
    assert source_norms.shape == (N,)
×
352
    assert spherical_steps.shape == (N,)
×
353
    assert source_steps.shape == (N,)
×
354

355
    # draw from an iid Gaussian (we can share this across the whole batch)
356
    eta = ep.normal(perturbed, (D, 1))
×
357

358
    # make orthogonal (source_directions are normalized)
359
    eta = eta.T - ep.matmul(source_directions, eta) * source_directions
×
360
    assert eta.shape == (N, D)
×
361

362
    # rescale
363
    norms = ep.norms.l2(eta, axis=-1)
×
364
    assert norms.shape == (N,)
×
365
    eta = eta * atleast_kd(spherical_steps * source_norms / norms, eta.ndim)
×
366

367
    # project on the sphere using Pythagoras
368
    distances = atleast_kd((spherical_steps.square() + 1).sqrt(), eta.ndim)
×
369
    directions = eta - unnormalized_source_directions
×
370
    spherical_candidates = originals + directions / distances
×
371

372
    # clip
373
    min_, max_ = bounds
×
374
    spherical_candidates = spherical_candidates.clip(min_, max_)
×
375

376
    # step towards the original inputs
377
    new_source_directions = originals - spherical_candidates
×
378
    assert new_source_directions.ndim == 2
×
379
    new_source_directions_norms = ep.norms.l2(flatten(new_source_directions), axis=-1)
×
380

381
    # length if spherical_candidates would be exactly on the sphere
382
    lengths = source_steps * source_norms
×
383

384
    # length including correction for numerical deviation from sphere
385
    lengths = lengths + new_source_directions_norms - source_norms
×
386

387
    # make sure the step size is positive
388
    lengths = ep.maximum(lengths, 0)
×
389

390
    # normalize the length
391
    lengths = lengths / new_source_directions_norms
×
392
    lengths = atleast_kd(lengths, new_source_directions.ndim)
×
393

394
    candidates = spherical_candidates + lengths * new_source_directions
×
395

396
    # clip
397
    candidates = candidates.clip(min_, max_)
×
398

399
    # restore shape
400
    candidates = candidates.reshape(shape)
×
401
    spherical_candidates = spherical_candidates.reshape(shape)
×
402
    return candidates, spherical_candidates
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc