• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18957

19 Dec 2023 05:39PM UTC coverage: 88.134% (-0.003%) from 88.137%
#18957

push

github

web-flow
buildscripts: Use the Kokoro shared install lib from the new repo (#10757) (#10763)

Source: https://github.com/grpc/grpc/blob/4f7ead2c7/tools/internal_ci/linux/grpc_xds_k8s_install_test_driver.sh
New repo: https://github.com/grpc/psm-interop
New path: `.kokoro/psm_interop_kokoro_lib.sh`

30832 of 34983 relevant lines covered (88.13%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.53
/../xds/src/main/java/io/grpc/xds/LoadReportClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.util.Durations;
27
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
28
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
29
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
30
import io.grpc.Channel;
31
import io.grpc.Context;
32
import io.grpc.InternalLogId;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.stub.StreamObserver;
38
import io.grpc.xds.EnvoyProtoData.Node;
39
import io.grpc.xds.Stats.ClusterStats;
40
import io.grpc.xds.Stats.DroppedRequests;
41
import io.grpc.xds.Stats.UpstreamLocalityStats;
42
import io.grpc.xds.XdsLogger.XdsLogLevel;
43
import java.util.ArrayList;
44
import java.util.Collections;
45
import java.util.List;
46
import java.util.concurrent.ScheduledExecutorService;
47
import java.util.concurrent.TimeUnit;
48
import javax.annotation.Nullable;
49

50
/**
51
 * Client of xDS load reporting service based on LRS protocol, which reports load stats of
52
 * gRPC client's perspective to a management server.
53
 */
54
final class LoadReportClient {
55
  private final InternalLogId logId;
56
  private final XdsLogger logger;
57
  private final Channel channel;
58
  private final Context context;
59
  private final Node node;
60
  private final SynchronizationContext syncContext;
61
  private final ScheduledExecutorService timerService;
62
  private final Stopwatch retryStopwatch;
63
  private final BackoffPolicy.Provider backoffPolicyProvider;
64
  @VisibleForTesting
65
  final LoadStatsManager2 loadStatsManager;
66

67
  private boolean started;
68
  @Nullable
69
  private BackoffPolicy lrsRpcRetryPolicy;
70
  @Nullable
71
  private ScheduledHandle lrsRpcRetryTimer;
72
  @Nullable
73
  @VisibleForTesting
74
  LrsStream lrsStream;
75

76
  LoadReportClient(
77
      LoadStatsManager2 loadStatsManager,
78
      Channel channel,
79
      Context context,
80
      Node node,
81
      SynchronizationContext syncContext,
82
      ScheduledExecutorService scheduledExecutorService,
83
      BackoffPolicy.Provider backoffPolicyProvider,
84
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
85
    this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
1✔
86
    this.channel = checkNotNull(channel, "xdsChannel");
1✔
87
    this.context = checkNotNull(context, "context");
1✔
88
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
89
    this.timerService = checkNotNull(scheduledExecutorService, "timeService");
1✔
90
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
91
    this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
92
    this.node = checkNotNull(node, "node").toBuilder()
1✔
93
        .addClientFeatures("envoy.lrs.supports_send_all_clusters").build();
1✔
94
    logId = InternalLogId.allocate("lrs-client", null);
1✔
95
    logger = XdsLogger.withLogId(logId);
1✔
96
    logger.log(XdsLogLevel.INFO, "Created");
1✔
97
  }
1✔
98

99
  /**
100
   * Establishes load reporting communication and negotiates with traffic director to report load
101
   * stats periodically. Calling this method on an already started {@link LoadReportClient} is
102
   * no-op.
103
   */
104
  void startLoadReporting() {
105
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
106
    if (started) {
1✔
107
      return;
×
108
    }
109
    started = true;
1✔
110
    logger.log(XdsLogLevel.INFO, "Starting load reporting RPC");
1✔
111
    startLrsRpc();
1✔
112
  }
1✔
113

114
  /**
115
   * Terminates load reporting. Calling this method on an already stopped
116
   * {@link LoadReportClient} is no-op.
117
   */
118
  void stopLoadReporting() {
119
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
120
    if (!started) {
1✔
121
      return;
1✔
122
    }
123
    started = false;
1✔
124
    logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC");
1✔
125
    if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
1✔
126
      lrsRpcRetryTimer.cancel();
1✔
127
    }
128
    if (lrsStream != null) {
1✔
129
      lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
1✔
130
    }
131
    // Do not shutdown channel as it is not owned by LrsClient.
132
  }
1✔
133

134
  @VisibleForTesting
135
  static class LoadReportingTask implements Runnable {
136
    private final LrsStream stream;
137

138
    LoadReportingTask(LrsStream stream) {
1✔
139
      this.stream = stream;
1✔
140
    }
1✔
141

142
    @Override
143
    public void run() {
144
      stream.sendLoadReport();
1✔
145
    }
1✔
146
  }
147

148
  @VisibleForTesting
149
  class LrsRpcRetryTask implements Runnable {
1✔
150

151
    @Override
152
    public void run() {
153
      startLrsRpc();
1✔
154
    }
1✔
155
  }
156

157
  private void startLrsRpc() {
158
    if (!started) {
1✔
159
      return;
×
160
    }
161
    checkState(lrsStream == null, "previous lbStream has not been cleared yet");
1✔
162
    lrsStream = new LrsStream();
1✔
163
    retryStopwatch.reset().start();
1✔
164
    Context prevContext = context.attach();
1✔
165
    try {
166
      lrsStream.start();
1✔
167
    } finally {
168
      context.detach(prevContext);
1✔
169
    }
170
  }
1✔
171

172
  private final class LrsStream {
1✔
173
    boolean initialResponseReceived;
174
    boolean closed;
175
    long intervalNano = -1;
1✔
176
    boolean reportAllClusters;
177
    List<String> clusterNames;  // clusters to report loads for, if not report all.
178
    ScheduledHandle loadReportTimer;
179
    StreamObserver<LoadStatsRequest> lrsRequestWriterV3;
180

181
    void start() {
182
      StreamObserver<LoadStatsResponse> lrsResponseReaderV3 =
1✔
183
          new StreamObserver<LoadStatsResponse>() {
1✔
184
            @Override
185
            public void onNext(final LoadStatsResponse response) {
186
              syncContext.execute(new Runnable() {
1✔
187
                @Override
188
                public void run() {
189
                  logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
1✔
190
                  handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
1✔
191
                      Durations.toNanos(response.getLoadReportingInterval()));
1✔
192
                }
1✔
193
              });
194
            }
1✔
195

196
            @Override
197
            public void onError(final Throwable t) {
198
              syncContext.execute(new Runnable() {
1✔
199
                @Override
200
                public void run() {
201
                  handleRpcError(t);
1✔
202
                }
1✔
203
              });
204
            }
1✔
205

206
            @Override
207
            public void onCompleted() {
208
              syncContext.execute(new Runnable() {
1✔
209
                @Override
210
                public void run() {
211
                  handleRpcCompleted();
1✔
212
                }
1✔
213
              });
214
            }
1✔
215
          };
216
      lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady()
1✔
217
          .streamLoadStats(lrsResponseReaderV3);
1✔
218
      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
1✔
219
      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
1✔
220
    }
1✔
221

222
    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
223
      LoadStatsRequest.Builder requestBuilder =
224
          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
1✔
225
      for (ClusterStats stats : clusterStatsList) {
1✔
226
        requestBuilder.addClusterStats(buildClusterStats(stats));
1✔
227
      }
1✔
228
      LoadStatsRequest request = requestBuilder.build();
1✔
229
      lrsRequestWriterV3.onNext(request);
1✔
230
      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
1✔
231
    }
1✔
232

233
    void sendError(Exception error) {
234
      lrsRequestWriterV3.onError(error);
1✔
235
    }
1✔
236

237
    void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
238
                           long loadReportIntervalNano) {
239
      if (closed) {
1✔
240
        return;
×
241
      }
242
      if (!initialResponseReceived) {
1✔
243
        logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
1✔
244
        initialResponseReceived = true;
1✔
245
      }
246
      reportAllClusters = sendAllClusters;
1✔
247
      if (reportAllClusters) {
1✔
248
        logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
1✔
249
      } else {
250
        logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters);
1✔
251
        clusterNames = clusters;
1✔
252
      }
253
      intervalNano = loadReportIntervalNano;
1✔
254
      logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano);
1✔
255
      scheduleNextLoadReport();
1✔
256
    }
1✔
257

258
    void handleRpcError(Throwable t) {
259
      handleStreamClosed(Status.fromThrowable(t));
1✔
260
    }
1✔
261

262
    void handleRpcCompleted() {
263
      handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
1✔
264
    }
1✔
265

266
    private void sendLoadReport() {
267
      if (closed) {
1✔
268
        return;
×
269
      }
270
      List<ClusterStats> clusterStatsList;
271
      if (reportAllClusters) {
1✔
272
        clusterStatsList = loadStatsManager.getAllClusterStatsReports();
1✔
273
      } else {
274
        clusterStatsList = new ArrayList<>();
1✔
275
        for (String name : clusterNames) {
1✔
276
          clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name));
1✔
277
        }
1✔
278
      }
279
      sendLoadStatsRequest(clusterStatsList);
1✔
280
      scheduleNextLoadReport();
1✔
281
    }
1✔
282

283
    private void scheduleNextLoadReport() {
284
      // Cancel pending load report and reschedule with updated load reporting interval.
285
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
286
        loadReportTimer.cancel();
1✔
287
        loadReportTimer = null;
1✔
288
      }
289
      if (intervalNano > 0) {
1✔
290
        loadReportTimer = syncContext.schedule(
1✔
291
            new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService);
1✔
292
      }
293
    }
1✔
294

295
    private void handleStreamClosed(Status status) {
296
      checkArgument(!status.isOk(), "unexpected OK status");
1✔
297
      if (closed) {
1✔
298
        return;
1✔
299
      }
300
      logger.log(
1✔
301
          XdsLogLevel.ERROR,
302
          "LRS stream closed with status {0}: {1}. Cause: {2}",
303
          status.getCode(), status.getDescription(), status.getCause());
1✔
304
      closed = true;
1✔
305
      cleanUp();
1✔
306

307
      if (initialResponseReceived || lrsRpcRetryPolicy == null) {
1✔
308
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
309
        // has never been initialized.
310
        lrsRpcRetryPolicy = backoffPolicyProvider.get();
1✔
311
      }
312
      // The back-off policy determines the interval between consecutive RPC upstarts, thus the
313
      // actual delay may be smaller than the value from the back-off policy, or even negative,
314
      // depending how much time was spent in the previous RPC.
315
      long delayNanos =
1✔
316
          lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
317
      logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
1✔
318
      if (delayNanos <= 0) {
1✔
319
        startLrsRpc();
×
320
      } else {
321
        lrsRpcRetryTimer = syncContext.schedule(
1✔
322
            new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
323
      }
324
    }
1✔
325

326
    private void close(Exception error) {
327
      if (closed) {
1✔
328
        return;
×
329
      }
330
      closed = true;
1✔
331
      cleanUp();
1✔
332
      sendError(error);
1✔
333
    }
1✔
334

335
    private void cleanUp() {
336
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
337
        loadReportTimer.cancel();
1✔
338
        loadReportTimer = null;
1✔
339
      }
340
      if (lrsStream == this) {
1✔
341
        lrsStream = null;
1✔
342
      }
343
    }
1✔
344

345
    private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
346
        ClusterStats stats) {
347
      io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder =
348
          io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
1✔
349
              .setClusterName(stats.clusterName());
1✔
350
      if (stats.clusterServiceName() != null) {
1✔
351
        builder.setClusterServiceName(stats.clusterServiceName());
1✔
352
      }
353
      for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
1✔
354
        builder.addUpstreamLocalityStats(
1✔
355
            io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
1✔
356
                .setLocality(
1✔
357
                    io.envoyproxy.envoy.config.core.v3.Locality.newBuilder()
1✔
358
                        .setRegion(upstreamLocalityStats.locality().region())
1✔
359
                        .setZone(upstreamLocalityStats.locality().zone())
1✔
360
                        .setSubZone(upstreamLocalityStats.locality().subZone()))
1✔
361
            .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
1✔
362
            .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
1✔
363
            .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
1✔
364
            .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests()));
1✔
365
      }
1✔
366
      for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
1✔
367
        builder.addDroppedRequests(
1✔
368
            io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
1✔
369
                .setCategory(droppedRequests.category())
1✔
370
                .setDroppedCount(droppedRequests.droppedCount()));
1✔
371
      }
1✔
372
      return builder
1✔
373
          .setTotalDroppedRequests(stats.totalDroppedRequests())
1✔
374
          .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano()))
1✔
375
          .build();
1✔
376
    }
377
  }
378
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc