• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20175

20 Feb 2026 07:37AM UTC coverage: 88.707% (+0.001%) from 88.706%
#20175

push

github

web-flow
unwrap ForwardingSubchannel during Picks (#12658)

This PR ensures that Load Balancing (LB) policies unwrap
`ForwardingSubchannel` instances before returning them in a
`PickResult`.

**Rationale:** Currently, the identity of a subchannel is "awkward"
because decorators break object identity. This forces the core channel
to use internal workarounds like `getInternalSubchannel()` to find the
underlying implementation. Removing these wrappers during the pick
process is a critical prerequisite for deleting Subchannel Attributes.

By enforcing unwrapping, `ManagedChannelImpl` can rely on the fact that
a returned subchannel is the same instance it originally created. This
allows the channel to use strongly-typed fields for state management
(via "blind casting") rather than abusing attributes to re-discover
information that should already be known. This also paves the way for
the eventual removal of the `getInternalSubchannel()` internal API.

**New APIs:** To ensure we don't "drop data on the floor" during the
unwrapping process, this PR adds two new non-static APIs to PickResult:
- copyWithSubchannel()
- copyWithStreamTracerFactory()

Unlike static factory methods, these instance methods follow a
"copy-and-update" pattern that preserves all existing pick-level
metadata (such as authority overrides or drop status) while only
swapping the specific field required.

35450 of 39963 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.28
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.xds.client.LoadStatsManager2.isEnabledOrcaLrsPropagation;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import com.google.common.base.Strings;
25
import com.google.common.collect.ImmutableMap;
26
import com.google.protobuf.Struct;
27
import io.grpc.Attributes;
28
import io.grpc.ClientStreamTracer;
29
import io.grpc.ClientStreamTracer.StreamInfo;
30
import io.grpc.ConnectivityState;
31
import io.grpc.ConnectivityStateInfo;
32
import io.grpc.EquivalentAddressGroup;
33
import io.grpc.InternalLogId;
34
import io.grpc.LoadBalancer;
35
import io.grpc.Metadata;
36
import io.grpc.NameResolver;
37
import io.grpc.Status;
38
import io.grpc.internal.ForwardingClientStreamTracer;
39
import io.grpc.internal.GrpcUtil;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingLoadBalancerHelper;
42
import io.grpc.util.ForwardingSubchannel;
43
import io.grpc.util.GracefulSwitchLoadBalancer;
44
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
45
import io.grpc.xds.Endpoints.DropOverload;
46
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
47
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
48
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
49
import io.grpc.xds.client.BackendMetricPropagation;
50
import io.grpc.xds.client.Bootstrapper.ServerInfo;
51
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
52
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
53
import io.grpc.xds.client.Locality;
54
import io.grpc.xds.client.XdsClient;
55
import io.grpc.xds.client.XdsLogger;
56
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
57
import io.grpc.xds.internal.XdsInternalAttributes;
58
import io.grpc.xds.internal.security.SecurityProtocolNegotiators;
59
import io.grpc.xds.internal.security.SslContextProviderSupplier;
60
import io.grpc.xds.orca.OrcaPerRequestUtil;
61
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
62
import java.util.ArrayList;
63
import java.util.Collections;
64
import java.util.List;
65
import java.util.Locale;
66
import java.util.Map;
67
import java.util.Objects;
68
import java.util.concurrent.atomic.AtomicLong;
69
import java.util.concurrent.atomic.AtomicReference;
70
import javax.annotation.Nullable;
71

72
/**
73
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
74
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
75
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
76
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
77
 * breakers.
78
 */
79
final class ClusterImplLoadBalancer extends LoadBalancer {
80

81
  @VisibleForTesting
82
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
83
  @VisibleForTesting
84
  static boolean enableCircuitBreaking =
1✔
85
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
86
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
87

88
  private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
1✔
89
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
1✔
90
  @VisibleForTesting
91
  static final Attributes.Key<String> ATTR_SUBCHANNEL_ADDRESS_NAME =
1✔
92
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.addressName");
1✔
93

94
  private final XdsLogger logger;
95
  private final Helper helper;
96
  private final ThreadSafeRandom random;
97
  // The following fields are effectively final.
98
  private String cluster;
99
  @Nullable
100
  private String edsServiceName;
101
  private XdsClient xdsClient;
102
  private CallCounterProvider callCounterProvider;
103
  private ClusterDropStats dropStats;
104
  private ClusterImplLbHelper childLbHelper;
105
  private GracefulSwitchLoadBalancer childSwitchLb;
106

107
  ClusterImplLoadBalancer(Helper helper) {
108
    this(helper, ThreadSafeRandomImpl.instance);
1✔
109
  }
1✔
110

111
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
112
    this.helper = checkNotNull(helper, "helper");
1✔
113
    this.random = checkNotNull(random, "random");
1✔
114
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
115
    logger = XdsLogger.withLogId(logId);
1✔
116
    logger.log(XdsLogLevel.INFO, "Created");
1✔
117
  }
1✔
118

119
  @Override
120
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
121
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
122
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
123
    if (xdsClient == null) {
1✔
124
      xdsClient = checkNotNull(attributes.get(io.grpc.xds.XdsAttributes.XDS_CLIENT), "xdsClient");
1✔
125
    }
126
    if (callCounterProvider == null) {
1✔
127
      callCounterProvider = attributes.get(io.grpc.xds.XdsAttributes.CALL_COUNTER_PROVIDER);
1✔
128
    }
129

130
    ClusterImplConfig config =
1✔
131
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
132
    if (config == null) {
1✔
133
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
134
    }
135

136
    if (cluster == null) {
1✔
137
      cluster = config.cluster;
1✔
138
      edsServiceName = config.edsServiceName;
1✔
139
      childLbHelper = new ClusterImplLbHelper(
1✔
140
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
141
          config.lrsServerInfo);
142
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
143
      // Assume load report server does not change throughout cluster lifetime.
144
      if (config.lrsServerInfo != null) {
1✔
145
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
146
      }
147
    }
148

149
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
150
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
151
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
152
    childLbHelper.updateFilterMetadata(config.filterMetadata);
1✔
153
    childLbHelper.updateBackendMetricPropagation(config.backendMetricPropagation);
1✔
154

155
    childSwitchLb.handleResolvedAddresses(
1✔
156
        resolvedAddresses.toBuilder()
1✔
157
            .setAttributes(attributes.toBuilder()
1✔
158
              .set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
1✔
159
              .build())
1✔
160
            .setLoadBalancingPolicyConfig(config.childConfig)
1✔
161
            .build());
1✔
162
    return Status.OK;
1✔
163
  }
164

165
  @Override
166
  public void handleNameResolutionError(Status error) {
167
    if (childSwitchLb != null) {
1✔
168
      childSwitchLb.handleNameResolutionError(error);
1✔
169
    } else {
170
      helper.updateBalancingState(
1✔
171
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
172
    }
173
  }
1✔
174

175
  @Override
176
  public void requestConnection() {
177
    if (childSwitchLb != null) {
1✔
178
      childSwitchLb.requestConnection();
1✔
179
    }
180
  }
1✔
181

182
  @Override
183
  public void shutdown() {
184
    if (dropStats != null) {
1✔
185
      dropStats.release();
1✔
186
    }
187
    if (childSwitchLb != null) {
1✔
188
      childSwitchLb.shutdown();
1✔
189
      if (childLbHelper != null) {
1✔
190
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
191
        childLbHelper = null;
1✔
192
      }
193
    }
194
    xdsClient = null;
1✔
195
  }
1✔
196

197
  /**
198
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
199
   * or requests to endpoints in the cluster.
200
   */
201
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
202
    private final AtomicLong inFlights;
203
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
204
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
205
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
206
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
207
    @Nullable
208
    private SslContextProviderSupplier sslContextProviderSupplier;
209
    private Map<String, Struct> filterMetadata = ImmutableMap.of();
1✔
210
    @Nullable
211
    private final ServerInfo lrsServerInfo;
212
    @Nullable
213
    private BackendMetricPropagation backendMetricPropagation;
214

215
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
216
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
217
      this.lrsServerInfo = lrsServerInfo;
1✔
218
    }
1✔
219

220
    @Override
221
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
222
      currentState = newState;
1✔
223
      currentPicker =  newPicker;
1✔
224
      SubchannelPicker picker = new RequestLimitingSubchannelPicker(
1✔
225
          newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
226
      delegate().updateBalancingState(newState, picker);
1✔
227
    }
1✔
228

229
    @Override
230
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
231
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
232
      // This value for  ClusterLocality is not recommended for general use.
233
      // Currently, we extract locality data from the first address, even before the subchannel is
234
      // READY.
235
      // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
236
      // might return the subchannel before it is READY. Typically, we wouldn't report load for such
237
      // selections because the channel will disregard the chosen (not-ready) subchannel.
238
      // However, we needed to ensure this case is handled.
239
      ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
1✔
240
          args.getAddresses().get(0).getAttributes());
1✔
241
      AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
1✔
242
          clusterLocality);
243
      Attributes.Builder attrsBuilder = args.getAttributes().toBuilder()
1✔
244
          .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference);
1✔
245
      if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)) {
1✔
246
        String hostname = args.getAddresses().get(0).getAttributes()
1✔
247
            .get(XdsInternalAttributes.ATTR_ADDRESS_NAME);
1✔
248
        if (hostname != null) {
1✔
249
          attrsBuilder.set(ATTR_SUBCHANNEL_ADDRESS_NAME, hostname);
1✔
250
        }
251
      }
252
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
1✔
253
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
254

255
      return new ClusterImplSubchannel(subchannel, localityAtomicReference);
1✔
256
    }
257

258
    private final class ClusterImplSubchannel extends ForwardingSubchannel {
259
      private final Subchannel delegate;
260
      private final AtomicReference<ClusterLocality> localityAtomicReference;
261

262
      private ClusterImplSubchannel(
263
          Subchannel delegate, AtomicReference<ClusterLocality> localityAtomicReference) {
1✔
264
        this.delegate = delegate;
1✔
265
        this.localityAtomicReference = localityAtomicReference;
1✔
266
      }
1✔
267

268
      @Override
269
      public void start(SubchannelStateListener listener) {
270
        delegate().start(
1✔
271
            new SubchannelStateListener() {
1✔
272
              @Override
273
              public void onSubchannelState(ConnectivityStateInfo newState) {
274
                // Do nothing if LB has been shutdown
275
                if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
1✔
276
                  // Get locality based on the connected address attributes
277
                  ClusterLocality updatedClusterLocality =
1✔
278
                      createClusterLocalityFromAttributes(
1✔
279
                          delegate.getConnectedAddressAttributes());
1✔
280
                  ClusterLocality oldClusterLocality =
1✔
281
                      localityAtomicReference.getAndSet(updatedClusterLocality);
1✔
282
                  oldClusterLocality.release();
1✔
283
                }
284
                listener.onSubchannelState(newState);
1✔
285
              }
1✔
286
            });
287
      }
1✔
288

289
      @Override
290
      public void shutdown() {
291
        localityAtomicReference.get().release();
1✔
292
        delegate().shutdown();
1✔
293
      }
1✔
294

295
      @Override
296
      public void updateAddresses(List<EquivalentAddressGroup> addresses) {
297
        delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
298
      }
1✔
299

300
      @Override
301
      protected Subchannel delegate() {
302
        return delegate;
1✔
303
      }
304
    }
305

306
    private List<EquivalentAddressGroup> withAdditionalAttributes(
307
        List<EquivalentAddressGroup> addresses) {
308
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
309
      for (EquivalentAddressGroup eag : addresses) {
1✔
310
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
311
            io.grpc.xds.XdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
312
        if (sslContextProviderSupplier != null) {
1✔
313
          attrBuilder.set(
1✔
314
              SecurityProtocolNegotiators.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
315
              sslContextProviderSupplier);
316
        }
317
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
318
      }
1✔
319
      return newAddresses;
1✔
320
    }
321

322
    private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
323
      Locality locality = addressAttributes.get(io.grpc.xds.XdsAttributes.ATTR_LOCALITY);
1✔
324
      String localityName = addressAttributes.get(EquivalentAddressGroup.ATTR_LOCALITY_NAME);
1✔
325

326
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
327
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
328
      // In case of not (which really shouldn't), loads are aggregated under an empty
329
      // locality.
330
      if (locality == null) {
1✔
331
        locality = Locality.create("", "", "");
×
332
        localityName = "";
×
333
      }
334

335
      final ClusterLocalityStats localityStats =
336
          (lrsServerInfo == null)
1✔
337
              ? null
1✔
338
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
339
                  edsServiceName, locality, backendMetricPropagation);
1✔
340

341
      return new ClusterLocality(localityStats, localityName);
1✔
342
    }
343

344
    @Override
345
    protected Helper delegate()  {
346
      return helper;
1✔
347
    }
348

349
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
350
      if (!dropPolicies.equals(dropOverloads)) {
1✔
351
        dropPolicies = dropOverloads;
1✔
352
        updateBalancingState(currentState, currentPicker);
1✔
353
      }
354
    }
1✔
355

356
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
357
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
358
        return;
×
359
      }
360
      this.maxConcurrentRequests =
1✔
361
          maxConcurrentRequests != null
1✔
362
              ? maxConcurrentRequests
1✔
363
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
364
      updateBalancingState(currentState, currentPicker);
1✔
365
    }
1✔
366

367
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
368
      UpstreamTlsContext currentTlsContext =
369
          sslContextProviderSupplier != null
1✔
370
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
371
              : null;
1✔
372
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
373
        return;
1✔
374
      }
375
      if (sslContextProviderSupplier != null) {
1✔
376
        sslContextProviderSupplier.close();
1✔
377
      }
378
      sslContextProviderSupplier =
1✔
379
          tlsContext != null
1✔
380
              ? new SslContextProviderSupplier(tlsContext,
1✔
381
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
382
              : null;
1✔
383
    }
1✔
384

385
    private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
386
      this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
387
    }
1✔
388

389
    private void updateBackendMetricPropagation(
390
        @Nullable BackendMetricPropagation backendMetricPropagation) {
391
      this.backendMetricPropagation = backendMetricPropagation;
1✔
392
    }
1✔
393

394
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
395
      private final SubchannelPicker delegate;
396
      private final List<DropOverload> dropPolicies;
397
      private final long maxConcurrentRequests;
398
      private final Map<String, Struct> filterMetadata;
399

400
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
401
          List<DropOverload> dropPolicies, long maxConcurrentRequests,
402
          Map<String, Struct> filterMetadata) {
1✔
403
        this.delegate = delegate;
1✔
404
        this.dropPolicies = dropPolicies;
1✔
405
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
406
        this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
1✔
407
      }
1✔
408

409
      @Override
410
      public PickResult pickSubchannel(PickSubchannelArgs args) {
411
        args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
1✔
412
            .accept(filterMetadata);
1✔
413
        args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
1✔
414
        for (DropOverload dropOverload : dropPolicies) {
1✔
415
          int rand = random.nextInt(1_000_000);
1✔
416
          if (rand < dropOverload.dropsPerMillion()) {
1✔
417
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
418
                dropOverload.category());
1✔
419
            if (dropStats != null) {
1✔
420
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
421
            }
422
            return PickResult.withDrop(
1✔
423
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
424
          }
425
        }
1✔
426
        PickResult result = delegate.pickSubchannel(args);
1✔
427
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
428
          Subchannel subchannel = result.getSubchannel();
1✔
429
          if (subchannel instanceof ClusterImplLbHelper.ClusterImplSubchannel) {
1✔
430
            subchannel = ((ClusterImplLbHelper.ClusterImplSubchannel) subchannel).delegate();
1✔
431
            result = result.copyWithSubchannel(subchannel);
1✔
432
          }
433
          if (enableCircuitBreaking) {
1✔
434
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
435
              if (dropStats != null) {
1✔
436
                dropStats.recordDroppedRequest();
1✔
437
              }
438
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
439
                  String.format(Locale.US, "Cluster max concurrent requests limit of %d exceeded",
1✔
440
                      maxConcurrentRequests)));
1✔
441
            }
442
          }
443
          final AtomicReference<ClusterLocality> clusterLocality =
1✔
444
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
1✔
445

446
          if (clusterLocality != null) {
1✔
447
            ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
1✔
448
            if (stats != null) {
1✔
449
              String localityName =
1✔
450
                  result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
1✔
451
                      .getClusterLocalityName();
1✔
452
              args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
453

454
              ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
455
                  stats, inFlights, result.getStreamTracerFactory());
1✔
456
              ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
457
                  .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
458
              result = result.copyWithStreamTracerFactory(orcaTracerFactory);
1✔
459
            }
460
          }
461
          if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
1✔
462
              && args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY)) {
1✔
463
            result = PickResult.withSubchannel(result.getSubchannel(),
1✔
464
                result.getStreamTracerFactory(),
1✔
465
                result.getSubchannel().getAttributes().get(ATTR_SUBCHANNEL_ADDRESS_NAME));
1✔
466
          }
467
        }
468
        return result;
1✔
469
      }
470

471
      @Override
472
      public String toString() {
473
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
474
      }
475
    }
476
  }
477

478
  private static final class CountingStreamTracerFactory extends
479
      ClientStreamTracer.Factory {
480
    private final ClusterLocalityStats stats;
481
    private final AtomicLong inFlights;
482
    @Nullable
483
    private final ClientStreamTracer.Factory delegate;
484

485
    private CountingStreamTracerFactory(
486
        ClusterLocalityStats stats, AtomicLong inFlights,
487
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
488
      this.stats = checkNotNull(stats, "stats");
1✔
489
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
490
      this.delegate = delegate;
1✔
491
    }
1✔
492

493
    @Override
494
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
495
      stats.recordCallStarted();
1✔
496
      inFlights.incrementAndGet();
1✔
497
      if (delegate == null) {
1✔
498
        return new ClientStreamTracer() {
1✔
499
          @Override
500
          public void streamClosed(Status status) {
501
            stats.recordCallFinished(status);
1✔
502
            inFlights.decrementAndGet();
1✔
503
          }
1✔
504
        };
505
      }
506
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
507
      return new ForwardingClientStreamTracer() {
×
508
        @Override
509
        protected ClientStreamTracer delegate() {
510
          return delegatedTracer;
×
511
        }
512

513
        @Override
514
        public void streamClosed(Status status) {
515
          stats.recordCallFinished(status);
×
516
          inFlights.decrementAndGet();
×
517
          delegate().streamClosed(status);
×
518
        }
×
519
      };
520
    }
521
  }
522

523
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
524

525
    private final ClusterLocalityStats stats;
526

527
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
528
      this.stats = checkNotNull(stats, "stats");
1✔
529
    }
1✔
530

531
    /**
532
     * Copies ORCA metrics from {@link MetricReport} to {@link ClusterLocalityStats}
533
     * such that they are included in the snapshot for the LRS report sent to the LRS server.
534
     * This includes both top-level metrics (CPU, memory, application utilization) and named
535
     * metrics, filtered according to the backend metric propagation configuration.
536
     */
537
    @Override
538
    public void onLoadReport(MetricReport report) {
539
      if (isEnabledOrcaLrsPropagation) {
1✔
540
        stats.recordTopLevelMetrics(
1✔
541
            report.getCpuUtilization(),
1✔
542
            report.getMemoryUtilization(),
1✔
543
            report.getApplicationUtilization());
1✔
544
      }
545
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
546
    }
1✔
547
  }
548

549
  /**
550
   * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
551
   */
552
  static final class ClusterLocality {
553
    private final ClusterLocalityStats clusterLocalityStats;
554
    private final String clusterLocalityName;
555

556
    @VisibleForTesting
557
    ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
1✔
558
      this.clusterLocalityStats = localityStats;
1✔
559
      this.clusterLocalityName = localityName;
1✔
560
    }
1✔
561

562
    ClusterLocalityStats getClusterLocalityStats() {
563
      return clusterLocalityStats;
1✔
564
    }
565

566
    String getClusterLocalityName() {
567
      return clusterLocalityName;
1✔
568
    }
569

570
    @VisibleForTesting
571
    void release() {
572
      if (clusterLocalityStats != null) {
1✔
573
        clusterLocalityStats.release();
1✔
574
      }
575
    }
1✔
576
  }
577
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc