• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19222

09 May 2024 05:48PM UTC coverage: 88.43% (+0.04%) from 88.388%
#19222

push

github

ejona86
opentelemetry: Missing locality should be empty string

From gRFC A78:

> If no locality information is available, the label will be set to the
> empty string.

31597 of 35731 relevant lines covered (88.43%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.55
/../xds/src/main/java/io/grpc/xds/client/LoadReportClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.util.Durations;
27
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
28
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
29
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
30
import io.grpc.Internal;
31
import io.grpc.InternalLogId;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import io.grpc.internal.BackoffPolicy;
37
import io.grpc.xds.client.EnvoyProtoData.Node;
38
import io.grpc.xds.client.Stats.ClusterStats;
39
import io.grpc.xds.client.Stats.DroppedRequests;
40
import io.grpc.xds.client.Stats.UpstreamLocalityStats;
41
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
42
import io.grpc.xds.client.XdsTransportFactory.EventHandler;
43
import io.grpc.xds.client.XdsTransportFactory.StreamingCall;
44
import io.grpc.xds.client.XdsTransportFactory.XdsTransport;
45
import java.util.ArrayList;
46
import java.util.Collections;
47
import java.util.List;
48
import java.util.concurrent.ScheduledExecutorService;
49
import java.util.concurrent.TimeUnit;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * Client of xDS load reporting service based on LRS protocol, which reports load stats of
55
 * gRPC client's perspective to a management server.
56
 */
57
@Internal
58
public final class LoadReportClient {
59
  private final InternalLogId logId;
60
  private final XdsLogger logger;
61
  private final XdsTransport xdsTransport;
62
  private final Node node;
63
  private final SynchronizationContext syncContext;
64
  private final ScheduledExecutorService timerService;
65
  private final Stopwatch retryStopwatch;
66
  private final BackoffPolicy.Provider backoffPolicyProvider;
67
  @VisibleForTesting
68
  public final LoadStatsManager2 loadStatsManager;
69
  private boolean started;
70
  @Nullable
71
  private BackoffPolicy lrsRpcRetryPolicy;
72
  @Nullable
73
  private ScheduledHandle lrsRpcRetryTimer;
74
  @Nullable
75
  private LrsStream lrsStream;
76
  private static final MethodDescriptor<LoadStatsRequest, LoadStatsResponse> method =
1✔
77
      LoadReportingServiceGrpc.getStreamLoadStatsMethod();
1✔
78

79
  @VisibleForTesting
80
  public LoadReportClient(
81
      LoadStatsManager2 loadStatsManager,
82
      XdsTransport xdsTransport,
83
      Node node,
84
      SynchronizationContext syncContext,
85
      ScheduledExecutorService scheduledExecutorService,
86
      BackoffPolicy.Provider backoffPolicyProvider,
87
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
88
    this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager");
1✔
89
    this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
1✔
90
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
91
    this.timerService = checkNotNull(scheduledExecutorService, "timeService");
1✔
92
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
93
    this.retryStopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
94
    this.node = checkNotNull(node, "node").toBuilder()
1✔
95
        .addClientFeatures("envoy.lrs.supports_send_all_clusters").build();
1✔
96
    logId = InternalLogId.allocate("lrs-client", null);
1✔
97
    logger = XdsLogger.withLogId(logId);
1✔
98
    logger.log(XdsLogLevel.INFO, "Created");
1✔
99
  }
1✔
100

101
  @VisibleForTesting
102
  public boolean lrsStreamIsNull() {
103
    return lrsStream == null;
1✔
104
  }
105

106
  /**
107
   * Establishes load reporting communication and negotiates with traffic director to report load
108
   * stats periodically. Calling this method on an already started {@link LoadReportClient} is
109
   * no-op.
110
   */
111
  public void startLoadReporting() {
112
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
113
    if (started) {
1✔
114
      return;
×
115
    }
116
    started = true;
1✔
117
    logger.log(XdsLogLevel.INFO, "Starting load reporting RPC");
1✔
118
    startLrsRpc();
1✔
119
  }
1✔
120

121
  /**
122
   * Terminates load reporting. Calling this method on an already stopped
123
   * {@link LoadReportClient} is no-op.
124
   */
125
  public void stopLoadReporting() {
126
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
127
    if (!started) {
1✔
128
      return;
1✔
129
    }
130
    started = false;
1✔
131
    logger.log(XdsLogLevel.INFO, "Stopping load reporting RPC");
1✔
132
    if (lrsRpcRetryTimer != null && lrsRpcRetryTimer.isPending()) {
1✔
133
      lrsRpcRetryTimer.cancel();
1✔
134
    }
135
    if (lrsStream != null) {
1✔
136
      lrsStream.close(Status.CANCELLED.withDescription("stop load reporting").asException());
1✔
137
    }
138
    // Do not shut down channel as it is not owned by LrsClient.
139
  }
1✔
140

141
  private static class LoadReportingTask implements Runnable {
142
    private final LrsStream stream;
143

144
    LoadReportingTask(LrsStream stream) {
1✔
145
      this.stream = stream;
1✔
146
    }
1✔
147

148
    @Override
149
    public void run() {
150
      stream.sendLoadReport();
1✔
151
    }
1✔
152
  }
153

154
  private class LrsRpcRetryTask implements Runnable {
1✔
155

156
    @Override
157
    public void run() {
158
      startLrsRpc();
1✔
159
    }
1✔
160
  }
161

162
  private void startLrsRpc() {
163
    if (!started) {
1✔
164
      return;
×
165
    }
166
    checkState(lrsStream == null, "previous lbStream has not been cleared yet");
1✔
167
    retryStopwatch.reset().start();
1✔
168
    lrsStream = new LrsStream();
1✔
169
  }
1✔
170

171
  private final class LrsStream implements EventHandler<LoadStatsResponse> {
172
    boolean initialResponseReceived;
173
    boolean closed;
174
    long intervalNano = -1;
1✔
175
    boolean reportAllClusters;
176
    List<String> clusterNames;  // clusters to report loads for, if not report all.
177
    ScheduledHandle loadReportTimer;
178
    private final StreamingCall<LoadStatsRequest, LoadStatsResponse> call;
179

180
    LrsStream() {
1✔
181
      this.call = xdsTransport.createStreamingCall(method.getFullMethodName(),
1✔
182
          method.getRequestMarshaller(), method.getResponseMarshaller());
1✔
183
      call.start(this);
1✔
184
      logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request");
1✔
185
      sendLoadStatsRequest(Collections.<ClusterStats>emptyList());
1✔
186
    }
1✔
187

188
    @Override
189
    public void onReady() {}
1✔
190

191
    @Override
192
    public void onRecvMessage(LoadStatsResponse response) {
193
      syncContext.execute(new Runnable() {
1✔
194
        @Override
195
        public void run() {
196
          logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response);
1✔
197
          handleRpcResponse(response.getClustersList(), response.getSendAllClusters(),
1✔
198
              Durations.toNanos(response.getLoadReportingInterval()));
1✔
199
          call.startRecvMessage();
1✔
200
        }
1✔
201
      });
202
    }
1✔
203

204
    @Override
205
    public void onStatusReceived(final Status status) {
206
      syncContext.execute(new Runnable() {
1✔
207
        @Override
208
        public void run() {
209
          if (status.isOk()) {
1✔
210
            handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server"));
1✔
211
          } else {
212
            handleStreamClosed(status);
1✔
213
          }
214
        }
1✔
215
      });
216
    }
1✔
217

218
    void sendLoadStatsRequest(List<ClusterStats> clusterStatsList) {
219
      LoadStatsRequest.Builder requestBuilder =
220
          LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode());
1✔
221
      for (ClusterStats stats : clusterStatsList) {
1✔
222
        requestBuilder.addClusterStats(buildClusterStats(stats));
1✔
223
      }
1✔
224
      LoadStatsRequest request = requestBuilder.build();
1✔
225
      call.sendMessage(request);
1✔
226
      logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request);
1✔
227
    }
1✔
228

229
    void handleRpcResponse(List<String> clusters, boolean sendAllClusters,
230
                           long loadReportIntervalNano) {
231
      if (closed) {
1✔
232
        return;
×
233
      }
234
      if (!initialResponseReceived) {
1✔
235
        logger.log(XdsLogLevel.DEBUG, "Initial LRS response received");
1✔
236
        initialResponseReceived = true;
1✔
237
      }
238
      reportAllClusters = sendAllClusters;
1✔
239
      if (reportAllClusters) {
1✔
240
        logger.log(XdsLogLevel.INFO, "Report loads for all clusters");
1✔
241
      } else {
242
        logger.log(XdsLogLevel.INFO, "Report loads for clusters: ", clusters);
1✔
243
        clusterNames = clusters;
1✔
244
      }
245
      intervalNano = loadReportIntervalNano;
1✔
246
      logger.log(XdsLogLevel.INFO, "Update load reporting interval to {0} ns", intervalNano);
1✔
247
      scheduleNextLoadReport();
1✔
248
    }
1✔
249

250
    private void sendLoadReport() {
251
      if (closed) {
1✔
252
        return;
×
253
      }
254
      List<ClusterStats> clusterStatsList;
255
      if (reportAllClusters) {
1✔
256
        clusterStatsList = loadStatsManager.getAllClusterStatsReports();
1✔
257
      } else {
258
        clusterStatsList = new ArrayList<>();
1✔
259
        for (String name : clusterNames) {
1✔
260
          clusterStatsList.addAll(loadStatsManager.getClusterStatsReports(name));
1✔
261
        }
1✔
262
      }
263
      sendLoadStatsRequest(clusterStatsList);
1✔
264
      scheduleNextLoadReport();
1✔
265
    }
1✔
266

267
    private void scheduleNextLoadReport() {
268
      // Cancel pending load report and reschedule with updated load reporting interval.
269
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
270
        loadReportTimer.cancel();
1✔
271
        loadReportTimer = null;
1✔
272
      }
273
      if (intervalNano > 0) {
1✔
274
        loadReportTimer = syncContext.schedule(
1✔
275
            new LoadReportingTask(this), intervalNano, TimeUnit.NANOSECONDS, timerService);
1✔
276
      }
277
    }
1✔
278

279
    private void handleStreamClosed(Status status) {
280
      checkArgument(!status.isOk(), "unexpected OK status");
1✔
281
      if (closed) {
1✔
282
        return;
1✔
283
      }
284
      logger.log(
1✔
285
          XdsLogLevel.ERROR,
286
          "LRS stream closed with status {0}: {1}. Cause: {2}",
287
          status.getCode(), status.getDescription(), status.getCause());
1✔
288
      closed = true;
1✔
289
      cleanUp();
1✔
290

291
      if (initialResponseReceived || lrsRpcRetryPolicy == null) {
1✔
292
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
293
        // has never been initialized.
294
        lrsRpcRetryPolicy = backoffPolicyProvider.get();
1✔
295
      }
296
      // The back-off policy determines the interval between consecutive RPC upstarts, thus the
297
      // actual delay may be smaller than the value from the back-off policy, or even negative,
298
      // depending on how much time was spent in the previous RPC.
299
      long delayNanos =
1✔
300
          lrsRpcRetryPolicy.nextBackoffNanos() - retryStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
301
      logger.log(XdsLogLevel.INFO, "Retry LRS stream in {0} ns", delayNanos);
1✔
302
      if (delayNanos <= 0) {
1✔
303
        startLrsRpc();
×
304
      } else {
305
        lrsRpcRetryTimer = syncContext.schedule(
1✔
306
            new LrsRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
307
      }
308
    }
1✔
309

310
    private void close(Exception error) {
311
      if (closed) {
1✔
312
        return;
×
313
      }
314
      closed = true;
1✔
315
      cleanUp();
1✔
316
      call.sendError(error);
1✔
317
    }
1✔
318

319
    private void cleanUp() {
320
      if (loadReportTimer != null && loadReportTimer.isPending()) {
1✔
321
        loadReportTimer.cancel();
1✔
322
        loadReportTimer = null;
1✔
323
      }
324
      if (lrsStream == this) {
1✔
325
        lrsStream = null;
1✔
326
      }
327
    }
1✔
328

329
    private io.envoyproxy.envoy.config.endpoint.v3.ClusterStats buildClusterStats(
330
        ClusterStats stats) {
331
      io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.Builder builder =
332
          io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.newBuilder()
1✔
333
              .setClusterName(stats.clusterName());
1✔
334
      if (stats.clusterServiceName() != null) {
1✔
335
        builder.setClusterServiceName(stats.clusterServiceName());
1✔
336
      }
337
      for (UpstreamLocalityStats upstreamLocalityStats : stats.upstreamLocalityStatsList()) {
1✔
338
        builder.addUpstreamLocalityStats(
1✔
339
            io.envoyproxy.envoy.config.endpoint.v3.UpstreamLocalityStats.newBuilder()
1✔
340
                .setLocality(
1✔
341
                    io.envoyproxy.envoy.config.core.v3.Locality.newBuilder()
1✔
342
                        .setRegion(upstreamLocalityStats.locality().region())
1✔
343
                        .setZone(upstreamLocalityStats.locality().zone())
1✔
344
                        .setSubZone(upstreamLocalityStats.locality().subZone()))
1✔
345
            .setTotalSuccessfulRequests(upstreamLocalityStats.totalSuccessfulRequests())
1✔
346
            .setTotalErrorRequests(upstreamLocalityStats.totalErrorRequests())
1✔
347
            .setTotalRequestsInProgress(upstreamLocalityStats.totalRequestsInProgress())
1✔
348
            .setTotalIssuedRequests(upstreamLocalityStats.totalIssuedRequests())
1✔
349
            .addAllLoadMetricStats(
1✔
350
                upstreamLocalityStats.loadMetricStatsMap().entrySet().stream().map(
1✔
351
                    e -> io.envoyproxy.envoy.config.endpoint.v3.EndpointLoadMetricStats.newBuilder()
1✔
352
                        .setMetricName(e.getKey())
1✔
353
                        .setNumRequestsFinishedWithMetric(
1✔
354
                            e.getValue().numRequestsFinishedWithMetric())
1✔
355
                        .setTotalMetricValue(e.getValue().totalMetricValue())
1✔
356
                        .build())
1✔
357
                .collect(Collectors.toList())));
1✔
358
      }
1✔
359
      for (DroppedRequests droppedRequests : stats.droppedRequestsList()) {
1✔
360
        builder.addDroppedRequests(
1✔
361
            io.envoyproxy.envoy.config.endpoint.v3.ClusterStats.DroppedRequests.newBuilder()
1✔
362
                .setCategory(droppedRequests.category())
1✔
363
                .setDroppedCount(droppedRequests.droppedCount()));
1✔
364
      }
1✔
365
      return builder
1✔
366
          .setTotalDroppedRequests(stats.totalDroppedRequests())
1✔
367
          .setLoadReportInterval(Durations.fromNanos(stats.loadReportIntervalNano()))
1✔
368
          .build();
1✔
369
    }
370
  }
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc