• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19785

21 Apr 2025 01:17PM UTC coverage: 88.591% (-0.008%) from 88.599%
#19785

push

github

web-flow
Implement grpc.lb.backend_service optional label

This completes gRFC A89. 7162d2d66 and fc86084df had already implemented
the LB plumbing for the optional label on RPC metrics. This observes the
value in OpenTelemetry and adds it to WRR metrics as well.

https://github.com/grpc/proposal/blob/master/A89-backend-service-metric-label.md

34747 of 39222 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.94
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.base.Strings;
24
import com.google.common.collect.ImmutableMap;
25
import com.google.protobuf.Struct;
26
import io.grpc.Attributes;
27
import io.grpc.ClientStreamTracer;
28
import io.grpc.ClientStreamTracer.StreamInfo;
29
import io.grpc.ConnectivityState;
30
import io.grpc.ConnectivityStateInfo;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.InternalLogId;
33
import io.grpc.LoadBalancer;
34
import io.grpc.Metadata;
35
import io.grpc.NameResolver;
36
import io.grpc.Status;
37
import io.grpc.internal.ForwardingClientStreamTracer;
38
import io.grpc.internal.GrpcUtil;
39
import io.grpc.internal.ObjectPool;
40
import io.grpc.services.MetricReport;
41
import io.grpc.util.ForwardingLoadBalancerHelper;
42
import io.grpc.util.ForwardingSubchannel;
43
import io.grpc.util.GracefulSwitchLoadBalancer;
44
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
45
import io.grpc.xds.Endpoints.DropOverload;
46
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
47
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
48
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
49
import io.grpc.xds.client.Bootstrapper.ServerInfo;
50
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
51
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
52
import io.grpc.xds.client.Locality;
53
import io.grpc.xds.client.XdsClient;
54
import io.grpc.xds.client.XdsLogger;
55
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
56
import io.grpc.xds.internal.security.SecurityProtocolNegotiators;
57
import io.grpc.xds.internal.security.SslContextProviderSupplier;
58
import io.grpc.xds.orca.OrcaPerRequestUtil;
59
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
60
import java.util.ArrayList;
61
import java.util.Collections;
62
import java.util.List;
63
import java.util.Locale;
64
import java.util.Map;
65
import java.util.Objects;
66
import java.util.concurrent.atomic.AtomicLong;
67
import java.util.concurrent.atomic.AtomicReference;
68
import javax.annotation.Nullable;
69

70
/**
71
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
72
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
73
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
74
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
75
 * breakers.
76
 */
77
final class ClusterImplLoadBalancer extends LoadBalancer {
78

79
  @VisibleForTesting
80
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
81
  @VisibleForTesting
82
  static boolean enableCircuitBreaking =
1✔
83
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
84
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
85

86
  private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
1✔
87
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
1✔
88

89
  private final XdsLogger logger;
90
  private final Helper helper;
91
  private final ThreadSafeRandom random;
92
  // The following fields are effectively final.
93
  private String cluster;
94
  @Nullable
95
  private String edsServiceName;
96
  private ObjectPool<XdsClient> xdsClientPool;
97
  private XdsClient xdsClient;
98
  private CallCounterProvider callCounterProvider;
99
  private ClusterDropStats dropStats;
100
  private ClusterImplLbHelper childLbHelper;
101
  private GracefulSwitchLoadBalancer childSwitchLb;
102

103
  ClusterImplLoadBalancer(Helper helper) {
104
    this(helper, ThreadSafeRandomImpl.instance);
1✔
105
  }
1✔
106

107
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
108
    this.helper = checkNotNull(helper, "helper");
1✔
109
    this.random = checkNotNull(random, "random");
1✔
110
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
111
    logger = XdsLogger.withLogId(logId);
1✔
112
    logger.log(XdsLogLevel.INFO, "Created");
1✔
113
  }
1✔
114

115
  @Override
116
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
117
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
118
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
119
    if (xdsClientPool == null) {
1✔
120
      xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
1✔
121
      assert xdsClientPool != null;
1✔
122
      xdsClient = xdsClientPool.getObject();
1✔
123
    }
124
    if (callCounterProvider == null) {
1✔
125
      callCounterProvider = attributes.get(XdsAttributes.CALL_COUNTER_PROVIDER);
1✔
126
    }
127

128
    ClusterImplConfig config =
1✔
129
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
130
    if (config == null) {
1✔
131
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
132
    }
133

134
    if (cluster == null) {
1✔
135
      cluster = config.cluster;
1✔
136
      edsServiceName = config.edsServiceName;
1✔
137
      childLbHelper = new ClusterImplLbHelper(
1✔
138
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
139
          config.lrsServerInfo);
140
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
141
      // Assume load report server does not change throughout cluster lifetime.
142
      if (config.lrsServerInfo != null) {
1✔
143
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
144
      }
145
    }
146

147
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
148
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
149
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
150
    childLbHelper.updateFilterMetadata(config.filterMetadata);
1✔
151

152
    childSwitchLb.handleResolvedAddresses(
1✔
153
        resolvedAddresses.toBuilder()
1✔
154
            .setAttributes(attributes.toBuilder()
1✔
155
              .set(NameResolver.ATTR_BACKEND_SERVICE, cluster)
1✔
156
              .build())
1✔
157
            .setLoadBalancingPolicyConfig(config.childConfig)
1✔
158
            .build());
1✔
159
    return Status.OK;
1✔
160
  }
161

162
  @Override
163
  public void handleNameResolutionError(Status error) {
164
    if (childSwitchLb != null) {
1✔
165
      childSwitchLb.handleNameResolutionError(error);
1✔
166
    } else {
167
      helper.updateBalancingState(
1✔
168
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
169
    }
170
  }
1✔
171

172
  @Override
173
  public void requestConnection() {
174
    if (childSwitchLb != null) {
1✔
175
      childSwitchLb.requestConnection();
1✔
176
    }
177
  }
1✔
178

179
  @Override
180
  public void shutdown() {
181
    if (dropStats != null) {
1✔
182
      dropStats.release();
1✔
183
    }
184
    if (childSwitchLb != null) {
1✔
185
      childSwitchLb.shutdown();
1✔
186
      if (childLbHelper != null) {
1✔
187
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
188
        childLbHelper = null;
1✔
189
      }
190
    }
191
    if (xdsClient != null) {
1✔
192
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
193
    }
194
  }
1✔
195

196
  /**
197
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
198
   * or requests to endpoints in the cluster.
199
   */
200
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
201
    private final AtomicLong inFlights;
202
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
203
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
204
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
205
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
206
    @Nullable
207
    private SslContextProviderSupplier sslContextProviderSupplier;
208
    private Map<String, Struct> filterMetadata = ImmutableMap.of();
1✔
209
    @Nullable
210
    private final ServerInfo lrsServerInfo;
211

212
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
213
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
214
      this.lrsServerInfo = lrsServerInfo;
1✔
215
    }
1✔
216

217
    @Override
218
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
219
      currentState = newState;
1✔
220
      currentPicker =  newPicker;
1✔
221
      SubchannelPicker picker = new RequestLimitingSubchannelPicker(
1✔
222
          newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
223
      delegate().updateBalancingState(newState, picker);
1✔
224
    }
1✔
225

226
    @Override
227
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
228
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
229
      // This value for  ClusterLocality is not recommended for general use.
230
      // Currently, we extract locality data from the first address, even before the subchannel is
231
      // READY.
232
      // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
233
      // might return the subchannel before it is READY. Typically, we wouldn't report load for such
234
      // selections because the channel will disregard the chosen (not-ready) subchannel.
235
      // However, we needed to ensure this case is handled.
236
      ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
1✔
237
          args.getAddresses().get(0).getAttributes());
1✔
238
      AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
1✔
239
          clusterLocality);
240
      Attributes.Builder attrsBuilder = args.getAttributes().toBuilder()
1✔
241
          .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference);
1✔
242
      if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)) {
1✔
243
        String hostname = args.getAddresses().get(0).getAttributes()
1✔
244
            .get(XdsAttributes.ATTR_ADDRESS_NAME);
1✔
245
        if (hostname != null) {
1✔
246
          attrsBuilder.set(XdsAttributes.ATTR_ADDRESS_NAME, hostname);
1✔
247
        }
248
      }
249
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
1✔
250
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
251

252
      return new ForwardingSubchannel() {
1✔
253
        @Override
254
        public void start(SubchannelStateListener listener) {
255
          delegate().start(new SubchannelStateListener() {
1✔
256
            @Override
257
            public void onSubchannelState(ConnectivityStateInfo newState) {
258
              // Do nothing if LB has been shutdown
259
              if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
1✔
260
                // Get locality based on the connected address attributes
261
                ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
1✔
262
                    subchannel.getConnectedAddressAttributes());
1✔
263
                ClusterLocality oldClusterLocality = localityAtomicReference
1✔
264
                    .getAndSet(updatedClusterLocality);
1✔
265
                oldClusterLocality.release();
1✔
266
              }
267
              listener.onSubchannelState(newState);
1✔
268
            }
1✔
269
          });
270
        }
1✔
271

272
        @Override
273
        public void shutdown() {
274
          localityAtomicReference.get().release();
1✔
275
          delegate().shutdown();
1✔
276
        }
1✔
277

278
        @Override
279
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
280
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
281
        }
1✔
282

283
        @Override
284
        protected Subchannel delegate() {
285
          return subchannel;
1✔
286
        }
287
      };
288
    }
289

290
    private List<EquivalentAddressGroup> withAdditionalAttributes(
291
        List<EquivalentAddressGroup> addresses) {
292
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
293
      for (EquivalentAddressGroup eag : addresses) {
1✔
294
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
295
            XdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
296
        if (sslContextProviderSupplier != null) {
1✔
297
          attrBuilder.set(
1✔
298
              SecurityProtocolNegotiators.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
299
              sslContextProviderSupplier);
300
        }
301
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
302
      }
1✔
303
      return newAddresses;
1✔
304
    }
305

306
    private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
307
      Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY);
1✔
308
      String localityName = addressAttributes.get(XdsAttributes.ATTR_LOCALITY_NAME);
1✔
309

310
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
311
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
312
      // In case of not (which really shouldn't), loads are aggregated under an empty
313
      // locality.
314
      if (locality == null) {
1✔
315
        locality = Locality.create("", "", "");
×
316
        localityName = "";
×
317
      }
318

319
      final ClusterLocalityStats localityStats =
320
          (lrsServerInfo == null)
1✔
321
              ? null
1✔
322
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
323
                  edsServiceName, locality);
1✔
324

325
      return new ClusterLocality(localityStats, localityName);
1✔
326
    }
327

328
    @Override
329
    protected Helper delegate()  {
330
      return helper;
1✔
331
    }
332

333
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
334
      if (!dropPolicies.equals(dropOverloads)) {
1✔
335
        dropPolicies = dropOverloads;
1✔
336
        updateBalancingState(currentState, currentPicker);
1✔
337
      }
338
    }
1✔
339

340
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
341
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
342
        return;
×
343
      }
344
      this.maxConcurrentRequests =
1✔
345
          maxConcurrentRequests != null
1✔
346
              ? maxConcurrentRequests
1✔
347
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
348
      updateBalancingState(currentState, currentPicker);
1✔
349
    }
1✔
350

351
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
352
      UpstreamTlsContext currentTlsContext =
353
          sslContextProviderSupplier != null
1✔
354
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
355
              : null;
1✔
356
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
357
        return;
1✔
358
      }
359
      if (sslContextProviderSupplier != null) {
1✔
360
        sslContextProviderSupplier.close();
1✔
361
      }
362
      sslContextProviderSupplier =
1✔
363
          tlsContext != null
1✔
364
              ? new SslContextProviderSupplier(tlsContext,
1✔
365
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
366
              : null;
1✔
367
    }
1✔
368

369
    private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
370
      this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
371
    }
1✔
372

373
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
374
      private final SubchannelPicker delegate;
375
      private final List<DropOverload> dropPolicies;
376
      private final long maxConcurrentRequests;
377
      private final Map<String, Struct> filterMetadata;
378

379
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
380
          List<DropOverload> dropPolicies, long maxConcurrentRequests,
381
          Map<String, Struct> filterMetadata) {
1✔
382
        this.delegate = delegate;
1✔
383
        this.dropPolicies = dropPolicies;
1✔
384
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
385
        this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
1✔
386
      }
1✔
387

388
      @Override
389
      public PickResult pickSubchannel(PickSubchannelArgs args) {
390
        args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
1✔
391
            .accept(filterMetadata);
1✔
392
        args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
1✔
393
        for (DropOverload dropOverload : dropPolicies) {
1✔
394
          int rand = random.nextInt(1_000_000);
1✔
395
          if (rand < dropOverload.dropsPerMillion()) {
1✔
396
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
397
                dropOverload.category());
1✔
398
            if (dropStats != null) {
1✔
399
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
400
            }
401
            return PickResult.withDrop(
1✔
402
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
403
          }
404
        }
1✔
405
        PickResult result = delegate.pickSubchannel(args);
1✔
406
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
407
          if (enableCircuitBreaking) {
1✔
408
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
409
              if (dropStats != null) {
1✔
410
                dropStats.recordDroppedRequest();
1✔
411
              }
412
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
413
                  String.format(Locale.US, "Cluster max concurrent requests limit of %d exceeded",
1✔
414
                      maxConcurrentRequests)));
1✔
415
            }
416
          }
417
          final AtomicReference<ClusterLocality> clusterLocality =
1✔
418
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
1✔
419

420
          if (clusterLocality != null) {
1✔
421
            ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
1✔
422
            if (stats != null) {
1✔
423
              String localityName =
1✔
424
                  result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
1✔
425
                      .getClusterLocalityName();
1✔
426
              args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
427

428
              ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
429
                  stats, inFlights, result.getStreamTracerFactory());
1✔
430
              ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
431
                  .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
432
              result = PickResult.withSubchannel(result.getSubchannel(),
1✔
433
                  orcaTracerFactory);
434
            }
435
          }
436
          if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
1✔
437
              && args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY)) {
1✔
438
            result = PickResult.withSubchannel(result.getSubchannel(),
1✔
439
                result.getStreamTracerFactory(),
1✔
440
                result.getSubchannel().getAttributes().get(
1✔
441
                    XdsAttributes.ATTR_ADDRESS_NAME));
442
          }
443
        }
444
        return result;
1✔
445
      }
446

447
      @Override
448
      public String toString() {
449
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
450
      }
451
    }
452
  }
453

454
  private static final class CountingStreamTracerFactory extends
455
      ClientStreamTracer.Factory {
456
    private final ClusterLocalityStats stats;
457
    private final AtomicLong inFlights;
458
    @Nullable
459
    private final ClientStreamTracer.Factory delegate;
460

461
    private CountingStreamTracerFactory(
462
        ClusterLocalityStats stats, AtomicLong inFlights,
463
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
464
      this.stats = checkNotNull(stats, "stats");
1✔
465
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
466
      this.delegate = delegate;
1✔
467
    }
1✔
468

469
    @Override
470
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
471
      stats.recordCallStarted();
1✔
472
      inFlights.incrementAndGet();
1✔
473
      if (delegate == null) {
1✔
474
        return new ClientStreamTracer() {
1✔
475
          @Override
476
          public void streamClosed(Status status) {
477
            stats.recordCallFinished(status);
1✔
478
            inFlights.decrementAndGet();
1✔
479
          }
1✔
480
        };
481
      }
482
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
483
      return new ForwardingClientStreamTracer() {
×
484
        @Override
485
        protected ClientStreamTracer delegate() {
486
          return delegatedTracer;
×
487
        }
488

489
        @Override
490
        public void streamClosed(Status status) {
491
          stats.recordCallFinished(status);
×
492
          inFlights.decrementAndGet();
×
493
          delegate().streamClosed(status);
×
494
        }
×
495
      };
496
    }
497
  }
498

499
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
500

501
    private final ClusterLocalityStats stats;
502

503
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
504
      this.stats = checkNotNull(stats, "stats");
1✔
505
    }
1✔
506

507
    /**
508
     * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
509
     * included in the snapshot for the LRS report sent to the LRS server.
510
     */
511
    @Override
512
    public void onLoadReport(MetricReport report) {
513
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
514
    }
1✔
515
  }
516

517
  /**
518
   * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
519
   */
520
  static final class ClusterLocality {
521
    private final ClusterLocalityStats clusterLocalityStats;
522
    private final String clusterLocalityName;
523

524
    @VisibleForTesting
525
    ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
1✔
526
      this.clusterLocalityStats = localityStats;
1✔
527
      this.clusterLocalityName = localityName;
1✔
528
    }
1✔
529

530
    ClusterLocalityStats getClusterLocalityStats() {
531
      return clusterLocalityStats;
1✔
532
    }
533

534
    String getClusterLocalityName() {
535
      return clusterLocalityName;
1✔
536
    }
537

538
    @VisibleForTesting
539
    void release() {
540
      if (clusterLocalityStats != null) {
1✔
541
        clusterLocalityStats.release();
1✔
542
      }
543
    }
1✔
544
  }
545
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc