• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19619

04 Jan 2025 12:01AM UTC coverage: 88.558% (+0.008%) from 88.55%
#19619

push

github

ejona86
xds: Avoid depending on io.grpc.xds.Internal* classes

Internal* classes should generally be accessors that are used outside of
the package/project. Only one attribute was used outside of xds, so
leave only that one attribute in InternalXdsAttributes. One attribute
was used by the internal.security package, so move the definition to the
same package to reduce the circular dependencies.

33621 of 37965 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.83
/../xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.base.Strings;
24
import com.google.common.collect.ImmutableMap;
25
import com.google.protobuf.Struct;
26
import io.grpc.Attributes;
27
import io.grpc.ClientStreamTracer;
28
import io.grpc.ClientStreamTracer.StreamInfo;
29
import io.grpc.ConnectivityState;
30
import io.grpc.ConnectivityStateInfo;
31
import io.grpc.EquivalentAddressGroup;
32
import io.grpc.InternalLogId;
33
import io.grpc.LoadBalancer;
34
import io.grpc.Metadata;
35
import io.grpc.Status;
36
import io.grpc.internal.ForwardingClientStreamTracer;
37
import io.grpc.internal.GrpcUtil;
38
import io.grpc.internal.ObjectPool;
39
import io.grpc.services.MetricReport;
40
import io.grpc.util.ForwardingLoadBalancerHelper;
41
import io.grpc.util.ForwardingSubchannel;
42
import io.grpc.util.GracefulSwitchLoadBalancer;
43
import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig;
44
import io.grpc.xds.Endpoints.DropOverload;
45
import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext;
46
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
47
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
48
import io.grpc.xds.client.Bootstrapper.ServerInfo;
49
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats;
50
import io.grpc.xds.client.LoadStatsManager2.ClusterLocalityStats;
51
import io.grpc.xds.client.Locality;
52
import io.grpc.xds.client.XdsClient;
53
import io.grpc.xds.client.XdsLogger;
54
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
55
import io.grpc.xds.internal.security.SecurityProtocolNegotiators;
56
import io.grpc.xds.internal.security.SslContextProviderSupplier;
57
import io.grpc.xds.orca.OrcaPerRequestUtil;
58
import io.grpc.xds.orca.OrcaPerRequestUtil.OrcaPerRequestReportListener;
59
import java.util.ArrayList;
60
import java.util.Collections;
61
import java.util.List;
62
import java.util.Map;
63
import java.util.Objects;
64
import java.util.concurrent.atomic.AtomicLong;
65
import java.util.concurrent.atomic.AtomicReference;
66
import javax.annotation.Nullable;
67

68
/**
69
 * Load balancer for cluster_impl_experimental LB policy. This LB policy is the child LB policy of
70
 * the priority_experimental LB policy and the parent LB policy of the weighted_target_experimental
71
 * LB policy in the xDS load balancing hierarchy. This LB policy applies cluster-level
72
 * configurations to requests sent to the corresponding cluster, such as drop policies, circuit
73
 * breakers.
74
 */
75
final class ClusterImplLoadBalancer extends LoadBalancer {
76

77
  @VisibleForTesting
78
  static final long DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS = 1024L;
79
  @VisibleForTesting
80
  static boolean enableCircuitBreaking =
1✔
81
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"))
1✔
82
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING"));
1✔
83

84
  private static final Attributes.Key<AtomicReference<ClusterLocality>> ATTR_CLUSTER_LOCALITY =
1✔
85
      Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocality");
1✔
86

87
  private final XdsLogger logger;
88
  private final Helper helper;
89
  private final ThreadSafeRandom random;
90
  // The following fields are effectively final.
91
  private String cluster;
92
  @Nullable
93
  private String edsServiceName;
94
  private ObjectPool<XdsClient> xdsClientPool;
95
  private XdsClient xdsClient;
96
  private CallCounterProvider callCounterProvider;
97
  private ClusterDropStats dropStats;
98
  private ClusterImplLbHelper childLbHelper;
99
  private GracefulSwitchLoadBalancer childSwitchLb;
100

101
  ClusterImplLoadBalancer(Helper helper) {
102
    this(helper, ThreadSafeRandomImpl.instance);
1✔
103
  }
1✔
104

105
  ClusterImplLoadBalancer(Helper helper, ThreadSafeRandom random) {
1✔
106
    this.helper = checkNotNull(helper, "helper");
1✔
107
    this.random = checkNotNull(random, "random");
1✔
108
    InternalLogId logId = InternalLogId.allocate("cluster-impl-lb", helper.getAuthority());
1✔
109
    logger = XdsLogger.withLogId(logId);
1✔
110
    logger.log(XdsLogLevel.INFO, "Created");
1✔
111
  }
1✔
112

113
  @Override
114
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
115
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
116
    Attributes attributes = resolvedAddresses.getAttributes();
1✔
117
    if (xdsClientPool == null) {
1✔
118
      xdsClientPool = attributes.get(XdsAttributes.XDS_CLIENT_POOL);
1✔
119
      assert xdsClientPool != null;
1✔
120
      xdsClient = xdsClientPool.getObject();
1✔
121
    }
122
    if (callCounterProvider == null) {
1✔
123
      callCounterProvider = attributes.get(XdsAttributes.CALL_COUNTER_PROVIDER);
1✔
124
    }
125

126
    ClusterImplConfig config =
1✔
127
        (ClusterImplConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
128
    if (config == null) {
1✔
129
      return Status.INTERNAL.withDescription("No cluster configuration found");
×
130
    }
131

132
    if (cluster == null) {
1✔
133
      cluster = config.cluster;
1✔
134
      edsServiceName = config.edsServiceName;
1✔
135
      childLbHelper = new ClusterImplLbHelper(
1✔
136
          callCounterProvider.getOrCreate(config.cluster, config.edsServiceName),
1✔
137
          config.lrsServerInfo);
138
      childSwitchLb = new GracefulSwitchLoadBalancer(childLbHelper);
1✔
139
      // Assume load report server does not change throughout cluster lifetime.
140
      if (config.lrsServerInfo != null) {
1✔
141
        dropStats = xdsClient.addClusterDropStats(config.lrsServerInfo, cluster, edsServiceName);
1✔
142
      }
143
    }
144

145
    childLbHelper.updateDropPolicies(config.dropCategories);
1✔
146
    childLbHelper.updateMaxConcurrentRequests(config.maxConcurrentRequests);
1✔
147
    childLbHelper.updateSslContextProviderSupplier(config.tlsContext);
1✔
148
    childLbHelper.updateFilterMetadata(config.filterMetadata);
1✔
149

150
    childSwitchLb.handleResolvedAddresses(
1✔
151
        resolvedAddresses.toBuilder()
1✔
152
            .setAttributes(attributes)
1✔
153
            .setLoadBalancingPolicyConfig(config.childConfig)
1✔
154
            .build());
1✔
155
    return Status.OK;
1✔
156
  }
157

158
  @Override
159
  public void handleNameResolutionError(Status error) {
160
    if (childSwitchLb != null) {
1✔
161
      childSwitchLb.handleNameResolutionError(error);
1✔
162
    } else {
163
      helper.updateBalancingState(
1✔
164
          ConnectivityState.TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
165
    }
166
  }
1✔
167

168
  @Override
169
  public void requestConnection() {
170
    if (childSwitchLb != null) {
1✔
171
      childSwitchLb.requestConnection();
1✔
172
    }
173
  }
1✔
174

175
  @Override
176
  public void shutdown() {
177
    if (dropStats != null) {
1✔
178
      dropStats.release();
1✔
179
    }
180
    if (childSwitchLb != null) {
1✔
181
      childSwitchLb.shutdown();
1✔
182
      if (childLbHelper != null) {
1✔
183
        childLbHelper.updateSslContextProviderSupplier(null);
1✔
184
        childLbHelper = null;
1✔
185
      }
186
    }
187
    if (xdsClient != null) {
1✔
188
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
189
    }
190
  }
1✔
191

192
  /**
193
   * A decorated {@link LoadBalancer.Helper} that applies configurations for connections
194
   * or requests to endpoints in the cluster.
195
   */
196
  private final class ClusterImplLbHelper extends ForwardingLoadBalancerHelper {
197
    private final AtomicLong inFlights;
198
    private ConnectivityState currentState = ConnectivityState.IDLE;
1✔
199
    private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
200
    private List<DropOverload> dropPolicies = Collections.emptyList();
1✔
201
    private long maxConcurrentRequests = DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
202
    @Nullable
203
    private SslContextProviderSupplier sslContextProviderSupplier;
204
    private Map<String, Struct> filterMetadata = ImmutableMap.of();
1✔
205
    @Nullable
206
    private final ServerInfo lrsServerInfo;
207

208
    private ClusterImplLbHelper(AtomicLong inFlights, @Nullable ServerInfo lrsServerInfo) {
1✔
209
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
210
      this.lrsServerInfo = lrsServerInfo;
1✔
211
    }
1✔
212

213
    @Override
214
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
215
      currentState = newState;
1✔
216
      currentPicker =  newPicker;
1✔
217
      SubchannelPicker picker = new RequestLimitingSubchannelPicker(
1✔
218
          newPicker, dropPolicies, maxConcurrentRequests, filterMetadata);
219
      delegate().updateBalancingState(newState, picker);
1✔
220
    }
1✔
221

222
    @Override
223
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
224
      List<EquivalentAddressGroup> addresses = withAdditionalAttributes(args.getAddresses());
1✔
225
      // This value for  ClusterLocality is not recommended for general use.
226
      // Currently, we extract locality data from the first address, even before the subchannel is
227
      // READY.
228
      // This is mainly to accommodate scenarios where a Load Balancing API (like "pick first")
229
      // might return the subchannel before it is READY. Typically, we wouldn't report load for such
230
      // selections because the channel will disregard the chosen (not-ready) subchannel.
231
      // However, we needed to ensure this case is handled.
232
      ClusterLocality clusterLocality = createClusterLocalityFromAttributes(
1✔
233
          args.getAddresses().get(0).getAttributes());
1✔
234
      AtomicReference<ClusterLocality> localityAtomicReference = new AtomicReference<>(
1✔
235
          clusterLocality);
236
      Attributes.Builder attrsBuilder = args.getAttributes().toBuilder()
1✔
237
          .set(ATTR_CLUSTER_LOCALITY, localityAtomicReference);
1✔
238
      if (GrpcUtil.getFlag("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)) {
1✔
239
        String hostname = args.getAddresses().get(0).getAttributes()
1✔
240
            .get(XdsAttributes.ATTR_ADDRESS_NAME);
1✔
241
        if (hostname != null) {
1✔
242
          attrsBuilder.set(XdsAttributes.ATTR_ADDRESS_NAME, hostname);
1✔
243
        }
244
      }
245
      args = args.toBuilder().setAddresses(addresses).setAttributes(attrsBuilder.build()).build();
1✔
246
      final Subchannel subchannel = delegate().createSubchannel(args);
1✔
247

248
      return new ForwardingSubchannel() {
1✔
249
        @Override
250
        public void start(SubchannelStateListener listener) {
251
          delegate().start(new SubchannelStateListener() {
1✔
252
            @Override
253
            public void onSubchannelState(ConnectivityStateInfo newState) {
254
              // Do nothing if LB has been shutdown
255
              if (xdsClient != null && newState.getState().equals(ConnectivityState.READY)) {
1✔
256
                // Get locality based on the connected address attributes
257
                ClusterLocality updatedClusterLocality = createClusterLocalityFromAttributes(
1✔
258
                    subchannel.getConnectedAddressAttributes());
1✔
259
                ClusterLocality oldClusterLocality = localityAtomicReference
1✔
260
                    .getAndSet(updatedClusterLocality);
1✔
261
                oldClusterLocality.release();
1✔
262
              }
263
              listener.onSubchannelState(newState);
1✔
264
            }
1✔
265
          });
266
        }
1✔
267

268
        @Override
269
        public void shutdown() {
270
          localityAtomicReference.get().release();
1✔
271
          delegate().shutdown();
1✔
272
        }
1✔
273

274
        @Override
275
        public void updateAddresses(List<EquivalentAddressGroup> addresses) {
276
          delegate().updateAddresses(withAdditionalAttributes(addresses));
1✔
277
        }
1✔
278

279
        @Override
280
        protected Subchannel delegate() {
281
          return subchannel;
1✔
282
        }
283
      };
284
    }
285

286
    private List<EquivalentAddressGroup> withAdditionalAttributes(
287
        List<EquivalentAddressGroup> addresses) {
288
      List<EquivalentAddressGroup> newAddresses = new ArrayList<>();
1✔
289
      for (EquivalentAddressGroup eag : addresses) {
1✔
290
        Attributes.Builder attrBuilder = eag.getAttributes().toBuilder().set(
1✔
291
            XdsAttributes.ATTR_CLUSTER_NAME, cluster);
1✔
292
        if (sslContextProviderSupplier != null) {
1✔
293
          attrBuilder.set(
1✔
294
              SecurityProtocolNegotiators.ATTR_SSL_CONTEXT_PROVIDER_SUPPLIER,
295
              sslContextProviderSupplier);
296
        }
297
        newAddresses.add(new EquivalentAddressGroup(eag.getAddresses(), attrBuilder.build()));
1✔
298
      }
1✔
299
      return newAddresses;
1✔
300
    }
301

302
    private ClusterLocality createClusterLocalityFromAttributes(Attributes addressAttributes) {
303
      Locality locality = addressAttributes.get(XdsAttributes.ATTR_LOCALITY);
1✔
304
      String localityName = addressAttributes.get(XdsAttributes.ATTR_LOCALITY_NAME);
1✔
305

306
      // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain
307
      // attributes with its locality, including endpoints in LOGICAL_DNS clusters.
308
      // In case of not (which really shouldn't), loads are aggregated under an empty
309
      // locality.
310
      if (locality == null) {
1✔
311
        locality = Locality.create("", "", "");
×
312
        localityName = "";
×
313
      }
314

315
      final ClusterLocalityStats localityStats =
316
          (lrsServerInfo == null)
1✔
317
              ? null
1✔
318
              : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster,
1✔
319
                  edsServiceName, locality);
1✔
320

321
      return new ClusterLocality(localityStats, localityName);
1✔
322
    }
323

324
    @Override
325
    protected Helper delegate()  {
326
      return helper;
1✔
327
    }
328

329
    private void updateDropPolicies(List<DropOverload> dropOverloads) {
330
      if (!dropPolicies.equals(dropOverloads)) {
1✔
331
        dropPolicies = dropOverloads;
1✔
332
        updateBalancingState(currentState, currentPicker);
1✔
333
      }
334
    }
1✔
335

336
    private void updateMaxConcurrentRequests(@Nullable Long maxConcurrentRequests) {
337
      if (Objects.equals(this.maxConcurrentRequests, maxConcurrentRequests)) {
1✔
338
        return;
×
339
      }
340
      this.maxConcurrentRequests =
1✔
341
          maxConcurrentRequests != null
1✔
342
              ? maxConcurrentRequests
1✔
343
              : DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS;
1✔
344
      updateBalancingState(currentState, currentPicker);
1✔
345
    }
1✔
346

347
    private void updateSslContextProviderSupplier(@Nullable UpstreamTlsContext tlsContext) {
348
      UpstreamTlsContext currentTlsContext =
349
          sslContextProviderSupplier != null
1✔
350
              ? (UpstreamTlsContext)sslContextProviderSupplier.getTlsContext()
1✔
351
              : null;
1✔
352
      if (Objects.equals(currentTlsContext,  tlsContext)) {
1✔
353
        return;
1✔
354
      }
355
      if (sslContextProviderSupplier != null) {
1✔
356
        sslContextProviderSupplier.close();
1✔
357
      }
358
      sslContextProviderSupplier =
1✔
359
          tlsContext != null
1✔
360
              ? new SslContextProviderSupplier(tlsContext,
1✔
361
                                               (TlsContextManager) xdsClient.getSecurityConfig())
1✔
362
              : null;
1✔
363
    }
1✔
364

365
    private void updateFilterMetadata(Map<String, Struct> filterMetadata) {
366
      this.filterMetadata = ImmutableMap.copyOf(filterMetadata);
1✔
367
    }
1✔
368

369
    private class RequestLimitingSubchannelPicker extends SubchannelPicker {
370
      private final SubchannelPicker delegate;
371
      private final List<DropOverload> dropPolicies;
372
      private final long maxConcurrentRequests;
373
      private final Map<String, Struct> filterMetadata;
374

375
      private RequestLimitingSubchannelPicker(SubchannelPicker delegate,
376
          List<DropOverload> dropPolicies, long maxConcurrentRequests,
377
          Map<String, Struct> filterMetadata) {
1✔
378
        this.delegate = delegate;
1✔
379
        this.dropPolicies = dropPolicies;
1✔
380
        this.maxConcurrentRequests = maxConcurrentRequests;
1✔
381
        this.filterMetadata = checkNotNull(filterMetadata, "filterMetadata");
1✔
382
      }
1✔
383

384
      @Override
385
      public PickResult pickSubchannel(PickSubchannelArgs args) {
386
        args.getCallOptions().getOption(ClusterImplLoadBalancerProvider.FILTER_METADATA_CONSUMER)
1✔
387
            .accept(filterMetadata);
1✔
388
        for (DropOverload dropOverload : dropPolicies) {
1✔
389
          int rand = random.nextInt(1_000_000);
1✔
390
          if (rand < dropOverload.dropsPerMillion()) {
1✔
391
            logger.log(XdsLogLevel.INFO, "Drop request with category: {0}",
1✔
392
                dropOverload.category());
1✔
393
            if (dropStats != null) {
1✔
394
              dropStats.recordDroppedRequest(dropOverload.category());
1✔
395
            }
396
            return PickResult.withDrop(
1✔
397
                Status.UNAVAILABLE.withDescription("Dropped: " + dropOverload.category()));
1✔
398
          }
399
        }
1✔
400
        PickResult result = delegate.pickSubchannel(args);
1✔
401
        if (result.getStatus().isOk() && result.getSubchannel() != null) {
1✔
402
          if (enableCircuitBreaking) {
1✔
403
            if (inFlights.get() >= maxConcurrentRequests) {
1✔
404
              if (dropStats != null) {
1✔
405
                dropStats.recordDroppedRequest();
1✔
406
              }
407
              return PickResult.withDrop(Status.UNAVAILABLE.withDescription(
1✔
408
                  "Cluster max concurrent requests limit exceeded"));
409
            }
410
          }
411
          final AtomicReference<ClusterLocality> clusterLocality =
1✔
412
              result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY);
1✔
413

414
          if (clusterLocality != null) {
1✔
415
            ClusterLocalityStats stats = clusterLocality.get().getClusterLocalityStats();
1✔
416
            if (stats != null) {
1✔
417
              String localityName =
1✔
418
                  result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY).get()
1✔
419
                      .getClusterLocalityName();
1✔
420
              args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName);
1✔
421

422
              ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory(
1✔
423
                  stats, inFlights, result.getStreamTracerFactory());
1✔
424
              ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance()
1✔
425
                  .newOrcaClientStreamTracerFactory(tracerFactory, new OrcaPerRpcListener(stats));
1✔
426
              result = PickResult.withSubchannel(result.getSubchannel(),
1✔
427
                  orcaTracerFactory);
428
            }
429
          }
430
          if (args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY) != null
1✔
431
              && args.getCallOptions().getOption(XdsNameResolver.AUTO_HOST_REWRITE_KEY)) {
1✔
432
            result = PickResult.withSubchannel(result.getSubchannel(),
1✔
433
                result.getStreamTracerFactory(),
1✔
434
                result.getSubchannel().getAttributes().get(
1✔
435
                    XdsAttributes.ATTR_ADDRESS_NAME));
436
          }
437
        }
438
        return result;
1✔
439
      }
440

441
      @Override
442
      public String toString() {
443
        return MoreObjects.toStringHelper(this).add("delegate", delegate).toString();
×
444
      }
445
    }
446
  }
447

448
  private static final class CountingStreamTracerFactory extends
449
      ClientStreamTracer.Factory {
450
    private final ClusterLocalityStats stats;
451
    private final AtomicLong inFlights;
452
    @Nullable
453
    private final ClientStreamTracer.Factory delegate;
454

455
    private CountingStreamTracerFactory(
456
        ClusterLocalityStats stats, AtomicLong inFlights,
457
        @Nullable ClientStreamTracer.Factory delegate) {
1✔
458
      this.stats = checkNotNull(stats, "stats");
1✔
459
      this.inFlights = checkNotNull(inFlights, "inFlights");
1✔
460
      this.delegate = delegate;
1✔
461
    }
1✔
462

463
    @Override
464
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
465
      stats.recordCallStarted();
1✔
466
      inFlights.incrementAndGet();
1✔
467
      if (delegate == null) {
1✔
468
        return new ClientStreamTracer() {
1✔
469
          @Override
470
          public void streamClosed(Status status) {
471
            stats.recordCallFinished(status);
1✔
472
            inFlights.decrementAndGet();
1✔
473
          }
1✔
474
        };
475
      }
476
      final ClientStreamTracer delegatedTracer = delegate.newClientStreamTracer(info, headers);
×
477
      return new ForwardingClientStreamTracer() {
×
478
        @Override
479
        protected ClientStreamTracer delegate() {
480
          return delegatedTracer;
×
481
        }
482

483
        @Override
484
        public void streamClosed(Status status) {
485
          stats.recordCallFinished(status);
×
486
          inFlights.decrementAndGet();
×
487
          delegate().streamClosed(status);
×
488
        }
×
489
      };
490
    }
491
  }
492

493
  private static final class OrcaPerRpcListener implements OrcaPerRequestReportListener {
494

495
    private final ClusterLocalityStats stats;
496

497
    private OrcaPerRpcListener(ClusterLocalityStats stats) {
1✔
498
      this.stats = checkNotNull(stats, "stats");
1✔
499
    }
1✔
500

501
    /**
502
     * Copies {@link MetricReport#getNamedMetrics()} to {@link ClusterLocalityStats} such that it is
503
     * included in the snapshot for the LRS report sent to the LRS server.
504
     */
505
    @Override
506
    public void onLoadReport(MetricReport report) {
507
      stats.recordBackendLoadMetricStats(report.getNamedMetrics());
1✔
508
    }
1✔
509
  }
510

511
  /**
512
   * Represents the {@link ClusterLocalityStats} and network locality name of a cluster.
513
   */
514
  static final class ClusterLocality {
515
    private final ClusterLocalityStats clusterLocalityStats;
516
    private final String clusterLocalityName;
517

518
    @VisibleForTesting
519
    ClusterLocality(ClusterLocalityStats localityStats, String localityName) {
1✔
520
      this.clusterLocalityStats = localityStats;
1✔
521
      this.clusterLocalityName = localityName;
1✔
522
    }
1✔
523

524
    ClusterLocalityStats getClusterLocalityStats() {
525
      return clusterLocalityStats;
1✔
526
    }
527

528
    String getClusterLocalityName() {
529
      return clusterLocalityName;
1✔
530
    }
531

532
    @VisibleForTesting
533
    void release() {
534
      if (clusterLocalityStats != null) {
1✔
535
        clusterLocalityStats.release();
1✔
536
      }
537
    }
1✔
538
  }
539
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc