• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19563

26 Nov 2024 12:47AM UTC coverage: 88.559% (-0.02%) from 88.582%
#19563

push

github

web-flow
xds: Add counter and gauge metrics  (#11661)

Adds the following xDS client metrics defined in [A78](https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient).

Counters
- grpc.xds_client.server_failure
- grpc.xds_client.resource_updates_valid
- grpc.xds_client.resource_updates_invalid

Gauges
- grpc.xds_client.connected
- grpc.xds_client.resources

33368 of 37679 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.3
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Stopwatch;
24
import com.google.common.base.Supplier;
25
import com.google.protobuf.Any;
26
import com.google.rpc.Code;
27
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
28
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
30
import io.grpc.InternalLogId;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.SynchronizationContext.ScheduledHandle;
35
import io.grpc.internal.BackoffPolicy;
36
import io.grpc.xds.client.Bootstrapper.ServerInfo;
37
import io.grpc.xds.client.EnvoyProtoData.Node;
38
import io.grpc.xds.client.XdsClient.ProcessingTracker;
39
import io.grpc.xds.client.XdsClient.ResourceStore;
40
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
43
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
44
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
45
import java.util.Collection;
46
import java.util.Collections;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Set;
52
import java.util.concurrent.ScheduledExecutorService;
53
import java.util.concurrent.TimeUnit;
54
import javax.annotation.Nullable;
55

56
/**
57
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
58
 * the xDS RPC stream.
59
 */
60
final class ControlPlaneClient {
61

62
  private final SynchronizationContext syncContext;
63
  private final InternalLogId logId;
64
  private final XdsLogger logger;
65
  private final ServerInfo serverInfo;
66
  private final XdsTransport xdsTransport;
67
  private final XdsResponseHandler xdsResponseHandler;
68
  private final ResourceStore resourceStore;
69
  private final ScheduledExecutorService timeService;
70
  private final BackoffPolicy.Provider backoffPolicyProvider;
71
  private final Stopwatch stopwatch;
72
  private final Node bootstrapNode;
73
  private final XdsClient xdsClient;
74

75
  // Last successfully applied version_info for each resource type. Starts with empty string.
76
  // A version_info is used to update management server with client's most recent knowledge of
77
  // resources.
78
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
79

80
  private boolean shutdown;
81
  private boolean streamClosedNoResponse;
82
  @Nullable
83
  private AdsStream adsStream;
84
  @Nullable
85
  private BackoffPolicy retryBackoffPolicy;
86
  @Nullable
87
  private ScheduledHandle rpcRetryTimer;
88
  private MessagePrettyPrinter messagePrinter;
89

90
  /** An entity that manages ADS RPCs over a single channel. */
91
  ControlPlaneClient(
92
      XdsTransport xdsTransport,
93
      ServerInfo serverInfo,
94
      Node bootstrapNode,
95
      XdsResponseHandler xdsResponseHandler,
96
      ResourceStore resourceStore,
97
      ScheduledExecutorService
98
      timeService,
99
      SynchronizationContext syncContext,
100
      BackoffPolicy.Provider backoffPolicyProvider,
101
      Supplier<Stopwatch> stopwatchSupplier,
102
      XdsClient xdsClient,
103
      MessagePrettyPrinter messagePrinter) {
1✔
104
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
105
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
106
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
107
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
108
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
109
    this.timeService = checkNotNull(timeService, "timeService");
1✔
110
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
111
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
112
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
113
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
114
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
115
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
116
    logger = XdsLogger.withLogId(logId);
1✔
117
    logger.log(XdsLogLevel.INFO, "Created");
1✔
118
  }
1✔
119

120
  void shutdown() {
121
    syncContext.execute(new Runnable() {
1✔
122
      @Override
123
      public void run() {
124
        shutdown = true;
1✔
125
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
126
        if (adsStream != null) {
1✔
127
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
128
        }
129
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
130
          rpcRetryTimer.cancel();
1✔
131
        }
132
        xdsTransport.shutdown();
1✔
133
      }
1✔
134
    });
135
  }
1✔
136

137
  @Override
138
  public String toString() {
139
    return logId.toString();
×
140
  }
141

142
  /**
143
   * Updates the resource subscription for the given resource type.
144
   */
145
  // Must be synchronized.
146
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
147
    if (isInBackoff()) {
1✔
148
      return;
1✔
149
    }
150
    if (adsStream == null) {
1✔
151
      startRpcStream();
1✔
152
    }
153
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
154
    if (resources == null) {
1✔
155
      resources = Collections.emptyList();
1✔
156
    }
157
    adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
158
    if (resources.isEmpty()) {
1✔
159
      // The resource type no longer has subscribing resources; clean up references to it
160
      versions.remove(resourceType);
1✔
161
      adsStream.respNonces.remove(resourceType);
1✔
162
    }
163
  }
1✔
164

165
  /**
166
   * Accepts the update for the given resource type by updating the latest resource version
167
   * and sends an ACK request to the management server.
168
   */
169
  // Must be synchronized.
170
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
171
    versions.put(type, versionInfo);
1✔
172
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
173
        type.typeName(), nonce, versionInfo);
1✔
174
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
175
    if (resources == null) {
1✔
176
      resources = Collections.emptyList();
×
177
    }
178
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
179
  }
1✔
180

181
  /**
182
   * Rejects the update for the given resource type and sends an NACK request (request with last
183
   * accepted version) to the management server.
184
   */
185
  // Must be synchronized.
186
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
187
    String versionInfo = versions.getOrDefault(type, "");
1✔
188
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
189
        type.typeName(), nonce, versionInfo);
1✔
190
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
191
    if (resources == null) {
1✔
192
      resources = Collections.emptyList();
×
193
    }
194
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
195
  }
1✔
196

197
  /**
198
   * Returns {@code true} if the resource discovery is currently in backoff.
199
   */
200
  // Must be synchronized.
201
  boolean isInBackoff() {
202
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
203
  }
204

205
  // Must be synchronized.
206
  boolean isReady() {
207
    return adsStream != null && adsStream.call != null && adsStream.call.isReady();
1✔
208
  }
209

210
  /**
211
   * Starts a timer for each requested resource that hasn't been responded to and
212
   * has been waiting for the channel to get ready.
213
   */
214
  // Must be synchronized.
215
  void readyHandler() {
216
    if (!isReady()) {
1✔
217
      return;
×
218
    }
219

220
    if (isInBackoff()) {
1✔
221
      rpcRetryTimer.cancel();
×
222
      rpcRetryTimer = null;
×
223
    }
224

225
    xdsClient.startSubscriberTimersIfNeeded(serverInfo);
1✔
226
  }
1✔
227

228
  /**
229
   * Indicates whether there is an active ADS stream.
230
   *
231
   * <p>Return {@code true} when the {@code AdsStream} is created.
232
   * {@code false} when the ADS stream fails without a response. Resets to true
233
   * upon receiving the first response on a new ADS stream.
234
   */
235
  // Must be synchronized
236
  boolean hasWorkingAdsStream() {
237
    return !streamClosedNoResponse;
1✔
238
  }
239

240

241
  /**
242
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
243
   * xDS protocol communication.
244
   */
245
  // Must be synchronized.
246
  private void startRpcStream() {
247
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
248
    adsStream = new AdsStream();
1✔
249
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
250
    stopwatch.reset().start();
1✔
251
  }
1✔
252

253
  @VisibleForTesting
254
  public final class RpcRetryTask implements Runnable {
1✔
255
    @Override
256
    public void run() {
257
      if (shutdown) {
1✔
258
        return;
×
259
      }
260
      startRpcStream();
1✔
261
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
262
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
263
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
264
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
265
        if (resources != null) {
1✔
266
          adsStream.sendDiscoveryRequest(type, resources);
1✔
267
        }
268
      }
1✔
269
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
270
    }
1✔
271
  }
272

273
  @VisibleForTesting
274
  @Nullable
275
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
276
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
277
  }
278

279
  private class AdsStream implements EventHandler<DiscoveryResponse> {
280
    private boolean responseReceived;
281
    private boolean closed;
282
    // Response nonce for the most recently received discovery responses of each resource type.
283
    // Client initiated requests start response nonce with empty string.
284
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
285
    // used for management server to identify which response the client is ACKing/NACking.
286
    // To avoid confusion, client-initiated requests will always use the nonce in
287
    // most recently received responses of each resource type.
288
    private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
1✔
289
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
290
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
291
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
292

293
    private AdsStream() {
1✔
294
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
295
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
296
      call.start(this);
1✔
297
    }
1✔
298

299
    /**
300
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
301
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
302
     * client-initiated discovery requests, use {@link
303
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
304
     */
305
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
306
                              Collection<String> resources, String nonce,
307
                              @Nullable String errorDetail) {
308
      DiscoveryRequest.Builder builder =
309
          DiscoveryRequest.newBuilder()
1✔
310
              .setVersionInfo(versionInfo)
1✔
311
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
312
              .addAllResourceNames(resources)
1✔
313
              .setTypeUrl(type.typeUrl())
1✔
314
              .setResponseNonce(nonce);
1✔
315
      if (errorDetail != null) {
1✔
316
        com.google.rpc.Status error =
317
            com.google.rpc.Status.newBuilder()
1✔
318
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
319
                .setMessage(errorDetail)
1✔
320
                .build();
1✔
321
        builder.setErrorDetail(error);
1✔
322
      }
323
      DiscoveryRequest request = builder.build();
1✔
324
      call.sendMessage(request);
1✔
325
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
326
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
327
      }
328
    }
1✔
329

330
    /**
331
     * Sends a client-initiated discovery request.
332
     */
333
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
334
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
335
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
336
          respNonces.getOrDefault(type, ""), null);
1✔
337
    }
1✔
338

339
    @Override
340
    public void onReady() {
341
      syncContext.execute(ControlPlaneClient.this::readyHandler);
1✔
342
    }
1✔
343

344
    @Override
345
    public void onRecvMessage(DiscoveryResponse response) {
346
      syncContext.execute(new Runnable() {
1✔
347
        @Override
348
        public void run() {
349
          // Reset flag as message has been received on a stream
350
          streamClosedNoResponse = false;
1✔
351
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
352
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
353
            logger.log(
×
354
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
355
                messagePrinter.print(response));
×
356
          }
357
          if (type == null) {
1✔
358
            logger.log(
1✔
359
                XdsLogLevel.WARNING,
360
                "Ignore an unknown type of DiscoveryResponse: {0}",
361
                response.getTypeUrl());
1✔
362

363
            call.startRecvMessage();
1✔
364
            return;
1✔
365
          }
366
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
367
              response.getNonce());
1✔
368
        }
1✔
369
      });
370
    }
1✔
371

372
    @Override
373
    public void onStatusReceived(final Status status) {
374
      syncContext.execute(() -> {
1✔
375
        handleRpcStreamClosed(status);
1✔
376
      });
1✔
377
    }
1✔
378

379
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
380
                                 String nonce) {
381
      checkNotNull(type, "type");
1✔
382
      if (closed) {
1✔
383
        return;
×
384
      }
385
      responseReceived = true;
1✔
386
      respNonces.put(type, nonce);
1✔
387
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
388
          () -> call.startRecvMessage(), syncContext);
1✔
389
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
390
          processingTracker);
391
      processingTracker.onComplete();
1✔
392
    }
1✔
393

394
    private void handleRpcStreamClosed(Status status) {
395
      if (closed) {
1✔
396
        return;
1✔
397
      }
398

399
      if (responseReceived || retryBackoffPolicy == null) {
1✔
400
        // Reset the backoff sequence if had received a response, or backoff sequence
401
        // has never been initialized.
402
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
403
      }
404
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
405
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
406
      // concurrently with the stopwatch and schedule.
407
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
408
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
409
      rpcRetryTimer = syncContext.schedule(
1✔
410
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
411

412
      Status newStatus = status;
1✔
413
      if (responseReceived) {
1✔
414
        // A closed ADS stream after a successful response is not considered an error. Servers may
415
        // close streams for various reasons during normal operation, such as load balancing or
416
        // underlying connection hitting its max connection age limit  (see gRFC A9).
417
        if (!status.isOk()) {
1✔
418
          newStatus = Status.OK;
1✔
419
          logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
1✔
420
              + "response was received, so this will not be treated as an error. Cause: {2}",
421
              status.getCode(), status.getDescription(), status.getCause());
1✔
422
        } else {
423
          logger.log(XdsLogLevel.DEBUG,
1✔
424
              "ADS stream closed by server after a response was received");
425
        }
426
      } else {
427
        streamClosedNoResponse = true;
1✔
428
        // If the ADS stream is closed without ever having received a response from the server, then
429
        // the XdsClient should consider that a connectivity error (see gRFC A57).
430
        if (status.isOk()) {
1✔
431
          newStatus = Status.UNAVAILABLE.withDescription(
1✔
432
              "ADS stream closed with OK before receiving a response");
433
        }
434
        logger.log(
1✔
435
            XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
436
            newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
1✔
437
      }
438

439
      closed = true;
1✔
440
      xdsResponseHandler.handleStreamClosed(newStatus);
1✔
441
      cleanUp();
1✔
442

443
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
444
    }
1✔
445

446
    private void close(Exception error) {
447
      if (closed) {
1✔
448
        return;
×
449
      }
450
      closed = true;
1✔
451
      cleanUp();
1✔
452
      call.sendError(error);
1✔
453
    }
1✔
454

455
    private void cleanUp() {
456
      if (adsStream == this) {
1✔
457
        adsStream = null;
1✔
458
      }
459
    }
1✔
460
  }
461
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc