• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20227

27 Mar 2026 03:13PM UTC coverage: 88.702% (-0.005%) from 88.707%
#20227

push

github

web-flow
Add custom label for per-RPC metrics

Implements gRFC A108.

https://github.com/grpc/proposal/blob/master/A108-otel-custom-per-call-label.md

35518 of 40042 relevant lines covered (88.7%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.68
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import com.google.errorprone.annotations.CheckReturnValue;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ConnectivityState;
36
import io.grpc.Grpc;
37
import io.grpc.LoadBalancer.Helper;
38
import io.grpc.LoadBalancer.PickResult;
39
import io.grpc.LoadBalancer.PickSubchannelArgs;
40
import io.grpc.LoadBalancer.ResolvedAddresses;
41
import io.grpc.LoadBalancer.SubchannelPicker;
42
import io.grpc.LongCounterMetricInstrument;
43
import io.grpc.LongGaugeMetricInstrument;
44
import io.grpc.ManagedChannel;
45
import io.grpc.ManagedChannelBuilder;
46
import io.grpc.Metadata;
47
import io.grpc.MetricInstrumentRegistry;
48
import io.grpc.MetricRecorder.BatchCallback;
49
import io.grpc.MetricRecorder.BatchRecorder;
50
import io.grpc.MetricRecorder.Registration;
51
import io.grpc.Status;
52
import io.grpc.internal.BackoffPolicy;
53
import io.grpc.internal.ExponentialBackoffPolicy;
54
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
55
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
56
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
57
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
58
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
59
import io.grpc.rls.LruCache.EvictionListener;
60
import io.grpc.rls.LruCache.EvictionType;
61
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
62
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
63
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
64
import io.grpc.rls.RlsProtoData.RouteLookupRequestKey;
65
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
66
import io.grpc.stub.StreamObserver;
67
import io.grpc.util.ForwardingLoadBalancerHelper;
68
import java.net.URI;
69
import java.net.URISyntaxException;
70
import java.util.Arrays;
71
import java.util.Collections;
72
import java.util.HashMap;
73
import java.util.List;
74
import java.util.Map;
75
import java.util.UUID;
76
import java.util.concurrent.Future;
77
import java.util.concurrent.ScheduledExecutorService;
78
import java.util.concurrent.TimeUnit;
79
import javax.annotation.Nullable;
80
import javax.annotation.concurrent.ThreadSafe;
81

82
/**
83
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
84
 * routing by fetching the decision from route lookup server. Every single request is routed by
85
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
86
 */
87
@ThreadSafe
88
final class CachingRlsLbClient {
89

90
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
91
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
92
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
93
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
94
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
95
  public static final int BYTES_PER_CHAR = 2;
96
  public static final int STRING_OVERHEAD_BYTES = 38;
97
  /** Minimum bytes for a Java Object. */
98
  public static final int OBJ_OVERHEAD_B = 16;
99

100
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
102
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
103
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
104
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
105
  private final Registration gaugeRegistration;
106
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
107

108
  // All cache status changes (pending, backoff, success) must be under this lock
109
  private final Object lock = new Object();
1✔
110
  // LRU cache based on access order (BACKOFF and actual data will be here)
111
  @GuardedBy("lock")
112
  private final RlsAsyncLruCache linkedHashLruCache;
113
  private final Future<?> periodicCleaner;
114
  // any RPC on the fly will cached in this map
115
  @GuardedBy("lock")
1✔
116
  private final Map<RouteLookupRequestKey, PendingCacheEntry> pendingCallCache = new HashMap<>();
117

118
  private final ScheduledExecutorService scheduledExecutorService;
119
  private final Ticker ticker;
120
  private final Throttler throttler;
121

122
  private final LbPolicyConfiguration lbPolicyConfig;
123
  private final BackoffPolicy.Provider backoffProvider;
124
  private final long maxAgeNanos;
125
  private final long staleAgeNanos;
126
  private final long callTimeoutNanos;
127

128
  private final RlsLbHelper helper;
129
  private final ManagedChannel rlsChannel;
130
  private final RouteLookupServiceStub rlsStub;
131
  private final RlsPicker rlsPicker;
132
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
133
  @GuardedBy("lock")
134
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
135
  private final ChannelLogger logger;
136
  private final ChildPolicyWrapper fallbackChildPolicyWrapper;
137

138
  static {
139
    MetricInstrumentRegistry metricInstrumentRegistry
140
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
141
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
142
        "grpc.lb.rls.default_target_picks",
143
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
144
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
145
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"),
146
        Arrays.asList("grpc.client.call.custom"),
1✔
147
        false);
148
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
149
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
150
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
151
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
152
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
153
            "grpc.lb.pick_result"),
154
        Arrays.asList("grpc.client.call.custom"),
1✔
155
        false);
156
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
157
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
158
            + "RLS channel being throttled", "{pick}",
159
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
160
        Arrays.asList("grpc.client.call.custom"), false);
1✔
161
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
162
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
163
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
164
        Collections.emptyList(), false);
1✔
165
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
166
        "EXPERIMENTAL. The current size of the RLS cache", "By",
167
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
168
        Collections.emptyList(), false);
1✔
169
  }
170

171
  private CachingRlsLbClient(Builder builder) {
1✔
172
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
173
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
174
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
175
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
176
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
177
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
178
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
179
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
180
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
181
    linkedHashLruCache =
1✔
182
        new RlsAsyncLruCache(
183
            rlsConfig.cacheSizeBytes(),
1✔
184
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
185
            ticker,
186
            helper);
187
    periodicCleaner =
1✔
188
        scheduledExecutorService.scheduleAtFixedRate(this::periodicClean, 1, 1, TimeUnit.MINUTES);
1✔
189
    logger = helper.getChannelLogger();
1✔
190
    String serverHost = null;
1✔
191
    try {
192
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
193
    } catch (URISyntaxException ignore) {
×
194
      // handled by the following null check
195
    }
1✔
196
    if (serverHost == null) {
1✔
197
      logger.log(
×
198
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
199
      serverHost = helper.getAuthority();
×
200
    }
201
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
202
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
203
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
204
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
205
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
206
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
207
    // called to impose the authority security restrictions.
208
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
209
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
210
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
211
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
212
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
213
    if (routeLookupChannelServiceConfig != null) {
1✔
214
      logger.log(
1✔
215
          ChannelLogLevel.DEBUG,
216
          "RLS channel service config: {0}",
217
          routeLookupChannelServiceConfig);
218
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
219
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
220
    }
221
    rlsChannel = rlsChannelBuilder.build();
1✔
222
    Runnable rlsServerConnectivityStateChangeHandler = new Runnable() {
1✔
223
      private boolean wasInTransientFailure;
224
      @Override
225
      public void run() {
226
        ConnectivityState currentState = rlsChannel.getState(false);
1✔
227
        if (currentState == ConnectivityState.TRANSIENT_FAILURE) {
1✔
228
          wasInTransientFailure = true;
1✔
229
        } else if (wasInTransientFailure && currentState == ConnectivityState.READY) {
1✔
230
          wasInTransientFailure = false;
1✔
231
          synchronized (lock) {
1✔
232
            boolean anyBackoffsCanceled = false;
1✔
233
            for (CacheEntry value : linkedHashLruCache.values()) {
1✔
234
              if (value instanceof BackoffCacheEntry) {
1✔
235
                if (((BackoffCacheEntry) value).scheduledFuture.cancel(false)) {
1✔
236
                  anyBackoffsCanceled = true;
1✔
237
                }
238
              }
239
            }
1✔
240
            if (anyBackoffsCanceled) {
1✔
241
              // Cache updated. updateBalancingState() to reattempt picks
242
              helper.triggerPendingRpcProcessing();
1✔
243
            }
244
          }
1✔
245
        }
246
        rlsChannel.notifyWhenStateChanged(currentState, this);
1✔
247
      }
1✔
248
    };
249
    rlsChannel.notifyWhenStateChanged(
1✔
250
        ConnectivityState.IDLE, rlsServerConnectivityStateChangeHandler);
251
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
252
    childLbResolvedAddressFactory =
1✔
253
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
254
    backoffProvider = builder.backoffProvider;
1✔
255
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
256
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
257
    refCountedChildPolicyWrapperFactory =
1✔
258
        new RefCountedChildPolicyWrapperFactory(
259
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
260
            childLbHelperProvider);
261
    // TODO(creamsoup) wait until lb is ready
262
    String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
263
    if (defaultTarget != null && !defaultTarget.isEmpty()) {
1✔
264
      fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
265
    } else {
266
      fallbackChildPolicyWrapper = null;
1✔
267
    }
268

269
    gaugeRegistration = helper.getMetricRecorder()
1✔
270
        .registerBatchCallback(new BatchCallback() {
1✔
271
          @Override
272
          public void accept(BatchRecorder recorder) {
273
            int estimatedSize;
274
            long estimatedSizeBytes;
275
            synchronized (lock) {
1✔
276
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
277
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
278
            }
1✔
279
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
280
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
281
                    metricsInstanceUuid), Collections.emptyList());
1✔
282
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
283
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
284
                    metricsInstanceUuid), Collections.emptyList());
1✔
285
          }
1✔
286
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
287

288
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
289
  }
1✔
290

291
  void init() {
292
    synchronized (lock) {
1✔
293
      refCountedChildPolicyWrapperFactory.init();
1✔
294
    }
1✔
295
  }
1✔
296

297
  Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
298
    synchronized (lock) {
1✔
299
      return refCountedChildPolicyWrapperFactory.acceptResolvedAddressFactory(
1✔
300
          childLbResolvedAddressFactory);
301
    }
302
  }
303

304
  /**
305
   * Convert the status to UNAVAILABLE and enhance the error message.
306
   * @param status status as provided by server
307
   * @param serverName Used for error description
308
   * @return Transformed status
309
   */
310
  static Status convertRlsServerStatus(Status status, String serverName) {
311
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
312
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
313
                + "RLS server returned: %s: %s",
314
            serverName, status.getCode(), status.getDescription()));
1✔
315
  }
316

317
  private void periodicClean() {
318
    synchronized (lock) {
1✔
319
      linkedHashLruCache.cleanupExpiredEntries();
1✔
320
    }
1✔
321
  }
1✔
322

323
  /** Populates async cache entry for new request. */
324
  @GuardedBy("lock")
325
  private CachedRouteLookupResponse asyncRlsCall(
326
      RouteLookupRequestKey routeLookupRequestKey, @Nullable BackoffPolicy backoffPolicy,
327
      RouteLookupRequest.Reason routeLookupReason) {
328
    if (throttler.shouldThrottle()) {
1✔
329
      logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup",
1✔
330
          routeLookupRequestKey);
331
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
332
      // on this result
333
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
334
          routeLookupRequestKey, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"),
1✔
335
          backoffPolicy));
336
    }
337
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
338
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(
1✔
339
        RouteLookupRequest.create(routeLookupRequestKey.keyMap(), routeLookupReason));
1✔
340
    logger.log(ChannelLogLevel.DEBUG,
1✔
341
        "[RLS Entry {0}] Starting RouteLookup: {1}", routeLookupRequestKey, routeLookupRequest);
342
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
343
        .routeLookup(
1✔
344
            routeLookupRequest,
345
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
346
              @Override
347
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
348
                logger.log(ChannelLogLevel.DEBUG,
1✔
349
                    "[RLS Entry {0}] RouteLookup succeeded: {1}", routeLookupRequestKey, value);
350
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
351
              }
1✔
352

353
              @Override
354
              public void onError(Throwable t) {
355
                logger.log(ChannelLogLevel.DEBUG,
1✔
356
                    "[RLS Entry {0}] RouteLookup failed: {1}", routeLookupRequestKey, t);
357
                response.setException(t);
1✔
358
                throttler.registerBackendResponse(true);
1✔
359
              }
1✔
360

361
              @Override
362
              public void onCompleted() {
363
                throttler.registerBackendResponse(false);
1✔
364
              }
1✔
365
            });
366
    return CachedRouteLookupResponse.pendingResponse(
1✔
367
        createPendingEntry(routeLookupRequestKey, response, backoffPolicy));
1✔
368
  }
369

370
  /**
371
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
372
   * cached, pending and backed-off due to error. The result remains same even if the status is
373
   * changed after the return.
374
   */
375
  @CheckReturnValue
376
  final CachedRouteLookupResponse get(final RouteLookupRequestKey routeLookupRequestKey) {
377
    synchronized (lock) {
1✔
378
      final CacheEntry cacheEntry;
379
      cacheEntry = linkedHashLruCache.read(routeLookupRequestKey);
1✔
380
      if (cacheEntry == null
1✔
381
          || (cacheEntry instanceof BackoffCacheEntry
382
          && !((BackoffCacheEntry) cacheEntry).isInBackoffPeriod())) {
1✔
383
        PendingCacheEntry pendingEntry = pendingCallCache.get(routeLookupRequestKey);
1✔
384
        if (pendingEntry != null) {
1✔
385
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
×
386
        }
387
        return asyncRlsCall(routeLookupRequestKey, cacheEntry instanceof BackoffCacheEntry
1✔
388
            ? ((BackoffCacheEntry) cacheEntry).backoffPolicy : null,
1✔
389
            RouteLookupRequest.Reason.REASON_MISS);
390
      }
391

392
      if (cacheEntry instanceof DataCacheEntry) {
1✔
393
        // cache hit, initiate async-refresh if entry is staled
394
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
395
        if (dataEntry.isStaled(ticker.read())) {
1✔
396
          dataEntry.maybeRefresh();
1✔
397
        }
398
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
399
      }
400
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
401
    }
402
  }
403

404
  /** Performs any pending maintenance operations needed by the cache. */
405
  void close() {
406
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
407
    synchronized (lock) {
1✔
408
      periodicCleaner.cancel(false);
1✔
409
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
410
      linkedHashLruCache.close();
1✔
411
      // TODO(creamsoup) maybe cancel all pending requests
412
      pendingCallCache.clear();
1✔
413
      rlsChannel.shutdownNow();
1✔
414
      rlsPicker.close();
1✔
415
      gaugeRegistration.close();
1✔
416
    }
1✔
417
  }
1✔
418

419
  void requestConnection() {
420
    rlsChannel.getState(true);
×
421
  }
×
422

423
  @GuardedBy("lock")
424
  private PendingCacheEntry createPendingEntry(
425
      RouteLookupRequestKey routeLookupRequestKey,
426
      ListenableFuture<RouteLookupResponse> pendingCall,
427
      @Nullable BackoffPolicy backoffPolicy) {
428
    PendingCacheEntry entry = new PendingCacheEntry(routeLookupRequestKey, pendingCall,
1✔
429
        backoffPolicy);
430
    // Add the entry to the map before adding the Listener, because the listener removes the
431
    // entry from the map
432
    pendingCallCache.put(routeLookupRequestKey, entry);
1✔
433
    // Beware that the listener can run immediately on the current thread
434
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
435
    return entry;
1✔
436
  }
437

438
  private void pendingRpcComplete(PendingCacheEntry entry) {
439
    synchronized (lock) {
1✔
440
      boolean clientClosed = pendingCallCache.remove(entry.routeLookupRequestKey) == null;
1✔
441
      if (clientClosed) {
1✔
442
        return;
1✔
443
      }
444

445
      try {
446
        createDataEntry(entry.routeLookupRequestKey, Futures.getDone(entry.pendingCall));
1✔
447
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
448
        // reattempt picks when the child LB is done connecting
449
      } catch (Exception e) {
1✔
450
        createBackOffEntry(entry.routeLookupRequestKey, Status.fromThrowable(e),
1✔
451
            entry.backoffPolicy);
1✔
452
        // Cache updated. updateBalancingState() to reattempt picks
453
        helper.triggerPendingRpcProcessing();
1✔
454
      }
1✔
455
    }
1✔
456
  }
1✔
457

458
  @GuardedBy("lock")
459
  private DataCacheEntry createDataEntry(
460
      RouteLookupRequestKey routeLookupRequestKey, RouteLookupResponse routeLookupResponse) {
461
    logger.log(
1✔
462
        ChannelLogLevel.DEBUG,
463
        "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}",
464
        routeLookupRequestKey, routeLookupResponse);
465
    DataCacheEntry entry = new DataCacheEntry(routeLookupRequestKey, routeLookupResponse);
1✔
466
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
467
    // this cache update because the lock is held
468
    linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry);
1✔
469
    return entry;
1✔
470
  }
471

472
  @GuardedBy("lock")
473
  private BackoffCacheEntry createBackOffEntry(RouteLookupRequestKey routeLookupRequestKey,
474
      Status status, @Nullable BackoffPolicy backoffPolicy) {
475
    if (backoffPolicy == null) {
1✔
476
      backoffPolicy = backoffProvider.get();
1✔
477
    }
478
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
479
    logger.log(
1✔
480
        ChannelLogLevel.DEBUG,
481
        "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
482
        routeLookupRequestKey, status, delayNanos);
1✔
483
    BackoffCacheEntry entry = new BackoffCacheEntry(routeLookupRequestKey, status, backoffPolicy,
1✔
484
        ticker.read() + delayNanos * 2);
1✔
485
    // Lock is held, so the task can't execute before the assignment
486
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
487
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
488
    linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry);
1✔
489
    return entry;
1✔
490
  }
491

492
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
493
    synchronized (lock) {
1✔
494
      // This checks whether the task has been cancelled and prevents a second execution.
495
      if (!entry.scheduledFuture.cancel(false)) {
1✔
496
        // Future was previously cancelled
497
        return;
×
498
      }
499
      // Cache updated. updateBalancingState() to reattempt picks
500
      helper.triggerPendingRpcProcessing();
1✔
501
    }
1✔
502
  }
1✔
503

504
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
505

506
    final Helper helper;
507
    private ConnectivityState state;
508
    private SubchannelPicker picker;
509

510
    RlsLbHelper(Helper helper) {
1✔
511
      this.helper = helper;
1✔
512
    }
1✔
513

514
    @Override
515
    protected Helper delegate() {
516
      return helper;
1✔
517
    }
518

519
    @Override
520
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
521
      state = newState;
1✔
522
      picker = newPicker;
1✔
523
      super.updateBalancingState(newState, newPicker);
1✔
524
    }
1✔
525

526
    void triggerPendingRpcProcessing() {
527
      checkState(state != null, "updateBalancingState hasn't yet been called");
1✔
528
      helper.getSynchronizationContext().execute(
1✔
529
          () -> super.updateBalancingState(state, picker));
1✔
530
    }
1✔
531
  }
532

533
  /**
534
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
535
   */
536
  static final class CachedRouteLookupResponse {
537
    // Should only have 1 of following 3 cache entries
538
    @Nullable
539
    private final DataCacheEntry dataCacheEntry;
540
    @Nullable
541
    private final PendingCacheEntry pendingCacheEntry;
542
    @Nullable
543
    private final BackoffCacheEntry backoffCacheEntry;
544

545
    CachedRouteLookupResponse(
546
        DataCacheEntry dataCacheEntry,
547
        PendingCacheEntry pendingCacheEntry,
548
        BackoffCacheEntry backoffCacheEntry) {
1✔
549
      this.dataCacheEntry = dataCacheEntry;
1✔
550
      this.pendingCacheEntry = pendingCacheEntry;
1✔
551
      this.backoffCacheEntry = backoffCacheEntry;
1✔
552
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
553
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
554
          "Expected only 1 cache entry value provided");
555
    }
1✔
556

557
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
558
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
559
    }
560

561
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
562
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
563
    }
564

565
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
566
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
567
    }
568

569
    boolean hasData() {
570
      return dataCacheEntry != null;
1✔
571
    }
572

573
    @Nullable
574
    ChildPolicyWrapper getChildPolicyWrapper() {
575
      if (!hasData()) {
1✔
576
        return null;
×
577
      }
578
      return dataCacheEntry.getChildPolicyWrapper();
1✔
579
    }
580

581
    @VisibleForTesting
582
    @Nullable
583
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
584
      if (!hasData()) {
1✔
585
        return null;
×
586
      }
587
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
588
    }
589

590
    @Nullable
591
    String getHeaderData() {
592
      if (!hasData()) {
1✔
593
        return null;
1✔
594
      }
595
      return dataCacheEntry.getHeaderData();
1✔
596
    }
597

598
    boolean hasError() {
599
      return backoffCacheEntry != null;
1✔
600
    }
601

602
    boolean isPending() {
603
      return pendingCacheEntry != null;
1✔
604
    }
605

606
    @Nullable
607
    Status getStatus() {
608
      if (!hasError()) {
1✔
609
        return null;
×
610
      }
611
      return backoffCacheEntry.getStatus();
1✔
612
    }
613

614
    @Override
615
    public String toString() {
616
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
617
      if (dataCacheEntry != null) {
×
618
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
619
      }
620
      if (pendingCacheEntry != null) {
×
621
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
622
      }
623
      if (backoffCacheEntry != null) {
×
624
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
625
      }
626
      return toStringHelper.toString();
×
627
    }
628
  }
629

630
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
631
  static final class PendingCacheEntry {
632
    private final ListenableFuture<RouteLookupResponse> pendingCall;
633
    private final RouteLookupRequestKey routeLookupRequestKey;
634
    @Nullable
635
    private final BackoffPolicy backoffPolicy;
636

637
    PendingCacheEntry(
638
        RouteLookupRequestKey routeLookupRequestKey,
639
        ListenableFuture<RouteLookupResponse> pendingCall,
640
        @Nullable BackoffPolicy backoffPolicy) {
1✔
641
      this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request");
1✔
642
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
643
      this.backoffPolicy = backoffPolicy;
1✔
644
    }
1✔
645

646
    @Override
647
    public String toString() {
648
      return MoreObjects.toStringHelper(this)
×
649
          .add("routeLookupRequestKey", routeLookupRequestKey)
×
650
          .toString();
×
651
    }
652
  }
653

654
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
655
  abstract static class CacheEntry {
656

657
    protected final RouteLookupRequestKey routeLookupRequestKey;
658

659
    CacheEntry(RouteLookupRequestKey routeLookupRequestKey) {
1✔
660
      this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request");
1✔
661
    }
1✔
662

663
    abstract int getSizeBytes();
664

665
    abstract boolean isExpired(long now);
666

667
    abstract void cleanup();
668

669
    protected boolean isOldEnoughToBeEvicted(long now) {
670
      return true;
×
671
    }
672
  }
673

674
  /** Implementation of {@link CacheEntry} contains valid data. */
675
  final class DataCacheEntry extends CacheEntry {
676
    private final RouteLookupResponse response;
677
    private final long minEvictionTime;
678
    private final long expireTime;
679
    private final long staleTime;
680
    private final List<ChildPolicyWrapper> childPolicyWrappers;
681

682
    // GuardedBy CachingRlsLbClient.lock
683
    DataCacheEntry(RouteLookupRequestKey routeLookupRequestKey,
684
        final RouteLookupResponse response) {
1✔
685
      super(routeLookupRequestKey);
1✔
686
      this.response = checkNotNull(response, "response");
1✔
687
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
688
      childPolicyWrappers =
1✔
689
          refCountedChildPolicyWrapperFactory
1✔
690
              .createOrGet(response.targets());
1✔
691
      long now = ticker.read();
1✔
692
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
693
      expireTime = now + maxAgeNanos;
1✔
694
      staleTime = now + staleAgeNanos;
1✔
695
    }
1✔
696

697
    /**
698
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
699
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
700
     * data still exists. Flow looks like following.
701
     *
702
     * <pre>
703
     * Timeline                       | async refresh
704
     *                                V put new cache (entry2)
705
     * entry1: Pending | hasValue | staled  |
706
     * entry2:                        | OV* | pending | hasValue | staled |
707
     *
708
     * OV: old value
709
     * </pre>
710
     */
711
    void maybeRefresh() {
712
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
713
        if (pendingCallCache.containsKey(routeLookupRequestKey)) {
1✔
714
          // pending already requested
715
          return;
×
716
        }
717
        logger.log(ChannelLogLevel.DEBUG,
1✔
718
            "[RLS Entry {0}] Cache entry is stale, refreshing", routeLookupRequestKey);
719
        asyncRlsCall(routeLookupRequestKey, /* backoffPolicy= */ null,
1✔
720
            RouteLookupRequest.Reason.REASON_STALE);
721
      }
1✔
722
    }
1✔
723

724
    @VisibleForTesting
725
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
726
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
727
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
728
          return childPolicyWrapper;
1✔
729
        }
730
      }
1✔
731

732
      throw new RuntimeException("Target not found:" + target);
×
733
    }
734

735
    @Nullable
736
    ChildPolicyWrapper getChildPolicyWrapper() {
737
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
738
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
739
          return childPolicyWrapper;
1✔
740
        }
741
      }
1✔
742
      return childPolicyWrappers.get(0);
1✔
743
    }
744

745
    String getHeaderData() {
746
      return response.getHeaderData();
1✔
747
    }
748

749
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
750
    int calcStringSize(String target) {
751
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
752
    }
753

754
    @Override
755
    int getSizeBytes() {
756
      int targetSize = 0;
1✔
757
      for (String target : response.targets()) {
1✔
758
        targetSize += calcStringSize(target);
1✔
759
      }
1✔
760
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
761
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
762
    }
763

764
    @Override
765
    boolean isExpired(long now) {
766
      return expireTime - now <= 0;
1✔
767
    }
768

769
    boolean isStaled(long now) {
770
      return staleTime - now <= 0;
1✔
771
    }
772

773
    @Override
774
    protected boolean isOldEnoughToBeEvicted(long now) {
775
      return minEvictionTime - now <= 0;
×
776
    }
777

778
    @Override
779
    void cleanup() {
780
      synchronized (lock) {
1✔
781
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
782
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
783
        }
1✔
784
      }
1✔
785
    }
1✔
786

787
    @Override
788
    public String toString() {
789
      return MoreObjects.toStringHelper(this)
×
790
          .add("request", routeLookupRequestKey)
×
791
          .add("response", response)
×
792
          .add("expireTime", expireTime)
×
793
          .add("staleTime", staleTime)
×
794
          .add("childPolicyWrappers", childPolicyWrappers)
×
795
          .toString();
×
796
    }
797
  }
798

799
  /**
800
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
801
   * status when the backoff time is expired.
802
   */
803
  private static final class BackoffCacheEntry extends CacheEntry {
804

805
    private final Status status;
806
    private final BackoffPolicy backoffPolicy;
807
    private final long expiryTimeNanos;
808
    private Future<?> scheduledFuture;
809

810
    BackoffCacheEntry(RouteLookupRequestKey routeLookupRequestKey, Status status,
811
        BackoffPolicy backoffPolicy, long expiryTimeNanos) {
812
      super(routeLookupRequestKey);
1✔
813
      this.status = checkNotNull(status, "status");
1✔
814
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
815
      this.expiryTimeNanos = expiryTimeNanos;
1✔
816
    }
1✔
817

818
    Status getStatus() {
819
      return status;
1✔
820
    }
821

822
    @Override
823
    int getSizeBytes() {
824
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
825
    }
826

827
    boolean isInBackoffPeriod() {
828
      return !scheduledFuture.isDone();
1✔
829
    }
830

831
    @Override
832
    boolean isExpired(long nowNanos) {
833
      return nowNanos > expiryTimeNanos;
1✔
834
    }
835

836
    @Override
837
    void cleanup() {
838
      scheduledFuture.cancel(false);
1✔
839
    }
1✔
840

841
    @Override
842
    public String toString() {
843
      return MoreObjects.toStringHelper(this)
×
844
          .add("request", routeLookupRequestKey)
×
845
          .add("status", status)
×
846
          .toString();
×
847
    }
848
  }
849

850
  /** Returns a Builder for {@link CachingRlsLbClient}. */
851
  static Builder newBuilder() {
852
    return new Builder();
1✔
853
  }
854

855
  /** A Builder for {@link CachingRlsLbClient}. */
856
  static final class Builder {
1✔
857

858
    private Helper helper;
859
    private LbPolicyConfiguration lbPolicyConfig;
860
    private Throttler throttler = new HappyThrottler();
1✔
861
    private ResolvedAddressFactory resolvedAddressFactory;
862
    private Ticker ticker = Ticker.systemTicker();
1✔
863
    private EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener;
864
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
865

866
    Builder setHelper(Helper helper) {
867
      this.helper = checkNotNull(helper, "helper");
1✔
868
      return this;
1✔
869
    }
870

871
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
872
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
873
      return this;
1✔
874
    }
875

876
    Builder setThrottler(Throttler throttler) {
877
      this.throttler = checkNotNull(throttler, "throttler");
1✔
878
      return this;
1✔
879
    }
880

881
    /**
882
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
883
     */
884
    Builder setResolvedAddressesFactory(
885
        ResolvedAddressFactory resolvedAddressFactory) {
886
      this.resolvedAddressFactory =
1✔
887
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
888
      return this;
1✔
889
    }
890

891
    Builder setTicker(Ticker ticker) {
892
      this.ticker = checkNotNull(ticker, "ticker");
1✔
893
      return this;
1✔
894
    }
895

896
    Builder setEvictionListener(
897
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener) {
898
      this.evictionListener = evictionListener;
1✔
899
      return this;
1✔
900
    }
901

902
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
903
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
904
      return this;
1✔
905
    }
906

907
    CachingRlsLbClient build() {
908
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
909
      client.init();
1✔
910
      return client;
1✔
911
    }
912
  }
913

914
  /**
915
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
916
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
917
   */
918
  private static final class AutoCleaningEvictionListener
919
      implements EvictionListener<RouteLookupRequestKey, CacheEntry> {
920

921
    private final EvictionListener<RouteLookupRequestKey, CacheEntry> delegate;
922

923
    AutoCleaningEvictionListener(
924
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> delegate) {
1✔
925
      this.delegate = delegate;
1✔
926
    }
1✔
927

928
    @Override
929
    public void onEviction(RouteLookupRequestKey key, CacheEntry value, EvictionType cause) {
930
      if (delegate != null) {
1✔
931
        delegate.onEviction(key, value, cause);
1✔
932
      }
933
      // performs cleanup after delegation
934
      value.cleanup();
1✔
935
    }
1✔
936
  }
937

938
  /** A Throttler never throttles. */
939
  private static final class HappyThrottler implements Throttler {
940

941
    @Override
942
    public boolean shouldThrottle() {
943
      return false;
×
944
    }
945

946
    @Override
947
    public void registerBackendResponse(boolean throttled) {
948
      // no-op
949
    }
×
950
  }
951

952
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
953
  private static final class RlsAsyncLruCache
954
      extends LinkedHashLruCache<RouteLookupRequestKey, CacheEntry> {
955
    private final RlsLbHelper helper;
956

957
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
958
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener,
959
        Ticker ticker, RlsLbHelper helper) {
960
      super(maxEstimatedSizeBytes, evictionListener, ticker);
1✔
961
      this.helper = checkNotNull(helper, "helper");
1✔
962
    }
1✔
963

964
    @Override
965
    protected boolean isExpired(RouteLookupRequestKey key, CacheEntry value, long nowNanos) {
966
      return value.isExpired(nowNanos);
1✔
967
    }
968

969
    @Override
970
    protected int estimateSizeOf(RouteLookupRequestKey key, CacheEntry value) {
971
      return value.getSizeBytes();
1✔
972
    }
973

974
    @Override
975
    protected boolean shouldInvalidateEldestEntry(
976
        RouteLookupRequestKey eldestKey, CacheEntry eldestValue, long now) {
977
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
978
        return false;
×
979
      }
980

981
      // eldest entry should be evicted if size limit exceeded
982
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
983
    }
984

985
    public CacheEntry cacheAndClean(RouteLookupRequestKey key, CacheEntry value) {
986
      CacheEntry newEntry = cache(key, value);
1✔
987

988
      // force cleanup if new entry pushed cache over max size (in bytes)
989
      if (fitToLimit()) {
1✔
990
        helper.triggerPendingRpcProcessing();
×
991
      }
992
      return newEntry;
1✔
993
    }
994
  }
995

996
  /** A header will be added when RLS server respond with additional header data. */
997
  @VisibleForTesting
998
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
999
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
1000

1001
  final class RlsPicker extends SubchannelPicker {
1002

1003
    private final RlsRequestFactory requestFactory;
1004
    private final String lookupService;
1005

1006
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
1007
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
1008
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
1009
    }
1✔
1010

1011
    @Override
1012
    public PickResult pickSubchannel(PickSubchannelArgs args) {
1013
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
1014
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
1015
      RlsProtoData.RouteLookupRequestKey lookupRequestKey =
1✔
1016
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
1017
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(lookupRequestKey);
1✔
1018

1019
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
1020
        Metadata headers = args.getHeaders();
1✔
1021
        headers.discardAll(RLS_DATA_KEY);
1✔
1022
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
1023
      }
1024
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1025
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
1026
      if (response.hasData()) {
1✔
1027
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1028
        SubchannelPicker picker =
1029
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1030
        if (picker == null) {
1✔
1031
          return PickResult.withNoResult();
×
1032
        }
1033
        // Happy path
1034
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1035
        if (pickResult.hasResult()) {
1✔
1036
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1037
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1038
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1039
              Arrays.asList(determineCustomLabel(args)));
1✔
1040
        }
1041
        return pickResult;
1✔
1042
      } else if (response.hasError()) {
1✔
1043
        if (hasFallback) {
1✔
1044
          return useFallback(args);
1✔
1045
        }
1046
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1047
            Arrays.asList(helper.getChannelTarget(), lookupService),
1✔
1048
            Arrays.asList(determineCustomLabel(args)));
1✔
1049
        return PickResult.withError(
1✔
1050
            convertRlsServerStatus(response.getStatus(),
1✔
1051
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1052
      } else {
1053
        return PickResult.withNoResult();
1✔
1054
      }
1055
    }
1056

1057
    /** Uses Subchannel connected to default target. */
1058
    private PickResult useFallback(PickSubchannelArgs args) {
1059
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1060
      if (picker == null) {
1✔
1061
        return PickResult.withNoResult();
×
1062
      }
1063
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1064
      if (pickResult.hasResult()) {
1✔
1065
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1066
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1067
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1068
            Arrays.asList(determineCustomLabel(args)));
1✔
1069
      }
1070
      return pickResult;
1✔
1071
    }
1072

1073
    private String determineMetricsPickResult(PickResult pickResult) {
1074
      if (pickResult.getStatus().isOk()) {
1✔
1075
        return "complete";
1✔
1076
      } else if (pickResult.isDrop()) {
1✔
1077
        return "drop";
×
1078
      } else {
1079
        return "fail";
1✔
1080
      }
1081
    }
1082

1083
    private String determineCustomLabel(PickSubchannelArgs args) {
1084
      return args.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL);
1✔
1085
    }
1086

1087
    // GuardedBy CachingRlsLbClient.lock
1088
    void close() {
1089
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1090
        if (fallbackChildPolicyWrapper != null) {
1✔
1091
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1092
        }
1093
      }
1✔
1094
    }
1✔
1095

1096
    @Override
1097
    public String toString() {
1098
      return MoreObjects.toStringHelper(this)
×
1099
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1100
          .toString();
×
1101
    }
1102
  }
1103

1104
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc