• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19629

07 Jan 2025 10:25PM CUT coverage: 84.602%. Remained the same
#19629

push

github

ejona86
xds: Remember nonces for unknown types

If the control plane sends a resource type the client doesn't understand
at-the-moment, the control plane will still expect the client to include
the nonce if the client subscribes to the type in the future.

This most easily happens when unsubscribing the last resource of a type.
Which meant 1cf1927d1 was insufficient.

33844 of 40004 relevant lines covered (84.6%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.14
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Stopwatch;
24
import com.google.common.base.Supplier;
25
import com.google.protobuf.Any;
26
import com.google.rpc.Code;
27
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
28
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
30
import io.grpc.InternalLogId;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.SynchronizationContext.ScheduledHandle;
35
import io.grpc.internal.BackoffPolicy;
36
import io.grpc.xds.client.Bootstrapper.ServerInfo;
37
import io.grpc.xds.client.EnvoyProtoData.Node;
38
import io.grpc.xds.client.XdsClient.ProcessingTracker;
39
import io.grpc.xds.client.XdsClient.ResourceStore;
40
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
43
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
44
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
45
import java.util.Collection;
46
import java.util.Collections;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Set;
52
import java.util.concurrent.ScheduledExecutorService;
53
import java.util.concurrent.TimeUnit;
54
import javax.annotation.Nullable;
55

56
/**
57
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
58
 * the xDS RPC stream.
59
 */
60
final class ControlPlaneClient {
61

62
  private final SynchronizationContext syncContext;
63
  private final InternalLogId logId;
64
  private final XdsLogger logger;
65
  private final ServerInfo serverInfo;
66
  private final XdsTransport xdsTransport;
67
  private final XdsResponseHandler xdsResponseHandler;
68
  private final ResourceStore resourceStore;
69
  private final ScheduledExecutorService timeService;
70
  private final BackoffPolicy.Provider backoffPolicyProvider;
71
  private final Stopwatch stopwatch;
72
  private final Node bootstrapNode;
73
  private final XdsClient xdsClient;
74

75
  // Last successfully applied version_info for each resource type. Starts with empty string.
76
  // A version_info is used to update management server with client's most recent knowledge of
77
  // resources.
78
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
79

80
  private boolean shutdown;
81
  @Nullable
82
  private AdsStream adsStream;
83
  @Nullable
84
  private BackoffPolicy retryBackoffPolicy;
85
  @Nullable
86
  private ScheduledHandle rpcRetryTimer;
87
  private MessagePrettyPrinter messagePrinter;
88

89
  /** An entity that manages ADS RPCs over a single channel. */
90
  ControlPlaneClient(
91
      XdsTransport xdsTransport,
92
      ServerInfo serverInfo,
93
      Node bootstrapNode,
94
      XdsResponseHandler xdsResponseHandler,
95
      ResourceStore resourceStore,
96
      ScheduledExecutorService
97
      timeService,
98
      SynchronizationContext syncContext,
99
      BackoffPolicy.Provider backoffPolicyProvider,
100
      Supplier<Stopwatch> stopwatchSupplier,
101
      XdsClient xdsClient,
102
      MessagePrettyPrinter messagePrinter) {
1✔
103
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
104
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
105
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
106
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
107
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
108
    this.timeService = checkNotNull(timeService, "timeService");
1✔
109
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
110
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
111
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
112
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
113
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
114
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
115
    logger = XdsLogger.withLogId(logId);
1✔
116
    logger.log(XdsLogLevel.INFO, "Created");
1✔
117
  }
1✔
118

119
  void shutdown() {
120
    syncContext.execute(new Runnable() {
1✔
121
      @Override
122
      public void run() {
123
        shutdown = true;
1✔
124
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
125
        if (adsStream != null) {
1✔
126
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
127
        }
128
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
129
          rpcRetryTimer.cancel();
1✔
130
        }
131
        xdsTransport.shutdown();
1✔
132
      }
1✔
133
    });
134
  }
1✔
135

136
  @Override
137
  public String toString() {
138
    return logId.toString();
×
139
  }
140

141
  /**
142
   * Updates the resource subscription for the given resource type.
143
   */
144
  // Must be synchronized.
145
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
146
    if (isInBackoff()) {
1✔
147
      return;
1✔
148
    }
149
    if (adsStream == null) {
1✔
150
      startRpcStream();
1✔
151
    }
152
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
153
    if (resources == null) {
1✔
154
      resources = Collections.emptyList();
1✔
155
    }
156
    adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
157
    if (resources.isEmpty()) {
1✔
158
      // The resource type no longer has subscribing resources; clean up references to it, except
159
      // for nonces. If the resource type becomes used again the control plane can ignore requests
160
      // for old/missing nonces. Old type's nonces are dropped when the ADS stream is restarted.
161
      versions.remove(resourceType);
1✔
162
    }
163
  }
1✔
164

165
  /**
166
   * Accepts the update for the given resource type by updating the latest resource version
167
   * and sends an ACK request to the management server.
168
   */
169
  // Must be synchronized.
170
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
171
    versions.put(type, versionInfo);
1✔
172
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
173
        type.typeName(), nonce, versionInfo);
1✔
174
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
175
    if (resources == null) {
1✔
176
      resources = Collections.emptyList();
×
177
    }
178
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
179
  }
1✔
180

181
  /**
182
   * Rejects the update for the given resource type and sends an NACK request (request with last
183
   * accepted version) to the management server.
184
   */
185
  // Must be synchronized.
186
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
187
    String versionInfo = versions.getOrDefault(type, "");
1✔
188
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
189
        type.typeName(), nonce, versionInfo);
1✔
190
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
191
    if (resources == null) {
1✔
192
      resources = Collections.emptyList();
×
193
    }
194
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
195
  }
1✔
196

197
  /**
198
   * Returns {@code true} if the resource discovery is currently in backoff.
199
   */
200
  // Must be synchronized.
201
  boolean isInBackoff() {
202
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
203
  }
204

205
  // Must be synchronized.
206
  boolean isReady() {
207
    return adsStream != null && adsStream.call != null && adsStream.call.isReady();
1✔
208
  }
209

210
  /**
211
   * Starts a timer for each requested resource that hasn't been responded to and
212
   * has been waiting for the channel to get ready.
213
   */
214
  // Must be synchronized.
215
  void readyHandler() {
216
    if (!isReady()) {
1✔
217
      return;
×
218
    }
219

220
    if (isInBackoff()) {
1✔
221
      rpcRetryTimer.cancel();
×
222
      rpcRetryTimer = null;
×
223
    }
224

225
    xdsClient.startSubscriberTimersIfNeeded(serverInfo);
1✔
226
  }
1✔
227

228
  /**
229
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
230
   * xDS protocol communication.
231
   */
232
  // Must be synchronized.
233
  private void startRpcStream() {
234
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
235
    adsStream = new AdsStream();
1✔
236
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
237
    stopwatch.reset().start();
1✔
238
  }
1✔
239

240
  @VisibleForTesting
241
  public final class RpcRetryTask implements Runnable {
1✔
242
    @Override
243
    public void run() {
244
      if (shutdown) {
1✔
245
        return;
×
246
      }
247
      startRpcStream();
1✔
248
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
249
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
250
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
251
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
252
        if (resources != null) {
1✔
253
          adsStream.sendDiscoveryRequest(type, resources);
1✔
254
        }
255
      }
1✔
256
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
257
    }
1✔
258
  }
259

260
  @VisibleForTesting
261
  @Nullable
262
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
263
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
264
  }
265

266
  private class AdsStream implements EventHandler<DiscoveryResponse> {
267
    private boolean responseReceived;
268
    private boolean closed;
269
    // Response nonce for the most recently received discovery responses of each resource type URL.
270
    // Client initiated requests start response nonce with empty string.
271
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
272
    // used for management server to identify which response the client is ACKing/NACking.
273
    // To avoid confusion, client-initiated requests will always use the nonce in
274
    // most recently received responses of each resource type. Nonces are never deleted from the
275
    // map; nonces are only discarded once the stream closes because xds_protocol says "the
276
    // management server should not send a DiscoveryResponse for any DiscoveryRequest that has a
277
    // stale nonce."
278
    private final Map<String, String> respNonces = new HashMap<>();
1✔
279
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
280
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
281
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
282

283
    private AdsStream() {
1✔
284
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
285
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
286
      call.start(this);
1✔
287
    }
1✔
288

289
    /**
290
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
291
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
292
     * client-initiated discovery requests, use {@link
293
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
294
     */
295
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
296
                              Collection<String> resources, String nonce,
297
                              @Nullable String errorDetail) {
298
      DiscoveryRequest.Builder builder =
299
          DiscoveryRequest.newBuilder()
1✔
300
              .setVersionInfo(versionInfo)
1✔
301
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
302
              .addAllResourceNames(resources)
1✔
303
              .setTypeUrl(type.typeUrl())
1✔
304
              .setResponseNonce(nonce);
1✔
305
      if (errorDetail != null) {
1✔
306
        com.google.rpc.Status error =
307
            com.google.rpc.Status.newBuilder()
1✔
308
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
309
                .setMessage(errorDetail)
1✔
310
                .build();
1✔
311
        builder.setErrorDetail(error);
1✔
312
      }
313
      DiscoveryRequest request = builder.build();
1✔
314
      call.sendMessage(request);
1✔
315
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
316
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
317
      }
318
    }
1✔
319

320
    /**
321
     * Sends a client-initiated discovery request.
322
     */
323
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
324
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
325
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
326
          respNonces.getOrDefault(type.typeUrl(), ""), null);
1✔
327
    }
1✔
328

329
    @Override
330
    public void onReady() {
331
      syncContext.execute(ControlPlaneClient.this::readyHandler);
1✔
332
    }
1✔
333

334
    @Override
335
    public void onRecvMessage(DiscoveryResponse response) {
336
      syncContext.execute(new Runnable() {
1✔
337
        @Override
338
        public void run() {
339
          respNonces.put(response.getTypeUrl(), response.getNonce());
1✔
340
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
341
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
342
            logger.log(
×
343
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
344
                messagePrinter.print(response));
×
345
          }
346
          if (type == null) {
1✔
347
            logger.log(
1✔
348
                XdsLogLevel.WARNING,
349
                "Ignore an unknown type of DiscoveryResponse: {0}",
350
                response.getTypeUrl());
1✔
351

352
            call.startRecvMessage();
1✔
353
            return;
1✔
354
          }
355
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
356
              response.getNonce());
1✔
357
        }
1✔
358
      });
359
    }
1✔
360

361
    @Override
362
    public void onStatusReceived(final Status status) {
363
      syncContext.execute(() -> {
1✔
364
        handleRpcStreamClosed(status);
1✔
365
      });
1✔
366
    }
1✔
367

368
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
369
                                 String nonce) {
370
      checkNotNull(type, "type");
1✔
371
      if (closed) {
1✔
372
        return;
×
373
      }
374
      responseReceived = true;
1✔
375
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
376
          () -> call.startRecvMessage(), syncContext);
1✔
377
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
378
          processingTracker);
379
      processingTracker.onComplete();
1✔
380
    }
1✔
381

382
    private void handleRpcStreamClosed(Status status) {
383
      if (closed) {
1✔
384
        return;
1✔
385
      }
386

387
      if (responseReceived || retryBackoffPolicy == null) {
1✔
388
        // Reset the backoff sequence if had received a response, or backoff sequence
389
        // has never been initialized.
390
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
391
      }
392
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
393
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
394
      // concurrently with the stopwatch and schedule.
395
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
396
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
397
      rpcRetryTimer = syncContext.schedule(
1✔
398
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
399

400
      Status newStatus = status;
1✔
401
      if (responseReceived) {
1✔
402
        // A closed ADS stream after a successful response is not considered an error. Servers may
403
        // close streams for various reasons during normal operation, such as load balancing or
404
        // underlying connection hitting its max connection age limit  (see gRFC A9).
405
        if (!status.isOk()) {
1✔
406
          newStatus = Status.OK;
1✔
407
          logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
1✔
408
              + "response was received, so this will not be treated as an error. Cause: {2}",
409
              status.getCode(), status.getDescription(), status.getCause());
1✔
410
        } else {
411
          logger.log(XdsLogLevel.DEBUG,
1✔
412
              "ADS stream closed by server after a response was received");
413
        }
414
      } else {
415
        // If the ADS stream is closed without ever having received a response from the server, then
416
        // the XdsClient should consider that a connectivity error (see gRFC A57).
417
        if (status.isOk()) {
1✔
418
          newStatus = Status.UNAVAILABLE.withDescription(
1✔
419
              "ADS stream closed with OK before receiving a response");
420
        }
421
        logger.log(
1✔
422
            XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
423
            newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
1✔
424
      }
425

426
      closed = true;
1✔
427
      xdsResponseHandler.handleStreamClosed(newStatus);
1✔
428
      cleanUp();
1✔
429

430
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
431
    }
1✔
432

433
    private void close(Exception error) {
434
      if (closed) {
1✔
435
        return;
×
436
      }
437
      closed = true;
1✔
438
      cleanUp();
1✔
439
      call.sendError(error);
1✔
440
    }
1✔
441

442
    private void cleanUp() {
443
      if (adsStream == this) {
1✔
444
        adsStream = null;
1✔
445
      }
446
    }
1✔
447
  }
448
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc