• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18696

pending completion
#18696

push

github-actions

web-flow
core: ManagedChannelImpl to always use RetryingNameResolver (#10328)

ManagedCahnnelImpl did not make sure to use a RetryingNameResolver if
authority was not overriden. This was not a problem for DNS name
resolution as the DNS name resolver factory explicitly returns a
RetryingNameResolver. For polling name resolvers that do not do this in
their factories (like the grpclb name resolver) this meant not having retry
at all.

30555 of 34636 relevant lines covered (88.22%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.53
/../xds/src/main/java/io/grpc/xds/LoadReportClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.util.Durations;
27
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
28
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
29
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
30
import io.grpc.Channel;
31
import io.grpc.Context;
32
import io.grpc.InternalLogId;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.stub.StreamObserver;
38
import io.grpc.xds.EnvoyProtoData.Node;
39
import io.grpc.xds.Stats.ClusterStats;
40
import io.grpc.xds.Stats.DroppedRequests;
41
import io.grpc.xds.Stats.UpstreamLocalityStats;
42
import io.grpc.xds.XdsLogger.XdsLogLevel;
43
import java.util.ArrayList;
44
import java.util.Collections;
45
import java.util.List;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import javax.annotation.Nullable;
49

50
/**
51
 * Client of xDS load reporting service based on LRS protocol, which reports load stats of
52
 * gRPC client's perspective to a management server.
53
 */
54
final class LoadReportClient {
55
  private final InternalLogId logId;
56
  private final XdsLogger logger;
57
  private final Channel channel;
58
  private final Context context;
59
  private final Node node;
60
  private final SynchronizationContext syncContext;
61
  private final ScheduledExecutorService timerService;
62
  private final Stopwatch retryStopwatch;
63
  private final BackoffPolicy.Provider backoffPolicyProvider;
64
  @VisibleForTesting
65
  final LoadStatsManager2 loadStatsManager;
66

67
  private boolean started;
68
  @Nullable
69
  private BackoffPolicy lrsRpcRetryPolicy;
70
  @Nullable
71
  private ScheduledHandle lrsRpcRetryTimer;
72
  @Nullable
73
  @VisibleForTesting
74
  LrsStream lrsStream;
75

76
  LoadReportClient(
77
      LoadStatsManager2 loadStatsManager,
78
      Channel channel,
79
      Context context,
80
      Node node,
81
      SynchronizationContext syncContext,
82
      ScheduledExecutorService scheduledExecutorService,
83
      BackoffPolicy.Provider backoffPolicyProvider,
84
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
85
    this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
1✔
86
    this.channel = checkNotNull(channel, "xdsChannel");
1✔
87
    this.context = checkNotNull(context, "context");
1✔
88
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
89
    this.timerService = checkNotNull(scheduledExecutorService, "timeService");
1✔
90
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
91
    this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
92
    this.node = checkNotNull(node, "node").toBuilder()
1✔
93
        .addClientFeatures("envoy.lrs.supports_send_all_clusters").build();
1✔
94
    logId = InternalLogId.allocate("lrs-client", null);
1✔
95
    logger = XdsLogger.withLogId(logId);
1✔
96
    logger.log(XdsLogLevel.INFO, "Created");
1✔
97
  }
1✔
98

99
  /**
100
   * Establishes load reporting communication and negotiates with traffic director to report load
101
   * stats periodically. Calling this method on an already started {@link LoadReportClient} is
102
   * no-op.
103
   */
104
  void startLoadReporting() {
105
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
106
    if (started) {
1✔
107
      return;
×
108
    }
109
    started = true;
1✔
110
    logger.log(XdsLogLevel.INFO, "Starting load reporting RPC");
1✔
111
    startLrsRpc();
1✔
112
  }
1✔
113

114
  /**
115
   * Terminates load reporting. Calling this method on an already stopped
116
   * {@link LoadReportClient} is no-op.
117
   */
118
  void stopLoadReporting() {
119
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
120
    if (!started) {
1✔
121
      return;
1✔
122
    }
123
    started = false;
1✔
124
    logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC");
1✔
125
    if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
1✔
126
      lrsRpcRetryTimer.cancel();
1✔
127
    }
128
    if (lrsStream != null) {
1✔
129
      lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
1✔
130
    }
131
    // Do not shutdown channel as it is not owned by LrsClient.
132
  }
1✔
133

134
  @VisibleForTesting
135
  static class LoadReportingTask implements Runnable {
136
    private final LrsStream stream;
137

138
    LoadReportingTask(LrsStream stream) {
1✔
139
      this.stream = stream;
1✔
140
    }
1✔
141

142
    @Override
143
    public void run() {
144
      stream.sendLoadReport();
1✔
145
    }
1✔
146
  }
147

148
  @VisibleForTesting
149
  class LrsRpcRetryTask implements Runnable {
1✔
150

151
    @Override
152
    public void run() {
153
      startLrsRpc();
1✔
154
    }
1✔
155
  }
156

157
  private void startLrsRpc() {
158
    if (!started) {
1✔
159
      return;
×
160
    }
161
    checkState(lrsStream == null, "previous lbStream has not been cleared yet");
1✔
162
    lrsStream = new LrsStream();
1✔
163
    retryStopwatch.reset().start();
1✔
164
    Context prevContext = context.attach();
1✔
165
    try {
166
      lrsStream.start();
1✔
167
    } finally {
168
      context.detach(prevContext);
1✔
169
    }
170
  }
1✔
171

172
  private final class LrsStream {
1✔
173
    boolean initialResponseReceived;
174
    boolean closed;
175
    long intervalNano = -1;
1✔
176
    boolean reportAllClusters;
177
    List<String> clusterNames;  // clusters to report loads for, if not report all.
178
    ScheduledHandle loadReportTimer;
179
    StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
180

181
    void start() {
182
      StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
1✔
183
          new StreamObserver<LoadStatsResponse>() {
1✔
184
            @Override
185
            public void onNext(final LoadStatsResponse response) {
186
              syncContext.execute(new Runnable() {
1✔
187
                @Override
188
                public void run() {
189
                  logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
1✔
190
                  handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
1✔
191
                      Durations.toNanos(response.getLoadReportingInterval()));
1✔
192
                }
1✔
193
              });
194
            }
1✔
195

196
            @Override
197
            public void onError(final Throwable t) {
198
              syncContext.execute(new Runnable() {
1✔
199
                @Override
200
                public void run() {
201
                  handleRpcError(t);
1✔
202
                }
1✔
203
              });
204
            }
1✔
205

206
            @Override
207
            public void onCompleted() {
208
              syncContext.execute(new Runnable() {
1✔
209
                @Override
210
                public void run() {
211
                  handleRpcCompleted();
1✔
212
                }
1✔
213
              });
214
            }
1✔
215
          };
216
      lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
1✔
217
          .streamLoadStats(lrsResponseReaderV3);
1✔
218
      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
1✔
219
      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
1✔
220
    }
1✔
221

222
    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
223
      LoadStatsRequest.Builder requestBuilder =
224
          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
1✔
225
      for (ClusterStats stats : clusterStatsList) {
1✔
226
        requestBuilder.addClusterStats(buildClusterStats(stats));
1✔
227
      }
1✔
228
      LoadStatsRequest request = requestBuilder.build();
1✔
229
      lrsRequestWriterV3.onNext(request);
1✔
230
      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
1✔
231
    }
1✔
232

233
    void sendError(Exception error) {
234
      lrsRequestWriterV3.onError(error);
1✔
235
    }
1✔
236

237
    void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
238
                           long loadReportIntervalNano) {
239
      if (closed) {
1✔
240
        return;
×
241
      }
242
      if (!initialResponseReceived) {
1✔
243
        logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
1✔
244
        initialResponseReceived = true;
1✔
245
      }
246
      reportAllClusters = sendAllClusters;
1✔
247
      if (reportAllClusters) {
1✔
248
        logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
1✔
249
      } else {
250
        logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters);
1✔
251
        clusterNames = clusters;
1✔
252
      }
253
      intervalNano = loadReportIntervalNano;
1✔
254
      logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano);
1✔
255
      scheduleNextLoadReport();
1✔
256
    }
1✔
257

258
    void handleRpcError(Throwable t) {
259
      handleStreamClosed(Status.fromThrowable(t));
1✔
260
    }
1✔
261

262
    void handleRpcCompleted() {
263
      handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
1✔
264
    }
1✔
265

266
    private void sendLoadReport() {
267
      if (closed) {
1✔
268
        return;
×
269
      }
270
      List<ClusterStats> clusterStatsList;
271
      if (reportAllClusters) {
1✔
272
        clusterStatsList = loadStatsManager.getAllClusterStatsReports();
1✔
273
      } else {
274
        clusterStatsList = new ArrayList<>();
1✔
275
        for (String name : clusterNames) {
1✔
276
          clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name));
1✔
277
        }
1✔
278
      }
279
      sendLoadStatsRequest(clusterStatsList);
1✔
280
      scheduleNextLoadReport();
1✔
281
    }
1✔
282

283
    private void scheduleNextLoadReport() {
284
      // Cancel pending load report and reschedule with updated load reporting interval.
285
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
286
        loadReportTimer.cancel();
1✔
287
        loadReportTimer = null;
1✔
288
      }
289
      if (intervalNano > 0) {
1✔
290
        loadReportTimer = syncContext.schedule(
1✔
291
            new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService);
1✔
292
      }
293
    }
1✔
294

295
    private void handleStreamClosed(Status status) {
296
      checkArgument(!status.isOk(), "unexpected OK status");
1✔
297
      if (closed) {
1✔
298
        return;
1✔
299
      }
300
      logger.log(
1✔
301
          XdsLogLevel.ERROR,
302
          "LRS stream closed with status {0}: {1}. Cause: {2}",
303
          status.getCode(), status.getDescription(), status.getCause());
1✔
304
      closed = true;
1✔
305
      cleanUp();
1✔
306

307
      if (initialResponseReceived || lrsRpcRetryPolicy == null) {
1✔
308
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
309
        // has never been initialized.
310
        lrsRpcRetryPolicy = backoffPolicyProvider.get();
1✔
311
      }
312
      // The back-off policy determines the interval between consecutive RPC upstarts, thus the
313
      // actual delay may be smaller than the value from the back-off policy, or even negative,
314
      // depending how much time was spent in the previous RPC.
315
      long delayNanos =
1✔
316
          lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
317
      logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
1✔
318
      if (delayNanos <= 0) {
1✔
319
        startLrsRpc();
×
320
      } else {
321
        lrsRpcRetryTimer = syncContext.schedule(
1✔
322
            new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
323
      }
324
    }
1✔
325

326
    private void close(Exception error) {
327
      if (closed) {
1✔
328
        return;
×
329
      }
330
      closed = true;
1✔
331
      cleanUp();
1✔
332
      sendError(error);
1✔
333
    }
1✔
334

335
    private void cleanUp() {
336
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
337
        loadReportTimer.cancel();
1✔
338
        loadReportTimer = null;
1✔
339
      }
340
      if (lrsStream == this) {
1✔
341
        lrsStream = null;
1✔
342
      }
343
    }
1✔
344

345
    private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
346
        ClusterStats stats) {
347
      io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder =
348
          io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
1✔
349
              .setClusterName(stats.clusterName());
1✔
350
      if (stats.clusterServiceName() != null) {
1✔
351
        builder.setClusterServiceName(stats.clusterServiceName());
1✔
352
      }
353
      for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
1✔
354
        builder.addUpstreamLocalityStats(
1✔
355
            io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
1✔
356
                .setLocality(
1✔
357
                    io.envoyproxy.envoy.config.core.v3.Locality.newBuilder()
1✔
358
                        .setRegion(upstreamLocalityStats.locality().region())
1✔
359
                        .setZone(upstreamLocalityStats.locality().zone())
1✔
360
                        .setSubZone(upstreamLocalityStats.locality().subZone()))
1✔
361
            .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
1✔
362
            .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
1✔
363
            .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
1✔
364
            .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()));
1✔
365
      }
1✔
366
      for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
1✔
367
        builder.addDroppedRequests(
1✔
368
            io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
1✔
369
                .setCategory(droppedRequests.category())
1✔
370
                .setDroppedCount(droppedRequests.droppedCount()));
1✔
371
      }
1✔
372
      return builder
1✔
373
          .setTotalDroppedRequests(stats.totalDroppedRequests())
1✔
374
          .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano()))
1✔
375
          .build();
1✔
376
    }
377
  }
378
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc