• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19677

05 Feb 2025 09:05AM CUT coverage: 88.604% (+0.03%) from 88.578%
#19677

Pull #11858

github

web-flow
Merge dad68ffd5 into ea3f644ee
Pull Request #11858: core: updates the backoff range as per the A6 redefinition

33773 of 38117 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.09
/../xds/src/main/java/io/grpc/xds/FaultFilter.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static java.util.concurrent.TimeUnit.NANOSECONDS;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Supplier;
24
import com.google.common.base.Suppliers;
25
import com.google.common.util.concurrent.MoreExecutors;
26
import com.google.protobuf.Any;
27
import com.google.protobuf.InvalidProtocolBufferException;
28
import com.google.protobuf.Message;
29
import com.google.protobuf.util.Durations;
30
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
31
import io.envoyproxy.envoy.type.v3.FractionalPercent;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.Context;
37
import io.grpc.Deadline;
38
import io.grpc.ForwardingClientCall;
39
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.Status;
43
import io.grpc.Status.Code;
44
import io.grpc.internal.DelayedClientCall;
45
import io.grpc.internal.GrpcUtil;
46
import io.grpc.xds.FaultConfig.FaultAbort;
47
import io.grpc.xds.FaultConfig.FaultDelay;
48
import io.grpc.xds.Filter.ClientInterceptorBuilder;
49
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
50
import java.util.Locale;
51
import java.util.concurrent.Executor;
52
import java.util.concurrent.ScheduledExecutorService;
53
import java.util.concurrent.ScheduledFuture;
54
import java.util.concurrent.TimeUnit;
55
import java.util.concurrent.atomic.AtomicLong;
56
import javax.annotation.Nullable;
57

58
/** HttpFault filter implementation. */
59
final class FaultFilter implements Filter, ClientInterceptorBuilder {
60

61
  static final FaultFilter INSTANCE =
1✔
62
      new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong());
63
  @VisibleForTesting
64
  static final Metadata.Key<String> HEADER_DELAY_KEY =
1✔
65
      Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
66
  @VisibleForTesting
67
  static final Metadata.Key<String> HEADER_DELAY_PERCENTAGE_KEY =
1✔
68
      Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
69
  @VisibleForTesting
70
  static final Metadata.Key<String> HEADER_ABORT_HTTP_STATUS_KEY =
1✔
71
      Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
72
  @VisibleForTesting
73
  static final Metadata.Key<String> HEADER_ABORT_GRPC_STATUS_KEY =
1✔
74
      Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
75
  @VisibleForTesting
76
  static final Metadata.Key<String> HEADER_ABORT_PERCENTAGE_KEY =
1✔
77
      Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
78
  static final String TYPE_URL =
79
      "type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault";
80

81
  private final ThreadSafeRandom random;
82
  private final AtomicLong activeFaultCounter;
83

84
  @VisibleForTesting
85
  FaultFilter(ThreadSafeRandom random, AtomicLong activeFaultCounter) {
1✔
86
    this.random = random;
1✔
87
    this.activeFaultCounter = activeFaultCounter;
1✔
88
  }
1✔
89

90
  @Override
91
  public String[] typeUrls() {
92
    return new String[] { TYPE_URL };
1✔
93
  }
94

95
  @Override
96
  public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
97
    HTTPFault httpFaultProto;
98
    if (!(rawProtoMessage instanceof Any)) {
1✔
99
      return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
×
100
    }
101
    Any anyMessage = (Any) rawProtoMessage;
1✔
102
    try {
103
      httpFaultProto = anyMessage.unpack(HTTPFault.class);
1✔
104
    } catch (InvalidProtocolBufferException e) {
×
105
      return ConfigOrError.fromError("Invalid proto: " + e);
×
106
    }
1✔
107
    return parseHttpFault(httpFaultProto);
1✔
108
  }
109

110
  private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
111
    FaultDelay faultDelay = null;
1✔
112
    FaultAbort faultAbort = null;
1✔
113
    if (httpFault.hasDelay()) {
1✔
114
      faultDelay = parseFaultDelay(httpFault.getDelay());
1✔
115
    }
116
    if (httpFault.hasAbort()) {
1✔
117
      ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
1✔
118
      if (faultAbortOrError.errorDetail != null) {
1✔
119
        return ConfigOrError.fromError(
×
120
            "HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);
121
      }
122
      faultAbort = faultAbortOrError.config;
1✔
123
    }
124
    Integer maxActiveFaults = null;
1✔
125
    if (httpFault.hasMaxActiveFaults()) {
1✔
126
      maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
1✔
127
      if (maxActiveFaults < 0) {
1✔
128
        maxActiveFaults = Integer.MAX_VALUE;
×
129
      }
130
    }
131
    return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
1✔
132
  }
133

134
  private static FaultDelay parseFaultDelay(
135
      io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
136
    FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
1✔
137
    if (faultDelay.hasHeaderDelay()) {
1✔
138
      return FaultDelay.forHeader(percent);
×
139
    }
140
    return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
1✔
141
  }
142

143
  @VisibleForTesting
144
  static ConfigOrError<FaultAbort> parseFaultAbort(
145
      io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
146
    FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
1✔
147
    switch (faultAbort.getErrorTypeCase()) {
1✔
148
      case HEADER_ABORT:
149
        return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
1✔
150
      case HTTP_STATUS:
151
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
152
            GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
1✔
153
      case GRPC_STATUS:
154
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
155
            Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
1✔
156
      case ERRORTYPE_NOT_SET:
157
      default:
158
        return ConfigOrError.fromError(
×
159
            "Unknown error type case: " + faultAbort.getErrorTypeCase());
×
160
    }
161
  }
162

163
  private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
164
    switch (proto.getDenominator()) {
1✔
165
      case HUNDRED:
166
        return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
1✔
167
      case TEN_THOUSAND:
168
        return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
1✔
169
      case MILLION:
170
        return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
1✔
171
      case UNRECOGNIZED:
172
      default:
173
        throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
×
174
    }
175
  }
176

177
  @Override
178
  public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
179
    return parseFilterConfig(rawProtoMessage);
1✔
180
  }
181

182
  @Nullable
183
  @Override
184
  public ClientInterceptor buildClientInterceptor(
185
      FilterConfig config, @Nullable FilterConfig overrideConfig,
186
      final ScheduledExecutorService scheduler) {
187
    checkNotNull(config, "config");
1✔
188
    if (overrideConfig != null) {
1✔
189
      config = overrideConfig;
1✔
190
    }
191
    FaultConfig faultConfig = (FaultConfig) config;
1✔
192

193
    final class FaultInjectionInterceptor implements ClientInterceptor {
1✔
194
      @Override
195
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
196
          final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
197
          final Channel next) {
198
        boolean checkFault = false;
1✔
199
        if (faultConfig.maxActiveFaults() == null
1✔
200
            || activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
1✔
201
          checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;
1✔
202
        }
203
        if (!checkFault) {
1✔
204
          return next.newCall(method, callOptions);
1✔
205
        }
206
        final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
1✔
207
          private ClientCall<ReqT, RespT> delegate;
208

209
          @Override
210
          protected ClientCall<ReqT, RespT> delegate() {
211
            return delegate;
1✔
212
          }
213

214
          @Override
215
          public void start(Listener<RespT> listener, Metadata headers) {
216
            Executor callExecutor = callOptions.getExecutor();
1✔
217
            if (callExecutor == null) { // This should never happen in practice because
1✔
218
              // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
219
              // a callExecutor.
220
              // TODO(https://github.com/grpc/grpc-java/issues/7868)
221
              callExecutor = MoreExecutors.directExecutor();
1✔
222
            }
223

224
            Long delayNanos;
225
            Status abortStatus = null;
1✔
226
            if (faultConfig.faultDelay() != null) {
1✔
227
              delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
1✔
228
            } else {
229
              delayNanos = null;
1✔
230
            }
231
            if (faultConfig.faultAbort() != null) {
1✔
232
              abortStatus = getAbortStatusWithDescription(
1✔
233
                  determineFaultAbortStatus(faultConfig.faultAbort(), headers));
1✔
234
            }
235

236
            Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
237
            if (abortStatus != null) {
1✔
238
              callSupplier = Suppliers.ofInstance(
1✔
239
                  new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));
240
            } else {
241
              callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
1✔
242
                @Override
243
                public ClientCall<ReqT, RespT> get() {
244
                  return next.newCall(method, callOptions);
1✔
245
                }
246
              };
247
            }
248
            if (delayNanos == null) {
1✔
249
              delegate = callSupplier.get();
1✔
250
              delegate().start(listener, headers);
1✔
251
              return;
1✔
252
            }
253

254
            delegate = new DelayInjectedCall<>(
1✔
255
                delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
1✔
256

257
            Listener<RespT> finalListener =
1✔
258
                new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
259
                  @Override
260
                  public void onClose(Status status, Metadata trailers) {
261
                    if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
1✔
262
                      // TODO(zdapeng:) check effective deadline locally, and
263
                      //   do the following only if the local deadline is exceeded.
264
                      //   (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
265
                      //   injected delay does not contribute to the error, because the request is
266
                      //   only sent out after the delay. There could be a race between local and
267
                      //   remote, but it is rather rare.)
268
                      String description = String.format(
1✔
269
                          Locale.US,
270
                          "Deadline exceeded after up to %d ns of fault-injected delay",
271
                          delayNanos);
272
                      if (status.getDescription() != null) {
1✔
273
                        description = description + ": " + status.getDescription();
1✔
274
                      }
275
                      status = Status.DEADLINE_EXCEEDED
1✔
276
                          .withDescription(description).withCause(status.getCause());
1✔
277
                      // Replace trailers to prevent mixing sources of status and trailers.
278
                      trailers = new Metadata();
1✔
279
                    }
280
                    delegate().onClose(status, trailers);
1✔
281
                  }
1✔
282
                };
283
            delegate().start(finalListener, headers);
1✔
284
          }
1✔
285
        }
286

287
        return new DeadlineInsightForwardingCall();
1✔
288
      }
289
    }
290

291
    return new FaultInjectionInterceptor();
1✔
292
  }
293

294
  private static Status getAbortStatusWithDescription(Status abortStatus) {
295
    Status finalAbortStatus = null;
1✔
296
    if (abortStatus != null) {
1✔
297
      String abortDesc = "RPC terminated due to fault injection";
1✔
298
      if (abortStatus.getDescription() != null) {
1✔
299
        abortDesc = abortDesc + ": " + abortStatus.getDescription();
1✔
300
      }
301
      finalAbortStatus = abortStatus.withDescription(abortDesc);
1✔
302
    }
303
    return finalAbortStatus;
1✔
304
  }
305

306
  @Nullable
307
  private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
308
    Long delayNanos;
309
    FaultConfig.FractionalPercent fractionalPercent = faultDelay.percent();
1✔
310
    if (faultDelay.headerDelay()) {
1✔
311
      try {
312
        int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY));
1✔
313
        delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
1✔
314
        String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY);
1✔
315
        if (delayPercentageStr != null) {
1✔
316
          int delayPercentage = Integer.parseInt(delayPercentageStr);
1✔
317
          if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) {
1✔
318
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
319
                delayPercentage, fractionalPercent.denominatorType());
1✔
320
          }
321
        }
322
      } catch (NumberFormatException e) {
1✔
323
        return null; // treated as header_delay not applicable
1✔
324
      }
1✔
325
    } else {
326
      delayNanos = faultDelay.delayNanos();
1✔
327
    }
328
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
329
      return null;
1✔
330
    }
331
    return delayNanos;
1✔
332
  }
333

334
  @Nullable
335
  private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) {
336
    Status abortStatus = null;
1✔
337
    FaultConfig.FractionalPercent fractionalPercent = faultAbort.percent();
1✔
338
    if (faultAbort.headerAbort()) {
1✔
339
      try {
340
        String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY);
1✔
341
        if (grpcCodeStr != null) {
1✔
342
          int grpcCode = Integer.parseInt(grpcCodeStr);
1✔
343
          abortStatus = Status.fromCodeValue(grpcCode);
1✔
344
        }
345
        String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY);
1✔
346
        if (httpCodeStr != null) {
1✔
347
          int httpCode = Integer.parseInt(httpCodeStr);
1✔
348
          abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode);
1✔
349
        }
350
        String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY);
1✔
351
        if (abortPercentageStr != null) {
1✔
352
          int abortPercentage =
1✔
353
              Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY));
1✔
354
          if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) {
1✔
355
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
356
                abortPercentage, fractionalPercent.denominatorType());
1✔
357
          }
358
        }
359
      } catch (NumberFormatException e) {
×
360
        return null; // treated as header_abort not applicable
×
361
      }
1✔
362
    } else {
363
      abortStatus = faultAbort.status();
1✔
364
    }
365
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
366
      return null;
1✔
367
    }
368
    return abortStatus;
1✔
369
  }
370

371
  private static int getRatePerMillion(FaultConfig.FractionalPercent percent) {
372
    int numerator = percent.numerator();
1✔
373
    FaultConfig.FractionalPercent.DenominatorType type = percent.denominatorType();
1✔
374
    switch (type) {
1✔
375
      case TEN_THOUSAND:
376
        numerator *= 100;
×
377
        break;
×
378
      case HUNDRED:
379
        numerator *= 10_000;
1✔
380
        break;
1✔
381
      case MILLION:
382
      default:
383
        break;
384
    }
385
    if (numerator > 1_000_000 || numerator < 0) {
1✔
386
      numerator = 1_000_000;
×
387
    }
388
    return numerator;
1✔
389
  }
390

391
  /** A {@link DelayedClientCall} with a fixed delay. */
392
  private final class DelayInjectedCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
393
    final Object lock = new Object();
1✔
394
    ScheduledFuture<?> delayTask;
395
    boolean cancelled;
396

397
    DelayInjectedCall(
398
        long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler,
399
        @Nullable Deadline deadline,
400
        final Supplier<? extends ClientCall<ReqT, RespT>> callSupplier) {
1✔
401
      super(callExecutor, scheduler, deadline);
1✔
402
      activeFaultCounter.incrementAndGet();
1✔
403
      ScheduledFuture<?> task = scheduler.schedule(
1✔
404
          new Runnable() {
1✔
405
            @Override
406
            public void run() {
407
              synchronized (lock) {
1✔
408
                if (!cancelled) {
1✔
409
                  activeFaultCounter.decrementAndGet();
1✔
410
                }
411
              }
1✔
412
              Runnable toRun = setCall(callSupplier.get());
1✔
413
              if (toRun != null) {
1✔
414
                toRun.run();
1✔
415
              }
416
            }
1✔
417
          },
418
          delayNanos,
419
          NANOSECONDS);
420
      synchronized (lock) {
1✔
421
        if (!cancelled) {
1✔
422
          delayTask = task;
1✔
423
          return;
1✔
424
        }
425
      }
×
426
      task.cancel(false);
×
427
    }
×
428

429
    @Override
430
    protected void callCancelled() {
431
      ScheduledFuture<?> savedDelayTask;
432
      synchronized (lock) {
1✔
433
        cancelled = true;
1✔
434
        activeFaultCounter.decrementAndGet();
1✔
435
        savedDelayTask = delayTask;
1✔
436
      }
1✔
437
      if (savedDelayTask != null) {
1✔
438
        savedDelayTask.cancel(false);
1✔
439
      }
440
    }
1✔
441
  }
442

443
  /** An implementation of {@link ClientCall} that fails when started. */
444
  private final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
445
    final Status error;
446
    final Executor callExecutor;
447
    final Context context;
448

449
    FailingClientCall(Status error, Executor callExecutor) {
1✔
450
      this.error = error;
1✔
451
      this.callExecutor = callExecutor;
1✔
452
      this.context = Context.current();
1✔
453
    }
1✔
454

455
    @Override
456
    public void start(final ClientCall.Listener<RespT> listener, Metadata headers) {
457
      activeFaultCounter.incrementAndGet();
1✔
458
      callExecutor.execute(
1✔
459
          new Runnable() {
1✔
460
            @Override
461
            public void run() {
462
              Context previous = context.attach();
1✔
463
              try {
464
                listener.onClose(error, new Metadata());
1✔
465
                activeFaultCounter.decrementAndGet();
1✔
466
              } finally {
467
                context.detach(previous);
1✔
468
              }
469
            }
1✔
470
          });
471
    }
1✔
472

473
    @Override
474
    public void request(int numMessages) {}
×
475

476
    @Override
477
    public void cancel(String message, Throwable cause) {}
×
478

479
    @Override
480
    public void halfClose() {}
×
481

482
    @Override
483
    public void sendMessage(ReqT message) {}
×
484
  }
485
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc