• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19628

07 Jan 2025 08:35PM UTC coverage: 88.561% (-0.008%) from 88.569%
#19628

push

github

ejona86
xds: Remember nonces for unknown types

If the control plane sends a resource type the client doesn't understand
at-the-moment, the control plane will still expect the client to include
the nonce if the client subscribes to the type in the future.

This most easily happens when unsubscribing the last resource of a type.
Which meant 1cf1927d1 was insufficient.

33369 of 37679 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Stopwatch;
24
import com.google.common.base.Supplier;
25
import com.google.protobuf.Any;
26
import com.google.rpc.Code;
27
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
28
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
30
import io.grpc.InternalLogId;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.SynchronizationContext.ScheduledHandle;
35
import io.grpc.internal.BackoffPolicy;
36
import io.grpc.xds.client.Bootstrapper.ServerInfo;
37
import io.grpc.xds.client.EnvoyProtoData.Node;
38
import io.grpc.xds.client.XdsClient.ProcessingTracker;
39
import io.grpc.xds.client.XdsClient.ResourceStore;
40
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
43
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
44
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
45
import java.util.Collection;
46
import java.util.Collections;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Set;
52
import java.util.concurrent.ScheduledExecutorService;
53
import java.util.concurrent.TimeUnit;
54
import javax.annotation.Nullable;
55

56
/**
57
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
58
 * the xDS RPC stream.
59
 */
60
final class ControlPlaneClient {
61

62
  private final SynchronizationContext syncContext;
63
  private final InternalLogId logId;
64
  private final XdsLogger logger;
65
  private final ServerInfo serverInfo;
66
  private final XdsTransport xdsTransport;
67
  private final XdsResponseHandler xdsResponseHandler;
68
  private final ResourceStore resourceStore;
69
  private final ScheduledExecutorService timeService;
70
  private final BackoffPolicy.Provider backoffPolicyProvider;
71
  private final Stopwatch stopwatch;
72
  private final Node bootstrapNode;
73
  private final XdsClient xdsClient;
74

75
  // Last successfully applied version_info for each resource type. Starts with empty string.
76
  // A version_info is used to update management server with client's most recent knowledge of
77
  // resources.
78
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
79

80
  private boolean shutdown;
81
  private boolean streamClosedNoResponse;
82
  @Nullable
83
  private AdsStream adsStream;
84
  @Nullable
85
  private BackoffPolicy retryBackoffPolicy;
86
  @Nullable
87
  private ScheduledHandle rpcRetryTimer;
88
  private MessagePrettyPrinter messagePrinter;
89

90
  /** An entity that manages ADS RPCs over a single channel. */
91
  ControlPlaneClient(
92
      XdsTransport xdsTransport,
93
      ServerInfo serverInfo,
94
      Node bootstrapNode,
95
      XdsResponseHandler xdsResponseHandler,
96
      ResourceStore resourceStore,
97
      ScheduledExecutorService
98
      timeService,
99
      SynchronizationContext syncContext,
100
      BackoffPolicy.Provider backoffPolicyProvider,
101
      Supplier<Stopwatch> stopwatchSupplier,
102
      XdsClient xdsClient,
103
      MessagePrettyPrinter messagePrinter) {
1✔
104
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
105
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
106
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
107
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
108
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
109
    this.timeService = checkNotNull(timeService, "timeService");
1✔
110
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
111
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
112
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
113
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
114
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
115
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
116
    logger = XdsLogger.withLogId(logId);
1✔
117
    logger.log(XdsLogLevel.INFO, "Created");
1✔
118
  }
1✔
119

120
  void shutdown() {
121
    syncContext.execute(new Runnable() {
1✔
122
      @Override
123
      public void run() {
124
        shutdown = true;
1✔
125
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
126
        if (adsStream != null) {
1✔
127
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
128
        }
129
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
130
          rpcRetryTimer.cancel();
1✔
131
        }
132
        xdsTransport.shutdown();
1✔
133
      }
1✔
134
    });
135
  }
1✔
136

137
  @Override
138
  public String toString() {
139
    return logId.toString();
×
140
  }
141

142
  /**
143
   * Updates the resource subscription for the given resource type.
144
   */
145
  // Must be synchronized.
146
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
147
    if (isInBackoff()) {
1✔
148
      return;
1✔
149
    }
150
    if (adsStream == null) {
1✔
151
      startRpcStream();
1✔
152
    }
153
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
154
    if (resources == null) {
1✔
155
      resources = Collections.emptyList();
1✔
156
    }
157
    adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
158
    if (resources.isEmpty()) {
1✔
159
      // The resource type no longer has subscribing resources; clean up references to it, except
160
      // for nonces. If the resource type becomes used again the control plane can ignore requests
161
      // for old/missing nonces. Old type's nonces are dropped when the ADS stream is restarted.
162
      versions.remove(resourceType);
1✔
163
    }
164
  }
1✔
165

166
  /**
167
   * Accepts the update for the given resource type by updating the latest resource version
168
   * and sends an ACK request to the management server.
169
   */
170
  // Must be synchronized.
171
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
172
    versions.put(type, versionInfo);
1✔
173
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
174
        type.typeName(), nonce, versionInfo);
1✔
175
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
176
    if (resources == null) {
1✔
177
      resources = Collections.emptyList();
×
178
    }
179
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
180
  }
1✔
181

182
  /**
183
   * Rejects the update for the given resource type and sends an NACK request (request with last
184
   * accepted version) to the management server.
185
   */
186
  // Must be synchronized.
187
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
188
    String versionInfo = versions.getOrDefault(type, "");
1✔
189
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
190
        type.typeName(), nonce, versionInfo);
1✔
191
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
192
    if (resources == null) {
1✔
193
      resources = Collections.emptyList();
×
194
    }
195
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
196
  }
1✔
197

198
  /**
199
   * Returns {@code true} if the resource discovery is currently in backoff.
200
   */
201
  // Must be synchronized.
202
  boolean isInBackoff() {
203
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
204
  }
205

206
  // Must be synchronized.
207
  boolean isReady() {
208
    return adsStream != null && adsStream.call != null && adsStream.call.isReady();
1✔
209
  }
210

211
  /**
212
   * Starts a timer for each requested resource that hasn't been responded to and
213
   * has been waiting for the channel to get ready.
214
   */
215
  // Must be synchronized.
216
  void readyHandler() {
217
    if (!isReady()) {
1✔
218
      return;
×
219
    }
220

221
    if (isInBackoff()) {
1✔
222
      rpcRetryTimer.cancel();
×
223
      rpcRetryTimer = null;
×
224
    }
225

226
    xdsClient.startSubscriberTimersIfNeeded(serverInfo);
1✔
227
  }
1✔
228

229
  /**
230
   * Indicates whether there is an active ADS stream.
231
   *
232
   * <p>Return {@code true} when the {@code AdsStream} is created.
233
   * {@code false} when the ADS stream fails without a response. Resets to true
234
   * upon receiving the first response on a new ADS stream.
235
   */
236
  // Must be synchronized
237
  boolean hasWorkingAdsStream() {
238
    return !streamClosedNoResponse;
1✔
239
  }
240

241

242
  /**
243
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
244
   * xDS protocol communication.
245
   */
246
  // Must be synchronized.
247
  private void startRpcStream() {
248
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
249
    adsStream = new AdsStream();
1✔
250
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
251
    stopwatch.reset().start();
1✔
252
  }
1✔
253

254
  @VisibleForTesting
255
  public final class RpcRetryTask implements Runnable {
1✔
256
    @Override
257
    public void run() {
258
      if (shutdown) {
1✔
259
        return;
×
260
      }
261
      startRpcStream();
1✔
262
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
263
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
264
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
265
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
266
        if (resources != null) {
1✔
267
          adsStream.sendDiscoveryRequest(type, resources);
1✔
268
        }
269
      }
1✔
270
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
271
    }
1✔
272
  }
273

274
  @VisibleForTesting
275
  @Nullable
276
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
277
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
278
  }
279

280
  private class AdsStream implements EventHandler<DiscoveryResponse> {
281
    private boolean responseReceived;
282
    private boolean closed;
283
    // Response nonce for the most recently received discovery responses of each resource type URL.
284
    // Client initiated requests start response nonce with empty string.
285
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
286
    // used for management server to identify which response the client is ACKing/NACking.
287
    // To avoid confusion, client-initiated requests will always use the nonce in
288
    // most recently received responses of each resource type. Nonces are never deleted from the
289
    // map; nonces are only discarded once the stream closes because xds_protocol says "the
290
    // management server should not send a DiscoveryResponse for any DiscoveryRequest that has a
291
    // stale nonce."
292
    private final Map<String, String> respNonces = new HashMap<>();
1✔
293
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
294
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
295
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
296

297
    private AdsStream() {
1✔
298
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
299
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
300
      call.start(this);
1✔
301
    }
1✔
302

303
    /**
304
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
305
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
306
     * client-initiated discovery requests, use {@link
307
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
308
     */
309
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
310
                              Collection<String> resources, String nonce,
311
                              @Nullable String errorDetail) {
312
      DiscoveryRequest.Builder builder =
313
          DiscoveryRequest.newBuilder()
1✔
314
              .setVersionInfo(versionInfo)
1✔
315
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
316
              .addAllResourceNames(resources)
1✔
317
              .setTypeUrl(type.typeUrl())
1✔
318
              .setResponseNonce(nonce);
1✔
319
      if (errorDetail != null) {
1✔
320
        com.google.rpc.Status error =
321
            com.google.rpc.Status.newBuilder()
1✔
322
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
323
                .setMessage(errorDetail)
1✔
324
                .build();
1✔
325
        builder.setErrorDetail(error);
1✔
326
      }
327
      DiscoveryRequest request = builder.build();
1✔
328
      call.sendMessage(request);
1✔
329
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
330
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
331
      }
332
    }
1✔
333

334
    /**
335
     * Sends a client-initiated discovery request.
336
     */
337
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
338
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
339
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
340
          respNonces.getOrDefault(type.typeUrl(), ""), null);
1✔
341
    }
1✔
342

343
    @Override
344
    public void onReady() {
345
      syncContext.execute(ControlPlaneClient.this::readyHandler);
1✔
346
    }
1✔
347

348
    @Override
349
    public void onRecvMessage(DiscoveryResponse response) {
350
      syncContext.execute(new Runnable() {
1✔
351
        @Override
352
        public void run() {
353
          // Reset flag as message has been received on a stream
354
          streamClosedNoResponse = false;
1✔
355
          respNonces.put(response.getTypeUrl(), response.getNonce());
1✔
356
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
357
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
358
            logger.log(
×
359
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
360
                messagePrinter.print(response));
×
361
          }
362
          if (type == null) {
1✔
363
            logger.log(
1✔
364
                XdsLogLevel.WARNING,
365
                "Ignore an unknown type of DiscoveryResponse: {0}",
366
                response.getTypeUrl());
1✔
367

368
            call.startRecvMessage();
1✔
369
            return;
1✔
370
          }
371
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
372
              response.getNonce());
1✔
373
        }
1✔
374
      });
375
    }
1✔
376

377
    @Override
378
    public void onStatusReceived(final Status status) {
379
      syncContext.execute(() -> {
1✔
380
        handleRpcStreamClosed(status);
1✔
381
      });
1✔
382
    }
1✔
383

384
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
385
                                 String nonce) {
386
      checkNotNull(type, "type");
1✔
387
      if (closed) {
1✔
388
        return;
×
389
      }
390
      responseReceived = true;
1✔
391
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
392
          () -> call.startRecvMessage(), syncContext);
1✔
393
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
394
          processingTracker);
395
      processingTracker.onComplete();
1✔
396
    }
1✔
397

398
    private void handleRpcStreamClosed(Status status) {
399
      if (closed) {
1✔
400
        return;
1✔
401
      }
402

403
      if (responseReceived || retryBackoffPolicy == null) {
1✔
404
        // Reset the backoff sequence if had received a response, or backoff sequence
405
        // has never been initialized.
406
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
407
      }
408
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
409
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
410
      // concurrently with the stopwatch and schedule.
411
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
412
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
413
      rpcRetryTimer = syncContext.schedule(
1✔
414
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
415

416
      Status newStatus = status;
1✔
417
      if (responseReceived) {
1✔
418
        // A closed ADS stream after a successful response is not considered an error. Servers may
419
        // close streams for various reasons during normal operation, such as load balancing or
420
        // underlying connection hitting its max connection age limit  (see gRFC A9).
421
        if (!status.isOk()) {
1✔
422
          newStatus = Status.OK;
1✔
423
          logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
1✔
424
              + "response was received, so this will not be treated as an error. Cause: {2}",
425
              status.getCode(), status.getDescription(), status.getCause());
1✔
426
        } else {
427
          logger.log(XdsLogLevel.DEBUG,
1✔
428
              "ADS stream closed by server after a response was received");
429
        }
430
      } else {
431
        streamClosedNoResponse = true;
1✔
432
        // If the ADS stream is closed without ever having received a response from the server, then
433
        // the XdsClient should consider that a connectivity error (see gRFC A57).
434
        if (status.isOk()) {
1✔
435
          newStatus = Status.UNAVAILABLE.withDescription(
1✔
436
              "ADS stream closed with OK before receiving a response");
437
        }
438
        logger.log(
1✔
439
            XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
440
            newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
1✔
441
      }
442

443
      closed = true;
1✔
444
      xdsResponseHandler.handleStreamClosed(newStatus);
1✔
445
      cleanUp();
1✔
446

447
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
448
    }
1✔
449

450
    private void close(Exception error) {
451
      if (closed) {
1✔
452
        return;
×
453
      }
454
      closed = true;
1✔
455
      cleanUp();
1✔
456
      call.sendError(error);
1✔
457
    }
1✔
458

459
    private void cleanUp() {
460
      if (adsStream == this) {
1✔
461
        adsStream = null;
1✔
462
      }
463
    }
1✔
464
  }
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc