• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19449

05 Sep 2024 10:32PM CUT coverage: 84.488% (-0.01%) from 84.498%
#19449

push

github

ejona86
core: touch() buffer when detach()ing

Detachable lets a buffer outlive its original lifetime. The new lifetime
is application-controlled. If the application fails to read/close the
stream, then the leak detector wouldn't make clear what code was
responsible for the buffer's lifetime. With this touch, we'll be able to
see detach() was called and thus know the application needs debugging.

Realized when looking at b/364531464, although I think the issue is
unrelated.

33251 of 39356 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.53
/../xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.Any;
27
import com.google.rpc.Code;
28
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
30
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
31
import io.grpc.InternalLogId;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.xds.client.Bootstrapper.ServerInfo;
38
import io.grpc.xds.client.EnvoyProtoData.Node;
39
import io.grpc.xds.client.XdsClient.ProcessingTracker;
40
import io.grpc.xds.client.XdsClient.ResourceStore;
41
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
42
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
43
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
44
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
45
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
46
import java.util.Collection;
47
import java.util.Collections;
48
import java.util.HashMap;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Set;
53
import java.util.concurrent.ScheduledExecutorService;
54
import java.util.concurrent.TimeUnit;
55
import javax.annotation.Nullable;
56

57
/**
58
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
59
 * the xDS RPC stream.
60
 */
61
final class ControlPlaneClient {
62

63
  public static final String CLOSED_BY_SERVER = "Closed by server";
64
  private final SynchronizationContext syncContext;
65
  private final InternalLogId logId;
66
  private final XdsLogger logger;
67
  private final ServerInfo serverInfo;
68
  private final XdsTransport xdsTransport;
69
  private final XdsResponseHandler xdsResponseHandler;
70
  private final ResourceStore resourceStore;
71
  private final ScheduledExecutorService timeService;
72
  private final BackoffPolicy.Provider backoffPolicyProvider;
73
  private final Stopwatch stopwatch;
74
  private final Node bootstrapNode;
75
  private final XdsClient xdsClient;
76

77
  // Last successfully applied version_info for each resource type. Starts with empty string.
78
  // A version_info is used to update management server with client's most recent knowledge of
79
  // resources.
80
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
81

82
  private boolean shutdown;
83
  @Nullable
84
  private AdsStream adsStream;
85
  @Nullable
86
  private BackoffPolicy retryBackoffPolicy;
87
  @Nullable
88
  private ScheduledHandle rpcRetryTimer;
89
  private MessagePrettyPrinter messagePrinter;
90

91
  /** An entity that manages ADS RPCs over a single channel. */
92
  ControlPlaneClient(
93
      XdsTransport xdsTransport,
94
      ServerInfo serverInfo,
95
      Node bootstrapNode,
96
      XdsResponseHandler xdsResponseHandler,
97
      ResourceStore resourceStore,
98
      ScheduledExecutorService
99
      timeService,
100
      SynchronizationContext syncContext,
101
      BackoffPolicy.Provider backoffPolicyProvider,
102
      Supplier<Stopwatch> stopwatchSupplier,
103
      XdsClient xdsClient,
104
      MessagePrettyPrinter messagePrinter) {
1✔
105
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
106
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
107
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
108
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
109
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
110
    this.timeService = checkNotNull(timeService, "timeService");
1✔
111
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
112
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
113
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
114
    this.messagePrinter = checkNotNull(messagePrinter, "messagePrinter");
1✔
115
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
116
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
117
    logger = XdsLogger.withLogId(logId);
1✔
118
    logger.log(XdsLogLevel.INFO, "Created");
1✔
119
  }
1✔
120

121
  void shutdown() {
122
    syncContext.execute(new Runnable() {
1✔
123
      @Override
124
      public void run() {
125
        shutdown = true;
1✔
126
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
127
        if (adsStream != null) {
1✔
128
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
129
        }
130
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
131
          rpcRetryTimer.cancel();
1✔
132
        }
133
        xdsTransport.shutdown();
1✔
134
      }
1✔
135
    });
136
  }
1✔
137

138
  @Override
139
  public String toString() {
140
    return logId.toString();
×
141
  }
142

143
  /**
144
   * Updates the resource subscription for the given resource type.
145
   */
146
  // Must be synchronized.
147
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
148
    if (isInBackoff()) {
1✔
149
      return;
1✔
150
    }
151
    if (adsStream == null) {
1✔
152
      startRpcStream();
1✔
153
    }
154
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
155
    if (resources == null) {
1✔
156
      resources = Collections.emptyList();
1✔
157
    }
158
    adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
159
    if (resources.isEmpty()) {
1✔
160
      // The resource type no longer has subscribing resources; clean up references to it
161
      versions.remove(resourceType);
1✔
162
      adsStream.respNonces.remove(resourceType);
1✔
163
    }
164
  }
1✔
165

166
  /**
167
   * Accepts the update for the given resource type by updating the latest resource version
168
   * and sends an ACK request to the management server.
169
   */
170
  // Must be synchronized.
171
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
172
    versions.put(type, versionInfo);
1✔
173
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
174
        type.typeName(), nonce, versionInfo);
1✔
175
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
176
    if (resources == null) {
1✔
177
      resources = Collections.emptyList();
×
178
    }
179
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
180
  }
1✔
181

182
  /**
183
   * Rejects the update for the given resource type and sends an NACK request (request with last
184
   * accepted version) to the management server.
185
   */
186
  // Must be synchronized.
187
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
188
    String versionInfo = versions.getOrDefault(type, "");
1✔
189
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
190
        type.typeName(), nonce, versionInfo);
1✔
191
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
192
    if (resources == null) {
1✔
193
      resources = Collections.emptyList();
×
194
    }
195
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
196
  }
1✔
197

198
  /**
199
   * Returns {@code true} if the resource discovery is currently in backoff.
200
   */
201
  // Must be synchronized.
202
  boolean isInBackoff() {
203
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
204
  }
205

206
  // Must be synchronized.
207
  boolean isReady() {
208
    return adsStream != null && adsStream.call != null && adsStream.call.isReady();
1✔
209
  }
210

211
  /**
212
   * Starts a timer for each requested resource that hasn't been responded to and
213
   * has been waiting for the channel to get ready.
214
   */
215
  // Must be synchronized.
216
  void readyHandler() {
217
    if (!isReady()) {
1✔
218
      return;
1✔
219
    }
220

221
    if (isInBackoff()) {
1✔
222
      rpcRetryTimer.cancel();
×
223
      rpcRetryTimer = null;
×
224
    }
225

226
    xdsClient.startSubscriberTimersIfNeeded(serverInfo);
1✔
227
  }
1✔
228

229
  /**
230
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
231
   * xDS protocol communication.
232
   */
233
  // Must be synchronized.
234
  private void startRpcStream() {
235
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
236
    adsStream = new AdsStream();
1✔
237
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
238
    stopwatch.reset().start();
1✔
239
  }
1✔
240

241
  @VisibleForTesting
242
  public final class RpcRetryTask implements Runnable {
1✔
243
    @Override
244
    public void run() {
245
      if (shutdown) {
1✔
246
        return;
×
247
      }
248
      startRpcStream();
1✔
249
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
250
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
251
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
252
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
253
        if (resources != null) {
1✔
254
          adsStream.sendDiscoveryRequest(type, resources);
1✔
255
        }
256
      }
1✔
257
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
258
    }
1✔
259
  }
260

261
  @VisibleForTesting
262
  @Nullable
263
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
264
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
265
  }
266

267
  private class AdsStream implements EventHandler<DiscoveryResponse> {
268
    private boolean responseReceived;
269
    private boolean closed;
270
    // Response nonce for the most recently received discovery responses of each resource type.
271
    // Client initiated requests start response nonce with empty string.
272
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
273
    // used for management server to identify which response the client is ACKing/NACking.
274
    // To avoid confusion, client-initiated requests will always use the nonce in
275
    // most recently received responses of each resource type.
276
    private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
1✔
277
    private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
278
    private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
1✔
279
        AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();
1✔
280

281
    private AdsStream() {
1✔
282
      this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
1✔
283
          methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
1✔
284
      call.start(this);
1✔
285
    }
1✔
286

287
    /**
288
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
289
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
290
     * client-initiated discovery requests, use {@link
291
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
292
     */
293
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
294
                              Collection<String> resources, String nonce,
295
                              @Nullable String errorDetail) {
296
      DiscoveryRequest.Builder builder =
297
          DiscoveryRequest.newBuilder()
1✔
298
              .setVersionInfo(versionInfo)
1✔
299
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
300
              .addAllResourceNames(resources)
1✔
301
              .setTypeUrl(type.typeUrl())
1✔
302
              .setResponseNonce(nonce);
1✔
303
      if (errorDetail != null) {
1✔
304
        com.google.rpc.Status error =
305
            com.google.rpc.Status.newBuilder()
1✔
306
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
307
                .setMessage(errorDetail)
1✔
308
                .build();
1✔
309
        builder.setErrorDetail(error);
1✔
310
      }
311
      DiscoveryRequest request = builder.build();
1✔
312
      call.sendMessage(request);
1✔
313
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
314
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", messagePrinter.print(request));
×
315
      }
316
    }
1✔
317

318
    /**
319
     * Sends a client-initiated discovery request.
320
     */
321
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
322
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
323
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
324
          respNonces.getOrDefault(type, ""), null);
1✔
325
    }
1✔
326

327
    @Override
328
    public void onReady() {
329
      syncContext.execute(ControlPlaneClient.this::readyHandler);
1✔
330
    }
1✔
331

332
    @Override
333
    public void onRecvMessage(DiscoveryResponse response) {
334
      syncContext.execute(new Runnable() {
1✔
335
        @Override
336
        public void run() {
337
          XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
338
          if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
339
            logger.log(
×
340
                XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
341
                messagePrinter.print(response));
×
342
          }
343
          if (type == null) {
1✔
344
            logger.log(
1✔
345
                XdsLogLevel.WARNING,
346
                "Ignore an unknown type of DiscoveryResponse: {0}",
347
                response.getTypeUrl());
1✔
348

349
            call.startRecvMessage();
1✔
350
            return;
1✔
351
          }
352
          handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
353
              response.getNonce());
1✔
354
        }
1✔
355
      });
356
    }
1✔
357

358
    @Override
359
    public void onStatusReceived(final Status status) {
360
      syncContext.execute(() -> {
1✔
361
        if (status.isOk()) {
1✔
362
          handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
×
363
        } else {
364
          handleRpcStreamClosed(status);
1✔
365
        }
366
      });
1✔
367
    }
1✔
368

369
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
370
                                 String nonce) {
371
      checkNotNull(type, "type");
1✔
372
      if (closed) {
1✔
373
        return;
×
374
      }
375
      responseReceived = true;
1✔
376
      respNonces.put(type, nonce);
1✔
377
      ProcessingTracker processingTracker = new ProcessingTracker(
1✔
378
          () -> call.startRecvMessage(), syncContext);
1✔
379
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
1✔
380
          processingTracker);
381
      processingTracker.onComplete();
1✔
382
    }
1✔
383

384
    private void handleRpcStreamClosed(Status error) {
385
      if (closed) {
1✔
386
        return;
1✔
387
      }
388

389
      if (responseReceived || retryBackoffPolicy == null) {
1✔
390
        // Reset the backoff sequence if had received a response, or backoff sequence
391
        // has never been initialized.
392
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
393
      }
394
      // FakeClock in tests isn't thread-safe. Schedule the retry timer before notifying callbacks
395
      // to avoid TSAN races, since tests may wait until callbacks are called but then would run
396
      // concurrently with the stopwatch and schedule.
397
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
398
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
399
      rpcRetryTimer = syncContext.schedule(
1✔
400
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
401

402
      checkArgument(!error.isOk(), "unexpected OK status");
1✔
403
      String errorMsg = error.getDescription() != null
1✔
404
          && error.getDescription().equals(CLOSED_BY_SERVER)
1✔
405
              ? "ADS stream closed with status {0}: {1}. Cause: {2}"
×
406
              : "ADS stream failed with status {0}: {1}. Cause: {2}";
1✔
407
      logger.log(
1✔
408
          XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
1✔
409
      closed = true;
1✔
410
      xdsResponseHandler.handleStreamClosed(error);
1✔
411
      cleanUp();
1✔
412

413
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
414
    }
1✔
415

416
    private void close(Exception error) {
417
      if (closed) {
1✔
418
        return;
×
419
      }
420
      closed = true;
1✔
421
      cleanUp();
1✔
422
      call.sendError(error);
1✔
423
    }
1✔
424

425
    private void cleanUp() {
426
      if (adsStream == this) {
1✔
427
        adsStream = null;
1✔
428
      }
429
    }
1✔
430
  }
431
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc