• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19571

27 Nov 2024 07:37PM UTC coverage: 88.578% (-0.004%) from 88.582%
#19571

push

github

web-flow
rls: Reduce RLS channel logging

The channel log is shared by many components and is poorly suited to
the noise of per-RPC events. This commit restricts RLS usage of the
logger to no more frequent than cache entry events. This may still be
too frequent, but should substantially improve the signal-to-noise and
we can do further rework as needed.

Many of the log entries were poor because they lacked enough context.
They weren't even clear they were from RLS. The cache entry events now
regularly include the request key in the logs, allowing you to follow
events for specific keys. I would have preferred using the hash code,
but NumberFormat is annoying and toString() may be acceptable given its
convenience.

This commit reverts much of eba699ad. Those logs have not proven to be
helpful as they produce more output than can be reasonably stored.

33346 of 37646 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.3
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Stopwatch;
24
import com.google.common.base.Supplier;
25
import com.google.protobuf.Any;
26
import com.google.rpc.Code;
27
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
28
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
30
import io.grpc.InternalLogId;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.SynchronizationContext.ScheduledHandle;
35
import io.grpc.internal.BackoffPolicy;
36
import io.grpc.xds.client.Bootstrapper.ServerInfo;
37
import io.grpc.xds.client.EnvoyProtoData.Node;
38
import io.grpc.xds.client.XdsClient.ProcessingTracker;
39
import io.grpc.xds.client.XdsClient.ResourceStore;
40
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
43
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
44
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
45
import java.util.Collection;
46
import java.util.Collections;
47
import java.util.HashMap;
48
import java.util.HashSet;
49
import java.util.List;
50
import java.util.Map;
51
import java.util.Set;
52
import java.util.concurrent.ScheduledExecutorService;
53
import java.util.concurrent.TimeUnit;
54
import javax.annotation.Nullable;
55

56
/**
57
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
58
 * the xDS RPC stream.
59
 */
60
final class ControlPlaneClient {
61

62
  private final SynchronizationContext syncContext;
63
  private final InternalLogId logId;
64
  private final XdsLogger logger;
65
  private final ServerInfo serverInfo;
66
  private final XdsTransport xdsTransport;
67
  private final XdsResponseHandler xdsResponseHandler;
68
  private final ResourceStore resourceStore;
69
  private final ScheduledExecutorService timeService;
70
  private final BackoffPolicy.Provider backoffPolicyProvider;
71
  private final Stopwatch stopwatch;
72
  private final Node bootstrapNode;
73
  private final XdsClient xdsClient;
74

75
  // Last successfully applied version_info for each resource type. Starts with empty string.
76
  // A version_info is used to update management server with client's most recent knowledge of
77
  // resources.
78
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
79

80
  private boolean shutdown;
81
  private boolean streamClosedNoResponse;
82
  @Nullable
83
  private AdsStream adsStream;
84
  @Nullable
85
  private BackoffPolicy retryBackoffPolicy;
86
  @Nullable
87
  private ScheduledHandle rpcRetryTimer;
88
  private MessagePrettyPrinter messagePrinter;
89

90
  /** An entity that manages ADS RPCs over a single channel. */
91
  ControlPlaneClient(
92
      XdsTransport xdsTransport,
93
      ServerInfo serverInfo,
94
      Node bootstrapNode,
95
      XdsResponseHandler xdsResponseHandler,
96
      ResourceStore resourceStore,
97
      ScheduledExecutorService
98
      timeService,
99
      SynchronizationContext syncContext,
100
      BackoffPolicy.Provider backoffPolicyProvider,
101
      Supplier<Stopwatch> stopwatchSupplier,
102
      XdsClient xdsClient,
103
      MessagePrettyPrinter messagePrinter) {
1✔
104
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
105
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
106
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
107
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
108
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
109
    this.timeService = checkNotNull(timeService, "timeService");
1✔
110
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
111
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
112
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
113
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
114
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
115
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
116
    logger = XdsLogger.withLogId(logId);
1✔
117
    logger.log(XdsLogLevel.INFO, "Created");
1✔
118
  }
1✔
119

120
  void shutdown() {
121
    syncContext.execute(new Runnable() {
1✔
122
      @Override
123
      public void run() {
124
        shutdown = true;
1✔
125
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
126
        if (adsStream != null) {
1✔
127
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
128
        }
129
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
130
          rpcRetryTimer.cancel();
1✔
131
        }
132
        xdsTransport.shutdown();
1✔
133
      }
1✔
134
    });
135
  }
1✔
136

137
  @Override
138
  public String toString() {
139
    return logId.toString();
×
140
  }
141

142
  /**
143
   * Updates the resource subscription for the given resource type.
144
   */
145
  // Must be synchronized.
146
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
147
    if (isInBackoff()) {
1✔
148
      return;
1✔
149
    }
150
    if (adsStream == null) {
1✔
151
      startRpcStream();
1✔
152
    }
153
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
154
    if (resources == null) {
1✔
155
      resources = Collections.emptyList();
1✔
156
    }
157
    adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
158
    if (resources.isEmpty()) {
1✔
159
      // The resource type no longer has subscribing resources; clean up references to it
160
      versions.remove(resourceType);
1✔
161
      adsStream.respNonces.remove(resourceType);
1✔
162
    }
163
  }
1✔
164

165
  /**
166
   * Accepts the update for the given resource type by updating the latest resource version
167
   * and sends an ACK request to the management server.
168
   */
169
  // Must be synchronized.
170
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
171
    versions.put(type, versionInfo);
1✔
172
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
173
        type.typeName(), nonce, versionInfo);
1✔
174
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
175
    if (resources == null) {
1✔
176
      resources = Collections.emptyList();
×
177
    }
178
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
179
  }
1✔
180

181
  /**
182
   * Rejects the update for the given resource type and sends an NACK request (request with last
183
   * accepted version) to the management server.
184
   */
185
  // Must be synchronized.
186
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
187
    String versionInfo = versions.getOrDefault(type, "");
1✔
188
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
189
        type.typeName(), nonce, versionInfo);
1✔
190
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
191
    if (resources == null) {
1✔
192
      resources = Collections.emptyList();
×
193
    }
194
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
195
  }
1✔
196

197
  /**
198
   * Returns {@code true} if the resource discovery is currently in backoff.
199
   */
200
  // Must be synchronized.
201
  boolean isInBackoff() {
202
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
203
  }
204

205
  // Must be synchronized.
206
  boolean isReady() {
207
    return adsStream != null && adsStream.call != null && adsStream.call.isReady();
1✔
208
  }
209

210
  /**
211
   * Starts a timer for each requested resource that hasn't been responded to and
212
   * has been waiting for the channel to get ready.
213
   */
214
  // Must be synchronized.
215
  void readyHandler() {
216
    if (!isReady()) {
1✔
217
      return;
×
218
    }
219

220
    if (isInBackoff()) {
1✔
221
      rpcRetryTimer.cancel();
×
222
      rpcRetryTimer = null;
×
223
    }
224

225
    xdsClient.startSubscriberTimersIfNeeded(serverInfo);
1✔
226
  }
1✔
227

228
  /**
229
   * Indicates whether there is an active ADS stream.
230
   *
231
   * <p>Return {@code true} when the {@code AdsStream} is created.
232
   * {@code false} when the ADS stream fails without a response. Resets to true
233
   * upon receiving the first response on a new ADS stream.
234
   */
235
  // Must be synchronized
236
  boolean hasWorkingAdsStream() {
237
    return !streamClosedNoResponse;
1✔
238
  }
239

240

241
  /**
242
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
243
   * xDS protocol communication.
244
   */
245
  // Must be synchronized.
246
  private void startRpcStream() {
247
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
248
    adsStream = new AdsStream();
1✔
249
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
250
    stopwatch.reset().start();
1✔
251
  }
1✔
252

253
  @VisibleForTesting
254
  public final class RpcRetryTask implements Runnable {
1✔
255
    @Override
256
    public void run() {
257
      if (shutdown) {
1✔
258
        return;
×
259
      }
260
      startRpcStream();
1✔
261
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
262
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
263
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
264
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
265
        if (resources != null) {
1✔
266
          adsStream.sendDiscoveryRequest(type, resources);
1✔
267
        }
268
      }
1✔
269
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
270
    }
1✔
271
  }
272

273
  @VisibleForTesting
274
  @Nullable
275
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
276
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
277
  }
278

279
  private class AdsStream implements EventHandler<DiscoveryResponse> {
280
    private boolean responseReceived;
281
    private boolean closed;
282
    // Response nonce for the most recently received discovery responses of each resource type.
283
    // Client initiated requests start response nonce with empty string.
284
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
285
    // used for management server to identify which response the client is ACKing/NACking.
286
    // To avoid confusion, client-initiated requests will always use the nonce in
287
    // most recently received responses of each resource type.
288
    private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
1✔
289
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
290
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
291
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
292

293
    private AdsStream() {
1✔
294
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
295
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
296
      call.start(this);
1✔
297
    }
1✔
298

299
    /**
300
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
301
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
302
     * client-initiated discovery requests, use {@link
303
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
304
     */
305
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
306
                              Collection<String> resources, String nonce,
307
                              @Nullable String errorDetail) {
308
      DiscoveryRequest.Builder builder =
309
          DiscoveryRequest.newBuilder()
1✔
310
              .setVersionInfo(versionInfo)
1✔
311
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
312
              .addAllResourceNames(resources)
1✔
313
              .setTypeUrl(type.typeUrl())
1✔
314
              .setResponseNonce(nonce);
1✔
315
      if (errorDetail != null) {
1✔
316
        com.google.rpc.Status error =
317
            com.google.rpc.Status.newBuilder()
1✔
318
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
319
                .setMessage(errorDetail)
1✔
320
                .build();
1✔
321
        builder.setErrorDetail(error);
1✔
322
      }
323
      DiscoveryRequest request = builder.build();
1✔
324
      call.sendMessage(request);
1✔
325
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
326
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
327
      }
328
    }
1✔
329

330
    /**
331
     * Sends a client-initiated discovery request.
332
     */
333
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
334
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
335
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
336
          respNonces.getOrDefault(type, ""), null);
1✔
337
    }
1✔
338

339
    @Override
340
    public void onReady() {
341
      syncContext.execute(ControlPlaneClient.this::readyHandler);
1✔
342
    }
1✔
343

344
    @Override
345
    public void onRecvMessage(DiscoveryResponse response) {
346
      syncContext.execute(new Runnable() {
1✔
347
        @Override
348
        public void run() {
349
          // Reset flag as message has been received on a stream
350
          streamClosedNoResponse = false;
1✔
351
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
352
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
353
            logger.log(
×
354
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
355
                messagePrinter.print(response));
×
356
          }
357
          if (type == null) {
1✔
358
            logger.log(
1✔
359
                XdsLogLevel.WARNING,
360
                "Ignore an unknown type of DiscoveryResponse: {0}",
361
                response.getTypeUrl());
1✔
362

363
            call.startRecvMessage();
1✔
364
            return;
1✔
365
          }
366
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
367
              response.getNonce());
1✔
368
        }
1✔
369
      });
370
    }
1✔
371

372
    @Override
373
    public void onStatusReceived(final Status status) {
374
      syncContext.execute(() -> {
1✔
375
        handleRpcStreamClosed(status);
1✔
376
      });
1✔
377
    }
1✔
378

379
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
380
                                 String nonce) {
381
      checkNotNull(type, "type");
1✔
382
      if (closed) {
1✔
383
        return;
×
384
      }
385
      responseReceived = true;
1✔
386
      respNonces.put(type, nonce);
1✔
387
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
388
          () -> call.startRecvMessage(), syncContext);
1✔
389
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
390
          processingTracker);
391
      processingTracker.onComplete();
1✔
392
    }
1✔
393

394
    private void handleRpcStreamClosed(Status status) {
395
      if (closed) {
1✔
396
        return;
1✔
397
      }
398

399
      if (responseReceived || retryBackoffPolicy == null) {
1✔
400
        // Reset the backoff sequence if had received a response, or backoff sequence
401
        // has never been initialized.
402
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
403
      }
404
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
405
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
406
      // concurrently with the stopwatch and schedule.
407
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
408
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
409
      rpcRetryTimer = syncContext.schedule(
1✔
410
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
411

412
      Status newStatus = status;
1✔
413
      if (responseReceived) {
1✔
414
        // A closed ADS stream after a successful response is not considered an error. Servers may
415
        // close streams for various reasons during normal operation, such as load balancing or
416
        // underlying connection hitting its max connection age limit  (see gRFC A9).
417
        if (!status.isOk()) {
1✔
418
          newStatus = Status.OK;
1✔
419
          logger.log( XdsLogLevel.DEBUG, "ADS stream closed with error {0}: {1}. However, a "
1✔
420
              + "response was received, so this will not be treated as an error. Cause: {2}",
421
              status.getCode(), status.getDescription(), status.getCause());
1✔
422
        } else {
423
          logger.log(XdsLogLevel.DEBUG,
1✔
424
              "ADS stream closed by server after a response was received");
425
        }
426
      } else {
427
        streamClosedNoResponse = true;
1✔
428
        // If the ADS stream is closed without ever having received a response from the server, then
429
        // the XdsClient should consider that a connectivity error (see gRFC A57).
430
        if (status.isOk()) {
1✔
431
          newStatus = Status.UNAVAILABLE.withDescription(
1✔
432
              "ADS stream closed with OK before receiving a response");
433
        }
434
        logger.log(
1✔
435
            XdsLogLevel.ERROR, "ADS stream failed with status {0}: {1}. Cause: {2}",
436
            newStatus.getCode(), newStatus.getDescription(), newStatus.getCause());
1✔
437
      }
438

439
      closed = true;
1✔
440
      xdsResponseHandler.handleStreamClosed(newStatus);
1✔
441
      cleanUp();
1✔
442

443
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
444
    }
1✔
445

446
    private void close(Exception error) {
447
      if (closed) {
1✔
448
        return;
×
449
      }
450
      closed = true;
1✔
451
      cleanUp();
1✔
452
      call.sendError(error);
1✔
453
    }
1✔
454

455
    private void cleanUp() {
456
      if (adsStream == this) {
1✔
457
        adsStream = null;
1✔
458
      }
459
    }
1✔
460
  }
461
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc