• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20002

29 Sep 2025 04:21PM UTC coverage: 88.592% (+0.02%) from 88.575%
#20002

push

github

web-flow
xds: xDS based SNI setting and SAN validation (#12378)

When using xDS credentials make SNI for the Tls handshake to be
configured via xDS, rather than use the channel authority as the SNI,
and make SAN validation to be able to use the SNI sent when so
instructed via xDS.

Implements A101.

34877 of 39368 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.94
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.base.Strings;
24
import com.google.common.collect.ImmutableMap;
25
import com.google.protobuf.Struct;
26
import io.grpc.Attributes;
27
import io.grpc.ClientStreamTracer;
28
import io.grpc.ClientStreamTracer.StreamInfo;
29
import io.grpc.ConnectivityState;
30
import io.grpc.ConnectivityStateInfo;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.InternalLogId;
33
import io.grpc.LoadBalancer;
34
import io.grpc.Metadata;
35
import io.grpc.NameResolver;
36
import io.grpc.Status;
37
import io.grpc.internal.ForwardingClientStreamTracer;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.internal.ObjectPool;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingLoadBalancerHelper;
42
import io.grpc.util.ForwardingSubchannel;
43
import io.grpc.util.GracefulSwitchLoadBalancer;
44
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
45
import io.grpc.xds.Endpoints.DropOverload;
46
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
47
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
48
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
49
import io.grpc.xds.client.Bootstrapper.ServerInfo;
50
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
51
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
52
import io.grpc.xds.client.Locality;
53
import io.grpc.xds.client.XdsClient;
54
import io.grpc.xds.client.XdsLogger;
55
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
56
import io.grpc.xds.internal.XdsInternalAttributes;
57
import io.grpc.xds.internal.security.SecurityProtocolNegotiators;
58
import io.grpc.xds.internal.security.SslContextProviderSupplier;
59
import io.grpc.xds.orca.OrcaPerRequestUtil;
60
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
61
import java.util.ArrayList;
62
import java.util.Collections;
63
import java.util.List;
64
import java.util.Locale;
65
import java.util.Map;
66
import java.util.Objects;
67
import java.util.concurrent.atomic.AtomicLong;
68
import java.util.concurrent.atomic.AtomicReference;
69
import javax.annotation.Nullable;
70

71
/**
72
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
73
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
74
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
75
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
76
 * breakers.
77
 */
78
final class ClusterImplLoadBalancer extends LoadBalancer {
79

80
  @VisibleForTesting
81
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
82
  @VisibleForTesting
83
  static boolean enableCircuitBreaking =
1✔
84
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
85
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
86

87
  private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
1✔
88
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
1✔
89

90
  private final XdsLogger logger;
91
  private final Helper helper;
92
  private final ThreadSafeRandom random;
93
  // The following fields are effectively final.
94
  private String cluster;
95
  @Nullable
96
  private String edsServiceName;
97
  private ObjectPool<XdsClient> xdsClientPool;
98
  private XdsClient xdsClient;
99
  private CallCounterProvider callCounterProvider;
100
  private ClusterDropStats dropStats;
101
  private ClusterImplLbHelper childLbHelper;
102
  private GracefulSwitchLoadBalancer childSwitchLb;
103

104
  ClusterImplLoadBalancer(Helper helper) {
105
    this(helper, ThreadSafeRandomImpl.instance);
1✔
106
  }
1✔
107

108
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
109
    this.helper = checkNotNull(helper, "helper");
1✔
110
    this.random = checkNotNull(random, "random");
1✔
111
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
112
    logger = XdsLogger.withLogId(logId);
1✔
113
    logger.log(XdsLogLevel.INFO, "Created");
1✔
114
  }
1✔
115

116
  @Override
117
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
118
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
119
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
120
    if (xdsClientPool == null) {
1✔
121
      xdsClientPool = attributes.get(io.grpc.xds.XdsAttributes.XDS_CLIENT_POOL);
1✔
122
      assert xdsClientPool != null;
1✔
123
      xdsClient = xdsClientPool.getObject();
1✔
124
    }
125
    if (callCounterProvider == null) {
1✔
126
      callCounterProvider = attributes.get(io.grpc.xds.XdsAttributes.CALL_COUNTER_PROVIDER);
1✔
127
    }
128

129
    ClusterImplConfig config =
1✔
130
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
131
    if (config == null) {
1✔
132
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
133
    }
134

135
    if (cluster == null) {
1✔
136
      cluster = config.cluster;
1✔
137
      edsServiceName = config.edsServiceName;
1✔
138
      childLbHelper = new ClusterImplLbHelper(
1✔
139
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
140
          config.lrsServerInfo);
141
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
142
      // Assume load report server does not change throughout cluster lifetime.
143
      if (config.lrsServerInfo != null) {
1✔
144
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
145
      }
146
    }
147

148
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
149
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
150
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
151
    childLbHelper.updateFilterMetadata(config.filterMetadata);
1✔
152

153
    childSwitchLb.handleResolvedAddresses(
1✔
154
        resolvedAddresses.toBuilder()
1✔
155
            .setAttributes(attributes.toBuilder()
1✔
156
              .set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
1✔
157
              .build())
1✔
158
            .setLoadBalancingPolicyConfig(config.childConfig)
1✔
159
            .build());
1✔
160
    return Status.OK;
1✔
161
  }
162

163
  @Override
164
  public void handleNameResolutionError(Status error) {
165
    if (childSwitchLb != null) {
1✔
166
      childSwitchLb.handleNameResolutionError(error);
1✔
167
    } else {
168
      helper.updateBalancingState(
1✔
169
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
170
    }
171
  }
1✔
172

173
  @Override
174
  public void requestConnection() {
175
    if (childSwitchLb != null) {
1✔
176
      childSwitchLb.requestConnection();
1✔
177
    }
178
  }
1✔
179

180
  @Override
181
  public void shutdown() {
182
    if (dropStats != null) {
1✔
183
      dropStats.release();
1✔
184
    }
185
    if (childSwitchLb != null) {
1✔
186
      childSwitchLb.shutdown();
1✔
187
      if (childLbHelper != null) {
1✔
188
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
189
        childLbHelper = null;
1✔
190
      }
191
    }
192
    if (xdsClient != null) {
1✔
193
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
194
    }
195
  }
1✔
196

197
  /**
198
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
199
   * or requests to endpoints in the cluster.
200
   */
201
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
202
    private final AtomicLong inFlights;
203
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
204
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
205
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
206
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
207
    @Nullable
208
    private SslContextProviderSupplier sslContextProviderSupplier;
209
    private Map<String, Struct> filterMetadata = ImmutableMap.of();
1✔
210
    @Nullable
211
    private final ServerInfo lrsServerInfo;
212

213
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
214
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
215
      this.lrsServerInfo = lrsServerInfo;
1✔
216
    }
1✔
217

218
    @Override
219
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
220
      currentState = newState;
1✔
221
      currentPicker =  newPicker;
1✔
222
      SubchannelPicker picker = new RequestLimitingSubchannelPicker(
1✔
223
          newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
224
      delegate().updateBalancingState(newState, picker);
1✔
225
    }
1✔
226

227
    @Override
228
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
229
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
230
      // This value for  ClusterLocality is not recommended for general use.
231
      // Currently, we extract locality data from the first address, even before the subchannel is
232
      // READY.
233
      // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
234
      // might return the subchannel before it is READY. Typically, we wouldn't report load for such
235
      // selections because the channel will disregard the chosen (not-ready) subchannel.
236
      // However, we needed to ensure this case is handled.
237
      ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
1✔
238
          args.getAddresses().get(0).getAttributes());
1✔
239
      AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
1✔
240
          clusterLocality);
241
      Attributes.Builder attrsBuilder = args.getAttributes().toBuilder()
1✔
242
          .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference);
1✔
243
      if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)) {
1✔
244
        String hostname = args.getAddresses().get(0).getAttributes()
1✔
245
            .get(XdsInternalAttributes.ATTR_ADDRESS_NAME);
1✔
246
        if (hostname != null) {
1✔
247
          attrsBuilder.set(XdsInternalAttributes.ATTR_ADDRESS_NAME, hostname);
1✔
248
        }
249
      }
250
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
1✔
251
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
252

253
      return new ForwardingSubchannel() {
1✔
254
        @Override
255
        public void start(SubchannelStateListener listener) {
256
          delegate().start(new SubchannelStateListener() {
1✔
257
            @Override
258
            public void onSubchannelState(ConnectivityStateInfo newState) {
259
              // Do nothing if LB has been shutdown
260
              if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
1✔
261
                // Get locality based on the connected address attributes
262
                ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
1✔
263
                    subchannel.getConnectedAddressAttributes());
1✔
264
                ClusterLocality oldClusterLocality = localityAtomicReference
1✔
265
                    .getAndSet(updatedClusterLocality);
1✔
266
                oldClusterLocality.release();
1✔
267
              }
268
              listener.onSubchannelState(newState);
1✔
269
            }
1✔
270
          });
271
        }
1✔
272

273
        @Override
274
        public void shutdown() {
275
          localityAtomicReference.get().release();
1✔
276
          delegate().shutdown();
1✔
277
        }
1✔
278

279
        @Override
280
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
281
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
282
        }
1✔
283

284
        @Override
285
        protected Subchannel delegate() {
286
          return subchannel;
1✔
287
        }
288
      };
289
    }
290

291
    private List<EquivalentAddressGroup> withAdditionalAttributes(
292
        List<EquivalentAddressGroup> addresses) {
293
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
294
      for (EquivalentAddressGroup eag : addresses) {
1✔
295
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
296
            io.grpc.xds.XdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
297
        if (sslContextProviderSupplier != null) {
1✔
298
          attrBuilder.set(
1✔
299
              SecurityProtocolNegotiators.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
300
              sslContextProviderSupplier);
301
        }
302
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
303
      }
1✔
304
      return newAddresses;
1✔
305
    }
306

307
    private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
308
      Locality locality = addressAttributes.get(io.grpc.xds.XdsAttributes.ATTR_LOCALITY);
1✔
309
      String localityName = addressAttributes.get(EquivalentAddressGroup.ATTR_LOCALITY_NAME);
1✔
310

311
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
312
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
313
      // In case of not (which really shouldn't), loads are aggregated under an empty
314
      // locality.
315
      if (locality == null) {
1✔
316
        locality = Locality.create("", "", "");
×
317
        localityName = "";
×
318
      }
319

320
      final ClusterLocalityStats localityStats =
321
          (lrsServerInfo == null)
1✔
322
              ? null
1✔
323
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
324
                  edsServiceName, locality);
1✔
325

326
      return new ClusterLocality(localityStats, localityName);
1✔
327
    }
328

329
    @Override
330
    protected Helper delegate()  {
331
      return helper;
1✔
332
    }
333

334
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
335
      if (!dropPolicies.equals(dropOverloads)) {
1✔
336
        dropPolicies = dropOverloads;
1✔
337
        updateBalancingState(currentState, currentPicker);
1✔
338
      }
339
    }
1✔
340

341
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
342
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
343
        return;
×
344
      }
345
      this.maxConcurrentRequests =
1✔
346
          maxConcurrentRequests != null
1✔
347
              ? maxConcurrentRequests
1✔
348
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
349
      updateBalancingState(currentState, currentPicker);
1✔
350
    }
1✔
351

352
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
353
      UpstreamTlsContext currentTlsContext =
354
          sslContextProviderSupplier != null
1✔
355
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
356
              : null;
1✔
357
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
358
        return;
1✔
359
      }
360
      if (sslContextProviderSupplier != null) {
1✔
361
        sslContextProviderSupplier.close();
1✔
362
      }
363
      sslContextProviderSupplier =
1✔
364
          tlsContext != null
1✔
365
              ? new SslContextProviderSupplier(tlsContext,
1✔
366
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
367
              : null;
1✔
368
    }
1✔
369

370
    private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
371
      this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
372
    }
1✔
373

374
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
375
      private final SubchannelPicker delegate;
376
      private final List<DropOverload> dropPolicies;
377
      private final long maxConcurrentRequests;
378
      private final Map<String, Struct> filterMetadata;
379

380
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
381
          List<DropOverload> dropPolicies, long maxConcurrentRequests,
382
          Map<String, Struct> filterMetadata) {
1✔
383
        this.delegate = delegate;
1✔
384
        this.dropPolicies = dropPolicies;
1✔
385
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
386
        this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
1✔
387
      }
1✔
388

389
      @Override
390
      public PickResult pickSubchannel(PickSubchannelArgs args) {
391
        args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
1✔
392
            .accept(filterMetadata);
1✔
393
        args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
1✔
394
        for (DropOverload dropOverload : dropPolicies) {
1✔
395
          int rand = random.nextInt(1_000_000);
1✔
396
          if (rand < dropOverload.dropsPerMillion()) {
1✔
397
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
398
                dropOverload.category());
1✔
399
            if (dropStats != null) {
1✔
400
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
401
            }
402
            return PickResult.withDrop(
1✔
403
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
404
          }
405
        }
1✔
406
        PickResult result = delegate.pickSubchannel(args);
1✔
407
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
408
          if (enableCircuitBreaking) {
1✔
409
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
410
              if (dropStats != null) {
1✔
411
                dropStats.recordDroppedRequest();
1✔
412
              }
413
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
414
                  String.format(Locale.US, "Cluster max concurrent requests limit of %d exceeded",
1✔
415
                      maxConcurrentRequests)));
1✔
416
            }
417
          }
418
          final AtomicReference<ClusterLocality> clusterLocality =
1✔
419
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
1✔
420

421
          if (clusterLocality != null) {
1✔
422
            ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
1✔
423
            if (stats != null) {
1✔
424
              String localityName =
1✔
425
                  result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
1✔
426
                      .getClusterLocalityName();
1✔
427
              args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
428

429
              ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
430
                  stats, inFlights, result.getStreamTracerFactory());
1✔
431
              ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
432
                  .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
433
              result = PickResult.withSubchannel(result.getSubchannel(),
1✔
434
                  orcaTracerFactory);
435
            }
436
          }
437
          if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
1✔
438
              && args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY)) {
1✔
439
            result = PickResult.withSubchannel(result.getSubchannel(),
1✔
440
                result.getStreamTracerFactory(),
1✔
441
                result.getSubchannel().getAttributes().get(
1✔
442
                    XdsInternalAttributes.ATTR_ADDRESS_NAME));
443
          }
444
        }
445
        return result;
1✔
446
      }
447

448
      @Override
449
      public String toString() {
450
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
451
      }
452
    }
453
  }
454

455
  private static final class CountingStreamTracerFactory extends
456
      ClientStreamTracer.Factory {
457
    private final ClusterLocalityStats stats;
458
    private final AtomicLong inFlights;
459
    @Nullable
460
    private final ClientStreamTracer.Factory delegate;
461

462
    private CountingStreamTracerFactory(
463
        ClusterLocalityStats stats, AtomicLong inFlights,
464
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
465
      this.stats = checkNotNull(stats, "stats");
1✔
466
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
467
      this.delegate = delegate;
1✔
468
    }
1✔
469

470
    @Override
471
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
472
      stats.recordCallStarted();
1✔
473
      inFlights.incrementAndGet();
1✔
474
      if (delegate == null) {
1✔
475
        return new ClientStreamTracer() {
1✔
476
          @Override
477
          public void streamClosed(Status status) {
478
            stats.recordCallFinished(status);
1✔
479
            inFlights.decrementAndGet();
1✔
480
          }
1✔
481
        };
482
      }
483
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
484
      return new ForwardingClientStreamTracer() {
×
485
        @Override
486
        protected ClientStreamTracer delegate() {
487
          return delegatedTracer;
×
488
        }
489

490
        @Override
491
        public void streamClosed(Status status) {
492
          stats.recordCallFinished(status);
×
493
          inFlights.decrementAndGet();
×
494
          delegate().streamClosed(status);
×
495
        }
×
496
      };
497
    }
498
  }
499

500
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
501

502
    private final ClusterLocalityStats stats;
503

504
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
505
      this.stats = checkNotNull(stats, "stats");
1✔
506
    }
1✔
507

508
    /**
509
     * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
510
     * included in the snapshot for the LRS report sent to the LRS server.
511
     */
512
    @Override
513
    public void onLoadReport(MetricReport report) {
514
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
515
    }
1✔
516
  }
517

518
  /**
519
   * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
520
   */
521
  static final class ClusterLocality {
522
    private final ClusterLocalityStats clusterLocalityStats;
523
    private final String clusterLocalityName;
524

525
    @VisibleForTesting
526
    ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
1✔
527
      this.clusterLocalityStats = localityStats;
1✔
528
      this.clusterLocalityName = localityName;
1✔
529
    }
1✔
530

531
    ClusterLocalityStats getClusterLocalityStats() {
532
      return clusterLocalityStats;
1✔
533
    }
534

535
    String getClusterLocalityName() {
536
      return clusterLocalityName;
1✔
537
    }
538

539
    @VisibleForTesting
540
    void release() {
541
      if (clusterLocalityStats != null) {
1✔
542
        clusterLocalityStats.release();
1✔
543
      }
544
    }
1✔
545
  }
546
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc