• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19667

29 Jan 2025 10:21PM UTC coverage: 88.579% (+0.001%) from 88.578%
#19667

push

github

ejona86
xds: Allow FaultFilter's interceptor to be reused

This is the only usage of PickSubchannelArgs when creating a filter's
ClientInterceptor, and a follow-up commit will remove the argument and
actually reuse the interceptors. Other filter's interceptors can
already be reused.

There doesn't seem to be any significant loss of legibility by making
FaultFilter a more ordinary interceptor, but the change does cause the
ForwardingClientCall to be present when faultDelay is configured,
independent of whether the fault delay ends up being triggered.

Reusing interceptors will move more state management out of the RPC path
which will be more relevant with RLQS.

33729 of 38078 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.09
/../xds/src/main/java/io/grpc/xds/FaultFilter.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static java.util.concurrent.TimeUnit.NANOSECONDS;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Supplier;
24
import com.google.common.base.Suppliers;
25
import com.google.common.util.concurrent.MoreExecutors;
26
import com.google.protobuf.Any;
27
import com.google.protobuf.InvalidProtocolBufferException;
28
import com.google.protobuf.Message;
29
import com.google.protobuf.util.Durations;
30
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
31
import io.envoyproxy.envoy.type.v3.FractionalPercent;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.Context;
37
import io.grpc.Deadline;
38
import io.grpc.ForwardingClientCall;
39
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
40
import io.grpc.LoadBalancer.PickSubchannelArgs;
41
import io.grpc.Metadata;
42
import io.grpc.MethodDescriptor;
43
import io.grpc.Status;
44
import io.grpc.Status.Code;
45
import io.grpc.internal.DelayedClientCall;
46
import io.grpc.internal.GrpcUtil;
47
import io.grpc.xds.FaultConfig.FaultAbort;
48
import io.grpc.xds.FaultConfig.FaultDelay;
49
import io.grpc.xds.Filter.ClientInterceptorBuilder;
50
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
51
import java.util.Locale;
52
import java.util.concurrent.Executor;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.ScheduledFuture;
55
import java.util.concurrent.TimeUnit;
56
import java.util.concurrent.atomic.AtomicLong;
57
import javax.annotation.Nullable;
58

59
/** HttpFault filter implementation. */
60
final class FaultFilter implements Filter, ClientInterceptorBuilder {
61

62
  static final FaultFilter INSTANCE =
1✔
63
      new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong());
64
  @VisibleForTesting
65
  static final Metadata.Key<String> HEADER_DELAY_KEY =
1✔
66
      Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
67
  @VisibleForTesting
68
  static final Metadata.Key<String> HEADER_DELAY_PERCENTAGE_KEY =
1✔
69
      Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
70
  @VisibleForTesting
71
  static final Metadata.Key<String> HEADER_ABORT_HTTP_STATUS_KEY =
1✔
72
      Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
73
  @VisibleForTesting
74
  static final Metadata.Key<String> HEADER_ABORT_GRPC_STATUS_KEY =
1✔
75
      Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
76
  @VisibleForTesting
77
  static final Metadata.Key<String> HEADER_ABORT_PERCENTAGE_KEY =
1✔
78
      Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
79
  static final String TYPE_URL =
80
      "type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault";
81

82
  private final ThreadSafeRandom random;
83
  private final AtomicLong activeFaultCounter;
84

85
  @VisibleForTesting
86
  FaultFilter(ThreadSafeRandom random, AtomicLong activeFaultCounter) {
1✔
87
    this.random = random;
1✔
88
    this.activeFaultCounter = activeFaultCounter;
1✔
89
  }
1✔
90

91
  @Override
92
  public String[] typeUrls() {
93
    return new String[] { TYPE_URL };
1✔
94
  }
95

96
  @Override
97
  public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
98
    HTTPFault httpFaultProto;
99
    if (!(rawProtoMessage instanceof Any)) {
1✔
100
      return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
×
101
    }
102
    Any anyMessage = (Any) rawProtoMessage;
1✔
103
    try {
104
      httpFaultProto = anyMessage.unpack(HTTPFault.class);
1✔
105
    } catch (InvalidProtocolBufferException e) {
×
106
      return ConfigOrError.fromError("Invalid proto: " + e);
×
107
    }
1✔
108
    return parseHttpFault(httpFaultProto);
1✔
109
  }
110

111
  private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
112
    FaultDelay faultDelay = null;
1✔
113
    FaultAbort faultAbort = null;
1✔
114
    if (httpFault.hasDelay()) {
1✔
115
      faultDelay = parseFaultDelay(httpFault.getDelay());
1✔
116
    }
117
    if (httpFault.hasAbort()) {
1✔
118
      ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
1✔
119
      if (faultAbortOrError.errorDetail != null) {
1✔
120
        return ConfigOrError.fromError(
×
121
            "HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);
122
      }
123
      faultAbort = faultAbortOrError.config;
1✔
124
    }
125
    Integer maxActiveFaults = null;
1✔
126
    if (httpFault.hasMaxActiveFaults()) {
1✔
127
      maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
1✔
128
      if (maxActiveFaults < 0) {
1✔
129
        maxActiveFaults = Integer.MAX_VALUE;
×
130
      }
131
    }
132
    return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
1✔
133
  }
134

135
  private static FaultDelay parseFaultDelay(
136
      io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
137
    FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
1✔
138
    if (faultDelay.hasHeaderDelay()) {
1✔
139
      return FaultDelay.forHeader(percent);
×
140
    }
141
    return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
1✔
142
  }
143

144
  @VisibleForTesting
145
  static ConfigOrError<FaultAbort> parseFaultAbort(
146
      io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
147
    FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
1✔
148
    switch (faultAbort.getErrorTypeCase()) {
1✔
149
      case HEADER_ABORT:
150
        return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
1✔
151
      case HTTP_STATUS:
152
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
153
            GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
1✔
154
      case GRPC_STATUS:
155
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
156
            Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
1✔
157
      case ERRORTYPE_NOT_SET:
158
      default:
159
        return ConfigOrError.fromError(
×
160
            "Unknown error type case: " + faultAbort.getErrorTypeCase());
×
161
    }
162
  }
163

164
  private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
165
    switch (proto.getDenominator()) {
1✔
166
      case HUNDRED:
167
        return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
1✔
168
      case TEN_THOUSAND:
169
        return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
1✔
170
      case MILLION:
171
        return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
1✔
172
      case UNRECOGNIZED:
173
      default:
174
        throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
×
175
    }
176
  }
177

178
  @Override
179
  public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
180
    return parseFilterConfig(rawProtoMessage);
1✔
181
  }
182

183
  @Nullable
184
  @Override
185
  public ClientInterceptor buildClientInterceptor(
186
      FilterConfig config, @Nullable FilterConfig overrideConfig, PickSubchannelArgs args,
187
      final ScheduledExecutorService scheduler) {
188
    checkNotNull(config, "config");
1✔
189
    if (overrideConfig != null) {
1✔
190
      config = overrideConfig;
1✔
191
    }
192
    FaultConfig faultConfig = (FaultConfig) config;
1✔
193

194
    final class FaultInjectionInterceptor implements ClientInterceptor {
1✔
195
      @Override
196
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
197
          final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
198
          final Channel next) {
199
        boolean checkFault = false;
1✔
200
        if (faultConfig.maxActiveFaults() == null
1✔
201
            || activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
1✔
202
          checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;
1✔
203
        }
204
        if (!checkFault) {
1✔
205
          return next.newCall(method, callOptions);
1✔
206
        }
207
        final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
1✔
208
          private ClientCall<ReqT, RespT> delegate;
209

210
          @Override
211
          protected ClientCall<ReqT, RespT> delegate() {
212
            return delegate;
1✔
213
          }
214

215
          @Override
216
          public void start(Listener<RespT> listener, Metadata headers) {
217
            Executor callExecutor = callOptions.getExecutor();
1✔
218
            if (callExecutor == null) { // This should never happen in practice because
1✔
219
              // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
220
              // a callExecutor.
221
              // TODO(https://github.com/grpc/grpc-java/issues/7868)
222
              callExecutor = MoreExecutors.directExecutor();
1✔
223
            }
224

225
            Long delayNanos;
226
            Status abortStatus = null;
1✔
227
            if (faultConfig.faultDelay() != null) {
1✔
228
              delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
1✔
229
            } else {
230
              delayNanos = null;
1✔
231
            }
232
            if (faultConfig.faultAbort() != null) {
1✔
233
              abortStatus = getAbortStatusWithDescription(
1✔
234
                  determineFaultAbortStatus(faultConfig.faultAbort(), headers));
1✔
235
            }
236

237
            Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
238
            if (abortStatus != null) {
1✔
239
              callSupplier = Suppliers.ofInstance(
1✔
240
                  new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));
241
            } else {
242
              callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
1✔
243
                @Override
244
                public ClientCall<ReqT, RespT> get() {
245
                  return next.newCall(method, callOptions);
1✔
246
                }
247
              };
248
            }
249
            if (delayNanos == null) {
1✔
250
              delegate = callSupplier.get();
1✔
251
              delegate().start(listener, headers);
1✔
252
              return;
1✔
253
            }
254

255
            delegate = new DelayInjectedCall<>(
1✔
256
                delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
1✔
257

258
            Listener<RespT> finalListener =
1✔
259
                new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
260
                  @Override
261
                  public void onClose(Status status, Metadata trailers) {
262
                    if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
1✔
263
                      // TODO(zdapeng:) check effective deadline locally, and
264
                      //   do the following only if the local deadline is exceeded.
265
                      //   (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
266
                      //   injected delay does not contribute to the error, because the request is
267
                      //   only sent out after the delay. There could be a race between local and
268
                      //   remote, but it is rather rare.)
269
                      String description = String.format(
1✔
270
                          Locale.US,
271
                          "Deadline exceeded after up to %d ns of fault-injected delay",
272
                          delayNanos);
273
                      if (status.getDescription() != null) {
1✔
274
                        description = description + ": " + status.getDescription();
1✔
275
                      }
276
                      status = Status.DEADLINE_EXCEEDED
1✔
277
                          .withDescription(description).withCause(status.getCause());
1✔
278
                      // Replace trailers to prevent mixing sources of status and trailers.
279
                      trailers = new Metadata();
1✔
280
                    }
281
                    delegate().onClose(status, trailers);
1✔
282
                  }
1✔
283
                };
284
            delegate().start(finalListener, headers);
1✔
285
          }
1✔
286
        }
287

288
        return new DeadlineInsightForwardingCall();
1✔
289
      }
290
    }
291

292
    return new FaultInjectionInterceptor();
1✔
293
  }
294

295
  private static Status getAbortStatusWithDescription(Status abortStatus) {
296
    Status finalAbortStatus = null;
1✔
297
    if (abortStatus != null) {
1✔
298
      String abortDesc = "RPC terminated due to fault injection";
1✔
299
      if (abortStatus.getDescription() != null) {
1✔
300
        abortDesc = abortDesc + ": " + abortStatus.getDescription();
1✔
301
      }
302
      finalAbortStatus = abortStatus.withDescription(abortDesc);
1✔
303
    }
304
    return finalAbortStatus;
1✔
305
  }
306

307
  @Nullable
308
  private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
309
    Long delayNanos;
310
    FaultConfig.FractionalPercent fractionalPercent = faultDelay.percent();
1✔
311
    if (faultDelay.headerDelay()) {
1✔
312
      try {
313
        int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY));
1✔
314
        delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
1✔
315
        String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY);
1✔
316
        if (delayPercentageStr != null) {
1✔
317
          int delayPercentage = Integer.parseInt(delayPercentageStr);
1✔
318
          if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) {
1✔
319
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
320
                delayPercentage, fractionalPercent.denominatorType());
1✔
321
          }
322
        }
323
      } catch (NumberFormatException e) {
1✔
324
        return null; // treated as header_delay not applicable
1✔
325
      }
1✔
326
    } else {
327
      delayNanos = faultDelay.delayNanos();
1✔
328
    }
329
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
330
      return null;
1✔
331
    }
332
    return delayNanos;
1✔
333
  }
334

335
  @Nullable
336
  private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) {
337
    Status abortStatus = null;
1✔
338
    FaultConfig.FractionalPercent fractionalPercent = faultAbort.percent();
1✔
339
    if (faultAbort.headerAbort()) {
1✔
340
      try {
341
        String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY);
1✔
342
        if (grpcCodeStr != null) {
1✔
343
          int grpcCode = Integer.parseInt(grpcCodeStr);
1✔
344
          abortStatus = Status.fromCodeValue(grpcCode);
1✔
345
        }
346
        String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY);
1✔
347
        if (httpCodeStr != null) {
1✔
348
          int httpCode = Integer.parseInt(httpCodeStr);
1✔
349
          abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode);
1✔
350
        }
351
        String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY);
1✔
352
        if (abortPercentageStr != null) {
1✔
353
          int abortPercentage =
1✔
354
              Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY));
1✔
355
          if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) {
1✔
356
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
357
                abortPercentage, fractionalPercent.denominatorType());
1✔
358
          }
359
        }
360
      } catch (NumberFormatException e) {
×
361
        return null; // treated as header_abort not applicable
×
362
      }
1✔
363
    } else {
364
      abortStatus = faultAbort.status();
1✔
365
    }
366
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
367
      return null;
1✔
368
    }
369
    return abortStatus;
1✔
370
  }
371

372
  private static int getRatePerMillion(FaultConfig.FractionalPercent percent) {
373
    int numerator = percent.numerator();
1✔
374
    FaultConfig.FractionalPercent.DenominatorType type = percent.denominatorType();
1✔
375
    switch (type) {
1✔
376
      case TEN_THOUSAND:
377
        numerator *= 100;
×
378
        break;
×
379
      case HUNDRED:
380
        numerator *= 10_000;
1✔
381
        break;
1✔
382
      case MILLION:
383
      default:
384
        break;
385
    }
386
    if (numerator > 1_000_000 || numerator < 0) {
1✔
387
      numerator = 1_000_000;
×
388
    }
389
    return numerator;
1✔
390
  }
391

392
  /** A {@link DelayedClientCall} with a fixed delay. */
393
  private final class DelayInjectedCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
394
    final Object lock = new Object();
1✔
395
    ScheduledFuture<?> delayTask;
396
    boolean cancelled;
397

398
    DelayInjectedCall(
399
        long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler,
400
        @Nullable Deadline deadline,
401
        final Supplier<? extends ClientCall<ReqT, RespT>> callSupplier) {
1✔
402
      super(callExecutor, scheduler, deadline);
1✔
403
      activeFaultCounter.incrementAndGet();
1✔
404
      ScheduledFuture<?> task = scheduler.schedule(
1✔
405
          new Runnable() {
1✔
406
            @Override
407
            public void run() {
408
              synchronized (lock) {
1✔
409
                if (!cancelled) {
1✔
410
                  activeFaultCounter.decrementAndGet();
1✔
411
                }
412
              }
1✔
413
              Runnable toRun = setCall(callSupplier.get());
1✔
414
              if (toRun != null) {
1✔
415
                toRun.run();
1✔
416
              }
417
            }
1✔
418
          },
419
          delayNanos,
420
          NANOSECONDS);
421
      synchronized (lock) {
1✔
422
        if (!cancelled) {
1✔
423
          delayTask = task;
1✔
424
          return;
1✔
425
        }
426
      }
×
427
      task.cancel(false);
×
428
    }
×
429

430
    @Override
431
    protected void callCancelled() {
432
      ScheduledFuture<?> savedDelayTask;
433
      synchronized (lock) {
1✔
434
        cancelled = true;
1✔
435
        activeFaultCounter.decrementAndGet();
1✔
436
        savedDelayTask = delayTask;
1✔
437
      }
1✔
438
      if (savedDelayTask != null) {
1✔
439
        savedDelayTask.cancel(false);
1✔
440
      }
441
    }
1✔
442
  }
443

444
  /** An implementation of {@link ClientCall} that fails when started. */
445
  private final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
446
    final Status error;
447
    final Executor callExecutor;
448
    final Context context;
449

450
    FailingClientCall(Status error, Executor callExecutor) {
1✔
451
      this.error = error;
1✔
452
      this.callExecutor = callExecutor;
1✔
453
      this.context = Context.current();
1✔
454
    }
1✔
455

456
    @Override
457
    public void start(final ClientCall.Listener<RespT> listener, Metadata headers) {
458
      activeFaultCounter.incrementAndGet();
1✔
459
      callExecutor.execute(
1✔
460
          new Runnable() {
1✔
461
            @Override
462
            public void run() {
463
              Context previous = context.attach();
1✔
464
              try {
465
                listener.onClose(error, new Metadata());
1✔
466
                activeFaultCounter.decrementAndGet();
1✔
467
              } finally {
468
                context.detach(previous);
1✔
469
              }
470
            }
1✔
471
          });
472
    }
1✔
473

474
    @Override
475
    public void request(int numMessages) {}
×
476

477
    @Override
478
    public void cancel(String message, Throwable cause) {}
×
479

480
    @Override
481
    public void halfClose() {}
×
482

483
    @Override
484
    public void sendMessage(ReqT message) {}
×
485
  }
486
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc