• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19449

05 Sep 2024 10:32PM CUT coverage: 84.488% (-0.01%) from 84.498%
#19449

push

github

ejona86
core: touch() buffer when detach()ing

Detachable lets a buffer outlive its original lifetime. The new lifetime
is application-controlled. If the application fails to read/close the
stream, then the leak detector wouldn't make clear what code was
responsible for the buffer's lifetime. With this touch, we'll be able to
see detach() was called and thus know the application needs debugging.

Realized when looking at b/364531464, although I think the issue is
unrelated.

33251 of 39356 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.55
/../xds/src/main/java/io/grpc/xds/client/LoadReportClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.util.Durations;
27
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
28
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
29
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
30
import io.grpc.Internal;
31
import io.grpc.InternalLogId;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.xds.client.EnvoyProtoData.Node;
38
import io.grpc.xds.client.Stats.ClusterStats;
39
import io.grpc.xds.client.Stats.DroppedRequests;
40
import io.grpc.xds.client.Stats.UpstreamLocalityStats;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
43
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
44
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
45
import java.util.ArrayList;
46
import java.util.Collections;
47
import java.util.List;
48
import java.util.concurrent.ScheduledExecutorService;
49
import java.util.concurrent.TimeUnit;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * Client of xDS load reporting service based on LRS protocol, which reports load stats of
55
 * gRPC client's perspective to a management server.
56
 */
57
@Internal
58
public final class LoadReportClient {
59
  private final InternalLogId logId;
60
  private final XdsLogger logger;
61
  private final XdsTransport xdsTransport;
62
  private final Node node;
63
  private final SynchronizationContext syncContext;
64
  private final ScheduledExecutorService timerService;
65
  private final Stopwatch retryStopwatch;
66
  private final BackoffPolicy.Provider backoffPolicyProvider;
67
  @VisibleForTesting
68
  public final LoadStatsManager2 loadStatsManager;
69
  private boolean started;
70
  @Nullable
71
  private BackoffPolicy lrsRpcRetryPolicy;
72
  @Nullable
73
  private ScheduledHandle lrsRpcRetryTimer;
74
  @Nullable
75
  private LrsStream lrsStream;
76
  private static final MethodDescriptor<LoadStatsRequest, LoadStatsResponse> method =
1✔
77
      LoadReportingServiceGrpc.getStreamLoadStatsMethod();
1✔
78

79
  @VisibleForTesting
80
  public LoadReportClient(
81
      LoadStatsManager2 loadStatsManager,
82
      XdsTransport xdsTransport,
83
      Node node,
84
      SynchronizationContext syncContext,
85
      ScheduledExecutorService scheduledExecutorService,
86
      BackoffPolicy.Provider backoffPolicyProvider,
87
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
88
    this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
1✔
89
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
90
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
91
    this.timerService = checkNotNull(scheduledExecutorService, "timeService");
1✔
92
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
93
    this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
94
    this.node = checkNotNull(node, "node").toBuilder()
1✔
95
        .addClientFeatures("envoy.lrs.supports_send_all_clusters").build();
1✔
96
    logId = InternalLogId.allocate("lrs-client", null);
1✔
97
    logger = XdsLogger.withLogId(logId);
1✔
98
    logger.log(XdsLogLevel.INFO, "Created");
1✔
99
  }
1✔
100

101
  @VisibleForTesting
102
  public boolean lrsStreamIsNull() {
103
    return lrsStream == null;
1✔
104
  }
105

106
  /**
107
   * Establishes load reporting communication and negotiates with traffic director to report load
108
   * stats periodically. Calling this method on an already started {@link LoadReportClient} is
109
   * no-op.
110
   */
111
  public void startLoadReporting() {
112
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
113
    if (started) {
1✔
114
      return;
×
115
    }
116
    started = true;
1✔
117
    logger.log(XdsLogLevel.INFO, "Starting load reporting RPC");
1✔
118
    startLrsRpc();
1✔
119
  }
1✔
120

121
  /**
122
   * Terminates load reporting. Calling this method on an already stopped
123
   * {@link LoadReportClient} is no-op.
124
   */
125
  public void stopLoadReporting() {
126
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
127
    if (!started) {
1✔
128
      return;
1✔
129
    }
130
    started = false;
1✔
131
    logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC");
1✔
132
    if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
1✔
133
      lrsRpcRetryTimer.cancel();
1✔
134
    }
135
    if (lrsStream != null) {
1✔
136
      lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
1✔
137
    }
138
    // Do not shut down channel as it is not owned by LrsClient.
139
  }
1✔
140

141
  private static class LoadReportingTask implements Runnable {
142
    private final LrsStream stream;
143

144
    LoadReportingTask(LrsStream stream) {
1✔
145
      this.stream = stream;
1✔
146
    }
1✔
147

148
    @Override
149
    public void run() {
150
      stream.sendLoadReport();
1✔
151
    }
1✔
152
  }
153

154
  private class LrsRpcRetryTask implements Runnable {
1✔
155

156
    @Override
157
    public void run() {
158
      startLrsRpc();
1✔
159
    }
1✔
160
  }
161

162
  private void startLrsRpc() {
163
    if (!started) {
1✔
164
      return;
×
165
    }
166
    checkState(lrsStream == null, "previous lbStream has not been cleared yet");
1✔
167
    retryStopwatch.reset().start();
1✔
168
    lrsStream = new LrsStream();
1✔
169
  }
1✔
170

171
  private final class LrsStream implements EventHandler<LoadStatsResponse> {
172
    boolean initialResponseReceived;
173
    boolean closed;
174
    long intervalNano = -1;
1✔
175
    boolean reportAllClusters;
176
    List<String> clusterNames;  // clusters to report loads for, if not report all.
177
    ScheduledHandle loadReportTimer;
178
    private final StreamingCall<LoadStatsRequest, LoadStatsResponse> call;
179

180
    LrsStream() {
1✔
181
      this.call = xdsTransport.createStreamingCall(method.getFullMethodName(),
1✔
182
          method.getRequestMarshaller(), method.getResponseMarshaller());
1✔
183
      call.start(this);
1✔
184
      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
1✔
185
      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
1✔
186
    }
1✔
187

188
    @Override
189
    public void onReady() {}
1✔
190

191
    @Override
192
    public void onRecvMessage(LoadStatsResponse response) {
193
      syncContext.execute(new Runnable() {
1✔
194
        @Override
195
        public void run() {
196
          logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
1✔
197
          handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
1✔
198
              Durations.toNanos(response.getLoadReportingInterval()));
1✔
199
          call.startRecvMessage();
1✔
200
        }
1✔
201
      });
202
    }
1✔
203

204
    @Override
205
    public void onStatusReceived(final Status status) {
206
      syncContext.execute(new Runnable() {
1✔
207
        @Override
208
        public void run() {
209
          if (status.isOk()) {
1✔
210
            handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
1✔
211
          } else {
212
            handleStreamClosed(status);
1✔
213
          }
214
        }
1✔
215
      });
216
    }
1✔
217

218
    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
219
      LoadStatsRequest.Builder requestBuilder =
220
          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
1✔
221
      for (ClusterStats stats : clusterStatsList) {
1✔
222
        requestBuilder.addClusterStats(buildClusterStats(stats));
1✔
223
      }
1✔
224
      LoadStatsRequest request = requestBuilder.build();
1✔
225
      call.sendMessage(request);
1✔
226
      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
1✔
227
    }
1✔
228

229
    void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
230
                           long loadReportIntervalNano) {
231
      if (closed) {
1✔
232
        return;
×
233
      }
234
      if (!initialResponseReceived) {
1✔
235
        logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
1✔
236
        initialResponseReceived = true;
1✔
237
      }
238
      reportAllClusters = sendAllClusters;
1✔
239
      if (reportAllClusters) {
1✔
240
        logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
1✔
241
      } else {
242
        logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters);
1✔
243
        clusterNames = clusters;
1✔
244
      }
245
      intervalNano = loadReportIntervalNano;
1✔
246
      logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano);
1✔
247
      scheduleNextLoadReport();
1✔
248
    }
1✔
249

250
    private void sendLoadReport() {
251
      if (closed) {
1✔
252
        return;
×
253
      }
254
      List<ClusterStats> clusterStatsList;
255
      if (reportAllClusters) {
1✔
256
        clusterStatsList = loadStatsManager.getAllClusterStatsReports();
1✔
257
      } else {
258
        clusterStatsList = new ArrayList<>();
1✔
259
        for (String name : clusterNames) {
1✔
260
          clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name));
1✔
261
        }
1✔
262
      }
263
      sendLoadStatsRequest(clusterStatsList);
1✔
264
      scheduleNextLoadReport();
1✔
265
    }
1✔
266

267
    private void scheduleNextLoadReport() {
268
      // Cancel pending load report and reschedule with updated load reporting interval.
269
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
270
        loadReportTimer.cancel();
1✔
271
        loadReportTimer = null;
1✔
272
      }
273
      if (intervalNano > 0) {
1✔
274
        loadReportTimer = syncContext.schedule(
1✔
275
            new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService);
1✔
276
      }
277
    }
1✔
278

279
    private void handleStreamClosed(Status status) {
280
      checkArgument(!status.isOk(), "unexpected OK status");
1✔
281
      if (closed) {
1✔
282
        return;
1✔
283
      }
284
      logger.log(
1✔
285
          XdsLogLevel.ERROR,
286
          "LRS stream closed with status {0}: {1}. Cause: {2}",
287
          status.getCode(), status.getDescription(), status.getCause());
1✔
288
      closed = true;
1✔
289
      cleanUp();
1✔
290

291
      if (initialResponseReceived || lrsRpcRetryPolicy == null) {
1✔
292
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
293
        // has never been initialized.
294
        lrsRpcRetryPolicy = backoffPolicyProvider.get();
1✔
295
      }
296
      // The back-off policy determines the interval between consecutive RPC upstarts, thus the
297
      // actual delay may be smaller than the value from the back-off policy, or even negative,
298
      // depending on how much time was spent in the previous RPC.
299
      long delayNanos =
1✔
300
          lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
301
      logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
1✔
302
      if (delayNanos <= 0) {
1✔
303
        startLrsRpc();
×
304
      } else {
305
        lrsRpcRetryTimer = syncContext.schedule(
1✔
306
            new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
307
      }
308
    }
1✔
309

310
    private void close(Exception error) {
311
      if (closed) {
1✔
312
        return;
×
313
      }
314
      closed = true;
1✔
315
      cleanUp();
1✔
316
      call.sendError(error);
1✔
317
    }
1✔
318

319
    private void cleanUp() {
320
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
321
        loadReportTimer.cancel();
1✔
322
        loadReportTimer = null;
1✔
323
      }
324
      if (lrsStream == this) {
1✔
325
        lrsStream = null;
1✔
326
      }
327
    }
1✔
328

329
    private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
330
        ClusterStats stats) {
331
      io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder =
332
          io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
1✔
333
              .setClusterName(stats.clusterName());
1✔
334
      if (stats.clusterServiceName() != null) {
1✔
335
        builder.setClusterServiceName(stats.clusterServiceName());
1✔
336
      }
337
      for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
1✔
338
        builder.addUpstreamLocalityStats(
1✔
339
            io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
1✔
340
                .setLocality(
1✔
341
                    io.envoyproxy.envoy.config.core.v3.Locality.newBuilder()
1✔
342
                        .setRegion(upstreamLocalityStats.locality().region())
1✔
343
                        .setZone(upstreamLocalityStats.locality().zone())
1✔
344
                        .setSubZone(upstreamLocalityStats.locality().subZone()))
1✔
345
            .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
1✔
346
            .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
1✔
347
            .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
1✔
348
            .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())
1✔
349
            .addAllLoadMetricStats(
1✔
350
                upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map(
1✔
351
                    e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder()
1✔
352
                        .setMetricName(e.getKey())
1✔
353
                        .setNumRequestsFinishedWithMetric(
1✔
354
                            e.getValue().numRequestsFinishedWithMetric())
1✔
355
                        .setTotalMetricValue(e.getValue().totalMetricValue())
1✔
356
                        .build())
1✔
357
                .collect(Collectors.toList())));
1✔
358
      }
1✔
359
      for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
1✔
360
        builder.addDroppedRequests(
1✔
361
            io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
1✔
362
                .setCategory(droppedRequests.category())
1✔
363
                .setDroppedCount(droppedRequests.droppedCount()));
1✔
364
      }
1✔
365
      return builder
1✔
366
          .setTotalDroppedRequests(stats.totalDroppedRequests())
1✔
367
          .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano()))
1✔
368
          .build();
1✔
369
    }
370
  }
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc