• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19669

30 Jan 2025 08:57AM UTC coverage: 88.58% (+0.002%) from 88.578%
#19669

Pull #11858

github

web-flow
Merge 29824e318 into 7153ff852
Pull Request #11858: core: updates the backoff range as per the A6 redefinition

33733 of 38082 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.09
/../xds/src/main/java/io/grpc/xds/FaultFilter.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static java.util.concurrent.TimeUnit.NANOSECONDS;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Supplier;
24
import com.google.common.base.Suppliers;
25
import com.google.common.util.concurrent.MoreExecutors;
26
import com.google.protobuf.Any;
27
import com.google.protobuf.InvalidProtocolBufferException;
28
import com.google.protobuf.Message;
29
import com.google.protobuf.util.Durations;
30
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
31
import io.envoyproxy.envoy.type.v3.FractionalPercent;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.Context;
37
import io.grpc.Deadline;
38
import io.grpc.ForwardingClientCall;
39
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
40
import io.grpc.LoadBalancer.PickSubchannelArgs;
41
import io.grpc.Metadata;
42
import io.grpc.MethodDescriptor;
43
import io.grpc.Status;
44
import io.grpc.Status.Code;
45
import io.grpc.internal.DelayedClientCall;
46
import io.grpc.internal.GrpcUtil;
47
import io.grpc.xds.FaultConfig.FaultAbort;
48
import io.grpc.xds.FaultConfig.FaultDelay;
49
import io.grpc.xds.Filter.ClientInterceptorBuilder;
50
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
51
import java.util.Locale;
52
import java.util.concurrent.Executor;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.ScheduledFuture;
55
import java.util.concurrent.TimeUnit;
56
import java.util.concurrent.atomic.AtomicLong;
57
import javax.annotation.Nullable;
58

59
/** HttpFault filter implementation. */
60
final class FaultFilter implements Filter, ClientInterceptorBuilder {
61

62
  static final FaultFilter INSTANCE =
1✔
63
      new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong());
64
  @VisibleForTesting
65
  static final Metadata.Key<String> HEADER_DELAY_KEY =
1✔
66
      Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
67
  @VisibleForTesting
68
  static final Metadata.Key<String> HEADER_DELAY_PERCENTAGE_KEY =
1✔
69
      Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
70
  @VisibleForTesting
71
  static final Metadata.Key<String> HEADER_ABORT_HTTP_STATUS_KEY =
1✔
72
      Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
73
  @VisibleForTesting
74
  static final Metadata.Key<String> HEADER_ABORT_GRPC_STATUS_KEY =
1✔
75
      Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
76
  @VisibleForTesting
77
  static final Metadata.Key<String> HEADER_ABORT_PERCENTAGE_KEY =
1✔
78
      Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
79
  static final String TYPE_URL =
80
      "type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault";
81

82
  private final ThreadSafeRandom random;
83
  private final AtomicLong activeFaultCounter;
84

85
  @VisibleForTesting
86
  FaultFilter(ThreadSafeRandom random, AtomicLong activeFaultCounter) {
1✔
87
    this.random = random;
1✔
88
    this.activeFaultCounter = activeFaultCounter;
1✔
89
  }
1✔
90

91
  @Override
92
  public String[] typeUrls() {
93
    return new String[] { TYPE_URL };
1✔
94
  }
95

96
  @Override
97
  public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
98
    HTTPFault httpFaultProto;
99
    if (!(rawProtoMessage instanceof Any)) {
1✔
100
      return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
×
101
    }
102
    Any anyMessage = (Any) rawProtoMessage;
1✔
103
    try {
104
      httpFaultProto = anyMessage.unpack(HTTPFault.class);
1✔
105
    } catch (InvalidProtocolBufferException e) {
×
106
      return ConfigOrError.fromError("Invalid proto: " + e);
×
107
    }
1✔
108
    return parseHttpFault(httpFaultProto);
1✔
109
  }
110

111
  private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
112
    FaultDelay faultDelay = null;
1✔
113
    FaultAbort faultAbort = null;
1✔
114
    if (httpFault.hasDelay()) {
1✔
115
      faultDelay = parseFaultDelay(httpFault.getDelay());
1✔
116
    }
117
    if (httpFault.hasAbort()) {
1✔
118
      ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
1✔
119
      if (faultAbortOrError.errorDetail != null) {
1✔
120
        return ConfigOrError.fromError(
×
121
            "HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);
122
      }
123
      faultAbort = faultAbortOrError.config;
1✔
124
    }
125
    Integer maxActiveFaults = null;
1✔
126
    if (httpFault.hasMaxActiveFaults()) {
1✔
127
      maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
1✔
128
      if (maxActiveFaults < 0) {
1✔
129
        maxActiveFaults = Integer.MAX_VALUE;
×
130
      }
131
    }
132
    return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
1✔
133
  }
134

135
  private static FaultDelay parseFaultDelay(
136
      io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
137
    FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
1✔
138
    if (faultDelay.hasHeaderDelay()) {
1✔
139
      return FaultDelay.forHeader(percent);
×
140
    }
141
    return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
1✔
142
  }
143

144
  @VisibleForTesting
145
  static ConfigOrError<FaultAbort> parseFaultAbort(
146
      io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
147
    FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
1✔
148
    switch (faultAbort.getErrorTypeCase()) {
1✔
149
      case HEADER_ABORT:
150
        return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
1✔
151
      case HTTP_STATUS:
152
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
153
            GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
1✔
154
      case GRPC_STATUS:
155
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
156
            Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
1✔
157
      case ERRORTYPE_NOT_SET:
158
      default:
159
        return ConfigOrError.fromError(
×
160
            "Unknown error type case: " + faultAbort.getErrorTypeCase());
×
161
    }
162
  }
163

164
  private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
165
    switch (proto.getDenominator()) {
1✔
166
      case HUNDRED:
167
        return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
1✔
168
      case TEN_THOUSAND:
169
        return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
1✔
170
      case MILLION:
171
        return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
1✔
172
      case UNRECOGNIZED:
173
      default:
174
        throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
×
175
    }
176
  }
177

178
  @Override
179
  public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
180
    return parseFilterConfig(rawProtoMessage);
1✔
181
  }
182

183
  @Nullable
184
  @Override
185
  public ClientInterceptor buildClientInterceptor(
186
      FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args,
187
      final ScheduledExecutorService scheduler) {
188
    checkNotNull(config, "config");
1✔
189
    if (overrideConfig != null) {
1✔
190
      config = overrideConfig;
1✔
191
    }
192
    FaultConfig faultConfig = (FaultConfig) config;
1✔
193

194
    final class FaultInjectionInterceptor implements ClientInterceptor {
1✔
195
      @Override
196
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
197
          final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
198
          final Channel next) {
199
        boolean checkFault = false;
1✔
200
        if (faultConfig.maxActiveFaults() == null
1✔
201
            || activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
1✔
202
          checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;
1✔
203
        }
204
        if (!checkFault) {
1✔
205
          return next.newCall(method, callOptions);
1✔
206
        }
207
        final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
1✔
208
          private ClientCall<ReqT, RespT> delegate;
209

210
          @Override
211
          protected ClientCall<ReqT, RespT> delegate() {
212
            return delegate;
1✔
213
          }
214

215
          @Override
216
          public void start(Listener<RespT> listener, Metadata headers) {
217
            Executor callExecutor = callOptions.getExecutor();
1✔
218
            if (callExecutor == null) { // This should never happen in practice because
1✔
219
              // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
220
              // a callExecutor.
221
              // TODO(https://github.com/grpc/grpc-java/issues/7868)
222
              callExecutor = MoreExecutors.directExecutor();
1✔
223
            }
224

225
            Long delayNanos;
226
            Status abortStatus = null;
1✔
227
            if (faultConfig.faultDelay() != null) {
1✔
228
              delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
1✔
229
            } else {
230
              delayNanos = null;
1✔
231
            }
232
            if (faultConfig.faultAbort() != null) {
1✔
233
              abortStatus = getAbortStatusWithDescription(
1✔
234
                  determineFaultAbortStatus(faultConfig.faultAbort(), headers));
1✔
235
            }
236

237
            Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
238
            if (abortStatus != null) {
1✔
239
              callSupplier = Suppliers.ofInstance(
1✔
240
                  new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));
241
            } else {
242
              callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
1✔
243
                @Override
244
                public ClientCall<ReqT, RespT> get() {
245
                  return next.newCall(method, callOptions);
1✔
246
                }
247
              };
248
            }
249
            if (delayNanos == null) {
1✔
250
              delegate = callSupplier.get();
1✔
251
              delegate().start(listener, headers);
1✔
252
              return;
1✔
253
            }
254

255
            delegate = new DelayInjectedCall<>(
1✔
256
                delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
1✔
257

258
            Listener<RespT> finalListener =
1✔
259
                new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
260
                  @Override
261
                  public void onClose(Status status, Metadata trailers) {
262
                    if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
1✔
263
                      // TODO(zdapeng:) check effective deadline locally, and
264
                      //   do the following only if the local deadline is exceeded.
265
                      //   (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
266
                      //   injected delay does not contribute to the error, because the request is
267
                      //   only sent out after the delay. There could be a race between local and
268
                      //   remote, but it is rather rare.)
269
                      String description = String.format(
1✔
270
                          Locale.US,
271
                          "Deadline exceeded after up to %d ns of fault-injected delay",
272
                          delayNanos);
273
                      if (status.getDescription() != null) {
1✔
274
                        description = description + ": " + status.getDescription();
1✔
275
                      }
276
                      status = Status.DEADLINE_EXCEEDED
1✔
277
                          .withDescription(description).withCause(status.getCause());
1✔
278
                      // Replace trailers to prevent mixing sources of status and trailers.
279
                      trailers = new Metadata();
1✔
280
                    }
281
                    delegate().onClose(status, trailers);
1✔
282
                  }
1✔
283
                };
284
            delegate().start(finalListener, headers);
1✔
285
          }
1✔
286
        }
287

288
        return new DeadlineInsightForwardingCall();
1✔
289
      }
290
    }
291

292
    return new FaultInjectionInterceptor();
1✔
293
  }
294

295
  private static Status getAbortStatusWithDescription(Status abortStatus) {
296
    Status finalAbortStatus = null;
1✔
297
    if (abortStatus != null) {
1✔
298
      String abortDesc = "RPC terminated due to fault injection";
1✔
299
      if (abortStatus.getDescription() != null) {
1✔
300
        abortDesc = abortDesc + ": " + abortStatus.getDescription();
1✔
301
      }
302
      finalAbortStatus = abortStatus.withDescription(abortDesc);
1✔
303
    }
304
    return finalAbortStatus;
1✔
305
  }
306

307
  @Nullable
308
  private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
309
    Long delayNanos;
310
    FaultConfig.FractionalPercent fractionalPercent = faultDelay.percent();
1✔
311
    if (faultDelay.headerDelay()) {
1✔
312
      try {
313
        int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY));
1✔
314
        delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
1✔
315
        String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY);
1✔
316
        if (delayPercentageStr != null) {
1✔
317
          int delayPercentage = Integer.parseInt(delayPercentageStr);
1✔
318
          if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) {
1✔
319
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
320
                delayPercentage, fractionalPercent.denominatorType());
1✔
321
          }
322
        }
323
      } catch (NumberFormatException e) {
1✔
324
        return null; // treated as header_delay not applicable
1✔
325
      }
1✔
326
    } else {
327
      delayNanos = faultDelay.delayNanos();
1✔
328
    }
329
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
330
      return null;
1✔
331
    }
332
    return delayNanos;
1✔
333
  }
334

335
  @Nullable
336
  private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) {
337
    Status abortStatus = null;
1✔
338
    FaultConfig.FractionalPercent fractionalPercent = faultAbort.percent();
1✔
339
    if (faultAbort.headerAbort()) {
1✔
340
      try {
341
        String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY);
1✔
342
        if (grpcCodeStr != null) {
1✔
343
          int grpcCode = Integer.parseInt(grpcCodeStr);
1✔
344
          abortStatus = Status.fromCodeValue(grpcCode);
1✔
345
        }
346
        String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY);
1✔
347
        if (httpCodeStr != null) {
1✔
348
          int httpCode = Integer.parseInt(httpCodeStr);
1✔
349
          abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode);
1✔
350
        }
351
        String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY);
1✔
352
        if (abortPercentageStr != null) {
1✔
353
          int abortPercentage =
1✔
354
              Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY));
1✔
355
          if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) {
1✔
356
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
357
                abortPercentage, fractionalPercent.denominatorType());
1✔
358
          }
359
        }
360
      } catch (NumberFormatException e) {
×
361
        return null; // treated as header_abort not applicable
×
362
      }
1✔
363
    } else {
364
      abortStatus = faultAbort.status();
1✔
365
    }
366
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
367
      return null;
1✔
368
    }
369
    return abortStatus;
1✔
370
  }
371

372
  private static int getRatePerMillion(FaultConfig.FractionalPercent percent) {
373
    int numerator = percent.numerator();
1✔
374
    FaultConfig.FractionalPercent.DenominatorType type = percent.denominatorType();
1✔
375
    switch (type) {
1✔
376
      case TEN_THOUSAND:
377
        numerator *= 100;
×
378
        break;
×
379
      case HUNDRED:
380
        numerator *= 10_000;
1✔
381
        break;
1✔
382
      case MILLION:
383
      default:
384
        break;
385
    }
386
    if (numerator > 1_000_000 || numerator < 0) {
1✔
387
      numerator = 1_000_000;
×
388
    }
389
    return numerator;
1✔
390
  }
391

392
  /** A {@link DelayedClientCall} with a fixed delay. */
393
  private final class DelayInjectedCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
394
    final Object lock = new Object();
1✔
395
    ScheduledFuture<?> delayTask;
396
    boolean cancelled;
397

398
    DelayInjectedCall(
399
        long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler,
400
        @Nullable Deadline deadline,
401
        final Supplier<? extends ClientCall<ReqT, RespT>> callSupplier) {
1✔
402
      super(callExecutor, scheduler, deadline);
1✔
403
      activeFaultCounter.incrementAndGet();
1✔
404
      ScheduledFuture<?> task = scheduler.schedule(
1✔
405
          new Runnable() {
1✔
406
            @Override
407
            public void run() {
408
              synchronized (lock) {
1✔
409
                if (!cancelled) {
1✔
410
                  activeFaultCounter.decrementAndGet();
1✔
411
                }
412
              }
1✔
413
              Runnable toRun = setCall(callSupplier.get());
1✔
414
              if (toRun != null) {
1✔
415
                toRun.run();
1✔
416
              }
417
            }
1✔
418
          },
419
          delayNanos,
420
          NANOSECONDS);
421
      synchronized (lock) {
1✔
422
        if (!cancelled) {
1✔
423
          delayTask = task;
1✔
424
          return;
1✔
425
        }
426
      }
×
427
      task.cancel(false);
×
428
    }
×
429

430
    @Override
431
    protected void callCancelled() {
432
      ScheduledFuture<?> savedDelayTask;
433
      synchronized (lock) {
1✔
434
        cancelled = true;
1✔
435
        activeFaultCounter.decrementAndGet();
1✔
436
        savedDelayTask = delayTask;
1✔
437
      }
1✔
438
      if (savedDelayTask != null) {
1✔
439
        savedDelayTask.cancel(false);
1✔
440
      }
441
    }
1✔
442
  }
443

444
  /** An implementation of {@link ClientCall} that fails when started. */
445
  private final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
446
    final Status error;
447
    final Executor callExecutor;
448
    final Context context;
449

450
    FailingClientCall(Status error, Executor callExecutor) {
1✔
451
      this.error = error;
1✔
452
      this.callExecutor = callExecutor;
1✔
453
      this.context = Context.current();
1✔
454
    }
1✔
455

456
    @Override
457
    public void start(final ClientCall.Listener<RespT> listener, Metadata headers) {
458
      activeFaultCounter.incrementAndGet();
1✔
459
      callExecutor.execute(
1✔
460
          new Runnable() {
1✔
461
            @Override
462
            public void run() {
463
              Context previous = context.attach();
1✔
464
              try {
465
                listener.onClose(error, new Metadata());
1✔
466
                activeFaultCounter.decrementAndGet();
1✔
467
              } finally {
468
                context.detach(previous);
1✔
469
              }
470
            }
1✔
471
          });
472
    }
1✔
473

474
    @Override
475
    public void request(int numMessages) {}
×
476

477
    @Override
478
    public void cancel(String message, Throwable cause) {}
×
479

480
    @Override
481
    public void halfClose() {}
×
482

483
    @Override
484
    public void sendMessage(ReqT message) {}
×
485
  }
486
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc