• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20052

05 Nov 2025 03:32PM UTC coverage: 88.552% (+0.03%) from 88.521%
#20052

push

github

ejona86
rls: Add route lookup reason to request whether it is due to a cache miss or stale cache entry (#12442)

b/348690462

34985 of 39508 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.36
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import com.google.errorprone.annotations.CheckReturnValue;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ConnectivityState;
36
import io.grpc.LoadBalancer.Helper;
37
import io.grpc.LoadBalancer.PickResult;
38
import io.grpc.LoadBalancer.PickSubchannelArgs;
39
import io.grpc.LoadBalancer.ResolvedAddresses;
40
import io.grpc.LoadBalancer.SubchannelPicker;
41
import io.grpc.LongCounterMetricInstrument;
42
import io.grpc.LongGaugeMetricInstrument;
43
import io.grpc.ManagedChannel;
44
import io.grpc.ManagedChannelBuilder;
45
import io.grpc.Metadata;
46
import io.grpc.MetricInstrumentRegistry;
47
import io.grpc.MetricRecorder.BatchCallback;
48
import io.grpc.MetricRecorder.BatchRecorder;
49
import io.grpc.MetricRecorder.Registration;
50
import io.grpc.Status;
51
import io.grpc.internal.BackoffPolicy;
52
import io.grpc.internal.ExponentialBackoffPolicy;
53
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
54
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
55
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
57
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
58
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
59
import io.grpc.rls.LruCache.EvictionListener;
60
import io.grpc.rls.LruCache.EvictionType;
61
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
62
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
63
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
64
import io.grpc.rls.RlsProtoData.RouteLookupRequestKey;
65
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
66
import io.grpc.stub.StreamObserver;
67
import io.grpc.util.ForwardingLoadBalancerHelper;
68
import java.net.URI;
69
import java.net.URISyntaxException;
70
import java.util.Arrays;
71
import java.util.Collections;
72
import java.util.HashMap;
73
import java.util.List;
74
import java.util.Map;
75
import java.util.UUID;
76
import java.util.concurrent.Future;
77
import java.util.concurrent.ScheduledExecutorService;
78
import java.util.concurrent.TimeUnit;
79
import javax.annotation.Nullable;
80
import javax.annotation.concurrent.ThreadSafe;
81

82
/**
83
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
84
 * routing by fetching the decision from route lookup server. Every single request is routed by
85
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
86
 */
87
@ThreadSafe
88
final class CachingRlsLbClient {
89

90
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
91
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
92
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
93
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
94
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
95
  public static final int BYTES_PER_CHAR = 2;
96
  public static final int STRING_OVERHEAD_BYTES = 38;
97
  /** Minimum bytes for a Java Object. */
98
  public static final int OBJ_OVERHEAD_B = 16;
99

100
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
102
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
103
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
104
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
105
  private final Registration gaugeRegistration;
106
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
107

108
  // All cache status changes (pending, backoff, success) must be under this lock
109
  private final Object lock = new Object();
1✔
110
  // LRU cache based on access order (BACKOFF and actual data will be here)
111
  @GuardedBy("lock")
112
  private final RlsAsyncLruCache linkedHashLruCache;
113
  private final Future<?> periodicCleaner;
114
  // any RPC on the fly will cached in this map
115
  @GuardedBy("lock")
1✔
116
  private final Map<RouteLookupRequestKey, PendingCacheEntry> pendingCallCache = new HashMap<>();
117

118
  private final ScheduledExecutorService scheduledExecutorService;
119
  private final Ticker ticker;
120
  private final Throttler throttler;
121

122
  private final LbPolicyConfiguration lbPolicyConfig;
123
  private final BackoffPolicy.Provider backoffProvider;
124
  private final long maxAgeNanos;
125
  private final long staleAgeNanos;
126
  private final long callTimeoutNanos;
127

128
  private final RlsLbHelper helper;
129
  private final ManagedChannel rlsChannel;
130
  private final RouteLookupServiceStub rlsStub;
131
  private final RlsPicker rlsPicker;
132
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
133
  @GuardedBy("lock")
134
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
135
  private final ChannelLogger logger;
136
  private final ChildPolicyWrapper fallbackChildPolicyWrapper;
137

138
  static {
139
    MetricInstrumentRegistry metricInstrumentRegistry
140
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
141
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
142
        "grpc.lb.rls.default_target_picks",
143
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
144
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
145
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
1✔
146
        false);
147
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
148
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
149
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
150
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
151
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
152
            "grpc.lb.pick_result"), Collections.emptyList(),
1✔
153
        false);
154
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
155
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
156
            + "RLS channel being throttled", "{pick}",
157
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
158
        Collections.emptyList(), false);
1✔
159
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
160
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
161
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
162
        Collections.emptyList(), false);
1✔
163
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
164
        "EXPERIMENTAL. The current size of the RLS cache", "By",
165
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
166
        Collections.emptyList(), false);
1✔
167
  }
168

169
  private CachingRlsLbClient(Builder builder) {
1✔
170
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
171
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
172
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
173
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
174
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
175
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
176
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
177
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
178
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
179
    linkedHashLruCache =
1✔
180
        new RlsAsyncLruCache(
181
            rlsConfig.cacheSizeBytes(),
1✔
182
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
183
            ticker,
184
            helper);
185
    periodicCleaner =
1✔
186
        scheduledExecutorService.scheduleAtFixedRate(this::periodicClean, 1, 1, TimeUnit.MINUTES);
1✔
187
    logger = helper.getChannelLogger();
1✔
188
    String serverHost = null;
1✔
189
    try {
190
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
191
    } catch (URISyntaxException ignore) {
×
192
      // handled by the following null check
193
    }
1✔
194
    if (serverHost == null) {
1✔
195
      logger.log(
×
196
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
197
      serverHost = helper.getAuthority();
×
198
    }
199
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
200
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
201
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
202
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
203
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
204
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
205
    // called to impose the authority security restrictions.
206
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
207
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
208
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
209
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
210
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
211
    if (routeLookupChannelServiceConfig != null) {
1✔
212
      logger.log(
1✔
213
          ChannelLogLevel.DEBUG,
214
          "RLS channel service config: {0}",
215
          routeLookupChannelServiceConfig);
216
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
217
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
218
    }
219
    rlsChannel = rlsChannelBuilder.build();
1✔
220
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
221
    childLbResolvedAddressFactory =
1✔
222
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
223
    backoffProvider = builder.backoffProvider;
1✔
224
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
225
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
226
    refCountedChildPolicyWrapperFactory =
1✔
227
        new RefCountedChildPolicyWrapperFactory(
228
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
229
            childLbHelperProvider,
230
            new BackoffRefreshListener());
231
    // TODO(creamsoup) wait until lb is ready
232
    String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
233
    if (defaultTarget != null && !defaultTarget.isEmpty()) {
1✔
234
      fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
235
    } else {
236
      fallbackChildPolicyWrapper = null;
1✔
237
    }
238

239
    gaugeRegistration = helper.getMetricRecorder()
1✔
240
        .registerBatchCallback(new BatchCallback() {
1✔
241
          @Override
242
          public void accept(BatchRecorder recorder) {
243
            int estimatedSize;
244
            long estimatedSizeBytes;
245
            synchronized (lock) {
1✔
246
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
247
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
248
            }
1✔
249
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
250
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
251
                    metricsInstanceUuid), Collections.emptyList());
1✔
252
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
253
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
254
                    metricsInstanceUuid), Collections.emptyList());
1✔
255
          }
1✔
256
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
257

258
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
259
  }
1✔
260

261
  void init() {
262
    synchronized (lock) {
1✔
263
      refCountedChildPolicyWrapperFactory.init();
1✔
264
    }
1✔
265
  }
1✔
266

267
  Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
268
    synchronized (lock) {
1✔
269
      return refCountedChildPolicyWrapperFactory.acceptResolvedAddressFactory(
1✔
270
          childLbResolvedAddressFactory);
271
    }
272
  }
273

274
  /**
275
   * Convert the status to UNAVAILABLE and enhance the error message.
276
   * @param status status as provided by server
277
   * @param serverName Used for error description
278
   * @return Transformed status
279
   */
280
  static Status convertRlsServerStatus(Status status, String serverName) {
281
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
282
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
283
                + "RLS server returned: %s: %s",
284
            serverName, status.getCode(), status.getDescription()));
1✔
285
  }
286

287
  private void periodicClean() {
288
    synchronized (lock) {
1✔
289
      linkedHashLruCache.cleanupExpiredEntries();
1✔
290
    }
1✔
291
  }
1✔
292

293
  /** Populates async cache entry for new request. */
294
  @GuardedBy("lock")
295
  private CachedRouteLookupResponse asyncRlsCall(
296
      RouteLookupRequestKey routeLookupRequestKey, @Nullable BackoffPolicy backoffPolicy,
297
      RouteLookupRequest.Reason routeLookupReason) {
298
    if (throttler.shouldThrottle()) {
1✔
299
      logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup",
1✔
300
          routeLookupRequestKey);
301
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
302
      // on this result
303
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
304
          routeLookupRequestKey, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"),
1✔
305
          backoffPolicy));
306
    }
307
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
308
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(
1✔
309
        RouteLookupRequest.create(routeLookupRequestKey.keyMap(), routeLookupReason));
1✔
310
    logger.log(ChannelLogLevel.DEBUG,
1✔
311
        "[RLS Entry {0}] Starting RouteLookup: {1}", routeLookupRequestKey, routeLookupRequest);
312
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
313
        .routeLookup(
1✔
314
            routeLookupRequest,
315
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
316
              @Override
317
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
318
                logger.log(ChannelLogLevel.DEBUG,
1✔
319
                    "[RLS Entry {0}] RouteLookup succeeded: {1}", routeLookupRequestKey, value);
320
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
321
              }
1✔
322

323
              @Override
324
              public void onError(Throwable t) {
325
                logger.log(ChannelLogLevel.DEBUG,
1✔
326
                    "[RLS Entry {0}] RouteLookup failed: {1}", routeLookupRequestKey, t);
327
                response.setException(t);
1✔
328
                throttler.registerBackendResponse(true);
1✔
329
              }
1✔
330

331
              @Override
332
              public void onCompleted() {
333
                throttler.registerBackendResponse(false);
1✔
334
              }
1✔
335
            });
336
    return CachedRouteLookupResponse.pendingResponse(
1✔
337
        createPendingEntry(routeLookupRequestKey, response, backoffPolicy));
1✔
338
  }
339

340
  /**
341
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
342
   * cached, pending and backed-off due to error. The result remains same even if the status is
343
   * changed after the return.
344
   */
345
  @CheckReturnValue
346
  final CachedRouteLookupResponse get(final RouteLookupRequestKey routeLookupRequestKey) {
347
    synchronized (lock) {
1✔
348
      final CacheEntry cacheEntry;
349
      cacheEntry = linkedHashLruCache.read(routeLookupRequestKey);
1✔
350
      if (cacheEntry == null) {
1✔
351
        PendingCacheEntry pendingEntry = pendingCallCache.get(routeLookupRequestKey);
1✔
352
        if (pendingEntry != null) {
1✔
353
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
354
        }
355
        return asyncRlsCall(routeLookupRequestKey, /* backoffPolicy= */ null,
1✔
356
            RouteLookupRequest.Reason.REASON_MISS);
357
      }
358

359
      if (cacheEntry instanceof DataCacheEntry) {
1✔
360
        // cache hit, initiate async-refresh if entry is staled
361
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
362
        if (dataEntry.isStaled(ticker.read())) {
1✔
363
          dataEntry.maybeRefresh();
1✔
364
        }
365
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
366
      }
367
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
368
    }
369
  }
370

371
  /** Performs any pending maintenance operations needed by the cache. */
372
  void close() {
373
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
374
    synchronized (lock) {
1✔
375
      periodicCleaner.cancel(false);
1✔
376
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
377
      linkedHashLruCache.close();
1✔
378
      // TODO(creamsoup) maybe cancel all pending requests
379
      pendingCallCache.clear();
1✔
380
      rlsChannel.shutdownNow();
1✔
381
      rlsPicker.close();
1✔
382
      gaugeRegistration.close();
1✔
383
    }
1✔
384
  }
1✔
385

386
  void requestConnection() {
387
    rlsChannel.getState(true);
×
388
  }
×
389

390
  @GuardedBy("lock")
391
  private PendingCacheEntry createPendingEntry(
392
      RouteLookupRequestKey routeLookupRequestKey,
393
      ListenableFuture<RouteLookupResponse> pendingCall,
394
      @Nullable BackoffPolicy backoffPolicy) {
395
    PendingCacheEntry entry = new PendingCacheEntry(routeLookupRequestKey, pendingCall,
1✔
396
        backoffPolicy);
397
    // Add the entry to the map before adding the Listener, because the listener removes the
398
    // entry from the map
399
    pendingCallCache.put(routeLookupRequestKey, entry);
1✔
400
    // Beware that the listener can run immediately on the current thread
401
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
402
    return entry;
1✔
403
  }
404

405
  private void pendingRpcComplete(PendingCacheEntry entry) {
406
    synchronized (lock) {
1✔
407
      boolean clientClosed = pendingCallCache.remove(entry.routeLookupRequestKey) == null;
1✔
408
      if (clientClosed) {
1✔
409
        return;
1✔
410
      }
411

412
      try {
413
        createDataEntry(entry.routeLookupRequestKey, Futures.getDone(entry.pendingCall));
1✔
414
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
415
        // reattempt picks when the child LB is done connecting
416
      } catch (Exception e) {
1✔
417
        createBackOffEntry(entry.routeLookupRequestKey, Status.fromThrowable(e),
1✔
418
            entry.backoffPolicy);
1✔
419
        // Cache updated. updateBalancingState() to reattempt picks
420
        helper.triggerPendingRpcProcessing();
1✔
421
      }
1✔
422
    }
1✔
423
  }
1✔
424

425
  @GuardedBy("lock")
426
  private DataCacheEntry createDataEntry(
427
      RouteLookupRequestKey routeLookupRequestKey, RouteLookupResponse routeLookupResponse) {
428
    logger.log(
1✔
429
        ChannelLogLevel.DEBUG,
430
        "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}",
431
        routeLookupRequestKey, routeLookupResponse);
432
    DataCacheEntry entry = new DataCacheEntry(routeLookupRequestKey, routeLookupResponse);
1✔
433
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
434
    // this cache update because the lock is held
435
    linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry);
1✔
436
    return entry;
1✔
437
  }
438

439
  @GuardedBy("lock")
440
  private BackoffCacheEntry createBackOffEntry(RouteLookupRequestKey routeLookupRequestKey,
441
      Status status, @Nullable BackoffPolicy backoffPolicy) {
442
    if (backoffPolicy == null) {
1✔
443
      backoffPolicy = backoffProvider.get();
1✔
444
    }
445
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
446
    logger.log(
1✔
447
        ChannelLogLevel.DEBUG,
448
        "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
449
        routeLookupRequestKey, status, delayNanos);
1✔
450
    BackoffCacheEntry entry = new BackoffCacheEntry(routeLookupRequestKey, status, backoffPolicy);
1✔
451
    // Lock is held, so the task can't execute before the assignment
452
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
453
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
454
    linkedHashLruCache.cacheAndClean(routeLookupRequestKey, entry);
1✔
455
    return entry;
1✔
456
  }
457

458
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
459
    synchronized (lock) {
1✔
460
      // This checks whether the task has been cancelled and prevents a second execution.
461
      if (!entry.scheduledFuture.cancel(false)) {
1✔
462
        // Future was previously cancelled
463
        return;
×
464
      }
465
      logger.log(ChannelLogLevel.DEBUG,
1✔
466
          "[RLS Entry {0}] Calling RLS for transition to pending", entry.routeLookupRequestKey);
467
      linkedHashLruCache.invalidate(entry.routeLookupRequestKey);
1✔
468
      asyncRlsCall(entry.routeLookupRequestKey, entry.backoffPolicy,
1✔
469
          RouteLookupRequest.Reason.REASON_MISS);
470
    }
1✔
471
  }
1✔
472

473
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
474

475
    final Helper helper;
476
    private ConnectivityState state;
477
    private SubchannelPicker picker;
478

479
    RlsLbHelper(Helper helper) {
1✔
480
      this.helper = helper;
1✔
481
    }
1✔
482

483
    @Override
484
    protected Helper delegate() {
485
      return helper;
1✔
486
    }
487

488
    @Override
489
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
490
      state = newState;
1✔
491
      picker = newPicker;
1✔
492
      super.updateBalancingState(newState, newPicker);
1✔
493
    }
1✔
494

495
    void triggerPendingRpcProcessing() {
496
      checkState(state != null, "updateBalancingState hasn't yet been called");
1✔
497
      helper.getSynchronizationContext().execute(
1✔
498
          () -> super.updateBalancingState(state, picker));
1✔
499
    }
1✔
500
  }
501

502
  /**
503
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
504
   */
505
  static final class CachedRouteLookupResponse {
506
    // Should only have 1 of following 3 cache entries
507
    @Nullable
508
    private final DataCacheEntry dataCacheEntry;
509
    @Nullable
510
    private final PendingCacheEntry pendingCacheEntry;
511
    @Nullable
512
    private final BackoffCacheEntry backoffCacheEntry;
513

514
    CachedRouteLookupResponse(
515
        DataCacheEntry dataCacheEntry,
516
        PendingCacheEntry pendingCacheEntry,
517
        BackoffCacheEntry backoffCacheEntry) {
1✔
518
      this.dataCacheEntry = dataCacheEntry;
1✔
519
      this.pendingCacheEntry = pendingCacheEntry;
1✔
520
      this.backoffCacheEntry = backoffCacheEntry;
1✔
521
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
522
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
523
          "Expected only 1 cache entry value provided");
524
    }
1✔
525

526
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
527
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
528
    }
529

530
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
531
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
532
    }
533

534
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
535
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
536
    }
537

538
    boolean hasData() {
539
      return dataCacheEntry != null;
1✔
540
    }
541

542
    @Nullable
543
    ChildPolicyWrapper getChildPolicyWrapper() {
544
      if (!hasData()) {
1✔
545
        return null;
×
546
      }
547
      return dataCacheEntry.getChildPolicyWrapper();
1✔
548
    }
549

550
    @VisibleForTesting
551
    @Nullable
552
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
553
      if (!hasData()) {
1✔
554
        return null;
×
555
      }
556
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
557
    }
558

559
    @Nullable
560
    String getHeaderData() {
561
      if (!hasData()) {
1✔
562
        return null;
1✔
563
      }
564
      return dataCacheEntry.getHeaderData();
1✔
565
    }
566

567
    boolean hasError() {
568
      return backoffCacheEntry != null;
1✔
569
    }
570

571
    boolean isPending() {
572
      return pendingCacheEntry != null;
1✔
573
    }
574

575
    @Nullable
576
    Status getStatus() {
577
      if (!hasError()) {
1✔
578
        return null;
×
579
      }
580
      return backoffCacheEntry.getStatus();
1✔
581
    }
582

583
    @Override
584
    public String toString() {
585
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
586
      if (dataCacheEntry != null) {
×
587
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
588
      }
589
      if (pendingCacheEntry != null) {
×
590
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
591
      }
592
      if (backoffCacheEntry != null) {
×
593
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
594
      }
595
      return toStringHelper.toString();
×
596
    }
597
  }
598

599
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
600
  static final class PendingCacheEntry {
601
    private final ListenableFuture<RouteLookupResponse> pendingCall;
602
    private final RouteLookupRequestKey routeLookupRequestKey;
603
    @Nullable
604
    private final BackoffPolicy backoffPolicy;
605

606
    PendingCacheEntry(
607
        RouteLookupRequestKey routeLookupRequestKey,
608
        ListenableFuture<RouteLookupResponse> pendingCall,
609
        @Nullable BackoffPolicy backoffPolicy) {
1✔
610
      this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request");
1✔
611
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
612
      this.backoffPolicy = backoffPolicy;
1✔
613
    }
1✔
614

615
    @Override
616
    public String toString() {
617
      return MoreObjects.toStringHelper(this)
×
618
          .add("routeLookupRequestKey", routeLookupRequestKey)
×
619
          .toString();
×
620
    }
621
  }
622

623
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
624
  abstract static class CacheEntry {
625

626
    protected final RouteLookupRequestKey routeLookupRequestKey;
627

628
    CacheEntry(RouteLookupRequestKey routeLookupRequestKey) {
1✔
629
      this.routeLookupRequestKey = checkNotNull(routeLookupRequestKey, "request");
1✔
630
    }
1✔
631

632
    abstract int getSizeBytes();
633

634
    abstract boolean isExpired(long now);
635

636
    abstract void cleanup();
637

638
    protected boolean isOldEnoughToBeEvicted(long now) {
639
      return true;
×
640
    }
641
  }
642

643
  /** Implementation of {@link CacheEntry} contains valid data. */
644
  final class DataCacheEntry extends CacheEntry {
645
    private final RouteLookupResponse response;
646
    private final long minEvictionTime;
647
    private final long expireTime;
648
    private final long staleTime;
649
    private final List<ChildPolicyWrapper> childPolicyWrappers;
650

651
    // GuardedBy CachingRlsLbClient.lock
652
    DataCacheEntry(RouteLookupRequestKey routeLookupRequestKey,
653
        final RouteLookupResponse response) {
1✔
654
      super(routeLookupRequestKey);
1✔
655
      this.response = checkNotNull(response, "response");
1✔
656
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
657
      childPolicyWrappers =
1✔
658
          refCountedChildPolicyWrapperFactory
1✔
659
              .createOrGet(response.targets());
1✔
660
      long now = ticker.read();
1✔
661
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
662
      expireTime = now + maxAgeNanos;
1✔
663
      staleTime = now + staleAgeNanos;
1✔
664
    }
1✔
665

666
    /**
667
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
668
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
669
     * data still exists. Flow looks like following.
670
     *
671
     * <pre>
672
     * Timeline                       | async refresh
673
     *                                V put new cache (entry2)
674
     * entry1: Pending | hasValue | staled  |
675
     * entry2:                        | OV* | pending | hasValue | staled |
676
     *
677
     * OV: old value
678
     * </pre>
679
     */
680
    void maybeRefresh() {
681
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
682
        if (pendingCallCache.containsKey(routeLookupRequestKey)) {
1✔
683
          // pending already requested
684
          return;
×
685
        }
686
        logger.log(ChannelLogLevel.DEBUG,
1✔
687
            "[RLS Entry {0}] Cache entry is stale, refreshing", routeLookupRequestKey);
688
        asyncRlsCall(routeLookupRequestKey, /* backoffPolicy= */ null,
1✔
689
            RouteLookupRequest.Reason.REASON_STALE);
690
      }
1✔
691
    }
1✔
692

693
    @VisibleForTesting
694
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
695
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
696
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
697
          return childPolicyWrapper;
1✔
698
        }
699
      }
1✔
700

701
      throw new RuntimeException("Target not found:" + target);
×
702
    }
703

704
    @Nullable
705
    ChildPolicyWrapper getChildPolicyWrapper() {
706
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
707
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
708
          return childPolicyWrapper;
1✔
709
        }
710
      }
1✔
711
      return childPolicyWrappers.get(0);
1✔
712
    }
713

714
    String getHeaderData() {
715
      return response.getHeaderData();
1✔
716
    }
717

718
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
719
    int calcStringSize(String target) {
720
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
721
    }
722

723
    @Override
724
    int getSizeBytes() {
725
      int targetSize = 0;
1✔
726
      for (String target : response.targets()) {
1✔
727
        targetSize += calcStringSize(target);
1✔
728
      }
1✔
729
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
730
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
731
    }
732

733
    @Override
734
    boolean isExpired(long now) {
735
      return expireTime - now <= 0;
1✔
736
    }
737

738
    boolean isStaled(long now) {
739
      return staleTime - now <= 0;
1✔
740
    }
741

742
    @Override
743
    protected boolean isOldEnoughToBeEvicted(long now) {
744
      return minEvictionTime - now <= 0;
×
745
    }
746

747
    @Override
748
    void cleanup() {
749
      synchronized (lock) {
1✔
750
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
751
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
752
        }
1✔
753
      }
1✔
754
    }
1✔
755

756
    @Override
757
    public String toString() {
758
      return MoreObjects.toStringHelper(this)
×
759
          .add("request", routeLookupRequestKey)
×
760
          .add("response", response)
×
761
          .add("expireTime", expireTime)
×
762
          .add("staleTime", staleTime)
×
763
          .add("childPolicyWrappers", childPolicyWrappers)
×
764
          .toString();
×
765
    }
766
  }
767

768
  /**
769
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
770
   * status when the backoff time is expired.
771
   */
772
  private static final class BackoffCacheEntry extends CacheEntry {
773

774
    private final Status status;
775
    private final BackoffPolicy backoffPolicy;
776
    private Future<?> scheduledFuture;
777

778
    BackoffCacheEntry(RouteLookupRequestKey routeLookupRequestKey, Status status,
779
        BackoffPolicy backoffPolicy) {
780
      super(routeLookupRequestKey);
1✔
781
      this.status = checkNotNull(status, "status");
1✔
782
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
783
    }
1✔
784

785
    Status getStatus() {
786
      return status;
1✔
787
    }
788

789
    @Override
790
    int getSizeBytes() {
791
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
792
    }
793

794
    @Override
795
    boolean isExpired(long now) {
796
      return scheduledFuture.isDone();
1✔
797
    }
798

799
    @Override
800
    void cleanup() {
801
      scheduledFuture.cancel(false);
1✔
802
    }
1✔
803

804
    @Override
805
    public String toString() {
806
      return MoreObjects.toStringHelper(this)
×
807
          .add("request", routeLookupRequestKey)
×
808
          .add("status", status)
×
809
          .toString();
×
810
    }
811
  }
812

813
  /** Returns a Builder for {@link CachingRlsLbClient}. */
814
  static Builder newBuilder() {
815
    return new Builder();
1✔
816
  }
817

818
  /** A Builder for {@link CachingRlsLbClient}. */
819
  static final class Builder {
1✔
820

821
    private Helper helper;
822
    private LbPolicyConfiguration lbPolicyConfig;
823
    private Throttler throttler = new HappyThrottler();
1✔
824
    private ResolvedAddressFactory resolvedAddressFactory;
825
    private Ticker ticker = Ticker.systemTicker();
1✔
826
    private EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener;
827
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
828

829
    Builder setHelper(Helper helper) {
830
      this.helper = checkNotNull(helper, "helper");
1✔
831
      return this;
1✔
832
    }
833

834
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
835
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
836
      return this;
1✔
837
    }
838

839
    Builder setThrottler(Throttler throttler) {
840
      this.throttler = checkNotNull(throttler, "throttler");
1✔
841
      return this;
1✔
842
    }
843

844
    /**
845
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
846
     */
847
    Builder setResolvedAddressesFactory(
848
        ResolvedAddressFactory resolvedAddressFactory) {
849
      this.resolvedAddressFactory =
1✔
850
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
851
      return this;
1✔
852
    }
853

854
    Builder setTicker(Ticker ticker) {
855
      this.ticker = checkNotNull(ticker, "ticker");
1✔
856
      return this;
1✔
857
    }
858

859
    Builder setEvictionListener(
860
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener) {
861
      this.evictionListener = evictionListener;
1✔
862
      return this;
1✔
863
    }
864

865
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
866
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
867
      return this;
1✔
868
    }
869

870
    CachingRlsLbClient build() {
871
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
872
      client.init();
1✔
873
      return client;
1✔
874
    }
875
  }
876

877
  /**
878
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
879
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
880
   */
881
  private static final class AutoCleaningEvictionListener
882
      implements EvictionListener<RouteLookupRequestKey, CacheEntry> {
883

884
    private final EvictionListener<RouteLookupRequestKey, CacheEntry> delegate;
885

886
    AutoCleaningEvictionListener(
887
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> delegate) {
1✔
888
      this.delegate = delegate;
1✔
889
    }
1✔
890

891
    @Override
892
    public void onEviction(RouteLookupRequestKey key, CacheEntry value, EvictionType cause) {
893
      if (delegate != null) {
1✔
894
        delegate.onEviction(key, value, cause);
1✔
895
      }
896
      // performs cleanup after delegation
897
      value.cleanup();
1✔
898
    }
1✔
899
  }
900

901
  /** A Throttler never throttles. */
902
  private static final class HappyThrottler implements Throttler {
903

904
    @Override
905
    public boolean shouldThrottle() {
906
      return false;
×
907
    }
908

909
    @Override
910
    public void registerBackendResponse(boolean throttled) {
911
      // no-op
912
    }
×
913
  }
914

915
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
916
  private static final class RlsAsyncLruCache
917
      extends LinkedHashLruCache<RouteLookupRequestKey, CacheEntry> {
918
    private final RlsLbHelper helper;
919

920
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
921
        @Nullable EvictionListener<RouteLookupRequestKey, CacheEntry> evictionListener,
922
        Ticker ticker, RlsLbHelper helper) {
923
      super(maxEstimatedSizeBytes, evictionListener, ticker);
1✔
924
      this.helper = checkNotNull(helper, "helper");
1✔
925
    }
1✔
926

927
    @Override
928
    protected boolean isExpired(RouteLookupRequestKey key, CacheEntry value, long nowNanos) {
929
      return value.isExpired(nowNanos);
1✔
930
    }
931

932
    @Override
933
    protected int estimateSizeOf(RouteLookupRequestKey key, CacheEntry value) {
934
      return value.getSizeBytes();
1✔
935
    }
936

937
    @Override
938
    protected boolean shouldInvalidateEldestEntry(
939
        RouteLookupRequestKey eldestKey, CacheEntry eldestValue, long now) {
940
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
941
        return false;
×
942
      }
943

944
      // eldest entry should be evicted if size limit exceeded
945
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
946
    }
947

948
    public CacheEntry cacheAndClean(RouteLookupRequestKey key, CacheEntry value) {
949
      CacheEntry newEntry = cache(key, value);
1✔
950

951
      // force cleanup if new entry pushed cache over max size (in bytes)
952
      if (fitToLimit()) {
1✔
953
        helper.triggerPendingRpcProcessing();
×
954
      }
955
      return newEntry;
1✔
956
    }
957
  }
958

959
  /**
960
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
961
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
962
   */
963
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
964

965
    @Nullable
1✔
966
    private ConnectivityState prevState = null;
967

968
    @Override
969
    public void onStatusChanged(ConnectivityState newState) {
970
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
971
          && newState == ConnectivityState.READY) {
972
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
973
        synchronized (lock) {
1✔
974
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
975
            if (value instanceof BackoffCacheEntry) {
1✔
976
              refreshBackoffEntry((BackoffCacheEntry) value);
×
977
            }
978
          }
1✔
979
        }
1✔
980
      }
981
      prevState = newState;
1✔
982
    }
1✔
983
  }
984

985
  /** A header will be added when RLS server respond with additional header data. */
986
  @VisibleForTesting
987
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
988
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
989

990
  final class RlsPicker extends SubchannelPicker {
991

992
    private final RlsRequestFactory requestFactory;
993
    private final String lookupService;
994

995
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
996
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
997
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
998
    }
1✔
999

1000
    @Override
1001
    public PickResult pickSubchannel(PickSubchannelArgs args) {
1002
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
1003
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
1004
      RlsProtoData.RouteLookupRequestKey lookupRequestKey =
1✔
1005
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
1006
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(lookupRequestKey);
1✔
1007

1008
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
1009
        Metadata headers = args.getHeaders();
1✔
1010
        headers.discardAll(RLS_DATA_KEY);
1✔
1011
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
1012
      }
1013
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1014
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
1015
      if (response.hasData()) {
1✔
1016
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1017
        SubchannelPicker picker =
1018
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1019
        if (picker == null) {
1✔
1020
          return PickResult.withNoResult();
×
1021
        }
1022
        // Happy path
1023
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1024
        if (pickResult.hasResult()) {
1✔
1025
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1026
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1027
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1028
              Collections.emptyList());
1✔
1029
        }
1030
        return pickResult;
1✔
1031
      } else if (response.hasError()) {
1✔
1032
        if (hasFallback) {
1✔
1033
          return useFallback(args);
1✔
1034
        }
1035
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1036
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1037
        return PickResult.withError(
1✔
1038
            convertRlsServerStatus(response.getStatus(),
1✔
1039
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1040
      } else {
1041
        return PickResult.withNoResult();
1✔
1042
      }
1043
    }
1044

1045
    /** Uses Subchannel connected to default target. */
1046
    private PickResult useFallback(PickSubchannelArgs args) {
1047
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1048
      if (picker == null) {
1✔
1049
        return PickResult.withNoResult();
×
1050
      }
1051
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1052
      if (pickResult.hasResult()) {
1✔
1053
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1054
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1055
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1056
            Collections.emptyList());
1✔
1057
      }
1058
      return pickResult;
1✔
1059
    }
1060

1061
    private String determineMetricsPickResult(PickResult pickResult) {
1062
      if (pickResult.getStatus().isOk()) {
1✔
1063
        return "complete";
1✔
1064
      } else if (pickResult.isDrop()) {
1✔
1065
        return "drop";
×
1066
      } else {
1067
        return "fail";
1✔
1068
      }
1069
    }
1070

1071
    // GuardedBy CachingRlsLbClient.lock
1072
    void close() {
1073
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1074
        if (fallbackChildPolicyWrapper != null) {
1✔
1075
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1076
        }
1077
      }
1✔
1078
    }
1✔
1079

1080
    @Override
1081
    public String toString() {
1082
      return MoreObjects.toStringHelper(this)
×
1083
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1084
          .toString();
×
1085
    }
1086
  }
1087

1088
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc