• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18727

pending completion
#18727

push

github-actions

ejona86
Restrict checkUpperBoundDeps to library dependencies

Guava 32.1.0 added Gradle Module Metadata to their artifact (they still
use Maven for their build). Because of that, we're now seeing less
normal dependencies that were confusing checkUpperBoundDeps.  Obviously
'null' is not the version we were looking for.

We aren't upgrading to Guava 32.1.x yet as there's still other problems.

```
Execution failed for task ':grpc-census:checkUpperBoundDeps'.
> Maven version skew: com.google.guava:guava-parent (32.1.1-jre != null) Bad version dependency path: [project ':grpc-census', com.google.guava:guava:32.1.1-android] Run './gradlew :grpc-census:dependencies --configuration runtimeClasspath' to diagnose
```

29148 of 33044 relevant lines covered (88.21%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.7
/../xds/src/main/java/io/grpc/xds/LoadReportClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.util.Durations;
27
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
28
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
29
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
30
import io.grpc.Channel;
31
import io.grpc.Context;
32
import io.grpc.InternalLogId;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.stub.StreamObserver;
38
import io.grpc.xds.EnvoyProtoData.Node;
39
import io.grpc.xds.Stats.ClusterStats;
40
import io.grpc.xds.Stats.DroppedRequests;
41
import io.grpc.xds.Stats.UpstreamLocalityStats;
42
import io.grpc.xds.XdsLogger.XdsLogLevel;
43
import java.util.ArrayList;
44
import java.util.Collections;
45
import java.util.List;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.stream.Collectors;
49
import javax.annotation.Nullable;
50

51
/**
52
 * Client of xDS load reporting service based on LRS protocol, which reports load stats of
53
 * gRPC client's perspective to a management server.
54
 */
55
final class LoadReportClient {
56
  private final InternalLogId logId;
57
  private final XdsLogger logger;
58
  private final Channel channel;
59
  private final Context context;
60
  private final Node node;
61
  private final SynchronizationContext syncContext;
62
  private final ScheduledExecutorService timerService;
63
  private final Stopwatch retryStopwatch;
64
  private final BackoffPolicy.Provider backoffPolicyProvider;
65
  @VisibleForTesting
66
  final LoadStatsManager2 loadStatsManager;
67

68
  private boolean started;
69
  @Nullable
70
  private BackoffPolicy lrsRpcRetryPolicy;
71
  @Nullable
72
  private ScheduledHandle lrsRpcRetryTimer;
73
  @Nullable
74
  @VisibleForTesting
75
  LrsStream lrsStream;
76

77
  LoadReportClient(
78
      LoadStatsManager2 loadStatsManager,
79
      Channel channel,
80
      Context context,
81
      Node node,
82
      SynchronizationContext syncContext,
83
      ScheduledExecutorService scheduledExecutorService,
84
      BackoffPolicy.Provider backoffPolicyProvider,
85
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
86
    this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
1✔
87
    this.channel = checkNotNull(channel, "xdsChannel");
1✔
88
    this.context = checkNotNull(context, "context");
1✔
89
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
90
    this.timerService = checkNotNull(scheduledExecutorService, "timeService");
1✔
91
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
92
    this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
93
    this.node = checkNotNull(node, "node").toBuilder()
1✔
94
        .addClientFeatures("envoy.lrs.supports_send_all_clusters").build();
1✔
95
    logId = InternalLogId.allocate("lrs-client", null);
1✔
96
    logger = XdsLogger.withLogId(logId);
1✔
97
    logger.log(XdsLogLevel.INFO, "Created");
1✔
98
  }
1✔
99

100
  /**
101
   * Establishes load reporting communication and negotiates with traffic director to report load
102
   * stats periodically. Calling this method on an already started {@link LoadReportClient} is
103
   * no-op.
104
   */
105
  void startLoadReporting() {
106
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
107
    if (started) {
1✔
108
      return;
×
109
    }
110
    started = true;
1✔
111
    logger.log(XdsLogLevel.INFO, "Starting load reporting RPC");
1✔
112
    startLrsRpc();
1✔
113
  }
1✔
114

115
  /**
116
   * Terminates load reporting. Calling this method on an already stopped
117
   * {@link LoadReportClient} is no-op.
118
   */
119
  void stopLoadReporting() {
120
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
121
    if (!started) {
1✔
122
      return;
1✔
123
    }
124
    started = false;
1✔
125
    logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC");
1✔
126
    if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
1✔
127
      lrsRpcRetryTimer.cancel();
1✔
128
    }
129
    if (lrsStream != null) {
1✔
130
      lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
1✔
131
    }
132
    // Do not shutdown channel as it is not owned by LrsClient.
133
  }
1✔
134

135
  @VisibleForTesting
136
  static class LoadReportingTask implements Runnable {
137
    private final LrsStream stream;
138

139
    LoadReportingTask(LrsStream stream) {
1✔
140
      this.stream = stream;
1✔
141
    }
1✔
142

143
    @Override
144
    public void run() {
145
      stream.sendLoadReport();
1✔
146
    }
1✔
147
  }
148

149
  @VisibleForTesting
150
  class LrsRpcRetryTask implements Runnable {
1✔
151

152
    @Override
153
    public void run() {
154
      startLrsRpc();
1✔
155
    }
1✔
156
  }
157

158
  private void startLrsRpc() {
159
    if (!started) {
1✔
160
      return;
×
161
    }
162
    checkState(lrsStream == null, "previous lbStream has not been cleared yet");
1✔
163
    lrsStream = new LrsStream();
1✔
164
    retryStopwatch.reset().start();
1✔
165
    Context prevContext = context.attach();
1✔
166
    try {
167
      lrsStream.start();
1✔
168
    } finally {
169
      context.detach(prevContext);
1✔
170
    }
171
  }
1✔
172

173
  private final class LrsStream {
1✔
174
    boolean initialResponseReceived;
175
    boolean closed;
176
    long intervalNano = -1;
1✔
177
    boolean reportAllClusters;
178
    List<String> clusterNames;  // clusters to report loads for, if not report all.
179
    ScheduledHandle loadReportTimer;
180
    StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
181

182
    void start() {
183
      StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
1✔
184
          new StreamObserver<LoadStatsResponse>() {
1✔
185
            @Override
186
            public void onNext(final LoadStatsResponse response) {
187
              syncContext.execute(new Runnable() {
1✔
188
                @Override
189
                public void run() {
190
                  logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
1✔
191
                  handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
1✔
192
                      Durations.toNanos(response.getLoadReportingInterval()));
1✔
193
                }
1✔
194
              });
195
            }
1✔
196

197
            @Override
198
            public void onError(final Throwable t) {
199
              syncContext.execute(new Runnable() {
1✔
200
                @Override
201
                public void run() {
202
                  handleRpcError(t);
1✔
203
                }
1✔
204
              });
205
            }
1✔
206

207
            @Override
208
            public void onCompleted() {
209
              syncContext.execute(new Runnable() {
1✔
210
                @Override
211
                public void run() {
212
                  handleRpcCompleted();
1✔
213
                }
1✔
214
              });
215
            }
1✔
216
          };
217
      lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
1✔
218
          .streamLoadStats(lrsResponseReaderV3);
1✔
219
      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
1✔
220
      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
1✔
221
    }
1✔
222

223
    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
224
      LoadStatsRequest.Builder requestBuilder =
225
          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
1✔
226
      for (ClusterStats stats : clusterStatsList) {
1✔
227
        requestBuilder.addClusterStats(buildClusterStats(stats));
1✔
228
      }
1✔
229
      LoadStatsRequest request = requestBuilder.build();
1✔
230
      lrsRequestWriterV3.onNext(request);
1✔
231
      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
1✔
232
    }
1✔
233

234
    void sendError(Exception error) {
235
      lrsRequestWriterV3.onError(error);
1✔
236
    }
1✔
237

238
    void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
239
                           long loadReportIntervalNano) {
240
      if (closed) {
1✔
241
        return;
×
242
      }
243
      if (!initialResponseReceived) {
1✔
244
        logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
1✔
245
        initialResponseReceived = true;
1✔
246
      }
247
      reportAllClusters = sendAllClusters;
1✔
248
      if (reportAllClusters) {
1✔
249
        logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
1✔
250
      } else {
251
        logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters);
1✔
252
        clusterNames = clusters;
1✔
253
      }
254
      intervalNano = loadReportIntervalNano;
1✔
255
      logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano);
1✔
256
      scheduleNextLoadReport();
1✔
257
    }
1✔
258

259
    void handleRpcError(Throwable t) {
260
      handleStreamClosed(Status.fromThrowable(t));
1✔
261
    }
1✔
262

263
    void handleRpcCompleted() {
264
      handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
1✔
265
    }
1✔
266

267
    private void sendLoadReport() {
268
      if (closed) {
1✔
269
        return;
×
270
      }
271
      List<ClusterStats> clusterStatsList;
272
      if (reportAllClusters) {
1✔
273
        clusterStatsList = loadStatsManager.getAllClusterStatsReports();
1✔
274
      } else {
275
        clusterStatsList = new ArrayList<>();
1✔
276
        for (String name : clusterNames) {
1✔
277
          clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name));
1✔
278
        }
1✔
279
      }
280
      sendLoadStatsRequest(clusterStatsList);
1✔
281
      scheduleNextLoadReport();
1✔
282
    }
1✔
283

284
    private void scheduleNextLoadReport() {
285
      // Cancel pending load report and reschedule with updated load reporting interval.
286
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
287
        loadReportTimer.cancel();
1✔
288
        loadReportTimer = null;
1✔
289
      }
290
      if (intervalNano > 0) {
1✔
291
        loadReportTimer = syncContext.schedule(
1✔
292
            new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService);
1✔
293
      }
294
    }
1✔
295

296
    private void handleStreamClosed(Status status) {
297
      checkArgument(!status.isOk(), "unexpected OK status");
1✔
298
      if (closed) {
1✔
299
        return;
1✔
300
      }
301
      logger.log(
1✔
302
          XdsLogLevel.ERROR,
303
          "LRS stream closed with status {0}: {1}. Cause: {2}",
304
          status.getCode(), status.getDescription(), status.getCause());
1✔
305
      closed = true;
1✔
306
      cleanUp();
1✔
307

308
      if (initialResponseReceived || lrsRpcRetryPolicy == null) {
1✔
309
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
310
        // has never been initialized.
311
        lrsRpcRetryPolicy = backoffPolicyProvider.get();
1✔
312
      }
313
      // The back-off policy determines the interval between consecutive RPC upstarts, thus the
314
      // actual delay may be smaller than the value from the back-off policy, or even negative,
315
      // depending how much time was spent in the previous RPC.
316
      long delayNanos =
1✔
317
          lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
318
      logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
1✔
319
      if (delayNanos <= 0) {
1✔
320
        startLrsRpc();
×
321
      } else {
322
        lrsRpcRetryTimer = syncContext.schedule(
1✔
323
            new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
324
      }
325
    }
1✔
326

327
    private void close(Exception error) {
328
      if (closed) {
1✔
329
        return;
×
330
      }
331
      closed = true;
1✔
332
      cleanUp();
1✔
333
      sendError(error);
1✔
334
    }
1✔
335

336
    private void cleanUp() {
337
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
338
        loadReportTimer.cancel();
1✔
339
        loadReportTimer = null;
1✔
340
      }
341
      if (lrsStream == this) {
1✔
342
        lrsStream = null;
1✔
343
      }
344
    }
1✔
345

346
    private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
347
        ClusterStats stats) {
348
      io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder =
349
          io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
1✔
350
              .setClusterName(stats.clusterName());
1✔
351
      if (stats.clusterServiceName() != null) {
1✔
352
        builder.setClusterServiceName(stats.clusterServiceName());
1✔
353
      }
354
      for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
1✔
355
        builder.addUpstreamLocalityStats(
1✔
356
            io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
1✔
357
                .setLocality(
1✔
358
                    io.envoyproxy.envoy.config.core.v3.Locality.newBuilder()
1✔
359
                        .setRegion(upstreamLocalityStats.locality().region())
1✔
360
                        .setZone(upstreamLocalityStats.locality().zone())
1✔
361
                        .setSubZone(upstreamLocalityStats.locality().subZone()))
1✔
362
            .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
1✔
363
            .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
1✔
364
            .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
1✔
365
            .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())
1✔
366
            .addAllLoadMetricStats(
1✔
367
                upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map(
1✔
368
                    e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder()
1✔
369
                        .setMetricName(e.getKey())
1✔
370
                        .setNumRequestsFinishedWithMetric(
1✔
371
                            e.getValue().numRequestsFinishedWithMetric())
1✔
372
                        .setTotalMetricValue(e.getValue().totalMetricValue())
1✔
373
                        .build())
1✔
374
                .collect(Collectors.toList())));
1✔
375
      }
1✔
376
      for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
1✔
377
        builder.addDroppedRequests(
1✔
378
            io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
1✔
379
                .setCategory(droppedRequests.category())
1✔
380
                .setDroppedCount(droppedRequests.droppedCount()));
1✔
381
      }
1✔
382
      return builder
1✔
383
          .setTotalDroppedRequests(stats.totalDroppedRequests())
1✔
384
          .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano()))
1✔
385
          .build();
1✔
386
    }
387
  }
388
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc