• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19354

11 Jul 2024 06:45PM CUT coverage: 88.434% (-0.01%) from 88.448%
#19354

push

github

larry-safran
Bump version to 1.64.3-SNAPSHOT

32067 of 36261 relevant lines covered (88.43%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.76
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.Any;
27
import com.google.rpc.Code;
28
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
30
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
31
import io.grpc.InternalLogId;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.xds.client.Bootstrapper.ServerInfo;
38
import io.grpc.xds.client.EnvoyProtoData.Node;
39
import io.grpc.xds.client.XdsClient.ProcessingTracker;
40
import io.grpc.xds.client.XdsClient.ResourceStore;
41
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
42
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
43
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
44
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
45
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
46
import java.util.Collection;
47
import java.util.Collections;
48
import java.util.HashMap;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.TimeUnit;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
59
 * the xDS RPC stream.
60
 */
61
final class ControlPlaneClient {
62

63
  public static final String CLOSED_BY_SERVER = "Closed by server";
64
  private final SynchronizationContext syncContext;
65
  private final InternalLogId logId;
66
  private final XdsLogger logger;
67
  private final ServerInfo serverInfo;
68
  private final XdsTransport xdsTransport;
69
  private final XdsResponseHandler xdsResponseHandler;
70
  private final ResourceStore resourceStore;
71
  private final ScheduledExecutorService timeService;
72
  private final BackoffPolicy.Provider backoffPolicyProvider;
73
  private final Stopwatch stopwatch;
74
  private final Node bootstrapNode;
75
  private final XdsClient xdsClient;
76

77
  // Last successfully applied version_info for each resource type. Starts with empty string.
78
  // A version_info is used to update management server with client's most recent knowledge of
79
  // resources.
80
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
81

82
  private boolean shutdown;
83
  @Nullable
84
  private AdsStream adsStream;
85
  @Nullable
86
  private BackoffPolicy retryBackoffPolicy;
87
  @Nullable
88
  private ScheduledHandle rpcRetryTimer;
89
  private MessagePrettyPrinter messagePrinter;
90

91
  /** An entity that manages ADS RPCs over a single channel. */
92
  ControlPlaneClient(
93
      XdsTransport xdsTransport,
94
      ServerInfo serverInfo,
95
      Node bootstrapNode,
96
      XdsResponseHandler xdsResponseHandler,
97
      ResourceStore resourceStore,
98
      ScheduledExecutorService
99
      timeService,
100
      SynchronizationContext syncContext,
101
      BackoffPolicy.Provider backoffPolicyProvider,
102
      Supplier<Stopwatch> stopwatchSupplier,
103
      XdsClient xdsClient,
104
      MessagePrettyPrinter messagePrinter) {
1✔
105
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
106
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
107
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
108
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
109
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
110
    this.timeService = checkNotNull(timeService, "timeService");
1✔
111
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
112
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
113
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
114
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
115
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
116
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
117
    logger = XdsLogger.withLogId(logId);
1✔
118
    logger.log(XdsLogLevel.INFO, "Created");
1✔
119
  }
1✔
120

121
  void shutdown() {
122
    syncContext.execute(new Runnable() {
1✔
123
      @Override
124
      public void run() {
125
        shutdown = true;
1✔
126
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
127
        if (adsStream != null) {
1✔
128
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
129
        }
130
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
131
          rpcRetryTimer.cancel();
1✔
132
        }
133
        xdsTransport.shutdown();
1✔
134
      }
1✔
135
    });
136
  }
1✔
137

138
  @Override
139
  public String toString() {
140
    return logId.toString();
×
141
  }
142

143
  /**
144
   * Updates the resource subscription for the given resource type.
145
   */
146
  // Must be synchronized.
147
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
148
    if (isInBackoff()) {
1✔
149
      return;
1✔
150
    }
151
    if (adsStream == null) {
1✔
152
      startRpcStream();
1✔
153
    }
154
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
155
    if (resources != null) {
1✔
156
      adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
157
    }
158
  }
1✔
159

160
  /**
161
   * Accepts the update for the given resource type by updating the latest resource version
162
   * and sends an ACK request to the management server.
163
   */
164
  // Must be synchronized.
165
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
166
    versions.put(type, versionInfo);
1✔
167
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
168
        type.typeName(), nonce, versionInfo);
1✔
169
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
170
    if (resources == null) {
1✔
171
      resources = Collections.emptyList();
×
172
    }
173
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
174
  }
1✔
175

176
  /**
177
   * Rejects the update for the given resource type and sends an NACK request (request with last
178
   * accepted version) to the management server.
179
   */
180
  // Must be synchronized.
181
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
182
    String versionInfo = versions.getOrDefault(type, "");
1✔
183
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
184
        type.typeName(), nonce, versionInfo);
1✔
185
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
186
    if (resources == null) {
1✔
187
      resources = Collections.emptyList();
×
188
    }
189
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
190
  }
1✔
191

192
  /**
193
   * Returns {@code true} if the resource discovery is currently in backoff.
194
   */
195
  // Must be synchronized.
196
  boolean isInBackoff() {
197
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
198
  }
199

200
  // Must be synchronized.
201
  boolean isReady() {
202
    return adsStream != null && adsStream.call != null && adsStream.call.isReady();
1✔
203
  }
204

205
  /**
206
   * Starts a timer for each requested resource that hasn't been responded to and
207
   * has been waiting for the channel to get ready.
208
   */
209
  // Must be synchronized.
210
  void readyHandler() {
211
    if (!isReady()) {
1✔
212
      return;
×
213
    }
214

215
    if (isInBackoff()) {
1✔
216
      rpcRetryTimer.cancel();
×
217
      rpcRetryTimer = null;
×
218
    }
219

220
    xdsClient.startSubscriberTimersIfNeeded(serverInfo);
1✔
221
  }
1✔
222

223
  /**
224
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
225
   * xDS protocol communication.
226
   */
227
  // Must be synchronized.
228
  private void startRpcStream() {
229
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
230
    adsStream = new AdsStream();
1✔
231
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
232
    stopwatch.reset().start();
1✔
233
  }
1✔
234

235
  @VisibleForTesting
236
  public final class RpcRetryTask implements Runnable {
1✔
237
    @Override
238
    public void run() {
239
      if (shutdown) {
1✔
240
        return;
×
241
      }
242
      startRpcStream();
1✔
243
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
244
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
245
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
246
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
247
        if (resources != null) {
1✔
248
          adsStream.sendDiscoveryRequest(type, resources);
1✔
249
        }
250
      }
1✔
251
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
252
    }
1✔
253
  }
254

255
  @VisibleForTesting
256
  @Nullable
257
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
258
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
259
  }
260

261
  private class AdsStream implements EventHandler<DiscoveryResponse> {
262
    private boolean responseReceived;
263
    private boolean closed;
264
    // Response nonce for the most recently received discovery responses of each resource type.
265
    // Client initiated requests start response nonce with empty string.
266
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
267
    // used for management server to identify which response the client is ACKing/NACking.
268
    // To avoid confusion, client-initiated requests will always use the nonce in
269
    // most recently received responses of each resource type.
270
    private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
1✔
271
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
272
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
273
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
274

275
    private AdsStream() {
1✔
276
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
277
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
278
      call.start(this);
1✔
279
    }
1✔
280

281
    /**
282
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
283
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
284
     * client-initiated discovery requests, use {@link
285
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
286
     */
287
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
288
                              Collection<String> resources, String nonce,
289
                              @Nullable String errorDetail) {
290
      DiscoveryRequest.Builder builder =
291
          DiscoveryRequest.newBuilder()
1✔
292
              .setVersionInfo(versionInfo)
1✔
293
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
294
              .addAllResourceNames(resources)
1✔
295
              .setTypeUrl(type.typeUrl())
1✔
296
              .setResponseNonce(nonce);
1✔
297
      if (errorDetail != null) {
1✔
298
        com.google.rpc.Status error =
299
            com.google.rpc.Status.newBuilder()
1✔
300
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
301
                .setMessage(errorDetail)
1✔
302
                .build();
1✔
303
        builder.setErrorDetail(error);
1✔
304
      }
305
      DiscoveryRequest request = builder.build();
1✔
306
      call.sendMessage(request);
1✔
307
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
308
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
309
      }
310
    }
1✔
311

312
    /**
313
     * Sends a client-initiated discovery request.
314
     */
315
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
316
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
317
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
318
          respNonces.getOrDefault(type, ""), null);
1✔
319
    }
1✔
320

321
    @Override
322
    public void onReady() {
323
      syncContext.execute(ControlPlaneClient.this::readyHandler);
1✔
324
    }
1✔
325

326
    @Override
327
    public void onRecvMessage(DiscoveryResponse response) {
328
      syncContext.execute(new Runnable() {
1✔
329
        @Override
330
        public void run() {
331
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
332
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
333
            logger.log(
×
334
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
335
                messagePrinter.print(response));
×
336
          }
337
          if (type == null) {
1✔
338
            logger.log(
1✔
339
                XdsLogLevel.WARNING,
340
                "Ignore an unknown type of DiscoveryResponse: {0}",
341
                response.getTypeUrl());
1✔
342

343
            call.startRecvMessage();
1✔
344
            return;
1✔
345
          }
346
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
347
              response.getNonce());
1✔
348
        }
1✔
349
      });
350
    }
1✔
351

352
    @Override
353
    public void onStatusReceived(final Status status) {
354
      syncContext.execute(() -> {
1✔
355
        if (status.isOk()) {
1✔
356
          handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
×
357
        } else {
358
          handleRpcStreamClosed(status);
1✔
359
        }
360
      });
1✔
361
    }
1✔
362

363
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
364
                                 String nonce) {
365
      checkNotNull(type, "type");
1✔
366
      if (closed) {
1✔
367
        return;
×
368
      }
369
      responseReceived = true;
1✔
370
      respNonces.put(type, nonce);
1✔
371
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
372
          () -> call.startRecvMessage(), syncContext);
1✔
373
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
374
          processingTracker);
375
      processingTracker.onComplete();
1✔
376
    }
1✔
377

378
    private void handleRpcStreamClosed(Status error) {
379
      if (closed) {
1✔
380
        return;
1✔
381
      }
382

383
      if (responseReceived || retryBackoffPolicy == null) {
1✔
384
        // Reset the backoff sequence if had received a response, or backoff sequence
385
        // has never been initialized.
386
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
387
      }
388
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
389
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
390
      // concurrently with the stopwatch and schedule.
391
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
392
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
393
      rpcRetryTimer = syncContext.schedule(
1✔
394
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
395

396
      checkArgument(!error.isOk(), "unexpected OK status");
1✔
397
      String errorMsg = error.getDescription() != null
1✔
398
          && error.getDescription().equals(CLOSED_BY_SERVER)
1✔
399
              ? "ADS stream closed with status {0}: {1}. Cause: {2}"
×
400
              : "ADS stream failed with status {0}: {1}. Cause: {2}";
1✔
401
      logger.log(
1✔
402
          XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
1✔
403
      closed = true;
1✔
404
      xdsResponseHandler.handleStreamClosed(error);
1✔
405
      cleanUp();
1✔
406

407
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
408
    }
1✔
409

410
    private void close(Exception error) {
411
      if (closed) {
1✔
412
        return;
×
413
      }
414
      closed = true;
1✔
415
      cleanUp();
1✔
416
      call.sendError(error);
1✔
417
    }
1✔
418

419
    private void cleanUp() {
420
      if (adsStream == this) {
1✔
421
        adsStream = null;
1✔
422
      }
423
    }
1✔
424
  }
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc