• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19209

08 May 2024 05:24PM UTC coverage: 88.386% (-0.005%) from 88.391%
#19209

push

github

ejona86
api: Hide internal metric APIs

Some APIs were marked experimental but had internal APIs in their
surface. These were all changed to internal. And then the internal APIs
were mostly hidden from generated documentation.

All these APIs will eventually become public and maybe even stable. But
they need some iteration before we're ready for others to start using
them.

31582 of 35732 relevant lines covered (88.39%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.76
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.Any;
27
import com.google.rpc.Code;
28
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
30
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
31
import io.grpc.InternalLogId;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.xds.client.Bootstrapper.ServerInfo;
38
import io.grpc.xds.client.EnvoyProtoData.Node;
39
import io.grpc.xds.client.XdsClient.ProcessingTracker;
40
import io.grpc.xds.client.XdsClient.ResourceStore;
41
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
42
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
43
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
44
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
45
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
46
import java.util.Collection;
47
import java.util.Collections;
48
import java.util.HashMap;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.TimeUnit;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
59
 * the xDS RPC stream.
60
 */
61
final class ControlPlaneClient {
62

63
  public static final String CLOSED_BY_SERVER = "Closed by server";
64
  private final SynchronizationContext syncContext;
65
  private final InternalLogId logId;
66
  private final XdsLogger logger;
67
  private final ServerInfo serverInfo;
68
  private final XdsTransport xdsTransport;
69
  private final XdsResponseHandler xdsResponseHandler;
70
  private final ResourceStore resourceStore;
71
  private final ScheduledExecutorService timeService;
72
  private final BackoffPolicy.Provider backoffPolicyProvider;
73
  private final Stopwatch stopwatch;
74
  private final Node bootstrapNode;
75
  private final XdsClient xdsClient;
76

77
  // Last successfully applied version_info for each resource type. Starts with empty string.
78
  // A version_info is used to update management server with client's most recent knowledge of
79
  // resources.
80
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
81

82
  private boolean shutdown;
83
  @Nullable
84
  private AdsStream adsStream;
85
  @Nullable
86
  private BackoffPolicy retryBackoffPolicy;
87
  @Nullable
88
  private ScheduledHandle rpcRetryTimer;
89
  private MessagePrettyPrinter messagePrinter;
90

91
  /** An entity that manages ADS RPCs over a single channel. */
92
  ControlPlaneClient(
93
      XdsTransport xdsTransport,
94
      ServerInfo serverInfo,
95
      Node bootstrapNode,
96
      XdsResponseHandler xdsResponseHandler,
97
      ResourceStore resourceStore,
98
      ScheduledExecutorService
99
      timeService,
100
      SynchronizationContext syncContext,
101
      BackoffPolicy.Provider backoffPolicyProvider,
102
      Supplier<Stopwatch> stopwatchSupplier,
103
      XdsClient xdsClient,
104
      MessagePrettyPrinter messagePrinter) {
1✔
105
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
106
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
107
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
108
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
109
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
110
    this.timeService = checkNotNull(timeService, "timeService");
1✔
111
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
112
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
113
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
114
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
115
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
116
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
117
    logger = XdsLogger.withLogId(logId);
1✔
118
    logger.log(XdsLogLevel.INFO, "Created");
1✔
119
  }
1✔
120

121
  void shutdown() {
122
    syncContext.execute(new Runnable() {
1✔
123
      @Override
124
      public void run() {
125
        shutdown = true;
1✔
126
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
127
        if (adsStream != null) {
1✔
128
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
129
        }
130
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
131
          rpcRetryTimer.cancel();
1✔
132
        }
133
        xdsTransport.shutdown();
1✔
134
      }
1✔
135
    });
136
  }
1✔
137

138
  @Override
139
  public String toString() {
140
    return logId.toString();
×
141
  }
142

143
  /**
144
   * Updates the resource subscription for the given resource type.
145
   */
146
  // Must be synchronized.
147
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
148
    if (isInBackoff()) {
1✔
149
      return;
1✔
150
    }
151
    if (adsStream == null) {
1✔
152
      startRpcStream();
1✔
153
    }
154
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
155
    if (resources != null) {
1✔
156
      adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
157
    }
158
  }
1✔
159

160
  /**
161
   * Accepts the update for the given resource type by updating the latest resource version
162
   * and sends an ACK request to the management server.
163
   */
164
  // Must be synchronized.
165
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
166
    versions.put(type, versionInfo);
1✔
167
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
168
        type.typeName(), nonce, versionInfo);
1✔
169
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
170
    if (resources == null) {
1✔
171
      resources = Collections.emptyList();
×
172
    }
173
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
174
  }
1✔
175

176
  /**
177
   * Rejects the update for the given resource type and sends an NACK request (request with last
178
   * accepted version) to the management server.
179
   */
180
  // Must be synchronized.
181
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
182
    String versionInfo = versions.getOrDefault(type, "");
1✔
183
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
184
        type.typeName(), nonce, versionInfo);
1✔
185
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
186
    if (resources == null) {
1✔
187
      resources = Collections.emptyList();
×
188
    }
189
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
190
  }
1✔
191

192
  /**
193
   * Returns {@code true} if the resource discovery is currently in backoff.
194
   */
195
  // Must be synchronized.
196
  boolean isInBackoff() {
197
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
198
  }
199

200
  // Must be synchronized.
201
  boolean isReady() {
202
    return adsStream != null && adsStream.call != null && adsStream.call.isReady();
1✔
203
  }
204

205
  /**
206
   * Starts a timer for each requested resource that hasn't been responded to and
207
   * has been waiting for the channel to get ready.
208
   */
209
  // Must be synchronized.
210
  void readyHandler() {
211
    if (!isReady()) {
1✔
212
      return;
×
213
    }
214

215
    if (isInBackoff()) {
1✔
216
      rpcRetryTimer.cancel();
×
217
      rpcRetryTimer = null;
×
218
    }
219

220
    xdsClient.startSubscriberTimersIfNeeded(serverInfo);
1✔
221
  }
1✔
222

223
  /**
224
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
225
   * xDS protocol communication.
226
   */
227
  // Must be synchronized.
228
  private void startRpcStream() {
229
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
230
    adsStream = new AdsStream();
1✔
231
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
232
    stopwatch.reset().start();
1✔
233
  }
1✔
234

235
  @VisibleForTesting
236
  public final class RpcRetryTask implements Runnable {
1✔
237
    @Override
238
    public void run() {
239
      if (shutdown) {
1✔
240
        return;
×
241
      }
242
      startRpcStream();
1✔
243
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
244
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
245
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
246
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
247
        if (resources != null) {
1✔
248
          adsStream.sendDiscoveryRequest(type, resources);
1✔
249
        }
250
      }
1✔
251
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
252
    }
1✔
253
  }
254

255
  @VisibleForTesting
256
  @Nullable
257
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
258
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
259
  }
260

261
  private class AdsStream implements EventHandler<DiscoveryResponse> {
262
    private boolean responseReceived;
263
    private boolean closed;
264
    // Response nonce for the most recently received discovery responses of each resource type.
265
    // Client initiated requests start response nonce with empty string.
266
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
267
    // used for management server to identify which response the client is ACKing/NACking.
268
    // To avoid confusion, client-initiated requests will always use the nonce in
269
    // most recently received responses of each resource type.
270
    private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
1✔
271
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
272
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
273
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
274

275
    private AdsStream() {
1✔
276
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
277
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
278
      call.start(this);
1✔
279
    }
1✔
280

281
    /**
282
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
283
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
284
     * client-initiated discovery requests, use {@link
285
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
286
     */
287
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
288
                              Collection<String> resources, String nonce,
289
                              @Nullable String errorDetail) {
290
      DiscoveryRequest.Builder builder =
291
          DiscoveryRequest.newBuilder()
1✔
292
              .setVersionInfo(versionInfo)
1✔
293
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
294
              .addAllResourceNames(resources)
1✔
295
              .setTypeUrl(type.typeUrl())
1✔
296
              .setResponseNonce(nonce);
1✔
297
      if (errorDetail != null) {
1✔
298
        com.google.rpc.Status error =
299
            com.google.rpc.Status.newBuilder()
1✔
300
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
301
                .setMessage(errorDetail)
1✔
302
                .build();
1✔
303
        builder.setErrorDetail(error);
1✔
304
      }
305
      DiscoveryRequest request = builder.build();
1✔
306
      call.sendMessage(request);
1✔
307
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
308
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
309
      }
310
    }
1✔
311

312
    /**
313
     * Sends a client-initiated discovery request.
314
     */
315
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
316
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
317
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
318
          respNonces.getOrDefault(type, ""), null);
1✔
319
    }
1✔
320

321
    @Override
322
    public void onReady() {
323
      syncContext.execute(ControlPlaneClient.this::readyHandler);
1✔
324
    }
1✔
325

326
    @Override
327
    public void onRecvMessage(DiscoveryResponse response) {
328
      syncContext.execute(new Runnable() {
1✔
329
        @Override
330
        public void run() {
331
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
332
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
333
            logger.log(
×
334
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
335
                messagePrinter.print(response));
×
336
          }
337
          if (type == null) {
1✔
338
            logger.log(
1✔
339
                XdsLogLevel.WARNING,
340
                "Ignore an unknown type of DiscoveryResponse: {0}",
341
                response.getTypeUrl());
1✔
342

343
            call.startRecvMessage();
1✔
344
            return;
1✔
345
          }
346
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
347
              response.getNonce());
1✔
348
        }
1✔
349
      });
350
    }
1✔
351

352
    @Override
353
    public void onStatusReceived(final Status status) {
354
      syncContext.execute(() -> {
1✔
355
        if (status.isOk()) {
1✔
356
          handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
×
357
        } else {
358
          handleRpcStreamClosed(status);
1✔
359
        }
360
      });
1✔
361
    }
1✔
362

363
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
364
                                 String nonce) {
365
      checkNotNull(type, "type");
1✔
366
      if (closed) {
1✔
367
        return;
×
368
      }
369
      responseReceived = true;
1✔
370
      respNonces.put(type, nonce);
1✔
371
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
372
          () -> call.startRecvMessage(), syncContext);
1✔
373
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
374
          processingTracker);
375
      processingTracker.onComplete();
1✔
376
    }
1✔
377

378
    private void handleRpcStreamClosed(Status error) {
379
      if (closed) {
1✔
380
        return;
1✔
381
      }
382

383
      if (responseReceived || retryBackoffPolicy == null) {
1✔
384
        // Reset the backoff sequence if had received a response, or backoff sequence
385
        // has never been initialized.
386
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
387
      }
388
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
389
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
390
      // concurrently with the stopwatch and schedule.
391
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
392
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
393
      rpcRetryTimer = syncContext.schedule(
1✔
394
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
395

396
      checkArgument(!error.isOk(), "unexpected OK status");
1✔
397
      String errorMsg = error.getDescription() != null
1✔
398
          && error.getDescription().equals(CLOSED_BY_SERVER)
1✔
399
              ? "ADS stream closed with status {0}: {1}. Cause: {2}"
×
400
              : "ADS stream failed with status {0}: {1}. Cause: {2}";
1✔
401
      logger.log(
1✔
402
          XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
1✔
403
      closed = true;
1✔
404
      xdsResponseHandler.handleStreamClosed(error);
1✔
405
      cleanUp();
1✔
406

407
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
408
    }
1✔
409

410
    private void close(Exception error) {
411
      if (closed) {
1✔
412
        return;
×
413
      }
414
      closed = true;
1✔
415
      cleanUp();
1✔
416
      call.sendError(error);
1✔
417
    }
1✔
418

419
    private void cleanUp() {
420
      if (adsStream == this) {
1✔
421
        adsStream = null;
1✔
422
      }
423
    }
1✔
424
  }
425
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc