• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19638

13 Jan 2025 09:30PM CUT coverage: 88.563% (+0.02%) from 88.544%
#19638

push

github

ejona86
xds: Pass grpc.xds.cluster label to tracer

This is in service to gRFC A89. Since the gRFC isn't finalized this
purposefully doesn't really do anything yet. The grpc-opentelemetry
change to use this optional label will be done after the gRFC is merged.
grpc-opentelemetry currently has a hard-coded list (one entry) of labels
that it looks for, and this label will need to be added.

b/356167676

33683 of 38033 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.85
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.base.Strings;
24
import com.google.common.collect.ImmutableMap;
25
import com.google.protobuf.Struct;
26
import io.grpc.Attributes;
27
import io.grpc.ClientStreamTracer;
28
import io.grpc.ClientStreamTracer.StreamInfo;
29
import io.grpc.ConnectivityState;
30
import io.grpc.ConnectivityStateInfo;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.InternalLogId;
33
import io.grpc.LoadBalancer;
34
import io.grpc.Metadata;
35
import io.grpc.Status;
36
import io.grpc.internal.ForwardingClientStreamTracer;
37
import io.grpc.internal.GrpcUtil;
38
import io.grpc.internal.ObjectPool;
39
import io.grpc.services.MetricReport;
40
import io.grpc.util.ForwardingLoadBalancerHelper;
41
import io.grpc.util.ForwardingSubchannel;
42
import io.grpc.util.GracefulSwitchLoadBalancer;
43
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
44
import io.grpc.xds.Endpoints.DropOverload;
45
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
46
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
47
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
48
import io.grpc.xds.client.Bootstrapper.ServerInfo;
49
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
50
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
51
import io.grpc.xds.client.Locality;
52
import io.grpc.xds.client.XdsClient;
53
import io.grpc.xds.client.XdsLogger;
54
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
55
import io.grpc.xds.internal.security.SecurityProtocolNegotiators;
56
import io.grpc.xds.internal.security.SslContextProviderSupplier;
57
import io.grpc.xds.orca.OrcaPerRequestUtil;
58
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
59
import java.util.ArrayList;
60
import java.util.Collections;
61
import java.util.List;
62
import java.util.Map;
63
import java.util.Objects;
64
import java.util.concurrent.atomic.AtomicLong;
65
import java.util.concurrent.atomic.AtomicReference;
66
import javax.annotation.Nullable;
67

68
/**
69
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
70
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
71
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
72
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
73
 * breakers.
74
 */
75
final class ClusterImplLoadBalancer extends LoadBalancer {
76

77
  @VisibleForTesting
78
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
79
  @VisibleForTesting
80
  static boolean enableCircuitBreaking =
1✔
81
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
82
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
83

84
  private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
1✔
85
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
1✔
86

87
  private final XdsLogger logger;
88
  private final Helper helper;
89
  private final ThreadSafeRandom random;
90
  // The following fields are effectively final.
91
  private String cluster;
92
  @Nullable
93
  private String edsServiceName;
94
  private ObjectPool<XdsClient> xdsClientPool;
95
  private XdsClient xdsClient;
96
  private CallCounterProvider callCounterProvider;
97
  private ClusterDropStats dropStats;
98
  private ClusterImplLbHelper childLbHelper;
99
  private GracefulSwitchLoadBalancer childSwitchLb;
100

101
  ClusterImplLoadBalancer(Helper helper) {
102
    this(helper, ThreadSafeRandomImpl.instance);
1✔
103
  }
1✔
104

105
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
106
    this.helper = checkNotNull(helper, "helper");
1✔
107
    this.random = checkNotNull(random, "random");
1✔
108
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
109
    logger = XdsLogger.withLogId(logId);
1✔
110
    logger.log(XdsLogLevel.INFO, "Created");
1✔
111
  }
1✔
112

113
  @Override
114
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
115
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
116
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
117
    if (xdsClientPool == null) {
1✔
118
      xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
1✔
119
      assert xdsClientPool != null;
1✔
120
      xdsClient = xdsClientPool.getObject();
1✔
121
    }
122
    if (callCounterProvider == null) {
1✔
123
      callCounterProvider = attributes.get(XdsAttributes.CALL_COUNTER_PROVIDER);
1✔
124
    }
125

126
    ClusterImplConfig config =
1✔
127
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
128
    if (config == null) {
1✔
129
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
130
    }
131

132
    if (cluster == null) {
1✔
133
      cluster = config.cluster;
1✔
134
      edsServiceName = config.edsServiceName;
1✔
135
      childLbHelper = new ClusterImplLbHelper(
1✔
136
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
137
          config.lrsServerInfo);
138
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
139
      // Assume load report server does not change throughout cluster lifetime.
140
      if (config.lrsServerInfo != null) {
1✔
141
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
142
      }
143
    }
144

145
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
146
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
147
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
148
    childLbHelper.updateFilterMetadata(config.filterMetadata);
1✔
149

150
    childSwitchLb.handleResolvedAddresses(
1✔
151
        resolvedAddresses.toBuilder()
1✔
152
            .setAttributes(attributes)
1✔
153
            .setLoadBalancingPolicyConfig(config.childConfig)
1✔
154
            .build());
1✔
155
    return Status.OK;
1✔
156
  }
157

158
  @Override
159
  public void handleNameResolutionError(Status error) {
160
    if (childSwitchLb != null) {
1✔
161
      childSwitchLb.handleNameResolutionError(error);
1✔
162
    } else {
163
      helper.updateBalancingState(
1✔
164
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
165
    }
166
  }
1✔
167

168
  @Override
169
  public void requestConnection() {
170
    if (childSwitchLb != null) {
1✔
171
      childSwitchLb.requestConnection();
1✔
172
    }
173
  }
1✔
174

175
  @Override
176
  public void shutdown() {
177
    if (dropStats != null) {
1✔
178
      dropStats.release();
1✔
179
    }
180
    if (childSwitchLb != null) {
1✔
181
      childSwitchLb.shutdown();
1✔
182
      if (childLbHelper != null) {
1✔
183
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
184
        childLbHelper = null;
1✔
185
      }
186
    }
187
    if (xdsClient != null) {
1✔
188
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
189
    }
190
  }
1✔
191

192
  /**
193
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
194
   * or requests to endpoints in the cluster.
195
   */
196
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
197
    private final AtomicLong inFlights;
198
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
199
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
200
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
201
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
202
    @Nullable
203
    private SslContextProviderSupplier sslContextProviderSupplier;
204
    private Map<String, Struct> filterMetadata = ImmutableMap.of();
1✔
205
    @Nullable
206
    private final ServerInfo lrsServerInfo;
207

208
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
209
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
210
      this.lrsServerInfo = lrsServerInfo;
1✔
211
    }
1✔
212

213
    @Override
214
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
215
      currentState = newState;
1✔
216
      currentPicker =  newPicker;
1✔
217
      SubchannelPicker picker = new RequestLimitingSubchannelPicker(
1✔
218
          newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
219
      delegate().updateBalancingState(newState, picker);
1✔
220
    }
1✔
221

222
    @Override
223
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
224
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
225
      // This value for  ClusterLocality is not recommended for general use.
226
      // Currently, we extract locality data from the first address, even before the subchannel is
227
      // READY.
228
      // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
229
      // might return the subchannel before it is READY. Typically, we wouldn't report load for such
230
      // selections because the channel will disregard the chosen (not-ready) subchannel.
231
      // However, we needed to ensure this case is handled.
232
      ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
1✔
233
          args.getAddresses().get(0).getAttributes());
1✔
234
      AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
1✔
235
          clusterLocality);
236
      Attributes.Builder attrsBuilder = args.getAttributes().toBuilder()
1✔
237
          .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference);
1✔
238
      if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)) {
1✔
239
        String hostname = args.getAddresses().get(0).getAttributes()
1✔
240
            .get(XdsAttributes.ATTR_ADDRESS_NAME);
1✔
241
        if (hostname != null) {
1✔
242
          attrsBuilder.set(XdsAttributes.ATTR_ADDRESS_NAME, hostname);
1✔
243
        }
244
      }
245
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
1✔
246
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
247

248
      return new ForwardingSubchannel() {
1✔
249
        @Override
250
        public void start(SubchannelStateListener listener) {
251
          delegate().start(new SubchannelStateListener() {
1✔
252
            @Override
253
            public void onSubchannelState(ConnectivityStateInfo newState) {
254
              // Do nothing if LB has been shutdown
255
              if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
1✔
256
                // Get locality based on the connected address attributes
257
                ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
1✔
258
                    subchannel.getConnectedAddressAttributes());
1✔
259
                ClusterLocality oldClusterLocality = localityAtomicReference
1✔
260
                    .getAndSet(updatedClusterLocality);
1✔
261
                oldClusterLocality.release();
1✔
262
              }
263
              listener.onSubchannelState(newState);
1✔
264
            }
1✔
265
          });
266
        }
1✔
267

268
        @Override
269
        public void shutdown() {
270
          localityAtomicReference.get().release();
1✔
271
          delegate().shutdown();
1✔
272
        }
1✔
273

274
        @Override
275
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
276
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
277
        }
1✔
278

279
        @Override
280
        protected Subchannel delegate() {
281
          return subchannel;
1✔
282
        }
283
      };
284
    }
285

286
    private List<EquivalentAddressGroup> withAdditionalAttributes(
287
        List<EquivalentAddressGroup> addresses) {
288
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
289
      for (EquivalentAddressGroup eag : addresses) {
1✔
290
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
291
            XdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
292
        if (sslContextProviderSupplier != null) {
1✔
293
          attrBuilder.set(
1✔
294
              SecurityProtocolNegotiators.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
295
              sslContextProviderSupplier);
296
        }
297
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
298
      }
1✔
299
      return newAddresses;
1✔
300
    }
301

302
    private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
303
      Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY);
1✔
304
      String localityName = addressAttributes.get(XdsAttributes.ATTR_LOCALITY_NAME);
1✔
305

306
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
307
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
308
      // In case of not (which really shouldn't), loads are aggregated under an empty
309
      // locality.
310
      if (locality == null) {
1✔
311
        locality = Locality.create("", "", "");
×
312
        localityName = "";
×
313
      }
314

315
      final ClusterLocalityStats localityStats =
316
          (lrsServerInfo == null)
1✔
317
              ? null
1✔
318
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
319
                  edsServiceName, locality);
1✔
320

321
      return new ClusterLocality(localityStats, localityName);
1✔
322
    }
323

324
    @Override
325
    protected Helper delegate()  {
326
      return helper;
1✔
327
    }
328

329
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
330
      if (!dropPolicies.equals(dropOverloads)) {
1✔
331
        dropPolicies = dropOverloads;
1✔
332
        updateBalancingState(currentState, currentPicker);
1✔
333
      }
334
    }
1✔
335

336
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
337
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
338
        return;
×
339
      }
340
      this.maxConcurrentRequests =
1✔
341
          maxConcurrentRequests != null
1✔
342
              ? maxConcurrentRequests
1✔
343
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
344
      updateBalancingState(currentState, currentPicker);
1✔
345
    }
1✔
346

347
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
348
      UpstreamTlsContext currentTlsContext =
349
          sslContextProviderSupplier != null
1✔
350
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
351
              : null;
1✔
352
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
353
        return;
1✔
354
      }
355
      if (sslContextProviderSupplier != null) {
1✔
356
        sslContextProviderSupplier.close();
1✔
357
      }
358
      sslContextProviderSupplier =
1✔
359
          tlsContext != null
1✔
360
              ? new SslContextProviderSupplier(tlsContext,
1✔
361
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
362
              : null;
1✔
363
    }
1✔
364

365
    private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
366
      this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
367
    }
1✔
368

369
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
370
      private final SubchannelPicker delegate;
371
      private final List<DropOverload> dropPolicies;
372
      private final long maxConcurrentRequests;
373
      private final Map<String, Struct> filterMetadata;
374

375
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
376
          List<DropOverload> dropPolicies, long maxConcurrentRequests,
377
          Map<String, Struct> filterMetadata) {
1✔
378
        this.delegate = delegate;
1✔
379
        this.dropPolicies = dropPolicies;
1✔
380
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
381
        this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
1✔
382
      }
1✔
383

384
      @Override
385
      public PickResult pickSubchannel(PickSubchannelArgs args) {
386
        args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
1✔
387
            .accept(filterMetadata);
1✔
388
        args.getPickDetailsConsumer().addOptionalLabel("grpc.xds.cluster", cluster);
1✔
389
        for (DropOverload dropOverload : dropPolicies) {
1✔
390
          int rand = random.nextInt(1_000_000);
1✔
391
          if (rand < dropOverload.dropsPerMillion()) {
1✔
392
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
393
                dropOverload.category());
1✔
394
            if (dropStats != null) {
1✔
395
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
396
            }
397
            return PickResult.withDrop(
1✔
398
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
399
          }
400
        }
1✔
401
        PickResult result = delegate.pickSubchannel(args);
1✔
402
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
403
          if (enableCircuitBreaking) {
1✔
404
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
405
              if (dropStats != null) {
1✔
406
                dropStats.recordDroppedRequest();
1✔
407
              }
408
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
409
                  "Cluster max concurrent requests limit exceeded"));
410
            }
411
          }
412
          final AtomicReference<ClusterLocality> clusterLocality =
1✔
413
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
1✔
414

415
          if (clusterLocality != null) {
1✔
416
            ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
1✔
417
            if (stats != null) {
1✔
418
              String localityName =
1✔
419
                  result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
1✔
420
                      .getClusterLocalityName();
1✔
421
              args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
422

423
              ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
424
                  stats, inFlights, result.getStreamTracerFactory());
1✔
425
              ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
426
                  .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
427
              result = PickResult.withSubchannel(result.getSubchannel(),
1✔
428
                  orcaTracerFactory);
429
            }
430
          }
431
          if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
1✔
432
              && args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY)) {
1✔
433
            result = PickResult.withSubchannel(result.getSubchannel(),
1✔
434
                result.getStreamTracerFactory(),
1✔
435
                result.getSubchannel().getAttributes().get(
1✔
436
                    XdsAttributes.ATTR_ADDRESS_NAME));
437
          }
438
        }
439
        return result;
1✔
440
      }
441

442
      @Override
443
      public String toString() {
444
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
445
      }
446
    }
447
  }
448

449
  private static final class CountingStreamTracerFactory extends
450
      ClientStreamTracer.Factory {
451
    private final ClusterLocalityStats stats;
452
    private final AtomicLong inFlights;
453
    @Nullable
454
    private final ClientStreamTracer.Factory delegate;
455

456
    private CountingStreamTracerFactory(
457
        ClusterLocalityStats stats, AtomicLong inFlights,
458
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
459
      this.stats = checkNotNull(stats, "stats");
1✔
460
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
461
      this.delegate = delegate;
1✔
462
    }
1✔
463

464
    @Override
465
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
466
      stats.recordCallStarted();
1✔
467
      inFlights.incrementAndGet();
1✔
468
      if (delegate == null) {
1✔
469
        return new ClientStreamTracer() {
1✔
470
          @Override
471
          public void streamClosed(Status status) {
472
            stats.recordCallFinished(status);
1✔
473
            inFlights.decrementAndGet();
1✔
474
          }
1✔
475
        };
476
      }
477
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
478
      return new ForwardingClientStreamTracer() {
×
479
        @Override
480
        protected ClientStreamTracer delegate() {
481
          return delegatedTracer;
×
482
        }
483

484
        @Override
485
        public void streamClosed(Status status) {
486
          stats.recordCallFinished(status);
×
487
          inFlights.decrementAndGet();
×
488
          delegate().streamClosed(status);
×
489
        }
×
490
      };
491
    }
492
  }
493

494
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
495

496
    private final ClusterLocalityStats stats;
497

498
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
499
      this.stats = checkNotNull(stats, "stats");
1✔
500
    }
1✔
501

502
    /**
503
     * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
504
     * included in the snapshot for the LRS report sent to the LRS server.
505
     */
506
    @Override
507
    public void onLoadReport(MetricReport report) {
508
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
509
    }
1✔
510
  }
511

512
  /**
513
   * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
514
   */
515
  static final class ClusterLocality {
516
    private final ClusterLocalityStats clusterLocalityStats;
517
    private final String clusterLocalityName;
518

519
    @VisibleForTesting
520
    ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
1✔
521
      this.clusterLocalityStats = localityStats;
1✔
522
      this.clusterLocalityName = localityName;
1✔
523
    }
1✔
524

525
    ClusterLocalityStats getClusterLocalityStats() {
526
      return clusterLocalityStats;
1✔
527
    }
528

529
    String getClusterLocalityName() {
530
      return clusterLocalityName;
1✔
531
    }
532

533
    @VisibleForTesting
534
    void release() {
535
      if (clusterLocalityStats != null) {
1✔
536
        clusterLocalityStats.release();
1✔
537
      }
538
    }
1✔
539
  }
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc