• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20276

11 May 2026 08:24AM UTC coverage: 88.821% (-0.02%) from 88.838%
#20276

push

github

web-flow
Allow injecting bootstrap info into xDS Filter API for config parsing (#12724)

Extend the xDS Filter API to support injecting bootstrap information
into filters during configuration parsing. This allows filters to access
context information (e.g., allowed gRPC services) from the resource loading
layer during configuration validation and parsing.

- Update `Filter.Provider.parseFilterConfig` and
`parseFilterConfigOverride`
  to accept a `FilterContext` parameter.
- Introduce `BootstrapInfoGrpcServiceContextProvider` to encapsulate
  bootstrap info for context resolution.
- Update `XdsListenerResource` and `XdsRouteConfigureResource` to
  construct and pass `FilterContext` during configuration parsing.
- Update sub-filters (`FaultFilter`, `RbacFilter`,
`GcpAuthenticationFilter`,
  `RouterFilter`) to match the updated `FilterContext` signature.

Known Gaps & Limitations:
1. **MetricHolder**: Propagation of `MetricHolder` is not supported with
   this approach currently and is planned for support in a later phase.
2. **NameResolverRegistry**: Propagation is deferred for consistency.
While it could be passed from `XdsNameResolver` on the client side, there is
no equivalent mechanism on the server side. To ensure consistent
behavior, `DefaultRegistry` is used when validating schemes and creating channels.

36254 of 40817 relevant lines covered (88.82%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.06
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Stopwatch;
24
import com.google.common.base.Supplier;
25
import com.google.protobuf.Any;
26
import com.google.rpc.Code;
27
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
28
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
30
import io.grpc.InternalLogId;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.SynchronizationContext.ScheduledHandle;
35
import io.grpc.internal.BackoffPolicy;
36
import io.grpc.xds.client.Bootstrapper.ServerInfo;
37
import io.grpc.xds.client.EnvoyProtoData.Node;
38
import io.grpc.xds.client.XdsClient.ProcessingTracker;
39
import io.grpc.xds.client.XdsClient.ResourceStore;
40
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
43
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
44
import java.util.Collection;
45
import java.util.Collections;
46
import java.util.HashMap;
47
import java.util.HashSet;
48
import java.util.List;
49
import java.util.Map;
50
import java.util.Set;
51
import java.util.concurrent.ScheduledExecutorService;
52
import java.util.concurrent.TimeUnit;
53
import javax.annotation.Nullable;
54

55
/**
56
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
57
 * the xDS RPC stream.
58
 */
59
final class ControlPlaneClient {
60

61
  private final SynchronizationContext syncContext;
62
  private final InternalLogId logId;
63
  private final XdsLogger logger;
64
  private final ServerInfo serverInfo;
65
  private final XdsTransport xdsTransport;
66
  private final XdsResponseHandler xdsResponseHandler;
67
  private final ResourceStore resourceStore;
68
  private final ScheduledExecutorService timeService;
69
  private final BackoffPolicy.Provider backoffPolicyProvider;
70
  private final Stopwatch stopwatch;
71
  private final Node bootstrapNode;
72

73
  // Last successfully applied version_info for each resource type. Starts with empty string.
74
  // A version_info is used to update management server with client's most recent knowledge of
75
  // resources.
76
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
77

78
  private boolean shutdown;
79
  private boolean inError;
80

81
  @Nullable
82
  private AdsStream adsStream;
83
  @Nullable
84
  private BackoffPolicy retryBackoffPolicy;
85
  @Nullable
86
  private ScheduledHandle rpcRetryTimer;
87
  private final MessagePrettyPrinter messagePrinter;
88

89
  /** An entity that manages ADS RPCs over a single channel. */
90
  ControlPlaneClient(
91
      XdsTransport xdsTransport,
92
      ServerInfo serverInfo,
93
      Node bootstrapNode,
94
      XdsResponseHandler xdsResponseHandler,
95
      ResourceStore resourceStore,
96
      ScheduledExecutorService
97
      timeService,
98
      SynchronizationContext syncContext,
99
      BackoffPolicy.Provider backoffPolicyProvider,
100
      Supplier<Stopwatch> stopwatchSupplier,
101
      MessagePrettyPrinter messagePrinter) {
1✔
102
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
103
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
104
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
105
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
106
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
107
    this.timeService = checkNotNull(timeService, "timeService");
1✔
108
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
109
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
110
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
111
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
112
    logId = InternalLogId.allocate("xds-cp-client", serverInfo.target());
1✔
113
    logger = XdsLogger.withLogId(logId);
1✔
114
    logger.log(XdsLogLevel.INFO, "Created");
1✔
115
  }
1✔
116

117
  void shutdown() {
118
    syncContext.execute(new Runnable() {
1✔
119
      @Override
120
      public void run() {
121
        shutdown = true;
1✔
122
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
123
        if (adsStream != null) {
1✔
124
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
125
        }
126
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
127
          rpcRetryTimer.cancel();
1✔
128
        }
129
        xdsTransport.shutdown();
1✔
130
      }
1✔
131
    });
132
  }
1✔
133

134
  @Override
135
  public String toString() {
136
    return logId.toString();
×
137
  }
138

139
  public ServerInfo getServerInfo() {
140
    return serverInfo;
1✔
141
  }
142

143
  /**
144
   * Updates the resource subscription for the given resource type.
145
   */
146
  // Must be synchronized.
147
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
148
    if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
149
      return;
1✔
150
    }
151
    if (adsStream == null) {
1✔
152
      startRpcStream();
×
153
      // when the stream becomes ready, it will send the discovery requests
154
      return;
×
155
    }
156

157
    // We will do the rest of the method as part of the readyHandler when the stream is ready.
158
    if (!isConnected()) {
1✔
159
      return;
1✔
160
    }
161

162
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
163
    if (resources == null && !adsStream.sentTypes.contains(resourceType)) {
1✔
164
      // No subscription for this type on this server, and we have never sent a DiscoveryRequest
165
      // of this type on the current stream — the server has no subscription state to clear.
166
      //
167
      // Per the ResourceStore contract in XdsClient.java, a null return means "no subscription";
168
      // an empty collection means wildcard subscription, which is a real subscription and must
169
      // not be skipped here.
170
      //
171
      // We track sent types per-stream rather than gating on `versions` because `versions` is
172
      // only populated on ACK. If a watch is canceled after the initial DiscoveryRequest goes
173
      // out but before any response is ACKed, `versions` would still have no entry for the
174
      // type, and gating on it would suppress the empty unsubscribe — leaving the server with
175
      // a stale subscription until the stream resets.
176
      //
177
      // Without this skip, sendDiscoveryRequests() iterates over every globally-subscribed
178
      // resource type when a stream becomes ready and emits an empty DiscoveryRequest for types
179
      // that have no subscription on this server. Per A47 (xDS Federation) servers may be
180
      // authority-specific (e.g. an EDS-only control plane) and reject DiscoveryRequests for
181
      // types they do not handle, tearing down the stream.
182
      //
183
      // Mirrors grpc-go's behavior in
184
      // internal/xds/clients/xdsclient/ads_stream.go:sendExisting, which skips types with no
185
      // subscription.
186
      return;
1✔
187
    }
188
    if (resources == null) {
1✔
189
      resources = Collections.emptyList();
1✔
190
    }
191
    adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
192
    resourceStore.startMissingResourceTimers(resources, resourceType);
1✔
193

194
    if (resources.isEmpty()) {
1✔
195
      // The resource type no longer has subscribing resources; clean up references to it, except
196
      // for nonces. If the resource type becomes used again the control plane can ignore requests
197
      // for old/missing nonces. Old type's nonces are dropped when the ADS stream is restarted.
198
      versions.remove(resourceType);
1✔
199
    }
200
  }
1✔
201

202
  /**
203
   * Accepts the update for the given resource type by updating the latest resource version
204
   * and sends an ACK request to the management server.
205
   */
206
  // Must be synchronized.
207
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
208
    versions.put(type, versionInfo);
1✔
209
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
210
        type.typeName(), nonce, versionInfo);
1✔
211
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
212
    if (resources == null) {
1✔
213
      resources = Collections.emptyList();
×
214
    }
215
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
216
  }
1✔
217

218
  /**
219
   * Rejects the update for the given resource type and sends an NACK request (request with last
220
   * accepted version) to the management server.
221
   */
222
  // Must be synchronized.
223
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
224
    String versionInfo = versions.getOrDefault(type, "");
1✔
225
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
226
        type.typeName(), nonce, versionInfo);
1✔
227
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
228
    if (resources == null) {
1✔
229
      resources = Collections.emptyList();
×
230
    }
231
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
232
  }
1✔
233

234
  // Must be synchronized.
235
  boolean isReady() {
236
    return adsStream != null && adsStream.call != null
1✔
237
        && adsStream.call.isReady() && !adsStream.closed;
1✔
238
  }
239

240
  boolean isConnected() {
241
    return adsStream != null && adsStream.sentInitialRequest;
1✔
242
  }
243

244
  /**
245
   * Used for identifying whether or not when getting a control plane for authority that this
246
   * control plane should be skipped over if there is a fallback.
247
   *
248
   * <p>Also used by metric to consider this control plane to not be "active".
249
   *
250
   * <p>A ControlPlaneClient is considered to be in error during the time from when an
251
   * {@link AdsStream} closed without having received a response to the time an AdsStream does
252
   * receive a response.
253
   */
254
  boolean isInError() {
255
    return inError;
1✔
256
  }
257

258

259
  /**
260
   * Cleans up outstanding rpcRetryTimer if present, since we are communicating.
261
   * If we haven't sent the initial discovery request for this RPC stream, we will delegate to
262
   * xdsResponseHandler (in practice XdsClientImpl) to do any initialization for a new active
263
   * stream such as starting timers.  We then send the initial discovery request.
264
   */
265
  // Must be synchronized.
266
  void readyHandler(boolean shouldSendInitialRequest) {
267
    if (shouldSendInitialRequest) {
1✔
268
      sendDiscoveryRequests();
1✔
269
    }
270
  }
1✔
271

272
  /**
273
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
274
   * xDS protocol communication.
275
   */
276
  // Must be synchronized.
277
  private void startRpcStream() {
278
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
279

280
    if (rpcRetryTimer != null) {
1✔
281
      rpcRetryTimer.cancel();
1✔
282
      rpcRetryTimer = null;
1✔
283
    }
284

285
    adsStream = new AdsStream();
1✔
286
    adsStream.start();
1✔
287
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
288
    stopwatch.reset().start();
1✔
289
  }
1✔
290

291
  void sendDiscoveryRequests() {
292
    if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
293
      return;
×
294
    }
295

296
    if (adsStream == null) {
1✔
297
      startRpcStream();
1✔
298
      // when the stream becomes ready, it will send the discovery requests
299
      return;
1✔
300
    }
301

302
    if (isConnected()) {
1✔
303
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
304
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
305

306
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
307
        adjustResourceSubscription(type);
1✔
308
      }
1✔
309
    }
310
  }
1✔
311

312
  @VisibleForTesting
313
  public final class RpcRetryTask implements Runnable {
1✔
314
    @Override
315
    public void run() {
316
      logger.log(XdsLogLevel.DEBUG, "Retry timeout. Restart ADS stream {0}", logId);
1✔
317
      if (shutdown) {
1✔
318
        return;
×
319
      }
320

321
      startRpcStream();
1✔
322

323
      // handling CPC management is triggered in readyHandler
324
    }
1✔
325
  }
326

327
  @VisibleForTesting
328
  @Nullable
329
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
330
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
331
  }
332

333
  private class AdsStream implements XdsTransportFactory.EventHandler<DiscoveryResponse> {
334
    private boolean responseReceived;
335
    private boolean sentInitialRequest;
336
    private boolean closed;
337
    // Response nonce for the most recently received discovery responses of each resource type URL.
338
    // Client initiated requests start response nonce with empty string.
339
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
340
    // used for management server to identify which response the client is ACKing/NACking.
341
    // To avoid confusion, client-initiated requests will always use the nonce in
342
    // most recently received responses of each resource type. Nonces are never deleted from the
343
    // map; nonces are only discarded once the stream closes because xds_protocol says "the
344
    // management server should not send a DiscoveryResponse for any DiscoveryRequest that has a
345
    // stale nonce."
346
    private final Map<String, String> respNonces = new HashMap<>();
1✔
347
    // Resource types for which a DiscoveryRequest has been sent on this stream. Used by
348
    // adjustResourceSubscription() to decide whether an empty unsubscribe must be sent on the
349
    // wire: the server only has subscription state to clear for types we have actually sent a
350
    // request for on this stream. Cleared implicitly when the stream is replaced.
351
    private final Set<XdsResourceType<?>> sentTypes = new HashSet<>();
1✔
352
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
353
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
354
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
355

356
    private AdsStream() {
1✔
357
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
358
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
359
    }
1✔
360

361
    void start() {
362
      call.start(this);
1✔
363
    }
1✔
364

365
    /**
366
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
367
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
368
     * client-initiated discovery requests, use {@link
369
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
370
     */
371
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
372
                              Collection<String> resources, String nonce,
373
                              @Nullable String errorDetail) {
374
      DiscoveryRequest.Builder builder =
375
          DiscoveryRequest.newBuilder()
1✔
376
              .setVersionInfo(versionInfo)
1✔
377
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
378
              .addAllResourceNames(resources)
1✔
379
              .setTypeUrl(type.typeUrl())
1✔
380
              .setResponseNonce(nonce);
1✔
381
      if (errorDetail != null) {
1✔
382
        com.google.rpc.Status error =
383
            com.google.rpc.Status.newBuilder()
1✔
384
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
385
                .setMessage(errorDetail)
1✔
386
                .build();
1✔
387
        builder.setErrorDetail(error);
1✔
388
      }
389
      DiscoveryRequest request = builder.build();
1✔
390
      call.sendMessage(request);
1✔
391
      sentTypes.add(type);
1✔
392
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
393
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
394
      }
395
    }
1✔
396

397
    /**
398
     * Sends a client-initiated discovery request.
399
     */
400
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
401
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
402
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
403
          respNonces.getOrDefault(type.typeUrl(), ""), null);
1✔
404
    }
1✔
405

406
    @Override
407
    public void onReady() {
408
      syncContext.execute(() -> {
1✔
409
        if (!isReady()) {
1✔
410
          logger.log(XdsLogLevel.DEBUG,
×
411
              "ADS stream ready handler called, but not ready {0}", logId);
×
412
          return;
×
413
        }
414

415
        logger.log(XdsLogLevel.DEBUG, "ADS stream ready {0}", logId);
1✔
416

417
        boolean hadSentInitialRequest = sentInitialRequest;
1✔
418
        sentInitialRequest = true;
1✔
419
        readyHandler(!hadSentInitialRequest);
1✔
420
      });
1✔
421
    }
1✔
422

423
    @Override
424
    public void onRecvMessage(DiscoveryResponse response) {
425
      syncContext.execute(new Runnable() {
1✔
426
        @Override
427
        public void run() {
428
          if (closed) {
1✔
429
            return;
1✔
430
          }
431
          boolean isFirstResponse = !responseReceived;
1✔
432
          responseReceived = true;
1✔
433
          inError = false;
1✔
434
          respNonces.put(response.getTypeUrl(), response.getNonce());
1✔
435

436
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
437
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
438
            logger.log(
×
439
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
440
                messagePrinter.print(response));
×
441
          }
442
          if (type == null) {
1✔
443
            logger.log(
1✔
444
                XdsLogLevel.WARNING,
445
                "Ignore an unknown type of DiscoveryResponse: {0}",
446
                response.getTypeUrl());
1✔
447

448
            call.startRecvMessage();
1✔
449
            return;
1✔
450
          }
451
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
452
              response.getNonce(), isFirstResponse);
1✔
453
        }
1✔
454
      });
455
    }
1✔
456

457
    @Override
458
    public void onStatusReceived(final Status status) {
459
      syncContext.execute(() -> {
1✔
460
        handleRpcStreamClosed(status);
1✔
461
      });
1✔
462
    }
1✔
463

464
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
465
                                 String nonce, boolean isFirstResponse) {
466
      checkNotNull(type, "type");
1✔
467

468
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
469
          () -> call.startRecvMessage(), syncContext);
1✔
470
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
471
          isFirstResponse, processingTracker);
472
      processingTracker.onComplete();
1✔
473
    }
1✔
474

475
    private void handleRpcStreamClosed(Status status) {
476
      if (closed) {
1✔
477
        return;
1✔
478
      }
479

480
      if (responseReceived || retryBackoffPolicy == null) {
1✔
481
        // Reset the backoff sequence if had received a response, or backoff sequence
482
        // has never been initialized.
483
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
484
        stopwatch.reset();
1✔
485
      }
486

487
      Status newStatus = status;
1✔
488
      if (responseReceived) {
1✔
489
        // A closed ADS stream after a successful response is not considered an error. Servers may
490
        // close streams for various reasons during normal operation, such as load balancing or
491
        // underlying connection hitting its max connection age limit (see gRFC A9).
492
        if (!status.isOk()) {
1✔
493
          newStatus = Status.OK;
1✔
494
          logger.log(XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
1✔
495
              + "response was received, so this will not be treated as an error. Cause: {2}",
496
              status.getCode(), status.getDescription(), status.getCause());
1✔
497
        } else {
498
          logger.log(XdsLogLevel.DEBUG,
1✔
499
              "ADS stream closed by server after a response was received");
500
        }
501
      } else {
502
        // If the ADS stream is closed without ever having received a response from the server, then
503
        // the XdsClient should consider that a connectivity error (see gRFC A57).
504
        inError = true;
1✔
505
        if (status.isOk()) {
1✔
506
          newStatus = Status.UNAVAILABLE.withDescription(
1✔
507
              "ADS stream closed with OK before receiving a response");
508
        }
509
        logger.log(
1✔
510
            XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
511
            newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
1✔
512
      }
513

514
      close(newStatus.asException());
1✔
515

516
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
517
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
518
      // concurrently with the stopwatch and schedule.
519
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
520
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
521
      rpcRetryTimer =
1✔
522
          syncContext.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
523

524
      xdsResponseHandler.handleStreamClosed(newStatus, !responseReceived);
1✔
525
    }
1✔
526

527
    private void close(Exception error) {
528
      if (closed) {
1✔
529
        return;
×
530
      }
531
      closed = true;
1✔
532
      cleanUp();
1✔
533
      call.sendError(error);
1✔
534
    }
1✔
535

536
    private void cleanUp() {
537
      if (adsStream == this) {
1✔
538
        adsStream = null;
1✔
539
      }
540
    }
1✔
541
  }
542

543
  @VisibleForTesting
544
  static class FailingXdsTransport implements XdsTransport {
545
    Status error;
546

547
    public FailingXdsTransport(Status error) {
1✔
548
      this.error = error;
1✔
549
    }
1✔
550

551
    @Override
552
    public <ReqT, RespT> StreamingCall<ReqT, RespT>
553
        createStreamingCall(String fullMethodName,
554
                            MethodDescriptor.Marshaller<ReqT> reqMarshaller,
555
                            MethodDescriptor.Marshaller<RespT> respMarshaller) {
556
      return new FailingXdsStreamingCall<>();
1✔
557
    }
558

559
    @Override
560
    public void shutdown() {
561
      // no-op
562
    }
1✔
563

564
    private class FailingXdsStreamingCall<ReqT, RespT> implements StreamingCall<ReqT, RespT> {
1✔
565

566
      @Override
567
      public void start(XdsTransportFactory.EventHandler<RespT> eventHandler) {
568
        eventHandler.onStatusReceived(error);
1✔
569
      }
1✔
570

571
      @Override
572
      public void sendMessage(ReqT message) {
573
        // no-op
574
      }
×
575

576
      @Override
577
      public void startRecvMessage() {
578
        // no-op
579
      }
×
580

581
      @Override
582
      public void sendError(Exception e) {
583
        // no-op
584
      }
1✔
585

586
      @Override
587
      public boolean isReady() {
588
        return false;
×
589
      }
590
    }
591
  }
592

593
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc