• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19658

23 Jan 2025 04:10PM CUT coverage: 88.574% (-0.007%) from 88.581%
#19658

push

github

web-flow
xds: Include max concurrent request limit in the error status for concurre… (#11845)

Include max concurrent request limit in the error status for concurrent connections limit exceeded

33720 of 38070 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.89
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.base.Strings;
24
import com.google.common.collect.ImmutableMap;
25
import com.google.protobuf.Struct;
26
import io.grpc.Attributes;
27
import io.grpc.ClientStreamTracer;
28
import io.grpc.ClientStreamTracer.StreamInfo;
29
import io.grpc.ConnectivityState;
30
import io.grpc.ConnectivityStateInfo;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.InternalLogId;
33
import io.grpc.LoadBalancer;
34
import io.grpc.Metadata;
35
import io.grpc.Status;
36
import io.grpc.internal.ForwardingClientStreamTracer;
37
import io.grpc.internal.GrpcUtil;
38
import io.grpc.internal.ObjectPool;
39
import io.grpc.services.MetricReport;
40
import io.grpc.util.ForwardingLoadBalancerHelper;
41
import io.grpc.util.ForwardingSubchannel;
42
import io.grpc.util.GracefulSwitchLoadBalancer;
43
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
44
import io.grpc.xds.Endpoints.DropOverload;
45
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
46
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
47
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
48
import io.grpc.xds.client.Bootstrapper.ServerInfo;
49
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
50
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
51
import io.grpc.xds.client.Locality;
52
import io.grpc.xds.client.XdsClient;
53
import io.grpc.xds.client.XdsLogger;
54
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
55
import io.grpc.xds.internal.security.SecurityProtocolNegotiators;
56
import io.grpc.xds.internal.security.SslContextProviderSupplier;
57
import io.grpc.xds.orca.OrcaPerRequestUtil;
58
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
59
import java.util.ArrayList;
60
import java.util.Collections;
61
import java.util.List;
62
import java.util.Locale;
63
import java.util.Map;
64
import java.util.Objects;
65
import java.util.concurrent.atomic.AtomicLong;
66
import java.util.concurrent.atomic.AtomicReference;
67
import javax.annotation.Nullable;
68

69
/**
70
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
71
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
72
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
73
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
74
 * breakers.
75
 */
76
final class ClusterImplLoadBalancer extends LoadBalancer {
77

78
  @VisibleForTesting
79
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
80
  @VisibleForTesting
81
  static boolean enableCircuitBreaking =
1✔
82
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
83
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
84

85
  private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
1✔
86
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
1✔
87

88
  private final XdsLogger logger;
89
  private final Helper helper;
90
  private final ThreadSafeRandom random;
91
  // The following fields are effectively final.
92
  private String cluster;
93
  @Nullable
94
  private String edsServiceName;
95
  private ObjectPool<XdsClient> xdsClientPool;
96
  private XdsClient xdsClient;
97
  private CallCounterProvider callCounterProvider;
98
  private ClusterDropStats dropStats;
99
  private ClusterImplLbHelper childLbHelper;
100
  private GracefulSwitchLoadBalancer childSwitchLb;
101

102
  ClusterImplLoadBalancer(Helper helper) {
103
    this(helper, ThreadSafeRandomImpl.instance);
1✔
104
  }
1✔
105

106
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
107
    this.helper = checkNotNull(helper, "helper");
1✔
108
    this.random = checkNotNull(random, "random");
1✔
109
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
110
    logger = XdsLogger.withLogId(logId);
1✔
111
    logger.log(XdsLogLevel.INFO, "Created");
1✔
112
  }
1✔
113

114
  @Override
115
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
116
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
117
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
118
    if (xdsClientPool == null) {
1✔
119
      xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
1✔
120
      assert xdsClientPool != null;
1✔
121
      xdsClient = xdsClientPool.getObject();
1✔
122
    }
123
    if (callCounterProvider == null) {
1✔
124
      callCounterProvider = attributes.get(XdsAttributes.CALL_COUNTER_PROVIDER);
1✔
125
    }
126

127
    ClusterImplConfig config =
1✔
128
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
129
    if (config == null) {
1✔
130
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
131
    }
132

133
    if (cluster == null) {
1✔
134
      cluster = config.cluster;
1✔
135
      edsServiceName = config.edsServiceName;
1✔
136
      childLbHelper = new ClusterImplLbHelper(
1✔
137
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
138
          config.lrsServerInfo);
139
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
140
      // Assume load report server does not change throughout cluster lifetime.
141
      if (config.lrsServerInfo != null) {
1✔
142
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
143
      }
144
    }
145

146
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
147
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
148
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
149
    childLbHelper.updateFilterMetadata(config.filterMetadata);
1✔
150

151
    childSwitchLb.handleResolvedAddresses(
1✔
152
        resolvedAddresses.toBuilder()
1✔
153
            .setAttributes(attributes)
1✔
154
            .setLoadBalancingPolicyConfig(config.childConfig)
1✔
155
            .build());
1✔
156
    return Status.OK;
1✔
157
  }
158

159
  @Override
160
  public void handleNameResolutionError(Status error) {
161
    if (childSwitchLb != null) {
1✔
162
      childSwitchLb.handleNameResolutionError(error);
1✔
163
    } else {
164
      helper.updateBalancingState(
1✔
165
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
166
    }
167
  }
1✔
168

169
  @Override
170
  public void requestConnection() {
171
    if (childSwitchLb != null) {
1✔
172
      childSwitchLb.requestConnection();
1✔
173
    }
174
  }
1✔
175

176
  @Override
177
  public void shutdown() {
178
    if (dropStats != null) {
1✔
179
      dropStats.release();
1✔
180
    }
181
    if (childSwitchLb != null) {
1✔
182
      childSwitchLb.shutdown();
1✔
183
      if (childLbHelper != null) {
1✔
184
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
185
        childLbHelper = null;
1✔
186
      }
187
    }
188
    if (xdsClient != null) {
1✔
189
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
190
    }
191
  }
1✔
192

193
  /**
194
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
195
   * or requests to endpoints in the cluster.
196
   */
197
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
198
    private final AtomicLong inFlights;
199
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
200
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
201
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
202
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
203
    @Nullable
204
    private SslContextProviderSupplier sslContextProviderSupplier;
205
    private Map<String, Struct> filterMetadata = ImmutableMap.of();
1✔
206
    @Nullable
207
    private final ServerInfo lrsServerInfo;
208

209
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
210
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
211
      this.lrsServerInfo = lrsServerInfo;
1✔
212
    }
1✔
213

214
    @Override
215
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
216
      currentState = newState;
1✔
217
      currentPicker =  newPicker;
1✔
218
      SubchannelPicker picker = new RequestLimitingSubchannelPicker(
1✔
219
          newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
220
      delegate().updateBalancingState(newState, picker);
1✔
221
    }
1✔
222

223
    @Override
224
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
225
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
226
      // This value for  ClusterLocality is not recommended for general use.
227
      // Currently, we extract locality data from the first address, even before the subchannel is
228
      // READY.
229
      // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
230
      // might return the subchannel before it is READY. Typically, we wouldn't report load for such
231
      // selections because the channel will disregard the chosen (not-ready) subchannel.
232
      // However, we needed to ensure this case is handled.
233
      ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
1✔
234
          args.getAddresses().get(0).getAttributes());
1✔
235
      AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
1✔
236
          clusterLocality);
237
      Attributes.Builder attrsBuilder = args.getAttributes().toBuilder()
1✔
238
          .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference);
1✔
239
      if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)) {
1✔
240
        String hostname = args.getAddresses().get(0).getAttributes()
1✔
241
            .get(XdsAttributes.ATTR_ADDRESS_NAME);
1✔
242
        if (hostname != null) {
1✔
243
          attrsBuilder.set(XdsAttributes.ATTR_ADDRESS_NAME, hostname);
1✔
244
        }
245
      }
246
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
1✔
247
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
248

249
      return new ForwardingSubchannel() {
1✔
250
        @Override
251
        public void start(SubchannelStateListener listener) {
252
          delegate().start(new SubchannelStateListener() {
1✔
253
            @Override
254
            public void onSubchannelState(ConnectivityStateInfo newState) {
255
              // Do nothing if LB has been shutdown
256
              if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
1✔
257
                // Get locality based on the connected address attributes
258
                ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
1✔
259
                    subchannel.getConnectedAddressAttributes());
1✔
260
                ClusterLocality oldClusterLocality = localityAtomicReference
1✔
261
                    .getAndSet(updatedClusterLocality);
1✔
262
                oldClusterLocality.release();
1✔
263
              }
264
              listener.onSubchannelState(newState);
1✔
265
            }
1✔
266
          });
267
        }
1✔
268

269
        @Override
270
        public void shutdown() {
271
          localityAtomicReference.get().release();
1✔
272
          delegate().shutdown();
1✔
273
        }
1✔
274

275
        @Override
276
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
277
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
278
        }
1✔
279

280
        @Override
281
        protected Subchannel delegate() {
282
          return subchannel;
1✔
283
        }
284
      };
285
    }
286

287
    private List<EquivalentAddressGroup> withAdditionalAttributes(
288
        List<EquivalentAddressGroup> addresses) {
289
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
290
      for (EquivalentAddressGroup eag : addresses) {
1✔
291
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
292
            XdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
293
        if (sslContextProviderSupplier != null) {
1✔
294
          attrBuilder.set(
1✔
295
              SecurityProtocolNegotiators.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
296
              sslContextProviderSupplier);
297
        }
298
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
299
      }
1✔
300
      return newAddresses;
1✔
301
    }
302

303
    private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
304
      Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY);
1✔
305
      String localityName = addressAttributes.get(XdsAttributes.ATTR_LOCALITY_NAME);
1✔
306

307
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
308
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
309
      // In case of not (which really shouldn't), loads are aggregated under an empty
310
      // locality.
311
      if (locality == null) {
1✔
312
        locality = Locality.create("", "", "");
×
313
        localityName = "";
×
314
      }
315

316
      final ClusterLocalityStats localityStats =
317
          (lrsServerInfo == null)
1✔
318
              ? null
1✔
319
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
320
                  edsServiceName, locality);
1✔
321

322
      return new ClusterLocality(localityStats, localityName);
1✔
323
    }
324

325
    @Override
326
    protected Helper delegate()  {
327
      return helper;
1✔
328
    }
329

330
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
331
      if (!dropPolicies.equals(dropOverloads)) {
1✔
332
        dropPolicies = dropOverloads;
1✔
333
        updateBalancingState(currentState, currentPicker);
1✔
334
      }
335
    }
1✔
336

337
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
338
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
339
        return;
×
340
      }
341
      this.maxConcurrentRequests =
1✔
342
          maxConcurrentRequests != null
1✔
343
              ? maxConcurrentRequests
1✔
344
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
345
      updateBalancingState(currentState, currentPicker);
1✔
346
    }
1✔
347

348
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
349
      UpstreamTlsContext currentTlsContext =
350
          sslContextProviderSupplier != null
1✔
351
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
352
              : null;
1✔
353
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
354
        return;
1✔
355
      }
356
      if (sslContextProviderSupplier != null) {
1✔
357
        sslContextProviderSupplier.close();
1✔
358
      }
359
      sslContextProviderSupplier =
1✔
360
          tlsContext != null
1✔
361
              ? new SslContextProviderSupplier(tlsContext,
1✔
362
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
363
              : null;
1✔
364
    }
1✔
365

366
    private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
367
      this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
368
    }
1✔
369

370
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
371
      private final SubchannelPicker delegate;
372
      private final List<DropOverload> dropPolicies;
373
      private final long maxConcurrentRequests;
374
      private final Map<String, Struct> filterMetadata;
375

376
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
377
          List<DropOverload> dropPolicies, long maxConcurrentRequests,
378
          Map<String, Struct> filterMetadata) {
1✔
379
        this.delegate = delegate;
1✔
380
        this.dropPolicies = dropPolicies;
1✔
381
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
382
        this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
1✔
383
      }
1✔
384

385
      @Override
386
      public PickResult pickSubchannel(PickSubchannelArgs args) {
387
        args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
1✔
388
            .accept(filterMetadata);
1✔
389
        args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.backend_service", cluster);
1✔
390
        for (DropOverload dropOverload : dropPolicies) {
1✔
391
          int rand = random.nextInt(1_000_000);
1✔
392
          if (rand < dropOverload.dropsPerMillion()) {
1✔
393
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
394
                dropOverload.category());
1✔
395
            if (dropStats != null) {
1✔
396
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
397
            }
398
            return PickResult.withDrop(
1✔
399
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
400
          }
401
        }
1✔
402
        PickResult result = delegate.pickSubchannel(args);
1✔
403
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
404
          if (enableCircuitBreaking) {
1✔
405
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
406
              if (dropStats != null) {
1✔
407
                dropStats.recordDroppedRequest();
1✔
408
              }
409
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
410
                  String.format(Locale.US, "Cluster max concurrent requests limit of %d exceeded",
1✔
411
                      maxConcurrentRequests)));
1✔
412
            }
413
          }
414
          final AtomicReference<ClusterLocality> clusterLocality =
1✔
415
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
1✔
416

417
          if (clusterLocality != null) {
1✔
418
            ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
1✔
419
            if (stats != null) {
1✔
420
              String localityName =
1✔
421
                  result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
1✔
422
                      .getClusterLocalityName();
1✔
423
              args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
424

425
              ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
426
                  stats, inFlights, result.getStreamTracerFactory());
1✔
427
              ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
428
                  .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
429
              result = PickResult.withSubchannel(result.getSubchannel(),
1✔
430
                  orcaTracerFactory);
431
            }
432
          }
433
          if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
1✔
434
              && args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY)) {
1✔
435
            result = PickResult.withSubchannel(result.getSubchannel(),
1✔
436
                result.getStreamTracerFactory(),
1✔
437
                result.getSubchannel().getAttributes().get(
1✔
438
                    XdsAttributes.ATTR_ADDRESS_NAME));
439
          }
440
        }
441
        return result;
1✔
442
      }
443

444
      @Override
445
      public String toString() {
446
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
447
      }
448
    }
449
  }
450

451
  private static final class CountingStreamTracerFactory extends
452
      ClientStreamTracer.Factory {
453
    private final ClusterLocalityStats stats;
454
    private final AtomicLong inFlights;
455
    @Nullable
456
    private final ClientStreamTracer.Factory delegate;
457

458
    private CountingStreamTracerFactory(
459
        ClusterLocalityStats stats, AtomicLong inFlights,
460
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
461
      this.stats = checkNotNull(stats, "stats");
1✔
462
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
463
      this.delegate = delegate;
1✔
464
    }
1✔
465

466
    @Override
467
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
468
      stats.recordCallStarted();
1✔
469
      inFlights.incrementAndGet();
1✔
470
      if (delegate == null) {
1✔
471
        return new ClientStreamTracer() {
1✔
472
          @Override
473
          public void streamClosed(Status status) {
474
            stats.recordCallFinished(status);
1✔
475
            inFlights.decrementAndGet();
1✔
476
          }
1✔
477
        };
478
      }
479
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
480
      return new ForwardingClientStreamTracer() {
×
481
        @Override
482
        protected ClientStreamTracer delegate() {
483
          return delegatedTracer;
×
484
        }
485

486
        @Override
487
        public void streamClosed(Status status) {
488
          stats.recordCallFinished(status);
×
489
          inFlights.decrementAndGet();
×
490
          delegate().streamClosed(status);
×
491
        }
×
492
      };
493
    }
494
  }
495

496
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
497

498
    private final ClusterLocalityStats stats;
499

500
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
501
      this.stats = checkNotNull(stats, "stats");
1✔
502
    }
1✔
503

504
    /**
505
     * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
506
     * included in the snapshot for the LRS report sent to the LRS server.
507
     */
508
    @Override
509
    public void onLoadReport(MetricReport report) {
510
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
511
    }
1✔
512
  }
513

514
  /**
515
   * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
516
   */
517
  static final class ClusterLocality {
518
    private final ClusterLocalityStats clusterLocalityStats;
519
    private final String clusterLocalityName;
520

521
    @VisibleForTesting
522
    ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
1✔
523
      this.clusterLocalityStats = localityStats;
1✔
524
      this.clusterLocalityName = localityName;
1✔
525
    }
1✔
526

527
    ClusterLocalityStats getClusterLocalityStats() {
528
      return clusterLocalityStats;
1✔
529
    }
530

531
    String getClusterLocalityName() {
532
      return clusterLocalityName;
1✔
533
    }
534

535
    @VisibleForTesting
536
    void release() {
537
      if (clusterLocalityStats != null) {
1✔
538
        clusterLocalityStats.release();
1✔
539
      }
540
    }
1✔
541
  }
542
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc