• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19923

24 Jul 2025 11:28AM UTC coverage: 88.569% (+0.004%) from 88.565%
#19923

push

github

web-flow
xds: Do RLS fallback policy eagar start (#12211)

The resource subscription to the fallback target was done only at the time of falling back, which can cause rpcs to fail. This change makes the fallback target to be subscribed and cached earlier, similar to C++ and go gRPC implementations.

34672 of 39147 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.31
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import com.google.errorprone.annotations.CheckReturnValue;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ConnectivityState;
36
import io.grpc.LoadBalancer.Helper;
37
import io.grpc.LoadBalancer.PickResult;
38
import io.grpc.LoadBalancer.PickSubchannelArgs;
39
import io.grpc.LoadBalancer.ResolvedAddresses;
40
import io.grpc.LoadBalancer.SubchannelPicker;
41
import io.grpc.LongCounterMetricInstrument;
42
import io.grpc.LongGaugeMetricInstrument;
43
import io.grpc.ManagedChannel;
44
import io.grpc.ManagedChannelBuilder;
45
import io.grpc.Metadata;
46
import io.grpc.MetricInstrumentRegistry;
47
import io.grpc.MetricRecorder.BatchCallback;
48
import io.grpc.MetricRecorder.BatchRecorder;
49
import io.grpc.MetricRecorder.Registration;
50
import io.grpc.Status;
51
import io.grpc.internal.BackoffPolicy;
52
import io.grpc.internal.ExponentialBackoffPolicy;
53
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
54
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
55
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
57
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
58
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
59
import io.grpc.rls.LruCache.EvictionListener;
60
import io.grpc.rls.LruCache.EvictionType;
61
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
62
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
63
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
64
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
65
import io.grpc.stub.StreamObserver;
66
import io.grpc.util.ForwardingLoadBalancerHelper;
67
import java.net.URI;
68
import java.net.URISyntaxException;
69
import java.util.Arrays;
70
import java.util.Collections;
71
import java.util.HashMap;
72
import java.util.List;
73
import java.util.Map;
74
import java.util.UUID;
75
import java.util.concurrent.Future;
76
import java.util.concurrent.ScheduledExecutorService;
77
import java.util.concurrent.TimeUnit;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
/**
82
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
83
 * routing by fetching the decision from route lookup server. Every single request is routed by
84
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
85
 */
86
@ThreadSafe
87
final class CachingRlsLbClient {
88

89
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
90
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
91
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
92
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
93
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
94
  public static final int BYTES_PER_CHAR = 2;
95
  public static final int STRING_OVERHEAD_BYTES = 38;
96
  /** Minimum bytes for a Java Object. */
97
  public static final int OBJ_OVERHEAD_B = 16;
98

99
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
100
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104
  private final Registration gaugeRegistration;
105
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
106

107
  // All cache status changes (pending, backoff, success) must be under this lock
108
  private final Object lock = new Object();
1✔
109
  // LRU cache based on access order (BACKOFF and actual data will be here)
110
  @GuardedBy("lock")
111
  private final RlsAsyncLruCache linkedHashLruCache;
112
  private final Future<?> periodicCleaner;
113
  // any RPC on the fly will cached in this map
114
  @GuardedBy("lock")
1✔
115
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
116

117
  private final ScheduledExecutorService scheduledExecutorService;
118
  private final Ticker ticker;
119
  private final Throttler throttler;
120

121
  private final LbPolicyConfiguration lbPolicyConfig;
122
  private final BackoffPolicy.Provider backoffProvider;
123
  private final long maxAgeNanos;
124
  private final long staleAgeNanos;
125
  private final long callTimeoutNanos;
126

127
  private final RlsLbHelper helper;
128
  private final ManagedChannel rlsChannel;
129
  private final RouteLookupServiceStub rlsStub;
130
  private final RlsPicker rlsPicker;
131
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
132
  @GuardedBy("lock")
133
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
134
  private final ChannelLogger logger;
135
  private final ChildPolicyWrapper fallbackChildPolicyWrapper;
136

137
  static {
138
    MetricInstrumentRegistry metricInstrumentRegistry
139
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
140
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
141
        "grpc.lb.rls.default_target_picks",
142
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
143
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
144
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
1✔
145
        false);
146
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
147
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
148
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
149
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
150
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
151
            "grpc.lb.pick_result"), Collections.emptyList(),
1✔
152
        false);
153
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
154
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
155
            + "RLS channel being throttled", "{pick}",
156
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
157
        Collections.emptyList(), false);
1✔
158
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
159
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
160
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
161
        Collections.emptyList(), false);
1✔
162
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
163
        "EXPERIMENTAL. The current size of the RLS cache", "By",
164
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
165
        Collections.emptyList(), false);
1✔
166
  }
167

168
  private CachingRlsLbClient(Builder builder) {
1✔
169
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
170
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
171
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
172
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
173
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
174
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
175
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
176
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
177
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
178
    linkedHashLruCache =
1✔
179
        new RlsAsyncLruCache(
180
            rlsConfig.cacheSizeBytes(),
1✔
181
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
182
            ticker,
183
            helper);
184
    periodicCleaner =
1✔
185
        scheduledExecutorService.scheduleAtFixedRate(this::periodicClean, 1, 1, TimeUnit.MINUTES);
1✔
186
    logger = helper.getChannelLogger();
1✔
187
    String serverHost = null;
1✔
188
    try {
189
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
190
    } catch (URISyntaxException ignore) {
×
191
      // handled by the following null check
192
    }
1✔
193
    if (serverHost == null) {
1✔
194
      logger.log(
×
195
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
196
      serverHost = helper.getAuthority();
×
197
    }
198
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
199
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
200
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
201
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
202
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
203
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
204
    // called to impose the authority security restrictions.
205
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
206
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
207
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
208
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
209
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
210
    if (routeLookupChannelServiceConfig != null) {
1✔
211
      logger.log(
1✔
212
          ChannelLogLevel.DEBUG,
213
          "RLS channel service config: {0}",
214
          routeLookupChannelServiceConfig);
215
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
216
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
217
    }
218
    rlsChannel = rlsChannelBuilder.build();
1✔
219
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
220
    childLbResolvedAddressFactory =
1✔
221
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
222
    backoffProvider = builder.backoffProvider;
1✔
223
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
224
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
225
    refCountedChildPolicyWrapperFactory =
1✔
226
        new RefCountedChildPolicyWrapperFactory(
227
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
228
            childLbHelperProvider,
229
            new BackoffRefreshListener());
230
    // TODO(creamsoup) wait until lb is ready
231
    String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
232
    if (defaultTarget != null && !defaultTarget.isEmpty()) {
1✔
233
      fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
234
    } else {
235
      fallbackChildPolicyWrapper = null;
1✔
236
    }
237

238
    gaugeRegistration = helper.getMetricRecorder()
1✔
239
        .registerBatchCallback(new BatchCallback() {
1✔
240
          @Override
241
          public void accept(BatchRecorder recorder) {
242
            int estimatedSize;
243
            long estimatedSizeBytes;
244
            synchronized (lock) {
1✔
245
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
246
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
247
            }
1✔
248
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
249
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
250
                    metricsInstanceUuid), Collections.emptyList());
1✔
251
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
252
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
253
                    metricsInstanceUuid), Collections.emptyList());
1✔
254
          }
1✔
255
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
256

257
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
258
  }
1✔
259

260
  void init() {
261
    synchronized (lock) {
1✔
262
      refCountedChildPolicyWrapperFactory.init();
1✔
263
    }
1✔
264
  }
1✔
265

266
  Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
267
    synchronized (lock) {
1✔
268
      return refCountedChildPolicyWrapperFactory.acceptResolvedAddressFactory(
1✔
269
          childLbResolvedAddressFactory);
270
    }
271
  }
272

273
  /**
274
   * Convert the status to UNAVAILABLE and enhance the error message.
275
   * @param status status as provided by server
276
   * @param serverName Used for error description
277
   * @return Transformed status
278
   */
279
  static Status convertRlsServerStatus(Status status, String serverName) {
280
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
281
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
282
                + "RLS server returned: %s: %s",
283
            serverName, status.getCode(), status.getDescription()));
1✔
284
  }
285

286
  private void periodicClean() {
287
    synchronized (lock) {
1✔
288
      linkedHashLruCache.cleanupExpiredEntries();
1✔
289
    }
1✔
290
  }
1✔
291

292
  /** Populates async cache entry for new request. */
293
  @GuardedBy("lock")
294
  private CachedRouteLookupResponse asyncRlsCall(
295
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
296
    if (throttler.shouldThrottle()) {
1✔
297
      logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup", request);
1✔
298
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
299
      // on this result
300
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
301
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
302
    }
303
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
304
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
305
    logger.log(ChannelLogLevel.DEBUG,
1✔
306
        "[RLS Entry {0}] Starting RouteLookup: {1}", request, routeLookupRequest);
307
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
308
        .routeLookup(
1✔
309
            routeLookupRequest,
310
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
311
              @Override
312
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
313
                logger.log(ChannelLogLevel.DEBUG,
1✔
314
                    "[RLS Entry {0}] RouteLookup succeeded: {1}", request, value);
315
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
316
              }
1✔
317

318
              @Override
319
              public void onError(Throwable t) {
320
                logger.log(ChannelLogLevel.DEBUG,
1✔
321
                    "[RLS Entry {0}] RouteLookup failed: {1}", request, t);
322
                response.setException(t);
1✔
323
                throttler.registerBackendResponse(true);
1✔
324
              }
1✔
325

326
              @Override
327
              public void onCompleted() {
328
                throttler.registerBackendResponse(false);
1✔
329
              }
1✔
330
            });
331
    return CachedRouteLookupResponse.pendingResponse(
1✔
332
        createPendingEntry(request, response, backoffPolicy));
1✔
333
  }
334

335
  /**
336
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
337
   * cached, pending and backed-off due to error. The result remains same even if the status is
338
   * changed after the return.
339
   */
340
  @CheckReturnValue
341
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
342
    synchronized (lock) {
1✔
343
      final CacheEntry cacheEntry;
344
      cacheEntry = linkedHashLruCache.read(request);
1✔
345
      if (cacheEntry == null) {
1✔
346
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
347
        if (pendingEntry != null) {
1✔
348
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
349
        }
350
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
351
      }
352

353
      if (cacheEntry instanceof DataCacheEntry) {
1✔
354
        // cache hit, initiate async-refresh if entry is staled
355
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
356
        if (dataEntry.isStaled(ticker.read())) {
1✔
357
          dataEntry.maybeRefresh();
1✔
358
        }
359
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
360
      }
361
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
362
    }
363
  }
364

365
  /** Performs any pending maintenance operations needed by the cache. */
366
  void close() {
367
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
368
    synchronized (lock) {
1✔
369
      periodicCleaner.cancel(false);
1✔
370
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
371
      linkedHashLruCache.close();
1✔
372
      // TODO(creamsoup) maybe cancel all pending requests
373
      pendingCallCache.clear();
1✔
374
      rlsChannel.shutdownNow();
1✔
375
      rlsPicker.close();
1✔
376
      gaugeRegistration.close();
1✔
377
    }
1✔
378
  }
1✔
379

380
  void requestConnection() {
381
    rlsChannel.getState(true);
×
382
  }
×
383

384
  @GuardedBy("lock")
385
  private PendingCacheEntry createPendingEntry(
386
      RouteLookupRequest request,
387
      ListenableFuture<RouteLookupResponse> pendingCall,
388
      @Nullable BackoffPolicy backoffPolicy) {
389
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
390
    // Add the entry to the map before adding the Listener, because the listener removes the
391
    // entry from the map
392
    pendingCallCache.put(request, entry);
1✔
393
    // Beware that the listener can run immediately on the current thread
394
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
395
    return entry;
1✔
396
  }
397

398
  private void pendingRpcComplete(PendingCacheEntry entry) {
399
    synchronized (lock) {
1✔
400
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
401
      if (clientClosed) {
1✔
402
        return;
1✔
403
      }
404

405
      try {
406
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
407
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
408
        // reattempt picks when the child LB is done connecting
409
      } catch (Exception e) {
1✔
410
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
411
        // Cache updated. updateBalancingState() to reattempt picks
412
        helper.triggerPendingRpcProcessing();
1✔
413
      }
1✔
414
    }
1✔
415
  }
1✔
416

417
  @GuardedBy("lock")
418
  private DataCacheEntry createDataEntry(
419
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
420
    logger.log(
1✔
421
        ChannelLogLevel.DEBUG,
422
        "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}",
423
        request, routeLookupResponse);
424
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
425
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
426
    // this cache update because the lock is held
427
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
428
    return entry;
1✔
429
  }
430

431
  @GuardedBy("lock")
432
  private BackoffCacheEntry createBackOffEntry(
433
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
434
    if (backoffPolicy == null) {
1✔
435
      backoffPolicy = backoffProvider.get();
1✔
436
    }
437
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
438
    logger.log(
1✔
439
        ChannelLogLevel.DEBUG,
440
        "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
441
        request, status, delayNanos);
1✔
442
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
443
    // Lock is held, so the task can't execute before the assignment
444
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
445
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
446
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
447
    return entry;
1✔
448
  }
449

450
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
451
    synchronized (lock) {
1✔
452
      // This checks whether the task has been cancelled and prevents a second execution.
453
      if (!entry.scheduledFuture.cancel(false)) {
1✔
454
        // Future was previously cancelled
455
        return;
×
456
      }
457
      logger.log(ChannelLogLevel.DEBUG,
1✔
458
          "[RLS Entry {0}] Calling RLS for transition to pending", entry.request);
459
      linkedHashLruCache.invalidate(entry.request);
1✔
460
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
461
    }
1✔
462
  }
1✔
463

464
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
465

466
    final Helper helper;
467
    private ConnectivityState state;
468
    private SubchannelPicker picker;
469

470
    RlsLbHelper(Helper helper) {
1✔
471
      this.helper = helper;
1✔
472
    }
1✔
473

474
    @Override
475
    protected Helper delegate() {
476
      return helper;
1✔
477
    }
478

479
    @Override
480
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
481
      state = newState;
1✔
482
      picker = newPicker;
1✔
483
      super.updateBalancingState(newState, newPicker);
1✔
484
    }
1✔
485

486
    void triggerPendingRpcProcessing() {
487
      checkState(state != null, "updateBalancingState hasn't yet been called");
1✔
488
      helper.getSynchronizationContext().execute(
1✔
489
          () -> super.updateBalancingState(state, picker));
1✔
490
    }
1✔
491
  }
492

493
  /**
494
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
495
   */
496
  static final class CachedRouteLookupResponse {
497
    // Should only have 1 of following 3 cache entries
498
    @Nullable
499
    private final DataCacheEntry dataCacheEntry;
500
    @Nullable
501
    private final PendingCacheEntry pendingCacheEntry;
502
    @Nullable
503
    private final BackoffCacheEntry backoffCacheEntry;
504

505
    CachedRouteLookupResponse(
506
        DataCacheEntry dataCacheEntry,
507
        PendingCacheEntry pendingCacheEntry,
508
        BackoffCacheEntry backoffCacheEntry) {
1✔
509
      this.dataCacheEntry = dataCacheEntry;
1✔
510
      this.pendingCacheEntry = pendingCacheEntry;
1✔
511
      this.backoffCacheEntry = backoffCacheEntry;
1✔
512
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
513
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
514
          "Expected only 1 cache entry value provided");
515
    }
1✔
516

517
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
518
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
519
    }
520

521
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
522
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
523
    }
524

525
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
526
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
527
    }
528

529
    boolean hasData() {
530
      return dataCacheEntry != null;
1✔
531
    }
532

533
    @Nullable
534
    ChildPolicyWrapper getChildPolicyWrapper() {
535
      if (!hasData()) {
1✔
536
        return null;
×
537
      }
538
      return dataCacheEntry.getChildPolicyWrapper();
1✔
539
    }
540

541
    @VisibleForTesting
542
    @Nullable
543
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
544
      if (!hasData()) {
1✔
545
        return null;
×
546
      }
547
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
548
    }
549

550
    @Nullable
551
    String getHeaderData() {
552
      if (!hasData()) {
1✔
553
        return null;
1✔
554
      }
555
      return dataCacheEntry.getHeaderData();
1✔
556
    }
557

558
    boolean hasError() {
559
      return backoffCacheEntry != null;
1✔
560
    }
561

562
    boolean isPending() {
563
      return pendingCacheEntry != null;
1✔
564
    }
565

566
    @Nullable
567
    Status getStatus() {
568
      if (!hasError()) {
1✔
569
        return null;
×
570
      }
571
      return backoffCacheEntry.getStatus();
1✔
572
    }
573

574
    @Override
575
    public String toString() {
576
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
577
      if (dataCacheEntry != null) {
×
578
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
579
      }
580
      if (pendingCacheEntry != null) {
×
581
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
582
      }
583
      if (backoffCacheEntry != null) {
×
584
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
585
      }
586
      return toStringHelper.toString();
×
587
    }
588
  }
589

590
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
591
  static final class PendingCacheEntry {
592
    private final ListenableFuture<RouteLookupResponse> pendingCall;
593
    private final RouteLookupRequest request;
594
    @Nullable
595
    private final BackoffPolicy backoffPolicy;
596

597
    PendingCacheEntry(
598
        RouteLookupRequest request,
599
        ListenableFuture<RouteLookupResponse> pendingCall,
600
        @Nullable BackoffPolicy backoffPolicy) {
1✔
601
      this.request = checkNotNull(request, "request");
1✔
602
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
603
      this.backoffPolicy = backoffPolicy;
1✔
604
    }
1✔
605

606
    @Override
607
    public String toString() {
608
      return MoreObjects.toStringHelper(this)
×
609
          .add("request", request)
×
610
          .toString();
×
611
    }
612
  }
613

614
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
615
  abstract static class CacheEntry {
616

617
    protected final RouteLookupRequest request;
618

619
    CacheEntry(RouteLookupRequest request) {
1✔
620
      this.request = checkNotNull(request, "request");
1✔
621
    }
1✔
622

623
    abstract int getSizeBytes();
624

625
    abstract boolean isExpired(long now);
626

627
    abstract void cleanup();
628

629
    protected boolean isOldEnoughToBeEvicted(long now) {
630
      return true;
×
631
    }
632
  }
633

634
  /** Implementation of {@link CacheEntry} contains valid data. */
635
  final class DataCacheEntry extends CacheEntry {
636
    private final RouteLookupResponse response;
637
    private final long minEvictionTime;
638
    private final long expireTime;
639
    private final long staleTime;
640
    private final List<ChildPolicyWrapper> childPolicyWrappers;
641

642
    // GuardedBy CachingRlsLbClient.lock
643
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
644
      super(request);
1✔
645
      this.response = checkNotNull(response, "response");
1✔
646
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
647
      childPolicyWrappers =
1✔
648
          refCountedChildPolicyWrapperFactory
1✔
649
              .createOrGet(response.targets());
1✔
650
      long now = ticker.read();
1✔
651
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
652
      expireTime = now + maxAgeNanos;
1✔
653
      staleTime = now + staleAgeNanos;
1✔
654
    }
1✔
655

656
    /**
657
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
658
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
659
     * data still exists. Flow looks like following.
660
     *
661
     * <pre>
662
     * Timeline                       | async refresh
663
     *                                V put new cache (entry2)
664
     * entry1: Pending | hasValue | staled  |
665
     * entry2:                        | OV* | pending | hasValue | staled |
666
     *
667
     * OV: old value
668
     * </pre>
669
     */
670
    void maybeRefresh() {
671
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
672
        if (pendingCallCache.containsKey(request)) {
1✔
673
          // pending already requested
674
          return;
×
675
        }
676
        logger.log(ChannelLogLevel.DEBUG,
1✔
677
            "[RLS Entry {0}] Cache entry is stale, refreshing", request);
678
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
679
      }
1✔
680
    }
1✔
681

682
    @VisibleForTesting
683
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
684
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
685
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
686
          return childPolicyWrapper;
1✔
687
        }
688
      }
1✔
689

690
      throw new RuntimeException("Target not found:" + target);
×
691
    }
692

693
    @Nullable
694
    ChildPolicyWrapper getChildPolicyWrapper() {
695
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
696
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
697
          return childPolicyWrapper;
1✔
698
        }
699
      }
1✔
700
      return childPolicyWrappers.get(0);
1✔
701
    }
702

703
    String getHeaderData() {
704
      return response.getHeaderData();
1✔
705
    }
706

707
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
708
    int calcStringSize(String target) {
709
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
710
    }
711

712
    @Override
713
    int getSizeBytes() {
714
      int targetSize = 0;
1✔
715
      for (String target : response.targets()) {
1✔
716
        targetSize += calcStringSize(target);
1✔
717
      }
1✔
718
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
719
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
720
    }
721

722
    @Override
723
    boolean isExpired(long now) {
724
      return expireTime - now <= 0;
1✔
725
    }
726

727
    boolean isStaled(long now) {
728
      return staleTime - now <= 0;
1✔
729
    }
730

731
    @Override
732
    protected boolean isOldEnoughToBeEvicted(long now) {
733
      return minEvictionTime - now <= 0;
×
734
    }
735

736
    @Override
737
    void cleanup() {
738
      synchronized (lock) {
1✔
739
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
740
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
741
        }
1✔
742
      }
1✔
743
    }
1✔
744

745
    @Override
746
    public String toString() {
747
      return MoreObjects.toStringHelper(this)
×
748
          .add("request", request)
×
749
          .add("response", response)
×
750
          .add("expireTime", expireTime)
×
751
          .add("staleTime", staleTime)
×
752
          .add("childPolicyWrappers", childPolicyWrappers)
×
753
          .toString();
×
754
    }
755
  }
756

757
  /**
758
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
759
   * status when the backoff time is expired.
760
   */
761
  private static final class BackoffCacheEntry extends CacheEntry {
762

763
    private final Status status;
764
    private final BackoffPolicy backoffPolicy;
765
    private Future<?> scheduledFuture;
766

767
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
768
      super(request);
1✔
769
      this.status = checkNotNull(status, "status");
1✔
770
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
771
    }
1✔
772

773
    Status getStatus() {
774
      return status;
1✔
775
    }
776

777
    @Override
778
    int getSizeBytes() {
779
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
780
    }
781

782
    @Override
783
    boolean isExpired(long now) {
784
      return scheduledFuture.isDone();
1✔
785
    }
786

787
    @Override
788
    void cleanup() {
789
      scheduledFuture.cancel(false);
1✔
790
    }
1✔
791

792
    @Override
793
    public String toString() {
794
      return MoreObjects.toStringHelper(this)
×
795
          .add("request", request)
×
796
          .add("status", status)
×
797
          .toString();
×
798
    }
799
  }
800

801
  /** Returns a Builder for {@link CachingRlsLbClient}. */
802
  static Builder newBuilder() {
803
    return new Builder();
1✔
804
  }
805

806
  /** A Builder for {@link CachingRlsLbClient}. */
807
  static final class Builder {
1✔
808

809
    private Helper helper;
810
    private LbPolicyConfiguration lbPolicyConfig;
811
    private Throttler throttler = new HappyThrottler();
1✔
812
    private ResolvedAddressFactory resolvedAddressFactory;
813
    private Ticker ticker = Ticker.systemTicker();
1✔
814
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
815
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
816

817
    Builder setHelper(Helper helper) {
818
      this.helper = checkNotNull(helper, "helper");
1✔
819
      return this;
1✔
820
    }
821

822
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
823
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
824
      return this;
1✔
825
    }
826

827
    Builder setThrottler(Throttler throttler) {
828
      this.throttler = checkNotNull(throttler, "throttler");
1✔
829
      return this;
1✔
830
    }
831

832
    /**
833
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
834
     */
835
    Builder setResolvedAddressesFactory(
836
        ResolvedAddressFactory resolvedAddressFactory) {
837
      this.resolvedAddressFactory =
1✔
838
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
839
      return this;
1✔
840
    }
841

842
    Builder setTicker(Ticker ticker) {
843
      this.ticker = checkNotNull(ticker, "ticker");
1✔
844
      return this;
1✔
845
    }
846

847
    Builder setEvictionListener(
848
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
849
      this.evictionListener = evictionListener;
1✔
850
      return this;
1✔
851
    }
852

853
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
854
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
855
      return this;
1✔
856
    }
857

858
    CachingRlsLbClient build() {
859
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
860
      client.init();
1✔
861
      return client;
1✔
862
    }
863
  }
864

865
  /**
866
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
867
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
868
   */
869
  private static final class AutoCleaningEvictionListener
870
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
871

872
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
873

874
    AutoCleaningEvictionListener(
875
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
876
      this.delegate = delegate;
1✔
877
    }
1✔
878

879
    @Override
880
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
881
      if (delegate != null) {
1✔
882
        delegate.onEviction(key, value, cause);
1✔
883
      }
884
      // performs cleanup after delegation
885
      value.cleanup();
1✔
886
    }
1✔
887
  }
888

889
  /** A Throttler never throttles. */
890
  private static final class HappyThrottler implements Throttler {
891

892
    @Override
893
    public boolean shouldThrottle() {
894
      return false;
×
895
    }
896

897
    @Override
898
    public void registerBackendResponse(boolean throttled) {
899
      // no-op
900
    }
×
901
  }
902

903
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
904
  private static final class RlsAsyncLruCache
905
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
906
    private final RlsLbHelper helper;
907

908
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
909
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
910
        Ticker ticker, RlsLbHelper helper) {
911
      super(maxEstimatedSizeBytes, evictionListener, ticker);
1✔
912
      this.helper = checkNotNull(helper, "helper");
1✔
913
    }
1✔
914

915
    @Override
916
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
917
      return value.isExpired(nowNanos);
1✔
918
    }
919

920
    @Override
921
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
922
      return value.getSizeBytes();
1✔
923
    }
924

925
    @Override
926
    protected boolean shouldInvalidateEldestEntry(
927
        RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) {
928
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
929
        return false;
×
930
      }
931

932
      // eldest entry should be evicted if size limit exceeded
933
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
934
    }
935

936
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
937
      CacheEntry newEntry = cache(key, value);
1✔
938

939
      // force cleanup if new entry pushed cache over max size (in bytes)
940
      if (fitToLimit()) {
1✔
941
        helper.triggerPendingRpcProcessing();
×
942
      }
943
      return newEntry;
1✔
944
    }
945
  }
946

947
  /**
948
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
949
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
950
   */
951
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
952

953
    @Nullable
1✔
954
    private ConnectivityState prevState = null;
955

956
    @Override
957
    public void onStatusChanged(ConnectivityState newState) {
958
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
959
          && newState == ConnectivityState.READY) {
960
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
961
        synchronized (lock) {
1✔
962
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
963
            if (value instanceof BackoffCacheEntry) {
1✔
964
              refreshBackoffEntry((BackoffCacheEntry) value);
×
965
            }
966
          }
1✔
967
        }
1✔
968
      }
969
      prevState = newState;
1✔
970
    }
1✔
971
  }
972

973
  /** A header will be added when RLS server respond with additional header data. */
974
  @VisibleForTesting
975
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
976
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
977

978
  final class RlsPicker extends SubchannelPicker {
979

980
    private final RlsRequestFactory requestFactory;
981
    private final String lookupService;
982

983
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
984
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
985
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
986
    }
1✔
987

988
    @Override
989
    public PickResult pickSubchannel(PickSubchannelArgs args) {
990
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
991
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
992
      RouteLookupRequest request =
1✔
993
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
994
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
995

996
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
997
        Metadata headers = args.getHeaders();
1✔
998
        headers.discardAll(RLS_DATA_KEY);
1✔
999
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
1000
      }
1001
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1002
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
1003
      if (response.hasData()) {
1✔
1004
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1005
        SubchannelPicker picker =
1006
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1007
        if (picker == null) {
1✔
1008
          return PickResult.withNoResult();
×
1009
        }
1010
        // Happy path
1011
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1012
        if (pickResult.hasResult()) {
1✔
1013
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1014
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1015
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1016
              Collections.emptyList());
1✔
1017
        }
1018
        return pickResult;
1✔
1019
      } else if (response.hasError()) {
1✔
1020
        if (hasFallback) {
1✔
1021
          return useFallback(args);
1✔
1022
        }
1023
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1024
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1025
        return PickResult.withError(
1✔
1026
            convertRlsServerStatus(response.getStatus(),
1✔
1027
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1028
      } else {
1029
        return PickResult.withNoResult();
1✔
1030
      }
1031
    }
1032

1033
    /** Uses Subchannel connected to default target. */
1034
    private PickResult useFallback(PickSubchannelArgs args) {
1035
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1036
      if (picker == null) {
1✔
1037
        return PickResult.withNoResult();
×
1038
      }
1039
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1040
      if (pickResult.hasResult()) {
1✔
1041
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1042
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1043
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1044
            Collections.emptyList());
1✔
1045
      }
1046
      return pickResult;
1✔
1047
    }
1048

1049
    private String determineMetricsPickResult(PickResult pickResult) {
1050
      if (pickResult.getStatus().isOk()) {
1✔
1051
        return "complete";
1✔
1052
      } else if (pickResult.isDrop()) {
1✔
1053
        return "drop";
×
1054
      } else {
1055
        return "fail";
1✔
1056
      }
1057
    }
1058

1059
    // GuardedBy CachingRlsLbClient.lock
1060
    void close() {
1061
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1062
        if (fallbackChildPolicyWrapper != null) {
1✔
1063
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1064
        }
1065
      }
1✔
1066
    }
1✔
1067

1068
    @Override
1069
    public String toString() {
1070
      return MoreObjects.toStringHelper(this)
×
1071
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1072
          .toString();
×
1073
    }
1074
  }
1075

1076
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc