• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18713

pending completion
#18713

push

github-actions

web-flow
services, xds, orca: LRS named metrics support (#10282)

Implements gRFC A64: xDS LRS Custom Metrics Support

30622 of 34704 relevant lines covered (88.24%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.32
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Strings;
25
import io.grpc.Attributes;
26
import io.grpc.ClientStreamTracer;
27
import io.grpc.ClientStreamTracer.StreamInfo;
28
import io.grpc.ConnectivityState;
29
import io.grpc.EquivalentAddressGroup;
30
import io.grpc.InternalLogId;
31
import io.grpc.LoadBalancer;
32
import io.grpc.Metadata;
33
import io.grpc.Status;
34
import io.grpc.internal.ForwardingClientStreamTracer;
35
import io.grpc.internal.ObjectPool;
36
import io.grpc.services.MetricReport;
37
import io.grpc.util.ForwardingLoadBalancerHelper;
38
import io.grpc.util.ForwardingSubchannel;
39
import io.grpc.util.GracefulSwitchLoadBalancer;
40
import io.grpc.xds.Bootstrapper.ServerInfo;
41
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
42
import io.grpc.xds.Endpoints.DropOverload;
43
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
44
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
45
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
46
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
47
import io.grpc.xds.XdsLogger.XdsLogLevel;
48
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
49
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
50
import io.grpc.xds.internal.security.SslContextProviderSupplier;
51
import io.grpc.xds.orca.OrcaPerRequestUtil;
52
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
53
import java.util.ArrayList;
54
import java.util.Collections;
55
import java.util.List;
56
import java.util.Objects;
57
import java.util.concurrent.atomic.AtomicLong;
58
import javax.annotation.Nullable;
59

60
/**
61
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
62
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
63
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
64
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
65
 * breakers.
66
 */
67
final class ClusterImplLoadBalancer extends LoadBalancer {
68

69
  @VisibleForTesting
70
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
71
  @VisibleForTesting
72
  static boolean enableCircuitBreaking =
1✔
73
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
74
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
75

76
  private static final Attributes.Key<ClusterLocalityStats> ATTR_CLUSTER_LOCALITY_STATS =
1✔
77
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats");
1✔
78

79
  private final XdsLogger logger;
80
  private final Helper helper;
81
  private final ThreadSafeRandom random;
82
  // The following fields are effectively final.
83
  private String cluster;
84
  @Nullable
85
  private String edsServiceName;
86
  private ObjectPool<XdsClient> xdsClientPool;
87
  private XdsClient xdsClient;
88
  private CallCounterProvider callCounterProvider;
89
  private ClusterDropStats dropStats;
90
  private ClusterImplLbHelper childLbHelper;
91
  private GracefulSwitchLoadBalancer childSwitchLb;
92

93
  ClusterImplLoadBalancer(Helper helper) {
94
    this(helper, ThreadSafeRandomImpl.instance);
1✔
95
  }
1✔
96

97
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
98
    this.helper = checkNotNull(helper, "helper");
1✔
99
    this.random = checkNotNull(random, "random");
1✔
100
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
101
    logger = XdsLogger.withLogId(logId);
1✔
102
    logger.log(XdsLogLevel.INFO, "Created");
1✔
103
  }
1✔
104

105
  @Override
106
  public boolean acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
107
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
108
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
109
    if (xdsClientPool == null) {
1✔
110
      xdsClientPool = attributes.get(InternalXdsAttributes.XDS_CLIENT_POOL);
1✔
111
      xdsClient = xdsClientPool.getObject();
1✔
112
    }
113
    if (callCounterProvider == null) {
1✔
114
      callCounterProvider = attributes.get(InternalXdsAttributes.CALL_COUNTER_PROVIDER);
1✔
115
    }
116
    ClusterImplConfig config =
1✔
117
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
118
    if (cluster == null) {
1✔
119
      cluster = config.cluster;
1✔
120
      edsServiceName = config.edsServiceName;
1✔
121
      childLbHelper = new ClusterImplLbHelper(
1✔
122
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
123
          config.lrsServerInfo);
124
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
125
      // Assume load report server does not change throughout cluster lifetime.
126
      if (config.lrsServerInfo != null) {
1✔
127
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
128
      }
129
    }
130
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
131
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
132
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
133

134
    childSwitchLb.switchTo(config.childPolicy.getProvider());
1✔
135
    childSwitchLb.handleResolvedAddresses(
1✔
136
        resolvedAddresses.toBuilder()
1✔
137
            .setAttributes(attributes)
1✔
138
            .setLoadBalancingPolicyConfig(config.childPolicy.getConfig())
1✔
139
            .build());
1✔
140
    return true;
1✔
141
  }
142

143
  @Override
144
  public void handleNameResolutionError(Status error) {
145
    if (childSwitchLb != null) {
1✔
146
      childSwitchLb.handleNameResolutionError(error);
1✔
147
    } else {
148
      helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker(error));
1✔
149
    }
150
  }
1✔
151

152
  @Override
153
  public void shutdown() {
154
    if (dropStats != null) {
1✔
155
      dropStats.release();
1✔
156
    }
157
    if (childSwitchLb != null) {
1✔
158
      childSwitchLb.shutdown();
1✔
159
      if (childLbHelper != null) {
1✔
160
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
161
        childLbHelper = null;
1✔
162
      }
163
    }
164
    if (xdsClient != null) {
1✔
165
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
166
    }
167
  }
1✔
168

169
  /**
170
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
171
   * or requests to endpoints in the cluster.
172
   */
173
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
174
    private final AtomicLong inFlights;
175
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
176
    private SubchannelPicker currentPicker = BUFFER_PICKER;
1✔
177
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
178
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
179
    @Nullable
180
    private SslContextProviderSupplier sslContextProviderSupplier;
181
    @Nullable
182
    private final ServerInfo lrsServerInfo;
183

184
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
185
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
186
      this.lrsServerInfo = lrsServerInfo;
1✔
187
    }
1✔
188

189
    @Override
190
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
191
      currentState = newState;
1✔
192
      currentPicker =  newPicker;
1✔
193
      SubchannelPicker picker =
1✔
194
          new RequestLimitingSubchannelPicker(newPicker, dropPolicies, maxConcurrentRequests);
195
      delegate().updateBalancingState(newState, picker);
1✔
196
    }
1✔
197

198
    @Override
199
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
200
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
201
      Locality locality = args.getAddresses().get(0).getAttributes().get(
1✔
202
          InternalXdsAttributes.ATTR_LOCALITY);  // all addresses should be in the same locality
203
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
204
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
205
      // In case of not (which really shouldn't), loads are aggregated under an empty locality.
206
      if (locality == null) {
1✔
207
        locality = Locality.create("", "", "");
×
208
      }
209
      final ClusterLocalityStats localityStats = lrsServerInfo == null ? null
1✔
210
          : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, edsServiceName, locality);
1✔
211
      Attributes attrs = args.getAttributes().toBuilder().set(
1✔
212
          ATTR_CLUSTER_LOCALITY_STATS, localityStats).build();
1✔
213
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build();
1✔
214
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
215

216
      return new ForwardingSubchannel() {
1✔
217
        @Override
218
        public void shutdown() {
219
          if (localityStats != null) {
1✔
220
            localityStats.release();
1✔
221
          }
222
          delegate().shutdown();
1✔
223
        }
1✔
224

225
        @Override
226
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
227
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
228
        }
1✔
229

230
        @Override
231
        protected Subchannel delegate() {
232
          return subchannel;
1✔
233
        }
234
      };
235
    }
236

237
    private List<EquivalentAddressGroup> withAdditionalAttributes(
238
        List<EquivalentAddressGroup> addresses) {
239
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
240
      for (EquivalentAddressGroup eag : addresses) {
1✔
241
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
242
            InternalXdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
243
        if (sslContextProviderSupplier != null) {
1✔
244
          attrBuilder.set(
1✔
245
              InternalXdsAttributes.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
246
              sslContextProviderSupplier);
247
        }
248
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
249
      }
1✔
250
      return newAddresses;
1✔
251
    }
252

253
    @Override
254
    protected Helper delegate()  {
255
      return helper;
1✔
256
    }
257

258
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
259
      if (!dropPolicies.equals(dropOverloads)) {
1✔
260
        dropPolicies = dropOverloads;
1✔
261
        updateBalancingState(currentState, currentPicker);
1✔
262
      }
263
    }
1✔
264

265
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
266
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
267
        return;
×
268
      }
269
      this.maxConcurrentRequests =
1✔
270
          maxConcurrentRequests != null
1✔
271
              ? maxConcurrentRequests
1✔
272
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
273
      updateBalancingState(currentState, currentPicker);
1✔
274
    }
1✔
275

276
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
277
      UpstreamTlsContext currentTlsContext =
278
          sslContextProviderSupplier != null
1✔
279
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
280
              : null;
1✔
281
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
282
        return;
1✔
283
      }
284
      if (sslContextProviderSupplier != null) {
1✔
285
        sslContextProviderSupplier.close();
1✔
286
      }
287
      sslContextProviderSupplier =
1✔
288
          tlsContext != null
1✔
289
              ? new SslContextProviderSupplier(tlsContext, xdsClient.getTlsContextManager())
1✔
290
              : null;
1✔
291
    }
1✔
292

293
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
294
      private final SubchannelPicker delegate;
295
      private final List<DropOverload> dropPolicies;
296
      private final long maxConcurrentRequests;
297

298
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
299
          List<DropOverload> dropPolicies, long maxConcurrentRequests) {
1✔
300
        this.delegate = delegate;
1✔
301
        this.dropPolicies = dropPolicies;
1✔
302
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
303
      }
1✔
304

305
      @Override
306
      public PickResult pickSubchannel(PickSubchannelArgs args) {
307
        for (DropOverload dropOverload : dropPolicies) {
1✔
308
          int rand = random.nextInt(1_000_000);
1✔
309
          if (rand < dropOverload.dropsPerMillion()) {
1✔
310
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
311
                dropOverload.category());
1✔
312
            if (dropStats != null) {
1✔
313
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
314
            }
315
            return PickResult.withDrop(
1✔
316
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
317
          }
318
        }
1✔
319
        final PickResult result = delegate.pickSubchannel(args);
1✔
320
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
321
          if (enableCircuitBreaking) {
1✔
322
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
323
              if (dropStats != null) {
1✔
324
                dropStats.recordDroppedRequest();
1✔
325
              }
326
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
327
                  "Cluster max concurrent requests limit exceeded"));
328
            }
329
          }
330
          final ClusterLocalityStats stats =
1✔
331
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS);
1✔
332
          if (stats != null) {
1✔
333
            ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
334
                stats, inFlights, result.getStreamTracerFactory());
1✔
335
            ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
336
                .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
337
            return PickResult.withSubchannel(result.getSubchannel(), orcaTracerFactory);
1✔
338
          }
339
        }
340
        return result;
1✔
341
      }
342

343
      @Override
344
      public String toString() {
345
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
346
      }
347
    }
348
  }
349

350
  private static final class CountingStreamTracerFactory extends
351
      ClientStreamTracer.Factory {
352
    private ClusterLocalityStats stats;
353
    private final AtomicLong inFlights;
354
    @Nullable
355
    private final ClientStreamTracer.Factory delegate;
356

357
    private CountingStreamTracerFactory(
358
        ClusterLocalityStats stats, AtomicLong inFlights,
359
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
360
      this.stats = checkNotNull(stats, "stats");
1✔
361
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
362
      this.delegate = delegate;
1✔
363
    }
1✔
364

365
    @Override
366
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
367
      stats.recordCallStarted();
1✔
368
      inFlights.incrementAndGet();
1✔
369
      if (delegate == null) {
1✔
370
        return new ClientStreamTracer() {
1✔
371
          @Override
372
          public void streamClosed(Status status) {
373
            stats.recordCallFinished(status);
1✔
374
            inFlights.decrementAndGet();
1✔
375
          }
1✔
376
        };
377
      }
378
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
379
      return new ForwardingClientStreamTracer() {
×
380
        @Override
381
        protected ClientStreamTracer delegate() {
382
          return delegatedTracer;
×
383
        }
384

385
        @Override
386
        public void streamClosed(Status status) {
387
          stats.recordCallFinished(status);
×
388
          inFlights.decrementAndGet();
×
389
          delegate().streamClosed(status);
×
390
        }
×
391
      };
392
    }
393
  }
394

395
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
396

397
    private final ClusterLocalityStats stats;
398

399
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
400
      this.stats = checkNotNull(stats, "stats");
1✔
401
    }
1✔
402

403
    /**
404
     * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
405
     * included in the snapshot for the LRS report sent to the LRS server.
406
     */
407
    @Override
408
    public void onLoadReport(MetricReport report) {
409
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
410
    }
1✔
411
  }
412
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc