• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18744

pending completion
#18744

push

github-actions

web-flow
Revert "Release v1.57.0 (#10417)" (#10419)

This reverts commit d23e39e64.

30630 of 34705 relevant lines covered (88.26%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.7
/../xds/src/main/java/io/grpc/xds/LoadReportClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.util.Durations;
27
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
28
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
29
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
30
import io.grpc.Channel;
31
import io.grpc.Context;
32
import io.grpc.InternalLogId;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.stub.StreamObserver;
38
import io.grpc.xds.EnvoyProtoData.Node;
39
import io.grpc.xds.Stats.ClusterStats;
40
import io.grpc.xds.Stats.DroppedRequests;
41
import io.grpc.xds.Stats.UpstreamLocalityStats;
42
import io.grpc.xds.XdsLogger.XdsLogLevel;
43
import java.util.ArrayList;
44
import java.util.Collections;
45
import java.util.List;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import java.util.stream.Collectors;
49
import javax.annotation.Nullable;
50

51
/**
52
 * Client of xDS load reporting service based on LRS protocol, which reports load stats of
53
 * gRPC client's perspective to a management server.
54
 */
55
final class LoadReportClient {
56
  private final InternalLogId logId;
57
  private final XdsLogger logger;
58
  private final Channel channel;
59
  private final Context context;
60
  private final Node node;
61
  private final SynchronizationContext syncContext;
62
  private final ScheduledExecutorService timerService;
63
  private final Stopwatch retryStopwatch;
64
  private final BackoffPolicy.Provider backoffPolicyProvider;
65
  @VisibleForTesting
66
  final LoadStatsManager2 loadStatsManager;
67

68
  private boolean started;
69
  @Nullable
70
  private BackoffPolicy lrsRpcRetryPolicy;
71
  @Nullable
72
  private ScheduledHandle lrsRpcRetryTimer;
73
  @Nullable
74
  @VisibleForTesting
75
  LrsStream lrsStream;
76

77
  LoadReportClient(
78
      LoadStatsManager2 loadStatsManager,
79
      Channel channel,
80
      Context context,
81
      Node node,
82
      SynchronizationContext syncContext,
83
      ScheduledExecutorService scheduledExecutorService,
84
      BackoffPolicy.Provider backoffPolicyProvider,
85
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
86
    this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
1✔
87
    this.channel = checkNotNull(channel, "xdsChannel");
1✔
88
    this.context = checkNotNull(context, "context");
1✔
89
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
90
    this.timerService = checkNotNull(scheduledExecutorService, "timeService");
1✔
91
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
92
    this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
93
    this.node = checkNotNull(node, "node").toBuilder()
1✔
94
        .addClientFeatures("envoy.lrs.supports_send_all_clusters").build();
1✔
95
    logId = InternalLogId.allocate("lrs-client", null);
1✔
96
    logger = XdsLogger.withLogId(logId);
1✔
97
    logger.log(XdsLogLevel.INFO, "Created");
1✔
98
  }
1✔
99

100
  /**
101
   * Establishes load reporting communication and negotiates with traffic director to report load
102
   * stats periodically. Calling this method on an already started {@link LoadReportClient} is
103
   * no-op.
104
   */
105
  void startLoadReporting() {
106
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
107
    if (started) {
1✔
108
      return;
×
109
    }
110
    started = true;
1✔
111
    logger.log(XdsLogLevel.INFO, "Starting load reporting RPC");
1✔
112
    startLrsRpc();
1✔
113
  }
1✔
114

115
  /**
116
   * Terminates load reporting. Calling this method on an already stopped
117
   * {@link LoadReportClient} is no-op.
118
   */
119
  void stopLoadReporting() {
120
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
121
    if (!started) {
1✔
122
      return;
1✔
123
    }
124
    started = false;
1✔
125
    logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC");
1✔
126
    if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
1✔
127
      lrsRpcRetryTimer.cancel();
1✔
128
    }
129
    if (lrsStream != null) {
1✔
130
      lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
1✔
131
    }
132
    // Do not shutdown channel as it is not owned by LrsClient.
133
  }
1✔
134

135
  @VisibleForTesting
136
  static class LoadReportingTask implements Runnable {
137
    private final LrsStream stream;
138

139
    LoadReportingTask(LrsStream stream) {
1✔
140
      this.stream = stream;
1✔
141
    }
1✔
142

143
    @Override
144
    public void run() {
145
      stream.sendLoadReport();
1✔
146
    }
1✔
147
  }
148

149
  @VisibleForTesting
150
  class LrsRpcRetryTask implements Runnable {
1✔
151

152
    @Override
153
    public void run() {
154
      startLrsRpc();
1✔
155
    }
1✔
156
  }
157

158
  private void startLrsRpc() {
159
    if (!started) {
1✔
160
      return;
×
161
    }
162
    checkState(lrsStream == null, "previous lbStream has not been cleared yet");
1✔
163
    lrsStream = new LrsStream();
1✔
164
    retryStopwatch.reset().start();
1✔
165
    Context prevContext = context.attach();
1✔
166
    try {
167
      lrsStream.start();
1✔
168
    } finally {
169
      context.detach(prevContext);
1✔
170
    }
171
  }
1✔
172

173
  private final class LrsStream {
1✔
174
    boolean initialResponseReceived;
175
    boolean closed;
176
    long intervalNano = -1;
1✔
177
    boolean reportAllClusters;
178
    List<String> clusterNames;  // clusters to report loads for, if not report all.
179
    ScheduledHandle loadReportTimer;
180
    StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
181

182
    void start() {
183
      StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
1✔
184
          new StreamObserver<LoadStatsResponse>() {
1✔
185
            @Override
186
            public void onNext(final LoadStatsResponse response) {
187
              syncContext.execute(new Runnable() {
1✔
188
                @Override
189
                public void run() {
190
                  logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
1✔
191
                  handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
1✔
192
                      Durations.toNanos(response.getLoadReportingInterval()));
1✔
193
                }
1✔
194
              });
195
            }
1✔
196

197
            @Override
198
            public void onError(final Throwable t) {
199
              syncContext.execute(new Runnable() {
1✔
200
                @Override
201
                public void run() {
202
                  handleRpcError(t);
1✔
203
                }
1✔
204
              });
205
            }
1✔
206

207
            @Override
208
            public void onCompleted() {
209
              syncContext.execute(new Runnable() {
1✔
210
                @Override
211
                public void run() {
212
                  handleRpcCompleted();
1✔
213
                }
1✔
214
              });
215
            }
1✔
216
          };
217
      lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
1✔
218
          .streamLoadStats(lrsResponseReaderV3);
1✔
219
      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
1✔
220
      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
1✔
221
    }
1✔
222

223
    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
224
      LoadStatsRequest.Builder requestBuilder =
225
          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
1✔
226
      for (ClusterStats stats : clusterStatsList) {
1✔
227
        requestBuilder.addClusterStats(buildClusterStats(stats));
1✔
228
      }
1✔
229
      LoadStatsRequest request = requestBuilder.build();
1✔
230
      lrsRequestWriterV3.onNext(request);
1✔
231
      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
1✔
232
    }
1✔
233

234
    void sendError(Exception error) {
235
      lrsRequestWriterV3.onError(error);
1✔
236
    }
1✔
237

238
    void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
239
                           long loadReportIntervalNano) {
240
      if (closed) {
1✔
241
        return;
×
242
      }
243
      if (!initialResponseReceived) {
1✔
244
        logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
1✔
245
        initialResponseReceived = true;
1✔
246
      }
247
      reportAllClusters = sendAllClusters;
1✔
248
      if (reportAllClusters) {
1✔
249
        logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
1✔
250
      } else {
251
        logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters);
1✔
252
        clusterNames = clusters;
1✔
253
      }
254
      intervalNano = loadReportIntervalNano;
1✔
255
      logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano);
1✔
256
      scheduleNextLoadReport();
1✔
257
    }
1✔
258

259
    void handleRpcError(Throwable t) {
260
      handleStreamClosed(Status.fromThrowable(t));
1✔
261
    }
1✔
262

263
    void handleRpcCompleted() {
264
      handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
1✔
265
    }
1✔
266

267
    private void sendLoadReport() {
268
      if (closed) {
1✔
269
        return;
×
270
      }
271
      List<ClusterStats> clusterStatsList;
272
      if (reportAllClusters) {
1✔
273
        clusterStatsList = loadStatsManager.getAllClusterStatsReports();
1✔
274
      } else {
275
        clusterStatsList = new ArrayList<>();
1✔
276
        for (String name : clusterNames) {
1✔
277
          clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name));
1✔
278
        }
1✔
279
      }
280
      sendLoadStatsRequest(clusterStatsList);
1✔
281
      scheduleNextLoadReport();
1✔
282
    }
1✔
283

284
    private void scheduleNextLoadReport() {
285
      // Cancel pending load report and reschedule with updated load reporting interval.
286
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
287
        loadReportTimer.cancel();
1✔
288
        loadReportTimer = null;
1✔
289
      }
290
      if (intervalNano > 0) {
1✔
291
        loadReportTimer = syncContext.schedule(
1✔
292
            new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService);
1✔
293
      }
294
    }
1✔
295

296
    private void handleStreamClosed(Status status) {
297
      checkArgument(!status.isOk(), "unexpected OK status");
1✔
298
      if (closed) {
1✔
299
        return;
1✔
300
      }
301
      logger.log(
1✔
302
          XdsLogLevel.ERROR,
303
          "LRS stream closed with status {0}: {1}. Cause: {2}",
304
          status.getCode(), status.getDescription(), status.getCause());
1✔
305
      closed = true;
1✔
306
      cleanUp();
1✔
307

308
      if (initialResponseReceived || lrsRpcRetryPolicy == null) {
1✔
309
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
310
        // has never been initialized.
311
        lrsRpcRetryPolicy = backoffPolicyProvider.get();
1✔
312
      }
313
      // The back-off policy determines the interval between consecutive RPC upstarts, thus the
314
      // actual delay may be smaller than the value from the back-off policy, or even negative,
315
      // depending how much time was spent in the previous RPC.
316
      long delayNanos =
1✔
317
          lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
318
      logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
1✔
319
      if (delayNanos <= 0) {
1✔
320
        startLrsRpc();
×
321
      } else {
322
        lrsRpcRetryTimer = syncContext.schedule(
1✔
323
            new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
324
      }
325
    }
1✔
326

327
    private void close(Exception error) {
328
      if (closed) {
1✔
329
        return;
×
330
      }
331
      closed = true;
1✔
332
      cleanUp();
1✔
333
      sendError(error);
1✔
334
    }
1✔
335

336
    private void cleanUp() {
337
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
338
        loadReportTimer.cancel();
1✔
339
        loadReportTimer = null;
1✔
340
      }
341
      if (lrsStream == this) {
1✔
342
        lrsStream = null;
1✔
343
      }
344
    }
1✔
345

346
    private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
347
        ClusterStats stats) {
348
      io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder =
349
          io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
1✔
350
              .setClusterName(stats.clusterName());
1✔
351
      if (stats.clusterServiceName() != null) {
1✔
352
        builder.setClusterServiceName(stats.clusterServiceName());
1✔
353
      }
354
      for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
1✔
355
        builder.addUpstreamLocalityStats(
1✔
356
            io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
1✔
357
                .setLocality(
1✔
358
                    io.envoyproxy.envoy.config.core.v3.Locality.newBuilder()
1✔
359
                        .setRegion(upstreamLocalityStats.locality().region())
1✔
360
                        .setZone(upstreamLocalityStats.locality().zone())
1✔
361
                        .setSubZone(upstreamLocalityStats.locality().subZone()))
1✔
362
            .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
1✔
363
            .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
1✔
364
            .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
1✔
365
            .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())
1✔
366
            .addAllLoadMetricStats(
1✔
367
                upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map(
1✔
368
                    e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder()
1✔
369
                        .setMetricName(e.getKey())
1✔
370
                        .setNumRequestsFinishedWithMetric(
1✔
371
                            e.getValue().numRequestsFinishedWithMetric())
1✔
372
                        .setTotalMetricValue(e.getValue().totalMetricValue())
1✔
373
                        .build())
1✔
374
                .collect(Collectors.toList())));
1✔
375
      }
1✔
376
      for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
1✔
377
        builder.addDroppedRequests(
1✔
378
            io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
1✔
379
                .setCategory(droppedRequests.category())
1✔
380
                .setDroppedCount(droppedRequests.droppedCount()));
1✔
381
      }
1✔
382
      return builder
1✔
383
          .setTotalDroppedRequests(stats.totalDroppedRequests())
1✔
384
          .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano()))
1✔
385
          .build();
1✔
386
    }
387
  }
388
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc