• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19671

30 Jan 2025 08:43PM UTC coverage: 88.581% (-0.01%) from 88.595%
#19671

push

github

web-flow
xds: Reuse filter interceptors across RPCs

This moves the interceptor creation from the ConfigSelector to the
resource update handling.

The code structure changes will make adding support for filter
lifecycles (for RLQS) a bit easier. The filter lifecycles will allow
filters to share state across interceptors, and constructing all the
interceptors on a single thread will mean filters wouldn't need to be
thread-safe (but their interceptors would be thread-safe).

33760 of 38112 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.09
/../xds/src/main/java/io/grpc/xds/FaultFilter.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static java.util.concurrent.TimeUnit.NANOSECONDS;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Supplier;
24
import com.google.common.base.Suppliers;
25
import com.google.common.util.concurrent.MoreExecutors;
26
import com.google.protobuf.Any;
27
import com.google.protobuf.InvalidProtocolBufferException;
28
import com.google.protobuf.Message;
29
import com.google.protobuf.util.Durations;
30
import io.envoyproxy.envoy.extensions.filters.http.fault.v3.HTTPFault;
31
import io.envoyproxy.envoy.type.v3.FractionalPercent;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.Context;
37
import io.grpc.Deadline;
38
import io.grpc.ForwardingClientCall;
39
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.Status;
43
import io.grpc.Status.Code;
44
import io.grpc.internal.DelayedClientCall;
45
import io.grpc.internal.GrpcUtil;
46
import io.grpc.xds.FaultConfig.FaultAbort;
47
import io.grpc.xds.FaultConfig.FaultDelay;
48
import io.grpc.xds.Filter.ClientInterceptorBuilder;
49
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
50
import java.util.Locale;
51
import java.util.concurrent.Executor;
52
import java.util.concurrent.ScheduledExecutorService;
53
import java.util.concurrent.ScheduledFuture;
54
import java.util.concurrent.TimeUnit;
55
import java.util.concurrent.atomic.AtomicLong;
56
import javax.annotation.Nullable;
57

58
/** HttpFault filter implementation. */
59
final class FaultFilter implements Filter, ClientInterceptorBuilder {
60

61
  static final FaultFilter INSTANCE =
1✔
62
      new FaultFilter(ThreadSafeRandomImpl.instance, new AtomicLong());
63
  @VisibleForTesting
64
  static final Metadata.Key<String> HEADER_DELAY_KEY =
1✔
65
      Metadata.Key.of("x-envoy-fault-delay-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
66
  @VisibleForTesting
67
  static final Metadata.Key<String> HEADER_DELAY_PERCENTAGE_KEY =
1✔
68
      Metadata.Key.of("x-envoy-fault-delay-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
69
  @VisibleForTesting
70
  static final Metadata.Key<String> HEADER_ABORT_HTTP_STATUS_KEY =
1✔
71
      Metadata.Key.of("x-envoy-fault-abort-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
72
  @VisibleForTesting
73
  static final Metadata.Key<String> HEADER_ABORT_GRPC_STATUS_KEY =
1✔
74
      Metadata.Key.of("x-envoy-fault-abort-grpc-request", Metadata.ASCII_STRING_MARSHALLER);
1✔
75
  @VisibleForTesting
76
  static final Metadata.Key<String> HEADER_ABORT_PERCENTAGE_KEY =
1✔
77
      Metadata.Key.of("x-envoy-fault-abort-request-percentage", Metadata.ASCII_STRING_MARSHALLER);
1✔
78
  static final String TYPE_URL =
79
      "type.googleapis.com/envoy.extensions.filters.http.fault.v3.HTTPFault";
80

81
  private final ThreadSafeRandom random;
82
  private final AtomicLong activeFaultCounter;
83

84
  @VisibleForTesting
85
  FaultFilter(ThreadSafeRandom random, AtomicLong activeFaultCounter) {
1✔
86
    this.random = random;
1✔
87
    this.activeFaultCounter = activeFaultCounter;
1✔
88
  }
1✔
89

90
  @Override
91
  public String[] typeUrls() {
92
    return new String[] { TYPE_URL };
1✔
93
  }
94

95
  @Override
96
  public ConfigOrError<FaultConfig> parseFilterConfig(Message rawProtoMessage) {
97
    HTTPFault httpFaultProto;
98
    if (!(rawProtoMessage instanceof Any)) {
1✔
99
      return ConfigOrError.fromError("Invalid config type: " + rawProtoMessage.getClass());
×
100
    }
101
    Any anyMessage = (Any) rawProtoMessage;
1✔
102
    try {
103
      httpFaultProto = anyMessage.unpack(HTTPFault.class);
1✔
104
    } catch (InvalidProtocolBufferException e) {
×
105
      return ConfigOrError.fromError("Invalid proto: " + e);
×
106
    }
1✔
107
    return parseHttpFault(httpFaultProto);
1✔
108
  }
109

110
  private static ConfigOrError<FaultConfig> parseHttpFault(HTTPFault httpFault) {
111
    FaultDelay faultDelay = null;
1✔
112
    FaultAbort faultAbort = null;
1✔
113
    if (httpFault.hasDelay()) {
1✔
114
      faultDelay = parseFaultDelay(httpFault.getDelay());
1✔
115
    }
116
    if (httpFault.hasAbort()) {
1✔
117
      ConfigOrError<FaultAbort> faultAbortOrError = parseFaultAbort(httpFault.getAbort());
1✔
118
      if (faultAbortOrError.errorDetail != null) {
1✔
119
        return ConfigOrError.fromError(
×
120
            "HttpFault contains invalid FaultAbort: " + faultAbortOrError.errorDetail);
121
      }
122
      faultAbort = faultAbortOrError.config;
1✔
123
    }
124
    Integer maxActiveFaults = null;
1✔
125
    if (httpFault.hasMaxActiveFaults()) {
1✔
126
      maxActiveFaults = httpFault.getMaxActiveFaults().getValue();
1✔
127
      if (maxActiveFaults < 0) {
1✔
128
        maxActiveFaults = Integer.MAX_VALUE;
×
129
      }
130
    }
131
    return ConfigOrError.fromConfig(FaultConfig.create(faultDelay, faultAbort, maxActiveFaults));
1✔
132
  }
133

134
  private static FaultDelay parseFaultDelay(
135
      io.envoyproxy.envoy.extensions.filters.common.fault.v3.FaultDelay faultDelay) {
136
    FaultConfig.FractionalPercent percent = parsePercent(faultDelay.getPercentage());
1✔
137
    if (faultDelay.hasHeaderDelay()) {
1✔
138
      return FaultDelay.forHeader(percent);
×
139
    }
140
    return FaultDelay.forFixedDelay(Durations.toNanos(faultDelay.getFixedDelay()), percent);
1✔
141
  }
142

143
  @VisibleForTesting
144
  static ConfigOrError<FaultAbort> parseFaultAbort(
145
      io.envoyproxy.envoy.extensions.filters.http.fault.v3.FaultAbort faultAbort) {
146
    FaultConfig.FractionalPercent percent = parsePercent(faultAbort.getPercentage());
1✔
147
    switch (faultAbort.getErrorTypeCase()) {
1✔
148
      case HEADER_ABORT:
149
        return ConfigOrError.fromConfig(FaultAbort.forHeader(percent));
1✔
150
      case HTTP_STATUS:
151
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
152
            GrpcUtil.httpStatusToGrpcStatus(faultAbort.getHttpStatus()), percent));
1✔
153
      case GRPC_STATUS:
154
        return ConfigOrError.fromConfig(FaultAbort.forStatus(
1✔
155
            Status.fromCodeValue(faultAbort.getGrpcStatus()), percent));
1✔
156
      case ERRORTYPE_NOT_SET:
157
      default:
158
        return ConfigOrError.fromError(
×
159
            "Unknown error type case: " + faultAbort.getErrorTypeCase());
×
160
    }
161
  }
162

163
  private static FaultConfig.FractionalPercent parsePercent(FractionalPercent proto) {
164
    switch (proto.getDenominator()) {
1✔
165
      case HUNDRED:
166
        return FaultConfig.FractionalPercent.perHundred(proto.getNumerator());
1✔
167
      case TEN_THOUSAND:
168
        return FaultConfig.FractionalPercent.perTenThousand(proto.getNumerator());
1✔
169
      case MILLION:
170
        return FaultConfig.FractionalPercent.perMillion(proto.getNumerator());
1✔
171
      case UNRECOGNIZED:
172
      default:
173
        throw new IllegalArgumentException("Unknown denominator type: " + proto.getDenominator());
×
174
    }
175
  }
176

177
  @Override
178
  public ConfigOrError<FaultConfig> parseFilterConfigOverride(Message rawProtoMessage) {
179
    return parseFilterConfig(rawProtoMessage);
1✔
180
  }
181

182
  @Nullable
183
  @Override
184
  public ClientInterceptor buildClientInterceptor(
185
      FilterConfig config, @Nullable FilterConfig overrideConfig,
186
      final ScheduledExecutorService scheduler) {
187
    checkNotNull(config, "config");
1✔
188
    if (overrideConfig != null) {
1✔
189
      config = overrideConfig;
1✔
190
    }
191
    FaultConfig faultConfig = (FaultConfig) config;
1✔
192

193
    final class FaultInjectionInterceptor implements ClientInterceptor {
1✔
194
      @Override
195
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
196
          final MethodDescriptor<ReqT, RespT> method, final CallOptions callOptions,
197
          final Channel next) {
198
        boolean checkFault = false;
1✔
199
        if (faultConfig.maxActiveFaults() == null
1✔
200
            || activeFaultCounter.get() < faultConfig.maxActiveFaults()) {
1✔
201
          checkFault = faultConfig.faultDelay() != null || faultConfig.faultAbort() != null;
1✔
202
        }
203
        if (!checkFault) {
1✔
204
          return next.newCall(method, callOptions);
1✔
205
        }
206
        final class DeadlineInsightForwardingCall extends ForwardingClientCall<ReqT, RespT> {
1✔
207
          private ClientCall<ReqT, RespT> delegate;
208

209
          @Override
210
          protected ClientCall<ReqT, RespT> delegate() {
211
            return delegate;
1✔
212
          }
213

214
          @Override
215
          public void start(Listener<RespT> listener, Metadata headers) {
216
            Executor callExecutor = callOptions.getExecutor();
1✔
217
            if (callExecutor == null) { // This should never happen in practice because
1✔
218
              // ManagedChannelImpl.ConfigSelectingClientCall always provides CallOptions with
219
              // a callExecutor.
220
              // TODO(https://github.com/grpc/grpc-java/issues/7868)
221
              callExecutor = MoreExecutors.directExecutor();
1✔
222
            }
223

224
            Long delayNanos;
225
            Status abortStatus = null;
1✔
226
            if (faultConfig.faultDelay() != null) {
1✔
227
              delayNanos = determineFaultDelayNanos(faultConfig.faultDelay(), headers);
1✔
228
            } else {
229
              delayNanos = null;
1✔
230
            }
231
            if (faultConfig.faultAbort() != null) {
1✔
232
              abortStatus = getAbortStatusWithDescription(
1✔
233
                  determineFaultAbortStatus(faultConfig.faultAbort(), headers));
1✔
234
            }
235

236
            Supplier<? extends ClientCall<ReqT, RespT>> callSupplier;
237
            if (abortStatus != null) {
1✔
238
              callSupplier = Suppliers.ofInstance(
1✔
239
                  new FailingClientCall<ReqT, RespT>(abortStatus, callExecutor));
240
            } else {
241
              callSupplier = new Supplier<ClientCall<ReqT, RespT>>() {
1✔
242
                @Override
243
                public ClientCall<ReqT, RespT> get() {
244
                  return next.newCall(method, callOptions);
1✔
245
                }
246
              };
247
            }
248
            if (delayNanos == null) {
1✔
249
              delegate = callSupplier.get();
1✔
250
              delegate().start(listener, headers);
1✔
251
              return;
1✔
252
            }
253

254
            delegate = new DelayInjectedCall<>(
1✔
255
                delayNanos, callExecutor, scheduler, callOptions.getDeadline(), callSupplier);
1✔
256

257
            Listener<RespT> finalListener =
1✔
258
                new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
259
                  @Override
260
                  public void onClose(Status status, Metadata trailers) {
261
                    if (status.getCode().equals(Code.DEADLINE_EXCEEDED)) {
1✔
262
                      // TODO(zdapeng:) check effective deadline locally, and
263
                      //   do the following only if the local deadline is exceeded.
264
                      //   (If the server sends DEADLINE_EXCEEDED for its own deadline, then the
265
                      //   injected delay does not contribute to the error, because the request is
266
                      //   only sent out after the delay. There could be a race between local and
267
                      //   remote, but it is rather rare.)
268
                      String description = String.format(
1✔
269
                          Locale.US,
270
                          "Deadline exceeded after up to %d ns of fault-injected delay",
271
                          delayNanos);
272
                      if (status.getDescription() != null) {
1✔
273
                        description = description + ": " + status.getDescription();
1✔
274
                      }
275
                      status = Status.DEADLINE_EXCEEDED
1✔
276
                          .withDescription(description).withCause(status.getCause());
1✔
277
                      // Replace trailers to prevent mixing sources of status and trailers.
278
                      trailers = new Metadata();
1✔
279
                    }
280
                    delegate().onClose(status, trailers);
1✔
281
                  }
1✔
282
                };
283
            delegate().start(finalListener, headers);
1✔
284
          }
1✔
285
        }
286

287
        return new DeadlineInsightForwardingCall();
1✔
288
      }
289
    }
290

291
    return new FaultInjectionInterceptor();
1✔
292
  }
293

294
  private static Status getAbortStatusWithDescription(Status abortStatus) {
295
    Status finalAbortStatus = null;
1✔
296
    if (abortStatus != null) {
1✔
297
      String abortDesc = "RPC terminated due to fault injection";
1✔
298
      if (abortStatus.getDescription() != null) {
1✔
299
        abortDesc = abortDesc + ": " + abortStatus.getDescription();
1✔
300
      }
301
      finalAbortStatus = abortStatus.withDescription(abortDesc);
1✔
302
    }
303
    return finalAbortStatus;
1✔
304
  }
305

306
  @Nullable
307
  private Long determineFaultDelayNanos(FaultDelay faultDelay, Metadata headers) {
308
    Long delayNanos;
309
    FaultConfig.FractionalPercent fractionalPercent = faultDelay.percent();
1✔
310
    if (faultDelay.headerDelay()) {
1✔
311
      try {
312
        int delayMillis = Integer.parseInt(headers.get(HEADER_DELAY_KEY));
1✔
313
        delayNanos = TimeUnit.MILLISECONDS.toNanos(delayMillis);
1✔
314
        String delayPercentageStr = headers.get(HEADER_DELAY_PERCENTAGE_KEY);
1✔
315
        if (delayPercentageStr != null) {
1✔
316
          int delayPercentage = Integer.parseInt(delayPercentageStr);
1✔
317
          if (delayPercentage >= 0 && delayPercentage < fractionalPercent.numerator()) {
1✔
318
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
319
                delayPercentage, fractionalPercent.denominatorType());
1✔
320
          }
321
        }
322
      } catch (NumberFormatException e) {
1✔
323
        return null; // treated as header_delay not applicable
1✔
324
      }
1✔
325
    } else {
326
      delayNanos = faultDelay.delayNanos();
1✔
327
    }
328
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
329
      return null;
1✔
330
    }
331
    return delayNanos;
1✔
332
  }
333

334
  @Nullable
335
  private Status determineFaultAbortStatus(FaultAbort faultAbort, Metadata headers) {
336
    Status abortStatus = null;
1✔
337
    FaultConfig.FractionalPercent fractionalPercent = faultAbort.percent();
1✔
338
    if (faultAbort.headerAbort()) {
1✔
339
      try {
340
        String grpcCodeStr = headers.get(HEADER_ABORT_GRPC_STATUS_KEY);
1✔
341
        if (grpcCodeStr != null) {
1✔
342
          int grpcCode = Integer.parseInt(grpcCodeStr);
1✔
343
          abortStatus = Status.fromCodeValue(grpcCode);
1✔
344
        }
345
        String httpCodeStr = headers.get(HEADER_ABORT_HTTP_STATUS_KEY);
1✔
346
        if (httpCodeStr != null) {
1✔
347
          int httpCode = Integer.parseInt(httpCodeStr);
1✔
348
          abortStatus = GrpcUtil.httpStatusToGrpcStatus(httpCode);
1✔
349
        }
350
        String abortPercentageStr = headers.get(HEADER_ABORT_PERCENTAGE_KEY);
1✔
351
        if (abortPercentageStr != null) {
1✔
352
          int abortPercentage =
1✔
353
              Integer.parseInt(headers.get(HEADER_ABORT_PERCENTAGE_KEY));
1✔
354
          if (abortPercentage >= 0 && abortPercentage < fractionalPercent.numerator()) {
1✔
355
            fractionalPercent = FaultConfig.FractionalPercent.create(
1✔
356
                abortPercentage, fractionalPercent.denominatorType());
1✔
357
          }
358
        }
359
      } catch (NumberFormatException e) {
×
360
        return null; // treated as header_abort not applicable
×
361
      }
1✔
362
    } else {
363
      abortStatus = faultAbort.status();
1✔
364
    }
365
    if (random.nextInt(1_000_000) >= getRatePerMillion(fractionalPercent)) {
1✔
366
      return null;
1✔
367
    }
368
    return abortStatus;
1✔
369
  }
370

371
  private static int getRatePerMillion(FaultConfig.FractionalPercent percent) {
372
    int numerator = percent.numerator();
1✔
373
    FaultConfig.FractionalPercent.DenominatorType type = percent.denominatorType();
1✔
374
    switch (type) {
1✔
375
      case TEN_THOUSAND:
376
        numerator *= 100;
×
377
        break;
×
378
      case HUNDRED:
379
        numerator *= 10_000;
1✔
380
        break;
1✔
381
      case MILLION:
382
      default:
383
        break;
384
    }
385
    if (numerator > 1_000_000 || numerator < 0) {
1✔
386
      numerator = 1_000_000;
×
387
    }
388
    return numerator;
1✔
389
  }
390

391
  /** A {@link DelayedClientCall} with a fixed delay. */
392
  private final class DelayInjectedCall<ReqT, RespT> extends DelayedClientCall<ReqT, RespT> {
393
    final Object lock = new Object();
1✔
394
    ScheduledFuture<?> delayTask;
395
    boolean cancelled;
396

397
    DelayInjectedCall(
398
        long delayNanos, Executor callExecutor, ScheduledExecutorService scheduler,
399
        @Nullable Deadline deadline,
400
        final Supplier<? extends ClientCall<ReqT, RespT>> callSupplier) {
1✔
401
      super(callExecutor, scheduler, deadline);
1✔
402
      activeFaultCounter.incrementAndGet();
1✔
403
      ScheduledFuture<?> task = scheduler.schedule(
1✔
404
          new Runnable() {
1✔
405
            @Override
406
            public void run() {
407
              synchronized (lock) {
1✔
408
                if (!cancelled) {
1✔
409
                  activeFaultCounter.decrementAndGet();
1✔
410
                }
411
              }
1✔
412
              Runnable toRun = setCall(callSupplier.get());
1✔
413
              if (toRun != null) {
1✔
414
                toRun.run();
1✔
415
              }
416
            }
1✔
417
          },
418
          delayNanos,
419
          NANOSECONDS);
420
      synchronized (lock) {
1✔
421
        if (!cancelled) {
1✔
422
          delayTask = task;
1✔
423
          return;
1✔
424
        }
425
      }
×
426
      task.cancel(false);
×
427
    }
×
428

429
    @Override
430
    protected void callCancelled() {
431
      ScheduledFuture<?> savedDelayTask;
432
      synchronized (lock) {
1✔
433
        cancelled = true;
1✔
434
        activeFaultCounter.decrementAndGet();
1✔
435
        savedDelayTask = delayTask;
1✔
436
      }
1✔
437
      if (savedDelayTask != null) {
1✔
438
        savedDelayTask.cancel(false);
1✔
439
      }
440
    }
1✔
441
  }
442

443
  /** An implementation of {@link ClientCall} that fails when started. */
444
  private final class FailingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
445
    final Status error;
446
    final Executor callExecutor;
447
    final Context context;
448

449
    FailingClientCall(Status error, Executor callExecutor) {
1✔
450
      this.error = error;
1✔
451
      this.callExecutor = callExecutor;
1✔
452
      this.context = Context.current();
1✔
453
    }
1✔
454

455
    @Override
456
    public void start(final ClientCall.Listener<RespT> listener, Metadata headers) {
457
      activeFaultCounter.incrementAndGet();
1✔
458
      callExecutor.execute(
1✔
459
          new Runnable() {
1✔
460
            @Override
461
            public void run() {
462
              Context previous = context.attach();
1✔
463
              try {
464
                listener.onClose(error, new Metadata());
1✔
465
                activeFaultCounter.decrementAndGet();
1✔
466
              } finally {
467
                context.detach(previous);
1✔
468
              }
469
            }
1✔
470
          });
471
    }
1✔
472

473
    @Override
474
    public void request(int numMessages) {}
×
475

476
    @Override
477
    public void cancel(String message, Throwable cause) {}
×
478

479
    @Override
480
    public void halfClose() {}
×
481

482
    @Override
483
    public void sendMessage(ReqT message) {}
×
484
  }
485
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc