• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20033

29 Oct 2025 04:43PM UTC coverage: 88.533% (-0.03%) from 88.561%
#20033

push

github

web-flow
xds,googleapis: Allow wrapping NameResolver to inject XdsClient (#12450)

Since there is no longer a single global XdsClient, it makes more sense
to allow things like the c2p name resolver to inject its own bootstrap
even if there is one defined in an environment variable.
GoogleCloudToProdNameResolver can now pass an XdsClient instance to
XdsNameResolver, and SharedXdsClientPoolProvider allows
GoogleCloudToProdNameResolver to choose the bootstrap for that one
specific target.

Since XdsNameResolver is no longer in control of the XdsClient pool the
XdsClient instance is now passed to ClusterImplLb. A channel will now
only access the global XdsClient pool exactly once: in the name
resolver.

BootstrapInfo is purposefully being shared across channels, as we really
want to share things like credentials which can have significant memory
use and may have caches which reduce I/O when shared. That is why
SharedXdsClientPoolProvider receives BootstrapInfo instead of
Map<String,?>.

Verifying BootstrapInfo.server() is not empty was moved from
SharedXdsClientPoolProvider to GrpcBootstrapperImpl so avoid
getOrCreate() throwing an exception in only that one case. It might make
sense to move that to BootstrapperImpl, but that will need more
investigation.

A lot of tests needed updating because XdsClientPoolProvider is no
longer responsible for parsing the bootstrap, so we now need bootstraps
even if XdsClientPoolProvider will ignore it.

This also fixes a bug in GoogleCloudToProdNameResolver where it would
initialize the delegate even when it failed to create the bootstrap.
That would certainly cause all RPCs on the channel to fail because of
the missing bootstrap and it defeated the point of `succeeded == false`
and `refresh()` which was supposed to retry contacting the metadata
server.

The server tests were enhanced to give a useful error when
server.start() throws an exception, as otherwise the real error is lost.

b/442819521

34966 of 39495 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.04
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.xds.client.LoadStatsManager2.isEnabledOrcaLrsPropagation;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Strings;
25
import com.google.common.collect.ImmutableMap;
26
import com.google.protobuf.Struct;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.ClientStreamTracer.StreamInfo;
30
import io.grpc.ConnectivityState;
31
import io.grpc.ConnectivityStateInfo;
32
import io.grpc.EquivalentAddressGroup;
33
import io.grpc.InternalLogId;
34
import io.grpc.LoadBalancer;
35
import io.grpc.Metadata;
36
import io.grpc.NameResolver;
37
import io.grpc.Status;
38
import io.grpc.internal.ForwardingClientStreamTracer;
39
import io.grpc.internal.GrpcUtil;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingLoadBalancerHelper;
42
import io.grpc.util.ForwardingSubchannel;
43
import io.grpc.util.GracefulSwitchLoadBalancer;
44
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
45
import io.grpc.xds.Endpoints.DropOverload;
46
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
47
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
48
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
49
import io.grpc.xds.client.BackendMetricPropagation;
50
import io.grpc.xds.client.Bootstrapper.ServerInfo;
51
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
52
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
53
import io.grpc.xds.client.Locality;
54
import io.grpc.xds.client.XdsClient;
55
import io.grpc.xds.client.XdsLogger;
56
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
57
import io.grpc.xds.internal.XdsInternalAttributes;
58
import io.grpc.xds.internal.security.SecurityProtocolNegotiators;
59
import io.grpc.xds.internal.security.SslContextProviderSupplier;
60
import io.grpc.xds.orca.OrcaPerRequestUtil;
61
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
62
import java.util.ArrayList;
63
import java.util.Collections;
64
import java.util.List;
65
import java.util.Locale;
66
import java.util.Map;
67
import java.util.Objects;
68
import java.util.concurrent.atomic.AtomicLong;
69
import java.util.concurrent.atomic.AtomicReference;
70
import javax.annotation.Nullable;
71

72
/**
73
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
74
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
75
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
76
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
77
 * breakers.
78
 */
79
final class ClusterImplLoadBalancer extends LoadBalancer {
80

81
  @VisibleForTesting
82
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
83
  @VisibleForTesting
84
  static boolean enableCircuitBreaking =
1✔
85
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
86
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
87

88
  private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
1✔
89
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
1✔
90

91
  private final XdsLogger logger;
92
  private final Helper helper;
93
  private final ThreadSafeRandom random;
94
  // The following fields are effectively final.
95
  private String cluster;
96
  @Nullable
97
  private String edsServiceName;
98
  private XdsClient xdsClient;
99
  private CallCounterProvider callCounterProvider;
100
  private ClusterDropStats dropStats;
101
  private ClusterImplLbHelper childLbHelper;
102
  private GracefulSwitchLoadBalancer childSwitchLb;
103

104
  ClusterImplLoadBalancer(Helper helper) {
105
    this(helper, ThreadSafeRandomImpl.instance);
1✔
106
  }
1✔
107

108
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
109
    this.helper = checkNotNull(helper, "helper");
1✔
110
    this.random = checkNotNull(random, "random");
1✔
111
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
112
    logger = XdsLogger.withLogId(logId);
1✔
113
    logger.log(XdsLogLevel.INFO, "Created");
1✔
114
  }
1✔
115

116
  @Override
117
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
118
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
119
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
120
    if (xdsClient == null) {
1✔
121
      xdsClient = checkNotNull(attributes.get(io.grpc.xds.XdsAttributes.XDS_CLIENT), "xdsClient");
1✔
122
    }
123
    if (callCounterProvider == null) {
1✔
124
      callCounterProvider = attributes.get(io.grpc.xds.XdsAttributes.CALL_COUNTER_PROVIDER);
1✔
125
    }
126

127
    ClusterImplConfig config =
1✔
128
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
129
    if (config == null) {
1✔
130
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
131
    }
132

133
    if (cluster == null) {
1✔
134
      cluster = config.cluster;
1✔
135
      edsServiceName = config.edsServiceName;
1✔
136
      childLbHelper = new ClusterImplLbHelper(
1✔
137
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
138
          config.lrsServerInfo);
139
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
140
      // Assume load report server does not change throughout cluster lifetime.
141
      if (config.lrsServerInfo != null) {
1✔
142
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
143
      }
144
    }
145

146
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
147
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
148
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
149
    childLbHelper.updateFilterMetadata(config.filterMetadata);
1✔
150
    childLbHelper.updateBackendMetricPropagation(config.backendMetricPropagation);
1✔
151

152
    childSwitchLb.handleResolvedAddresses(
1✔
153
        resolvedAddresses.toBuilder()
1✔
154
            .setAttributes(attributes.toBuilder()
1✔
155
              .set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
1✔
156
              .build())
1✔
157
            .setLoadBalancingPolicyConfig(config.childConfig)
1✔
158
            .build());
1✔
159
    return Status.OK;
1✔
160
  }
161

162
  @Override
163
  public void handleNameResolutionError(Status error) {
164
    if (childSwitchLb != null) {
1✔
165
      childSwitchLb.handleNameResolutionError(error);
1✔
166
    } else {
167
      helper.updateBalancingState(
1✔
168
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
169
    }
170
  }
1✔
171

172
  @Override
173
  public void requestConnection() {
174
    if (childSwitchLb != null) {
1✔
175
      childSwitchLb.requestConnection();
1✔
176
    }
177
  }
1✔
178

179
  @Override
180
  public void shutdown() {
181
    if (dropStats != null) {
1✔
182
      dropStats.release();
1✔
183
    }
184
    if (childSwitchLb != null) {
1✔
185
      childSwitchLb.shutdown();
1✔
186
      if (childLbHelper != null) {
1✔
187
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
188
        childLbHelper = null;
1✔
189
      }
190
    }
191
    xdsClient = null;
1✔
192
  }
1✔
193

194
  /**
195
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
196
   * or requests to endpoints in the cluster.
197
   */
198
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
199
    private final AtomicLong inFlights;
200
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
201
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
202
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
203
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
204
    @Nullable
205
    private SslContextProviderSupplier sslContextProviderSupplier;
206
    private Map<String, Struct> filterMetadata = ImmutableMap.of();
1✔
207
    @Nullable
208
    private final ServerInfo lrsServerInfo;
209
    @Nullable
210
    private BackendMetricPropagation backendMetricPropagation;
211

212
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
213
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
214
      this.lrsServerInfo = lrsServerInfo;
1✔
215
    }
1✔
216

217
    @Override
218
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
219
      currentState = newState;
1✔
220
      currentPicker =  newPicker;
1✔
221
      SubchannelPicker picker = new RequestLimitingSubchannelPicker(
1✔
222
          newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
223
      delegate().updateBalancingState(newState, picker);
1✔
224
    }
1✔
225

226
    @Override
227
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
228
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
229
      // This value for  ClusterLocality is not recommended for general use.
230
      // Currently, we extract locality data from the first address, even before the subchannel is
231
      // READY.
232
      // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
233
      // might return the subchannel before it is READY. Typically, we wouldn't report load for such
234
      // selections because the channel will disregard the chosen (not-ready) subchannel.
235
      // However, we needed to ensure this case is handled.
236
      ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
1✔
237
          args.getAddresses().get(0).getAttributes());
1✔
238
      AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
1✔
239
          clusterLocality);
240
      Attributes.Builder attrsBuilder = args.getAttributes().toBuilder()
1✔
241
          .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference);
1✔
242
      if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)) {
1✔
243
        String hostname = args.getAddresses().get(0).getAttributes()
1✔
244
            .get(XdsInternalAttributes.ATTR_ADDRESS_NAME);
1✔
245
        if (hostname != null) {
1✔
246
          attrsBuilder.set(XdsInternalAttributes.ATTR_ADDRESS_NAME, hostname);
1✔
247
        }
248
      }
249
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
1✔
250
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
251

252
      return new ForwardingSubchannel() {
1✔
253
        @Override
254
        public void start(SubchannelStateListener listener) {
255
          delegate().start(new SubchannelStateListener() {
1✔
256
            @Override
257
            public void onSubchannelState(ConnectivityStateInfo newState) {
258
              // Do nothing if LB has been shutdown
259
              if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
1✔
260
                // Get locality based on the connected address attributes
261
                ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
1✔
262
                    subchannel.getConnectedAddressAttributes());
1✔
263
                ClusterLocality oldClusterLocality = localityAtomicReference
1✔
264
                    .getAndSet(updatedClusterLocality);
1✔
265
                oldClusterLocality.release();
1✔
266
              }
267
              listener.onSubchannelState(newState);
1✔
268
            }
1✔
269
          });
270
        }
1✔
271

272
        @Override
273
        public void shutdown() {
274
          localityAtomicReference.get().release();
1✔
275
          delegate().shutdown();
1✔
276
        }
1✔
277

278
        @Override
279
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
280
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
281
        }
1✔
282

283
        @Override
284
        protected Subchannel delegate() {
285
          return subchannel;
1✔
286
        }
287
      };
288
    }
289

290
    private List<EquivalentAddressGroup> withAdditionalAttributes(
291
        List<EquivalentAddressGroup> addresses) {
292
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
293
      for (EquivalentAddressGroup eag : addresses) {
1✔
294
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
295
            io.grpc.xds.XdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
296
        if (sslContextProviderSupplier != null) {
1✔
297
          attrBuilder.set(
1✔
298
              SecurityProtocolNegotiators.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
299
              sslContextProviderSupplier);
300
        }
301
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
302
      }
1✔
303
      return newAddresses;
1✔
304
    }
305

306
    private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
307
      Locality locality = addressAttributes.get(io.grpc.xds.XdsAttributes.ATTR_LOCALITY);
1✔
308
      String localityName = addressAttributes.get(EquivalentAddressGroup.ATTR_LOCALITY_NAME);
1✔
309

310
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
311
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
312
      // In case of not (which really shouldn't), loads are aggregated under an empty
313
      // locality.
314
      if (locality == null) {
1✔
315
        locality = Locality.create("", "", "");
×
316
        localityName = "";
×
317
      }
318

319
      final ClusterLocalityStats localityStats =
320
          (lrsServerInfo == null)
1✔
321
              ? null
1✔
322
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
323
                  edsServiceName, locality, backendMetricPropagation);
1✔
324

325
      return new ClusterLocality(localityStats, localityName);
1✔
326
    }
327

328
    @Override
329
    protected Helper delegate()  {
330
      return helper;
1✔
331
    }
332

333
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
334
      if (!dropPolicies.equals(dropOverloads)) {
1✔
335
        dropPolicies = dropOverloads;
1✔
336
        updateBalancingState(currentState, currentPicker);
1✔
337
      }
338
    }
1✔
339

340
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
341
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
342
        return;
×
343
      }
344
      this.maxConcurrentRequests =
1✔
345
          maxConcurrentRequests != null
1✔
346
              ? maxConcurrentRequests
1✔
347
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
348
      updateBalancingState(currentState, currentPicker);
1✔
349
    }
1✔
350

351
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
352
      UpstreamTlsContext currentTlsContext =
353
          sslContextProviderSupplier != null
1✔
354
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
355
              : null;
1✔
356
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
357
        return;
1✔
358
      }
359
      if (sslContextProviderSupplier != null) {
1✔
360
        sslContextProviderSupplier.close();
1✔
361
      }
362
      sslContextProviderSupplier =
1✔
363
          tlsContext != null
1✔
364
              ? new SslContextProviderSupplier(tlsContext,
1✔
365
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
366
              : null;
1✔
367
    }
1✔
368

369
    private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
370
      this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
371
    }
1✔
372

373
    private void updateBackendMetricPropagation(
374
        @Nullable BackendMetricPropagation backendMetricPropagation) {
375
      this.backendMetricPropagation = backendMetricPropagation;
1✔
376
    }
1✔
377

378
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
379
      private final SubchannelPicker delegate;
380
      private final List<DropOverload> dropPolicies;
381
      private final long maxConcurrentRequests;
382
      private final Map<String, Struct> filterMetadata;
383

384
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
385
          List<DropOverload> dropPolicies, long maxConcurrentRequests,
386
          Map<String, Struct> filterMetadata) {
1✔
387
        this.delegate = delegate;
1✔
388
        this.dropPolicies = dropPolicies;
1✔
389
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
390
        this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
1✔
391
      }
1✔
392

393
      @Override
394
      public PickResult pickSubchannel(PickSubchannelArgs args) {
395
        args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
1✔
396
            .accept(filterMetadata);
1✔
397
        args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
1✔
398
        for (DropOverload dropOverload : dropPolicies) {
1✔
399
          int rand = random.nextInt(1_000_000);
1✔
400
          if (rand < dropOverload.dropsPerMillion()) {
1✔
401
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
402
                dropOverload.category());
1✔
403
            if (dropStats != null) {
1✔
404
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
405
            }
406
            return PickResult.withDrop(
1✔
407
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
408
          }
409
        }
1✔
410
        PickResult result = delegate.pickSubchannel(args);
1✔
411
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
412
          if (enableCircuitBreaking) {
1✔
413
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
414
              if (dropStats != null) {
1✔
415
                dropStats.recordDroppedRequest();
1✔
416
              }
417
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
418
                  String.format(Locale.US, "Cluster max concurrent requests limit of %d exceeded",
1✔
419
                      maxConcurrentRequests)));
1✔
420
            }
421
          }
422
          final AtomicReference<ClusterLocality> clusterLocality =
1✔
423
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
1✔
424

425
          if (clusterLocality != null) {
1✔
426
            ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
1✔
427
            if (stats != null) {
1✔
428
              String localityName =
1✔
429
                  result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
1✔
430
                      .getClusterLocalityName();
1✔
431
              args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
432

433
              ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
434
                  stats, inFlights, result.getStreamTracerFactory());
1✔
435
              ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
436
                  .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
437
              result = PickResult.withSubchannel(result.getSubchannel(),
1✔
438
                  orcaTracerFactory);
439
            }
440
          }
441
          if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
1✔
442
              && args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY)) {
1✔
443
            result = PickResult.withSubchannel(result.getSubchannel(),
1✔
444
                result.getStreamTracerFactory(),
1✔
445
                result.getSubchannel().getAttributes().get(
1✔
446
                    XdsInternalAttributes.ATTR_ADDRESS_NAME));
447
          }
448
        }
449
        return result;
1✔
450
      }
451

452
      @Override
453
      public String toString() {
454
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
455
      }
456
    }
457
  }
458

459
  private static final class CountingStreamTracerFactory extends
460
      ClientStreamTracer.Factory {
461
    private final ClusterLocalityStats stats;
462
    private final AtomicLong inFlights;
463
    @Nullable
464
    private final ClientStreamTracer.Factory delegate;
465

466
    private CountingStreamTracerFactory(
467
        ClusterLocalityStats stats, AtomicLong inFlights,
468
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
469
      this.stats = checkNotNull(stats, "stats");
1✔
470
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
471
      this.delegate = delegate;
1✔
472
    }
1✔
473

474
    @Override
475
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
476
      stats.recordCallStarted();
1✔
477
      inFlights.incrementAndGet();
1✔
478
      if (delegate == null) {
1✔
479
        return new ClientStreamTracer() {
1✔
480
          @Override
481
          public void streamClosed(Status status) {
482
            stats.recordCallFinished(status);
1✔
483
            inFlights.decrementAndGet();
1✔
484
          }
1✔
485
        };
486
      }
487
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
488
      return new ForwardingClientStreamTracer() {
×
489
        @Override
490
        protected ClientStreamTracer delegate() {
491
          return delegatedTracer;
×
492
        }
493

494
        @Override
495
        public void streamClosed(Status status) {
496
          stats.recordCallFinished(status);
×
497
          inFlights.decrementAndGet();
×
498
          delegate().streamClosed(status);
×
499
        }
×
500
      };
501
    }
502
  }
503

504
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
505

506
    private final ClusterLocalityStats stats;
507

508
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
509
      this.stats = checkNotNull(stats, "stats");
1✔
510
    }
1✔
511

512
    /**
513
     * Copies ORCA metrics from {@link MetricReport} to {@link ClusterLocalityStats}
514
     * such that they are included in the snapshot for the LRS report sent to the LRS server.
515
     * This includes both top-level metrics (CPU, memory, application utilization) and named
516
     * metrics, filtered according to the backend metric propagation configuration.
517
     */
518
    @Override
519
    public void onLoadReport(MetricReport report) {
520
      if (isEnabledOrcaLrsPropagation) {
1✔
521
        stats.recordTopLevelMetrics(
1✔
522
            report.getCpuUtilization(),
1✔
523
            report.getMemoryUtilization(),
1✔
524
            report.getApplicationUtilization());
1✔
525
      }
526
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
527
    }
1✔
528
  }
529

530
  /**
531
   * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
532
   */
533
  static final class ClusterLocality {
534
    private final ClusterLocalityStats clusterLocalityStats;
535
    private final String clusterLocalityName;
536

537
    @VisibleForTesting
538
    ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
1✔
539
      this.clusterLocalityStats = localityStats;
1✔
540
      this.clusterLocalityName = localityName;
1✔
541
    }
1✔
542

543
    ClusterLocalityStats getClusterLocalityStats() {
544
      return clusterLocalityStats;
1✔
545
    }
546

547
    String getClusterLocalityName() {
548
      return clusterLocalityName;
1✔
549
    }
550

551
    @VisibleForTesting
552
    void release() {
553
      if (clusterLocalityStats != null) {
1✔
554
        clusterLocalityStats.release();
1✔
555
      }
556
    }
1✔
557
  }
558
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc