• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20057

06 Nov 2025 10:27AM UTC coverage: 88.528% (-0.01%) from 88.542%
#20057

push

github

web-flow
rls: Control plane channel monitor state and back off handling (#12460)

At the end of back off time, instead of firing a Rls RPC, just update the RLS picker, and RLS connectivity state change from TRANSIENT_FAILURE to READY deactivate all active backoffs.

35044 of 39585 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.63
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import com.google.errorprone.annotations.CheckReturnValue;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ConnectivityState;
36
import io.grpc.LoadBalancer.Helper;
37
import io.grpc.LoadBalancer.PickResult;
38
import io.grpc.LoadBalancer.PickSubchannelArgs;
39
import io.grpc.LoadBalancer.ResolvedAddresses;
40
import io.grpc.LoadBalancer.SubchannelPicker;
41
import io.grpc.LongCounterMetricInstrument;
42
import io.grpc.LongGaugeMetricInstrument;
43
import io.grpc.ManagedChannel;
44
import io.grpc.ManagedChannelBuilder;
45
import io.grpc.Metadata;
46
import io.grpc.MetricInstrumentRegistry;
47
import io.grpc.MetricRecorder.BatchCallback;
48
import io.grpc.MetricRecorder.BatchRecorder;
49
import io.grpc.MetricRecorder.Registration;
50
import io.grpc.Status;
51
import io.grpc.internal.BackoffPolicy;
52
import io.grpc.internal.ExponentialBackoffPolicy;
53
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
54
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
55
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
57
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
58
import io.grpc.rls.LruCache.EvictionListener;
59
import io.grpc.rls.LruCache.EvictionType;
60
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
61
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
62
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
63
import io.grpc.rls.RlsProtoData.RouteLookupRequestKey;
64
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
65
import io.grpc.stub.StreamObserver;
66
import io.grpc.util.ForwardingLoadBalancerHelper;
67
import java.net.URI;
68
import java.net.URISyntaxException;
69
import java.util.Arrays;
70
import java.util.Collections;
71
import java.util.HashMap;
72
import java.util.List;
73
import java.util.Map;
74
import java.util.UUID;
75
import java.util.concurrent.Future;
76
import java.util.concurrent.ScheduledExecutorService;
77
import java.util.concurrent.TimeUnit;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
/**
82
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
83
 * routing by fetching the decision from route lookup server. Every single request is routed by
84
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
85
 */
86
@ThreadSafe
87
final class CachingRlsLbClient {
88

89
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
90
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
91
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
92
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
93
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
94
  public static final int BYTES_PER_CHAR = 2;
95
  public static final int STRING_OVERHEAD_BYTES = 38;
96
  /** Minimum bytes for a Java Object. */
97
  public static final int OBJ_OVERHEAD_B = 16;
98

99
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
100
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104
  private final Registration gaugeRegistration;
105
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
106

107
  // All cache status changes (pending, backoff, success) must be under this lock
108
  private final Object lock = new Object();
1✔
109
  // LRU cache based on access order (BACKOFF and actual data will be here)
110
  @GuardedBy("lock")
111
  private final RlsAsyncLruCache linkedHashLruCache;
112
  private final Future<?> periodicCleaner;
113
  // any RPC on the fly will cached in this map
114
  @GuardedBy("lock")
1✔
115
  private final Map<RouteLookupRequestKey, PendingCacheEntry> pendingCallCache = new HashMap<>();
116

117
  private final ScheduledExecutorService scheduledExecutorService;
118
  private final Ticker ticker;
119
  private final Throttler throttler;
120

121
  private final LbPolicyConfiguration lbPolicyConfig;
122
  private final BackoffPolicy.Provider backoffProvider;
123
  private final long maxAgeNanos;
124
  private final long staleAgeNanos;
125
  private final long callTimeoutNanos;
126

127
  private final RlsLbHelper helper;
128
  private final ManagedChannel rlsChannel;
129
  private final RouteLookupServiceStub rlsStub;
130
  private final RlsPicker rlsPicker;
131
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
132
  @GuardedBy("lock")
133
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
134
  private final ChannelLogger logger;
135
  private final ChildPolicyWrapper fallbackChildPolicyWrapper;
136

137
  static {
138
    MetricInstrumentRegistry metricInstrumentRegistry
139
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
140
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
141
        "grpc.lb.rls.default_target_picks",
142
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
143
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
144
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
1✔
145
        false);
146
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
147
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
148
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
149
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
150
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
151
            "grpc.lb.pick_result"), Collections.emptyList(),
1✔
152
        false);
153
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
154
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
155
            + "RLS channel being throttled", "{pick}",
156
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
157
        Collections.emptyList(), false);
1✔
158
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
159
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
160
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
161
        Collections.emptyList(), false);
1✔
162
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
163
        "EXPERIMENTAL. The current size of the RLS cache", "By",
164
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
165
        Collections.emptyList(), false);
1✔
166
  }
167

168
  private CachingRlsLbClient(Builder builder) {
1✔
169
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
170
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
171
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
172
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
173
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
174
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
175
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
176
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
177
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
178
    linkedHashLruCache =
1✔
179
        new RlsAsyncLruCache(
180
            rlsConfig.cacheSizeBytes(),
1✔
181
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
182
            ticker,
183
            helper);
184
    periodicCleaner =
1✔
185
        scheduledExecutorService.scheduleAtFixedRate(this::periodicClean, 1, 1, TimeUnit.MINUTES);
1✔
186
    logger = helper.getChannelLogger();
1✔
187
    String serverHost = null;
1✔
188
    try {
189
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
190
    } catch (URISyntaxException ignore) {
×
191
      // handled by the following null check
192
    }
1✔
193
    if (serverHost == null) {
1✔
194
      logger.log(
×
195
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
196
      serverHost = helper.getAuthority();
×
197
    }
198
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
199
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
200
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
201
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
202
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
203
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
204
    // called to impose the authority security restrictions.
205
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
206
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
207
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
208
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
209
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
210
    if (routeLookupChannelServiceConfig != null) {
1✔
211
      logger.log(
1✔
212
          ChannelLogLevel.DEBUG,
213
          "RLS channel service config: {0}",
214
          routeLookupChannelServiceConfig);
215
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
216
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
217
    }
218
    rlsChannel = rlsChannelBuilder.build();
1✔
219
    Runnable rlsServerConnectivityStateChangeHandler = new Runnable() {
1✔
220
      private boolean wasInTransientFailure;
221
      @Override
222
      public void run() {
223
        ConnectivityState currentState = rlsChannel.getState(false);
1✔
224
        if (currentState == ConnectivityState.TRANSIENT_FAILURE) {
1✔
225
          wasInTransientFailure = true;
1✔
226
        } else if (wasInTransientFailure && currentState == ConnectivityState.READY) {
1✔
227
          wasInTransientFailure = false;
1✔
228
          synchronized (lock) {
1✔
229
            boolean anyBackoffsCanceled = false;
1✔
230
            for (CacheEntry value : linkedHashLruCache.values()) {
1✔
231
              if (value instanceof BackoffCacheEntry) {
1✔
232
                if (((BackoffCacheEntry) value).scheduledFuture.cancel(false)) {
1✔
233
                  anyBackoffsCanceled = true;
1✔
234
                }
235
              }
236
            }
1✔
237
            if (anyBackoffsCanceled) {
1✔
238
              // Cache updated. updateBalancingState() to reattempt picks
239
              helper.triggerPendingRpcProcessing();
1✔
240
            }
241
          }
1✔
242
        }
243
        rlsChannel.notifyWhenStateChanged(currentState, this);
1✔
244
      }
1✔
245
    };
246
    rlsChannel.notifyWhenStateChanged(
1✔
247
        ConnectivityState.IDLE, rlsServerConnectivityStateChangeHandler);
248
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
249
    childLbResolvedAddressFactory =
1✔
250
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
251
    backoffProvider = builder.backoffProvider;
1✔
252
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
253
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
254
    refCountedChildPolicyWrapperFactory =
1✔
255
        new RefCountedChildPolicyWrapperFactory(
256
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
257
            childLbHelperProvider);
258
    // TODO(creamsoup) wait until lb is ready
259
    String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
260
    if (defaultTarget != null && !defaultTarget.isEmpty()) {
1✔
261
      fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
262
    } else {
263
      fallbackChildPolicyWrapper = null;
1✔
264
    }
265

266
    gaugeRegistration = helper.getMetricRecorder()
1✔
267
        .registerBatchCallback(new BatchCallback() {
1✔
268
          @Override
269
          public void accept(BatchRecorder recorder) {
270
            int estimatedSize;
271
            long estimatedSizeBytes;
272
            synchronized (lock) {
1✔
273
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
274
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
275
            }
1✔
276
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
277
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
278
                    metricsInstanceUuid), Collections.emptyList());
1✔
279
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
280
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
281
                    metricsInstanceUuid), Collections.emptyList());
1✔
282
          }
1✔
283
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
284

285
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
286
  }
1✔
287

288
  void init() {
289
    synchronized (lock) {
1✔
290
      refCountedChildPolicyWrapperFactory.init();
1✔
291
    }
1✔
292
  }
1✔
293

294
  Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
295
    synchronized (lock) {
1✔
296
      return refCountedChildPolicyWrapperFactory.acceptResolvedAddressFactory(
1✔
297
          childLbResolvedAddressFactory);
298
    }
299
  }
300

301
  /**
302
   * Convert the status to UNAVAILABLE and enhance the error message.
303
   * @param status status as provided by server
304
   * @param serverName Used for error description
305
   * @return Transformed status
306
   */
307
  static Status convertRlsServerStatus(Status status, String serverName) {
308
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
309
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
310
                + "RLS server returned: %s: %s",
311
            serverName, status.getCode(), status.getDescription()));
1✔
312
  }
313

314
  private void periodicClean() {
315
    synchronized (lock) {
1✔
316
      linkedHashLruCache.cleanupExpiredEntries();
1✔
317
    }
1✔
318
  }
1✔
319

320
  /** Populates async cache entry for new request. */
321
  @GuardedBy("lock")
322
  private CachedRouteLookupResponse asyncRlsCall(
323
      RouteLookupRequestKey routeLookupRequestKey, @Nullable BackoffPolicy backoffPolicy,
324
      RouteLookupRequest.Reason routeLookupReason) {
325
    if (throttler.shouldThrottle()) {
1✔
326
      logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup",
1✔
327
          routeLookupRequestKey);
328
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
329
      // on this result
330
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
331
          routeLookupRequestKey, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"),
1✔
332
          backoffPolicy));
333
    }
334
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
335
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(
1✔
336
        RouteLookupRequest.create(routeLookupRequestKey.keyMap(), routeLookupReason));
1✔
337
    logger.log(ChannelLogLevel.DEBUG,
1✔
338
        "[RLS Entry {0}] Starting RouteLookup: {1}", routeLookupRequestKey, routeLookupRequest);
339
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
340
        .routeLookup(
1✔
341
            routeLookupRequest,
342
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
343
              @Override
344
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
345
                logger.log(ChannelLogLevel.DEBUG,
1✔
346
                    "[RLS Entry {0}] RouteLookup succeeded: {1}", routeLookupRequestKey, value);
347
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
348
              }
1✔
349

350
              @Override
351
              public void onError(Throwable t) {
352
                logger.log(ChannelLogLevel.DEBUG,
1✔
353
                    "[RLS Entry {0}] RouteLookup failed: {1}", routeLookupRequestKey, t);
354
                response.setException(t);
1✔
355
                throttler.registerBackendResponse(true);
1✔
356
              }
1✔
357

358
              @Override
359
              public void onCompleted() {
360
                throttler.registerBackendResponse(false);
1✔
361
              }
1✔
362
            });
363
    return CachedRouteLookupResponse.pendingResponse(
1✔
364
        createPendingEntry(routeLookupRequestKey, response, backoffPolicy));
1✔
365
  }
366

367
  /**
368
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
369
   * cached, pending and backed-off due to error. The result remains same even if the status is
370
   * changed after the return.
371
   */
372
  @CheckReturnValue
373
  final CachedRouteLookupResponse get(final RouteLookupRequestKey routeLookupRequestKey) {
374
    synchronized (lock) {
1✔
375
      final CacheEntry cacheEntry;
376
      cacheEntry = linkedHashLruCache.read(routeLookupRequestKey);
1✔
377
      if (cacheEntry == null
1✔
378
          || (cacheEntry instanceof BackoffCacheEntry
379
          && !((BackoffCacheEntry) cacheEntry).isInBackoffPeriod())) {
1✔
380
        PendingCacheEntry pendingEntry = pendingCallCache.get(routeLookupRequestKey);
1✔
381
        if (pendingEntry != null) {
1✔
382
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
×
383
        }
384
        return asyncRlsCall(routeLookupRequestKey, cacheEntry instanceof BackoffCacheEntry
1✔
385
            ? ((BackoffCacheEntry) cacheEntry).backoffPolicy : null,
1✔
386
            RouteLookupRequest.Reason.REASON_MISS);
387
      }
388

389
      if (cacheEntry instanceof DataCacheEntry) {
1✔
390
        // cache hit, initiate async-refresh if entry is staled
391
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
392
        if (dataEntry.isStaled(ticker.read())) {
1✔
393
          dataEntry.maybeRefresh();
1✔
394
        }
395
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
396
      }
397
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
398
    }
399
  }
400

401
  /** Performs any pending maintenance operations needed by the cache. */
402
  void close() {
403
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
404
    synchronized (lock) {
1✔
405
      periodicCleaner.cancel(false);
1✔
406
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
407
      linkedHashLruCache.close();
1✔
408
      // TODO(creamsoup) maybe cancel all pending requests
409
      pendingCallCache.clear();
1✔
410
      rlsChannel.shutdownNow();
1✔
411
      rlsPicker.close();
1✔
412
      gaugeRegistration.close();
1✔
413
    }
1✔
414
  }
1✔
415

416
  void requestConnection() {
417
    rlsChannel.getState(true);
×
418
  }
×
419

420
  @GuardedBy("lock")
421
  private PendingCacheEntry createPendingEntry(
422
      RouteLookupRequestKey routeLookupRequestKey,
423
      ListenableFuture<RouteLookupResponse> pendingCall,
424
      @Nullable BackoffPolicy backoffPolicy) {
425
    PendingCacheEntry entry = new PendingCacheEntry(routeLookupRequestKey, pendingCall,
1✔
426
        backoffPolicy);
427
    // Add the entry to the map before adding the Listener, because the listener removes the
428
    // entry from the map
429
    pendingCallCache.put(routeLookupRequestKey, entry);
1✔
430
    // Beware that the listener can run immediately on the current thread
431
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
432
    return entry;
1✔
433
  }
434

435
  private void pendingRpcComplete(PendingCacheEntry entry) {
436
    synchronized (lock) {
1✔
437
      boolean clientClosed = pendingCallCache.remove(entry.routeLookupRequestKey) == null;
1✔
438
      if (clientClosed) {
1✔
439
        return;
1✔
440
      }
441

442
      try {
443
        createDataEntry(entry.routeLookupRequestKey, Futures.getDone(entry.pendingCall));
1✔
444
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
445
        // reattempt picks when the child LB is done connecting
446
      } catch (Exception e) {
1✔
447
        createBackOffEntry(entry.routeLookupRequestKey, Status.fromThrowable(e),
1✔
448
            entry.backoffPolicy);
1✔
449
        // Cache updated. updateBalancingState() to reattempt picks
450
        helper.triggerPendingRpcProcessing();
1✔
451
      }
1✔
452
    }
1✔
453
  }
1✔
454

455
  @GuardedBy("lock")
456
  private DataCacheEntry createDataEntry(
457
      RouteLookupRequestKey routeLookupRequestKey, RouteLookupResponse routeLookupResponse) {
458
    logger.log(
1✔
459
        ChannelLogLevel.DEBUG,
460
        "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}",
461
        routeLookupRequestKey, routeLookupResponse);
462
    DataCacheEntry entry = new DataCacheEntry(routeLookupRequestKey, routeLookupResponse);
1✔
463
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
464
    // this cache update because the lock is held
465
    linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry);
1✔
466
    return entry;
1✔
467
  }
468

469
  @GuardedBy("lock")
470
  private BackoffCacheEntry createBackOffEntry(RouteLookupRequestKey routeLookupRequestKey,
471
      Status status, @Nullable BackoffPolicy backoffPolicy) {
472
    if (backoffPolicy == null) {
1✔
473
      backoffPolicy = backoffProvider.get();
1✔
474
    }
475
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
476
    logger.log(
1✔
477
        ChannelLogLevel.DEBUG,
478
        "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
479
        routeLookupRequestKey, status, delayNanos);
1✔
480
    BackoffCacheEntry entry = new BackoffCacheEntry(routeLookupRequestKey, status, backoffPolicy,
1✔
481
        ticker.read() + delayNanos * 2);
1✔
482
    // Lock is held, so the task can't execute before the assignment
483
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
484
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
485
    linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry);
1✔
486
    return entry;
1✔
487
  }
488

489
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
490
    synchronized (lock) {
1✔
491
      // This checks whether the task has been cancelled and prevents a second execution.
492
      if (!entry.scheduledFuture.cancel(false)) {
1✔
493
        // Future was previously cancelled
494
        return;
×
495
      }
496
      // Cache updated. updateBalancingState() to reattempt picks
497
      helper.triggerPendingRpcProcessing();
1✔
498
    }
1✔
499
  }
1✔
500

501
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
502

503
    final Helper helper;
504
    private ConnectivityState state;
505
    private SubchannelPicker picker;
506

507
    RlsLbHelper(Helper helper) {
1✔
508
      this.helper = helper;
1✔
509
    }
1✔
510

511
    @Override
512
    protected Helper delegate() {
513
      return helper;
1✔
514
    }
515

516
    @Override
517
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
518
      state = newState;
1✔
519
      picker = newPicker;
1✔
520
      super.updateBalancingState(newState, newPicker);
1✔
521
    }
1✔
522

523
    void triggerPendingRpcProcessing() {
524
      checkState(state != null, "updateBalancingState hasn't yet been called");
1✔
525
      helper.getSynchronizationContext().execute(
1✔
526
          () -> super.updateBalancingState(state, picker));
1✔
527
    }
1✔
528
  }
529

530
  /**
531
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
532
   */
533
  static final class CachedRouteLookupResponse {
534
    // Should only have 1 of following 3 cache entries
535
    @Nullable
536
    private final DataCacheEntry dataCacheEntry;
537
    @Nullable
538
    private final PendingCacheEntry pendingCacheEntry;
539
    @Nullable
540
    private final BackoffCacheEntry backoffCacheEntry;
541

542
    CachedRouteLookupResponse(
543
        DataCacheEntry dataCacheEntry,
544
        PendingCacheEntry pendingCacheEntry,
545
        BackoffCacheEntry backoffCacheEntry) {
1✔
546
      this.dataCacheEntry = dataCacheEntry;
1✔
547
      this.pendingCacheEntry = pendingCacheEntry;
1✔
548
      this.backoffCacheEntry = backoffCacheEntry;
1✔
549
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
550
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
551
          "Expected only 1 cache entry value provided");
552
    }
1✔
553

554
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
555
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
556
    }
557

558
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
559
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
560
    }
561

562
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
563
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
564
    }
565

566
    boolean hasData() {
567
      return dataCacheEntry != null;
1✔
568
    }
569

570
    @Nullable
571
    ChildPolicyWrapper getChildPolicyWrapper() {
572
      if (!hasData()) {
1✔
573
        return null;
×
574
      }
575
      return dataCacheEntry.getChildPolicyWrapper();
1✔
576
    }
577

578
    @VisibleForTesting
579
    @Nullable
580
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
581
      if (!hasData()) {
1✔
582
        return null;
×
583
      }
584
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
585
    }
586

587
    @Nullable
588
    String getHeaderData() {
589
      if (!hasData()) {
1✔
590
        return null;
1✔
591
      }
592
      return dataCacheEntry.getHeaderData();
1✔
593
    }
594

595
    boolean hasError() {
596
      return backoffCacheEntry != null;
1✔
597
    }
598

599
    boolean isPending() {
600
      return pendingCacheEntry != null;
1✔
601
    }
602

603
    @Nullable
604
    Status getStatus() {
605
      if (!hasError()) {
1✔
606
        return null;
×
607
      }
608
      return backoffCacheEntry.getStatus();
1✔
609
    }
610

611
    @Override
612
    public String toString() {
613
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
614
      if (dataCacheEntry != null) {
×
615
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
616
      }
617
      if (pendingCacheEntry != null) {
×
618
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
619
      }
620
      if (backoffCacheEntry != null) {
×
621
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
622
      }
623
      return toStringHelper.toString();
×
624
    }
625
  }
626

627
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
628
  static final class PendingCacheEntry {
629
    private final ListenableFuture<RouteLookupResponse> pendingCall;
630
    private final RouteLookupRequestKey routeLookupRequestKey;
631
    @Nullable
632
    private final BackoffPolicy backoffPolicy;
633

634
    PendingCacheEntry(
635
        RouteLookupRequestKey routeLookupRequestKey,
636
        ListenableFuture<RouteLookupResponse> pendingCall,
637
        @Nullable BackoffPolicy backoffPolicy) {
1✔
638
      this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request");
1✔
639
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
640
      this.backoffPolicy = backoffPolicy;
1✔
641
    }
1✔
642

643
    @Override
644
    public String toString() {
645
      return MoreObjects.toStringHelper(this)
×
646
          .add("routeLookupRequestKey", routeLookupRequestKey)
×
647
          .toString();
×
648
    }
649
  }
650

651
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
652
  abstract static class CacheEntry {
653

654
    protected final RouteLookupRequestKey routeLookupRequestKey;
655

656
    CacheEntry(RouteLookupRequestKey routeLookupRequestKey) {
1✔
657
      this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request");
1✔
658
    }
1✔
659

660
    abstract int getSizeBytes();
661

662
    abstract boolean isExpired(long now);
663

664
    abstract void cleanup();
665

666
    protected boolean isOldEnoughToBeEvicted(long now) {
667
      return true;
×
668
    }
669
  }
670

671
  /** Implementation of {@link CacheEntry} contains valid data. */
672
  final class DataCacheEntry extends CacheEntry {
673
    private final RouteLookupResponse response;
674
    private final long minEvictionTime;
675
    private final long expireTime;
676
    private final long staleTime;
677
    private final List<ChildPolicyWrapper> childPolicyWrappers;
678

679
    // GuardedBy CachingRlsLbClient.lock
680
    DataCacheEntry(RouteLookupRequestKey routeLookupRequestKey,
681
        final RouteLookupResponse response) {
1✔
682
      super(routeLookupRequestKey);
1✔
683
      this.response = checkNotNull(response, "response");
1✔
684
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
685
      childPolicyWrappers =
1✔
686
          refCountedChildPolicyWrapperFactory
1✔
687
              .createOrGet(response.targets());
1✔
688
      long now = ticker.read();
1✔
689
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
690
      expireTime = now + maxAgeNanos;
1✔
691
      staleTime = now + staleAgeNanos;
1✔
692
    }
1✔
693

694
    /**
695
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
696
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
697
     * data still exists. Flow looks like following.
698
     *
699
     * <pre>
700
     * Timeline                       | async refresh
701
     *                                V put new cache (entry2)
702
     * entry1: Pending | hasValue | staled  |
703
     * entry2:                        | OV* | pending | hasValue | staled |
704
     *
705
     * OV: old value
706
     * </pre>
707
     */
708
    void maybeRefresh() {
709
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
710
        if (pendingCallCache.containsKey(routeLookupRequestKey)) {
1✔
711
          // pending already requested
712
          return;
×
713
        }
714
        logger.log(ChannelLogLevel.DEBUG,
1✔
715
            "[RLS Entry {0}] Cache entry is stale, refreshing", routeLookupRequestKey);
716
        asyncRlsCall(routeLookupRequestKey, /* backoffPolicy= */ null,
1✔
717
            RouteLookupRequest.Reason.REASON_STALE);
718
      }
1✔
719
    }
1✔
720

721
    @VisibleForTesting
722
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
723
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
724
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
725
          return childPolicyWrapper;
1✔
726
        }
727
      }
1✔
728

729
      throw new RuntimeException("Target not found:" + target);
×
730
    }
731

732
    @Nullable
733
    ChildPolicyWrapper getChildPolicyWrapper() {
734
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
735
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
736
          return childPolicyWrapper;
1✔
737
        }
738
      }
1✔
739
      return childPolicyWrappers.get(0);
1✔
740
    }
741

742
    String getHeaderData() {
743
      return response.getHeaderData();
1✔
744
    }
745

746
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
747
    int calcStringSize(String target) {
748
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
749
    }
750

751
    @Override
752
    int getSizeBytes() {
753
      int targetSize = 0;
1✔
754
      for (String target : response.targets()) {
1✔
755
        targetSize += calcStringSize(target);
1✔
756
      }
1✔
757
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
758
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
759
    }
760

761
    @Override
762
    boolean isExpired(long now) {
763
      return expireTime - now <= 0;
1✔
764
    }
765

766
    boolean isStaled(long now) {
767
      return staleTime - now <= 0;
1✔
768
    }
769

770
    @Override
771
    protected boolean isOldEnoughToBeEvicted(long now) {
772
      return minEvictionTime - now <= 0;
×
773
    }
774

775
    @Override
776
    void cleanup() {
777
      synchronized (lock) {
1✔
778
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
779
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
780
        }
1✔
781
      }
1✔
782
    }
1✔
783

784
    @Override
785
    public String toString() {
786
      return MoreObjects.toStringHelper(this)
×
787
          .add("request", routeLookupRequestKey)
×
788
          .add("response", response)
×
789
          .add("expireTime", expireTime)
×
790
          .add("staleTime", staleTime)
×
791
          .add("childPolicyWrappers", childPolicyWrappers)
×
792
          .toString();
×
793
    }
794
  }
795

796
  /**
797
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
798
   * status when the backoff time is expired.
799
   */
800
  private static final class BackoffCacheEntry extends CacheEntry {
801

802
    private final Status status;
803
    private final BackoffPolicy backoffPolicy;
804
    private final long expiryTimeNanos;
805
    private Future<?> scheduledFuture;
806

807
    BackoffCacheEntry(RouteLookupRequestKey routeLookupRequestKey, Status status,
808
        BackoffPolicy backoffPolicy, long expiryTimeNanos) {
809
      super(routeLookupRequestKey);
1✔
810
      this.status = checkNotNull(status, "status");
1✔
811
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
812
      this.expiryTimeNanos = expiryTimeNanos;
1✔
813
    }
1✔
814

815
    Status getStatus() {
816
      return status;
1✔
817
    }
818

819
    @Override
820
    int getSizeBytes() {
821
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
822
    }
823

824
    boolean isInBackoffPeriod() {
825
      return !scheduledFuture.isDone();
1✔
826
    }
827

828
    @Override
829
    boolean isExpired(long nowNanos) {
830
      return nowNanos > expiryTimeNanos;
1✔
831
    }
832

833
    @Override
834
    void cleanup() {
835
      scheduledFuture.cancel(false);
1✔
836
    }
1✔
837

838
    @Override
839
    public String toString() {
840
      return MoreObjects.toStringHelper(this)
×
841
          .add("request", routeLookupRequestKey)
×
842
          .add("status", status)
×
843
          .toString();
×
844
    }
845
  }
846

847
  /** Returns a Builder for {@link CachingRlsLbClient}. */
848
  static Builder newBuilder() {
849
    return new Builder();
1✔
850
  }
851

852
  /** A Builder for {@link CachingRlsLbClient}. */
853
  static final class Builder {
1✔
854

855
    private Helper helper;
856
    private LbPolicyConfiguration lbPolicyConfig;
857
    private Throttler throttler = new HappyThrottler();
1✔
858
    private ResolvedAddressFactory resolvedAddressFactory;
859
    private Ticker ticker = Ticker.systemTicker();
1✔
860
    private EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener;
861
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
862

863
    Builder setHelper(Helper helper) {
864
      this.helper = checkNotNull(helper, "helper");
1✔
865
      return this;
1✔
866
    }
867

868
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
869
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
870
      return this;
1✔
871
    }
872

873
    Builder setThrottler(Throttler throttler) {
874
      this.throttler = checkNotNull(throttler, "throttler");
1✔
875
      return this;
1✔
876
    }
877

878
    /**
879
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
880
     */
881
    Builder setResolvedAddressesFactory(
882
        ResolvedAddressFactory resolvedAddressFactory) {
883
      this.resolvedAddressFactory =
1✔
884
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
885
      return this;
1✔
886
    }
887

888
    Builder setTicker(Ticker ticker) {
889
      this.ticker = checkNotNull(ticker, "ticker");
1✔
890
      return this;
1✔
891
    }
892

893
    Builder setEvictionListener(
894
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener) {
895
      this.evictionListener = evictionListener;
1✔
896
      return this;
1✔
897
    }
898

899
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
900
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
901
      return this;
1✔
902
    }
903

904
    CachingRlsLbClient build() {
905
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
906
      client.init();
1✔
907
      return client;
1✔
908
    }
909
  }
910

911
  /**
912
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
913
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
914
   */
915
  private static final class AutoCleaningEvictionListener
916
      implements EvictionListener<RouteLookupRequestKey, CacheEntry> {
917

918
    private final EvictionListener<RouteLookupRequestKey, CacheEntry> delegate;
919

920
    AutoCleaningEvictionListener(
921
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> delegate) {
1✔
922
      this.delegate = delegate;
1✔
923
    }
1✔
924

925
    @Override
926
    public void onEviction(RouteLookupRequestKey key, CacheEntry value, EvictionType cause) {
927
      if (delegate != null) {
1✔
928
        delegate.onEviction(key, value, cause);
1✔
929
      }
930
      // performs cleanup after delegation
931
      value.cleanup();
1✔
932
    }
1✔
933
  }
934

935
  /** A Throttler never throttles. */
936
  private static final class HappyThrottler implements Throttler {
937

938
    @Override
939
    public boolean shouldThrottle() {
940
      return false;
×
941
    }
942

943
    @Override
944
    public void registerBackendResponse(boolean throttled) {
945
      // no-op
946
    }
×
947
  }
948

949
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
950
  private static final class RlsAsyncLruCache
951
      extends LinkedHashLruCache<RouteLookupRequestKey, CacheEntry> {
952
    private final RlsLbHelper helper;
953

954
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
955
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener,
956
        Ticker ticker, RlsLbHelper helper) {
957
      super(maxEstimatedSizeBytes, evictionListener, ticker);
1✔
958
      this.helper = checkNotNull(helper, "helper");
1✔
959
    }
1✔
960

961
    @Override
962
    protected boolean isExpired(RouteLookupRequestKey key, CacheEntry value, long nowNanos) {
963
      return value.isExpired(nowNanos);
1✔
964
    }
965

966
    @Override
967
    protected int estimateSizeOf(RouteLookupRequestKey key, CacheEntry value) {
968
      return value.getSizeBytes();
1✔
969
    }
970

971
    @Override
972
    protected boolean shouldInvalidateEldestEntry(
973
        RouteLookupRequestKey eldestKey, CacheEntry eldestValue, long now) {
974
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
975
        return false;
×
976
      }
977

978
      // eldest entry should be evicted if size limit exceeded
979
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
980
    }
981

982
    public CacheEntry cacheAndClean(RouteLookupRequestKey key, CacheEntry value) {
983
      CacheEntry newEntry = cache(key, value);
1✔
984

985
      // force cleanup if new entry pushed cache over max size (in bytes)
986
      if (fitToLimit()) {
1✔
987
        helper.triggerPendingRpcProcessing();
×
988
      }
989
      return newEntry;
1✔
990
    }
991
  }
992

993
  /** A header will be added when RLS server respond with additional header data. */
994
  @VisibleForTesting
995
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
996
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
997

998
  final class RlsPicker extends SubchannelPicker {
999

1000
    private final RlsRequestFactory requestFactory;
1001
    private final String lookupService;
1002

1003
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
1004
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
1005
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
1006
    }
1✔
1007

1008
    @Override
1009
    public PickResult pickSubchannel(PickSubchannelArgs args) {
1010
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
1011
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
1012
      RlsProtoData.RouteLookupRequestKey lookupRequestKey =
1✔
1013
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
1014
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(lookupRequestKey);
1✔
1015

1016
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
1017
        Metadata headers = args.getHeaders();
1✔
1018
        headers.discardAll(RLS_DATA_KEY);
1✔
1019
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
1020
      }
1021
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1022
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
1023
      if (response.hasData()) {
1✔
1024
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1025
        SubchannelPicker picker =
1026
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1027
        if (picker == null) {
1✔
1028
          return PickResult.withNoResult();
×
1029
        }
1030
        // Happy path
1031
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1032
        if (pickResult.hasResult()) {
1✔
1033
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1034
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1035
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1036
              Collections.emptyList());
1✔
1037
        }
1038
        return pickResult;
1✔
1039
      } else if (response.hasError()) {
1✔
1040
        if (hasFallback) {
1✔
1041
          return useFallback(args);
1✔
1042
        }
1043
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1044
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1045
        return PickResult.withError(
1✔
1046
            convertRlsServerStatus(response.getStatus(),
1✔
1047
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1048
      } else {
1049
        return PickResult.withNoResult();
1✔
1050
      }
1051
    }
1052

1053
    /** Uses Subchannel connected to default target. */
1054
    private PickResult useFallback(PickSubchannelArgs args) {
1055
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1056
      if (picker == null) {
1✔
1057
        return PickResult.withNoResult();
×
1058
      }
1059
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1060
      if (pickResult.hasResult()) {
1✔
1061
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1062
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1063
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1064
            Collections.emptyList());
1✔
1065
      }
1066
      return pickResult;
1✔
1067
    }
1068

1069
    private String determineMetricsPickResult(PickResult pickResult) {
1070
      if (pickResult.getStatus().isOk()) {
1✔
1071
        return "complete";
1✔
1072
      } else if (pickResult.isDrop()) {
1✔
1073
        return "drop";
×
1074
      } else {
1075
        return "fail";
1✔
1076
      }
1077
    }
1078

1079
    // GuardedBy CachingRlsLbClient.lock
1080
    void close() {
1081
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1082
        if (fallbackChildPolicyWrapper != null) {
1✔
1083
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1084
        }
1085
      }
1✔
1086
    }
1✔
1087

1088
    @Override
1089
    public String toString() {
1090
      return MoreObjects.toStringHelper(this)
×
1091
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1092
          .toString();
×
1093
    }
1094
  }
1095

1096
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc