• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19900

09 Jul 2025 02:53PM UTC coverage: 88.528% (-0.001%) from 88.529%
#19900

push

github

ejona86
Fix RLS regressions from XdsDepMan conversion

297ab05ef converted CDS to XdsDependencyManager. This caused three
regressions:

 * CdsLB2 as a RLS child would always fail with "Unable to find
   non-dynamic root cluster" because is_dynamic=true was missing in
   its service config
 * XdsNameResolver only propagated resolution updates when the clusters
   changed, so a CdsUpdate change would be ignored. This caused a hang
   for RLS even with is_dynamic=true. For non-RLS the lack config update
   broke the circuit breaking psm interop test. This would have been
   more severe if ClusterResolverLb had been converted to
   XdsDependenceManager, as it would have ignored EDS updates
 * RLS did not propagate resolution updates, so CdsLB2 even with
   is_dynamic=true the CdsUpdate for the new cluster would never arrive,
   causing a hang

b/428120265
b/427912384

34649 of 39139 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.68
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import com.google.errorprone.annotations.CheckReturnValue;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ConnectivityState;
36
import io.grpc.LoadBalancer.Helper;
37
import io.grpc.LoadBalancer.PickResult;
38
import io.grpc.LoadBalancer.PickSubchannelArgs;
39
import io.grpc.LoadBalancer.ResolvedAddresses;
40
import io.grpc.LoadBalancer.SubchannelPicker;
41
import io.grpc.LongCounterMetricInstrument;
42
import io.grpc.LongGaugeMetricInstrument;
43
import io.grpc.ManagedChannel;
44
import io.grpc.ManagedChannelBuilder;
45
import io.grpc.Metadata;
46
import io.grpc.MetricInstrumentRegistry;
47
import io.grpc.MetricRecorder.BatchCallback;
48
import io.grpc.MetricRecorder.BatchRecorder;
49
import io.grpc.MetricRecorder.Registration;
50
import io.grpc.Status;
51
import io.grpc.internal.BackoffPolicy;
52
import io.grpc.internal.ExponentialBackoffPolicy;
53
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
54
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
55
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
57
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
58
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
59
import io.grpc.rls.LruCache.EvictionListener;
60
import io.grpc.rls.LruCache.EvictionType;
61
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
62
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
63
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
64
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
65
import io.grpc.stub.StreamObserver;
66
import io.grpc.util.ForwardingLoadBalancerHelper;
67
import java.net.URI;
68
import java.net.URISyntaxException;
69
import java.util.Arrays;
70
import java.util.Collections;
71
import java.util.HashMap;
72
import java.util.List;
73
import java.util.Map;
74
import java.util.UUID;
75
import java.util.concurrent.Future;
76
import java.util.concurrent.ScheduledExecutorService;
77
import java.util.concurrent.TimeUnit;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
/**
82
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
83
 * routing by fetching the decision from route lookup server. Every single request is routed by
84
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
85
 */
86
@ThreadSafe
87
final class CachingRlsLbClient {
88

89
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
90
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
91
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
92
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
93
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
94
  public static final int BYTES_PER_CHAR = 2;
95
  public static final int STRING_OVERHEAD_BYTES = 38;
96
  /** Minimum bytes for a Java Object. */
97
  public static final int OBJ_OVERHEAD_B = 16;
98

99
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
100
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104
  private final Registration gaugeRegistration;
105
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
106

107
  // All cache status changes (pending, backoff, success) must be under this lock
108
  private final Object lock = new Object();
1✔
109
  // LRU cache based on access order (BACKOFF and actual data will be here)
110
  @GuardedBy("lock")
111
  private final RlsAsyncLruCache linkedHashLruCache;
112
  private final Future<?> periodicCleaner;
113
  // any RPC on the fly will cached in this map
114
  @GuardedBy("lock")
1✔
115
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
116

117
  private final ScheduledExecutorService scheduledExecutorService;
118
  private final Ticker ticker;
119
  private final Throttler throttler;
120

121
  private final LbPolicyConfiguration lbPolicyConfig;
122
  private final BackoffPolicy.Provider backoffProvider;
123
  private final long maxAgeNanos;
124
  private final long staleAgeNanos;
125
  private final long callTimeoutNanos;
126

127
  private final RlsLbHelper helper;
128
  private final ManagedChannel rlsChannel;
129
  private final RouteLookupServiceStub rlsStub;
130
  private final RlsPicker rlsPicker;
131
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
132
  @GuardedBy("lock")
133
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
134
  private final ChannelLogger logger;
135

136
  static {
137
    MetricInstrumentRegistry metricInstrumentRegistry
138
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
139
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
140
        "grpc.lb.rls.default_target_picks",
141
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
142
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
143
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
1✔
144
        false);
145
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
146
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
147
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
148
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
149
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
150
            "grpc.lb.pick_result"), Collections.emptyList(),
1✔
151
        false);
152
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
153
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
154
            + "RLS channel being throttled", "{pick}",
155
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
156
        Collections.emptyList(), false);
1✔
157
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
158
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
159
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
160
        Collections.emptyList(), false);
1✔
161
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
162
        "EXPERIMENTAL. The current size of the RLS cache", "By",
163
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
164
        Collections.emptyList(), false);
1✔
165
  }
166

167
  private CachingRlsLbClient(Builder builder) {
1✔
168
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
169
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
170
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
171
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
172
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
173
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
174
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
175
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
176
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
177
    linkedHashLruCache =
1✔
178
        new RlsAsyncLruCache(
179
            rlsConfig.cacheSizeBytes(),
1✔
180
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
181
            ticker,
182
            helper);
183
    periodicCleaner =
1✔
184
        scheduledExecutorService.scheduleAtFixedRate(this::periodicClean, 1, 1, TimeUnit.MINUTES);
1✔
185
    logger = helper.getChannelLogger();
1✔
186
    String serverHost = null;
1✔
187
    try {
188
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
189
    } catch (URISyntaxException ignore) {
×
190
      // handled by the following null check
191
    }
1✔
192
    if (serverHost == null) {
1✔
193
      logger.log(
×
194
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
195
      serverHost = helper.getAuthority();
×
196
    }
197
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
198
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
199
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
200
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
201
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
202
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
203
    // called to impose the authority security restrictions.
204
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
205
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
206
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
207
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
208
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
209
    if (routeLookupChannelServiceConfig != null) {
1✔
210
      logger.log(
1✔
211
          ChannelLogLevel.DEBUG,
212
          "RLS channel service config: {0}",
213
          routeLookupChannelServiceConfig);
214
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
215
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
216
    }
217
    rlsChannel = rlsChannelBuilder.build();
1✔
218
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
219
    childLbResolvedAddressFactory =
1✔
220
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
221
    backoffProvider = builder.backoffProvider;
1✔
222
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
223
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
224
    refCountedChildPolicyWrapperFactory =
1✔
225
        new RefCountedChildPolicyWrapperFactory(
226
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
227
            childLbHelperProvider,
228
            new BackoffRefreshListener());
229

230
    gaugeRegistration = helper.getMetricRecorder()
1✔
231
        .registerBatchCallback(new BatchCallback() {
1✔
232
          @Override
233
          public void accept(BatchRecorder recorder) {
234
            int estimatedSize;
235
            long estimatedSizeBytes;
236
            synchronized (lock) {
1✔
237
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
238
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
239
            }
1✔
240
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
241
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
242
                    metricsInstanceUuid), Collections.emptyList());
1✔
243
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
244
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
245
                    metricsInstanceUuid), Collections.emptyList());
1✔
246
          }
1✔
247
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
248

249
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
250
  }
1✔
251

252
  void init() {
253
    synchronized (lock) {
1✔
254
      refCountedChildPolicyWrapperFactory.init();
1✔
255
    }
1✔
256
  }
1✔
257

258
  Status acceptResolvedAddressFactory(ResolvedAddressFactory childLbResolvedAddressFactory) {
259
    synchronized (lock) {
1✔
260
      return refCountedChildPolicyWrapperFactory.acceptResolvedAddressFactory(
1✔
261
          childLbResolvedAddressFactory);
262
    }
263
  }
264

265
  /**
266
   * Convert the status to UNAVAILABLE and enhance the error message.
267
   * @param status status as provided by server
268
   * @param serverName Used for error description
269
   * @return Transformed status
270
   */
271
  static Status convertRlsServerStatus(Status status, String serverName) {
272
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
273
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
274
                + "RLS server returned: %s: %s",
275
            serverName, status.getCode(), status.getDescription()));
1✔
276
  }
277

278
  private void periodicClean() {
279
    synchronized (lock) {
1✔
280
      linkedHashLruCache.cleanupExpiredEntries();
1✔
281
    }
1✔
282
  }
1✔
283

284
  /** Populates async cache entry for new request. */
285
  @GuardedBy("lock")
286
  private CachedRouteLookupResponse asyncRlsCall(
287
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
288
    if (throttler.shouldThrottle()) {
1✔
289
      logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup", request);
1✔
290
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
291
      // on this result
292
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
293
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
294
    }
295
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
296
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
297
    logger.log(ChannelLogLevel.DEBUG,
1✔
298
        "[RLS Entry {0}] Starting RouteLookup: {1}", request, routeLookupRequest);
299
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
300
        .routeLookup(
1✔
301
            routeLookupRequest,
302
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
303
              @Override
304
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
305
                logger.log(ChannelLogLevel.DEBUG,
1✔
306
                    "[RLS Entry {0}] RouteLookup succeeded: {1}", request, value);
307
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
308
              }
1✔
309

310
              @Override
311
              public void onError(Throwable t) {
312
                logger.log(ChannelLogLevel.DEBUG,
1✔
313
                    "[RLS Entry {0}] RouteLookup failed: {1}", request, t);
314
                response.setException(t);
1✔
315
                throttler.registerBackendResponse(true);
1✔
316
              }
1✔
317

318
              @Override
319
              public void onCompleted() {
320
                throttler.registerBackendResponse(false);
1✔
321
              }
1✔
322
            });
323
    return CachedRouteLookupResponse.pendingResponse(
1✔
324
        createPendingEntry(request, response, backoffPolicy));
1✔
325
  }
326

327
  /**
328
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
329
   * cached, pending and backed-off due to error. The result remains same even if the status is
330
   * changed after the return.
331
   */
332
  @CheckReturnValue
333
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
334
    synchronized (lock) {
1✔
335
      final CacheEntry cacheEntry;
336
      cacheEntry = linkedHashLruCache.read(request);
1✔
337
      if (cacheEntry == null) {
1✔
338
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
339
        if (pendingEntry != null) {
1✔
340
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
341
        }
342
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
343
      }
344

345
      if (cacheEntry instanceof DataCacheEntry) {
1✔
346
        // cache hit, initiate async-refresh if entry is staled
347
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
348
        if (dataEntry.isStaled(ticker.read())) {
1✔
349
          dataEntry.maybeRefresh();
1✔
350
        }
351
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
352
      }
353
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
354
    }
355
  }
356

357
  /** Performs any pending maintenance operations needed by the cache. */
358
  void close() {
359
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
360
    synchronized (lock) {
1✔
361
      periodicCleaner.cancel(false);
1✔
362
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
363
      linkedHashLruCache.close();
1✔
364
      // TODO(creamsoup) maybe cancel all pending requests
365
      pendingCallCache.clear();
1✔
366
      rlsChannel.shutdownNow();
1✔
367
      rlsPicker.close();
1✔
368
      gaugeRegistration.close();
1✔
369
    }
1✔
370
  }
1✔
371

372
  void requestConnection() {
373
    rlsChannel.getState(true);
×
374
  }
×
375

376
  @GuardedBy("lock")
377
  private PendingCacheEntry createPendingEntry(
378
      RouteLookupRequest request,
379
      ListenableFuture<RouteLookupResponse> pendingCall,
380
      @Nullable BackoffPolicy backoffPolicy) {
381
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
382
    // Add the entry to the map before adding the Listener, because the listener removes the
383
    // entry from the map
384
    pendingCallCache.put(request, entry);
1✔
385
    // Beware that the listener can run immediately on the current thread
386
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
387
    return entry;
1✔
388
  }
389

390
  private void pendingRpcComplete(PendingCacheEntry entry) {
391
    synchronized (lock) {
1✔
392
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
393
      if (clientClosed) {
1✔
394
        return;
1✔
395
      }
396

397
      try {
398
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
399
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
400
        // reattempt picks when the child LB is done connecting
401
      } catch (Exception e) {
1✔
402
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
403
        // Cache updated. updateBalancingState() to reattempt picks
404
        helper.triggerPendingRpcProcessing();
1✔
405
      }
1✔
406
    }
1✔
407
  }
1✔
408

409
  @GuardedBy("lock")
410
  private DataCacheEntry createDataEntry(
411
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
412
    logger.log(
1✔
413
        ChannelLogLevel.DEBUG,
414
        "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}",
415
        request, routeLookupResponse);
416
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
417
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
418
    // this cache update because the lock is held
419
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
420
    return entry;
1✔
421
  }
422

423
  @GuardedBy("lock")
424
  private BackoffCacheEntry createBackOffEntry(
425
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
426
    if (backoffPolicy == null) {
1✔
427
      backoffPolicy = backoffProvider.get();
1✔
428
    }
429
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
430
    logger.log(
1✔
431
        ChannelLogLevel.DEBUG,
432
        "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
433
        request, status, delayNanos);
1✔
434
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
435
    // Lock is held, so the task can't execute before the assignment
436
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
437
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
438
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
439
    return entry;
1✔
440
  }
441

442
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
443
    synchronized (lock) {
1✔
444
      // This checks whether the task has been cancelled and prevents a second execution.
445
      if (!entry.scheduledFuture.cancel(false)) {
1✔
446
        // Future was previously cancelled
447
        return;
×
448
      }
449
      logger.log(ChannelLogLevel.DEBUG,
1✔
450
          "[RLS Entry {0}] Calling RLS for transition to pending", entry.request);
451
      linkedHashLruCache.invalidate(entry.request);
1✔
452
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
453
    }
1✔
454
  }
1✔
455

456
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
457

458
    final Helper helper;
459
    private ConnectivityState state;
460
    private SubchannelPicker picker;
461

462
    RlsLbHelper(Helper helper) {
1✔
463
      this.helper = helper;
1✔
464
    }
1✔
465

466
    @Override
467
    protected Helper delegate() {
468
      return helper;
1✔
469
    }
470

471
    @Override
472
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
473
      state = newState;
1✔
474
      picker = newPicker;
1✔
475
      super.updateBalancingState(newState, newPicker);
1✔
476
    }
1✔
477

478
    void triggerPendingRpcProcessing() {
479
      checkState(state != null, "updateBalancingState hasn't yet been called");
1✔
480
      helper.getSynchronizationContext().execute(
1✔
481
          () -> super.updateBalancingState(state, picker));
1✔
482
    }
1✔
483
  }
484

485
  /**
486
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
487
   */
488
  static final class CachedRouteLookupResponse {
489
    // Should only have 1 of following 3 cache entries
490
    @Nullable
491
    private final DataCacheEntry dataCacheEntry;
492
    @Nullable
493
    private final PendingCacheEntry pendingCacheEntry;
494
    @Nullable
495
    private final BackoffCacheEntry backoffCacheEntry;
496

497
    CachedRouteLookupResponse(
498
        DataCacheEntry dataCacheEntry,
499
        PendingCacheEntry pendingCacheEntry,
500
        BackoffCacheEntry backoffCacheEntry) {
1✔
501
      this.dataCacheEntry = dataCacheEntry;
1✔
502
      this.pendingCacheEntry = pendingCacheEntry;
1✔
503
      this.backoffCacheEntry = backoffCacheEntry;
1✔
504
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
505
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
506
          "Expected only 1 cache entry value provided");
507
    }
1✔
508

509
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
510
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
511
    }
512

513
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
514
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
515
    }
516

517
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
518
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
519
    }
520

521
    boolean hasData() {
522
      return dataCacheEntry != null;
1✔
523
    }
524

525
    @Nullable
526
    ChildPolicyWrapper getChildPolicyWrapper() {
527
      if (!hasData()) {
1✔
528
        return null;
×
529
      }
530
      return dataCacheEntry.getChildPolicyWrapper();
1✔
531
    }
532

533
    @VisibleForTesting
534
    @Nullable
535
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
536
      if (!hasData()) {
1✔
537
        return null;
×
538
      }
539
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
540
    }
541

542
    @Nullable
543
    String getHeaderData() {
544
      if (!hasData()) {
1✔
545
        return null;
1✔
546
      }
547
      return dataCacheEntry.getHeaderData();
1✔
548
    }
549

550
    boolean hasError() {
551
      return backoffCacheEntry != null;
1✔
552
    }
553

554
    boolean isPending() {
555
      return pendingCacheEntry != null;
1✔
556
    }
557

558
    @Nullable
559
    Status getStatus() {
560
      if (!hasError()) {
1✔
561
        return null;
×
562
      }
563
      return backoffCacheEntry.getStatus();
1✔
564
    }
565

566
    @Override
567
    public String toString() {
568
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
569
      if (dataCacheEntry != null) {
×
570
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
571
      }
572
      if (pendingCacheEntry != null) {
×
573
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
574
      }
575
      if (backoffCacheEntry != null) {
×
576
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
577
      }
578
      return toStringHelper.toString();
×
579
    }
580
  }
581

582
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
583
  static final class PendingCacheEntry {
584
    private final ListenableFuture<RouteLookupResponse> pendingCall;
585
    private final RouteLookupRequest request;
586
    @Nullable
587
    private final BackoffPolicy backoffPolicy;
588

589
    PendingCacheEntry(
590
        RouteLookupRequest request,
591
        ListenableFuture<RouteLookupResponse> pendingCall,
592
        @Nullable BackoffPolicy backoffPolicy) {
1✔
593
      this.request = checkNotNull(request, "request");
1✔
594
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
595
      this.backoffPolicy = backoffPolicy;
1✔
596
    }
1✔
597

598
    @Override
599
    public String toString() {
600
      return MoreObjects.toStringHelper(this)
×
601
          .add("request", request)
×
602
          .toString();
×
603
    }
604
  }
605

606
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
607
  abstract static class CacheEntry {
608

609
    protected final RouteLookupRequest request;
610

611
    CacheEntry(RouteLookupRequest request) {
1✔
612
      this.request = checkNotNull(request, "request");
1✔
613
    }
1✔
614

615
    abstract int getSizeBytes();
616

617
    abstract boolean isExpired(long now);
618

619
    abstract void cleanup();
620

621
    protected boolean isOldEnoughToBeEvicted(long now) {
622
      return true;
×
623
    }
624
  }
625

626
  /** Implementation of {@link CacheEntry} contains valid data. */
627
  final class DataCacheEntry extends CacheEntry {
628
    private final RouteLookupResponse response;
629
    private final long minEvictionTime;
630
    private final long expireTime;
631
    private final long staleTime;
632
    private final List<ChildPolicyWrapper> childPolicyWrappers;
633

634
    // GuardedBy CachingRlsLbClient.lock
635
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
636
      super(request);
1✔
637
      this.response = checkNotNull(response, "response");
1✔
638
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
639
      childPolicyWrappers =
1✔
640
          refCountedChildPolicyWrapperFactory
1✔
641
              .createOrGet(response.targets());
1✔
642
      long now = ticker.read();
1✔
643
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
644
      expireTime = now + maxAgeNanos;
1✔
645
      staleTime = now + staleAgeNanos;
1✔
646
    }
1✔
647

648
    /**
649
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
650
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
651
     * data still exists. Flow looks like following.
652
     *
653
     * <pre>
654
     * Timeline                       | async refresh
655
     *                                V put new cache (entry2)
656
     * entry1: Pending | hasValue | staled  |
657
     * entry2:                        | OV* | pending | hasValue | staled |
658
     *
659
     * OV: old value
660
     * </pre>
661
     */
662
    void maybeRefresh() {
663
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
664
        if (pendingCallCache.containsKey(request)) {
1✔
665
          // pending already requested
666
          return;
×
667
        }
668
        logger.log(ChannelLogLevel.DEBUG,
1✔
669
            "[RLS Entry {0}] Cache entry is stale, refreshing", request);
670
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
671
      }
1✔
672
    }
1✔
673

674
    @VisibleForTesting
675
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
676
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
677
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
678
          return childPolicyWrapper;
1✔
679
        }
680
      }
1✔
681

682
      throw new RuntimeException("Target not found:" + target);
×
683
    }
684

685
    @Nullable
686
    ChildPolicyWrapper getChildPolicyWrapper() {
687
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
688
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
689
          return childPolicyWrapper;
1✔
690
        }
691
      }
1✔
692
      return childPolicyWrappers.get(0);
1✔
693
    }
694

695
    String getHeaderData() {
696
      return response.getHeaderData();
1✔
697
    }
698

699
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
700
    int calcStringSize(String target) {
701
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
702
    }
703

704
    @Override
705
    int getSizeBytes() {
706
      int targetSize = 0;
1✔
707
      for (String target : response.targets()) {
1✔
708
        targetSize += calcStringSize(target);
1✔
709
      }
1✔
710
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
711
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
712
    }
713

714
    @Override
715
    boolean isExpired(long now) {
716
      return expireTime - now <= 0;
1✔
717
    }
718

719
    boolean isStaled(long now) {
720
      return staleTime - now <= 0;
1✔
721
    }
722

723
    @Override
724
    protected boolean isOldEnoughToBeEvicted(long now) {
725
      return minEvictionTime - now <= 0;
×
726
    }
727

728
    @Override
729
    void cleanup() {
730
      synchronized (lock) {
1✔
731
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
732
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
733
        }
1✔
734
      }
1✔
735
    }
1✔
736

737
    @Override
738
    public String toString() {
739
      return MoreObjects.toStringHelper(this)
×
740
          .add("request", request)
×
741
          .add("response", response)
×
742
          .add("expireTime", expireTime)
×
743
          .add("staleTime", staleTime)
×
744
          .add("childPolicyWrappers", childPolicyWrappers)
×
745
          .toString();
×
746
    }
747
  }
748

749
  /**
750
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
751
   * status when the backoff time is expired.
752
   */
753
  private static final class BackoffCacheEntry extends CacheEntry {
754

755
    private final Status status;
756
    private final BackoffPolicy backoffPolicy;
757
    private Future<?> scheduledFuture;
758

759
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
760
      super(request);
1✔
761
      this.status = checkNotNull(status, "status");
1✔
762
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
763
    }
1✔
764

765
    Status getStatus() {
766
      return status;
1✔
767
    }
768

769
    @Override
770
    int getSizeBytes() {
771
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
772
    }
773

774
    @Override
775
    boolean isExpired(long now) {
776
      return scheduledFuture.isDone();
1✔
777
    }
778

779
    @Override
780
    void cleanup() {
781
      scheduledFuture.cancel(false);
1✔
782
    }
1✔
783

784
    @Override
785
    public String toString() {
786
      return MoreObjects.toStringHelper(this)
×
787
          .add("request", request)
×
788
          .add("status", status)
×
789
          .toString();
×
790
    }
791
  }
792

793
  /** Returns a Builder for {@link CachingRlsLbClient}. */
794
  static Builder newBuilder() {
795
    return new Builder();
1✔
796
  }
797

798
  /** A Builder for {@link CachingRlsLbClient}. */
799
  static final class Builder {
1✔
800

801
    private Helper helper;
802
    private LbPolicyConfiguration lbPolicyConfig;
803
    private Throttler throttler = new HappyThrottler();
1✔
804
    private ResolvedAddressFactory resolvedAddressFactory;
805
    private Ticker ticker = Ticker.systemTicker();
1✔
806
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
807
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
808

809
    Builder setHelper(Helper helper) {
810
      this.helper = checkNotNull(helper, "helper");
1✔
811
      return this;
1✔
812
    }
813

814
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
815
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
816
      return this;
1✔
817
    }
818

819
    Builder setThrottler(Throttler throttler) {
820
      this.throttler = checkNotNull(throttler, "throttler");
1✔
821
      return this;
1✔
822
    }
823

824
    /**
825
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
826
     */
827
    Builder setResolvedAddressesFactory(
828
        ResolvedAddressFactory resolvedAddressFactory) {
829
      this.resolvedAddressFactory =
1✔
830
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
831
      return this;
1✔
832
    }
833

834
    Builder setTicker(Ticker ticker) {
835
      this.ticker = checkNotNull(ticker, "ticker");
1✔
836
      return this;
1✔
837
    }
838

839
    Builder setEvictionListener(
840
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
841
      this.evictionListener = evictionListener;
1✔
842
      return this;
1✔
843
    }
844

845
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
846
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
847
      return this;
1✔
848
    }
849

850
    CachingRlsLbClient build() {
851
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
852
      client.init();
1✔
853
      return client;
1✔
854
    }
855
  }
856

857
  /**
858
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
859
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
860
   */
861
  private static final class AutoCleaningEvictionListener
862
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
863

864
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
865

866
    AutoCleaningEvictionListener(
867
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
868
      this.delegate = delegate;
1✔
869
    }
1✔
870

871
    @Override
872
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
873
      if (delegate != null) {
1✔
874
        delegate.onEviction(key, value, cause);
1✔
875
      }
876
      // performs cleanup after delegation
877
      value.cleanup();
1✔
878
    }
1✔
879
  }
880

881
  /** A Throttler never throttles. */
882
  private static final class HappyThrottler implements Throttler {
883

884
    @Override
885
    public boolean shouldThrottle() {
886
      return false;
×
887
    }
888

889
    @Override
890
    public void registerBackendResponse(boolean throttled) {
891
      // no-op
892
    }
×
893
  }
894

895
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
896
  private static final class RlsAsyncLruCache
897
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
898
    private final RlsLbHelper helper;
899

900
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
901
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
902
        Ticker ticker, RlsLbHelper helper) {
903
      super(maxEstimatedSizeBytes, evictionListener, ticker);
1✔
904
      this.helper = checkNotNull(helper, "helper");
1✔
905
    }
1✔
906

907
    @Override
908
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
909
      return value.isExpired(nowNanos);
1✔
910
    }
911

912
    @Override
913
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
914
      return value.getSizeBytes();
1✔
915
    }
916

917
    @Override
918
    protected boolean shouldInvalidateEldestEntry(
919
        RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) {
920
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
921
        return false;
×
922
      }
923

924
      // eldest entry should be evicted if size limit exceeded
925
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
926
    }
927

928
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
929
      CacheEntry newEntry = cache(key, value);
1✔
930

931
      // force cleanup if new entry pushed cache over max size (in bytes)
932
      if (fitToLimit()) {
1✔
933
        helper.triggerPendingRpcProcessing();
×
934
      }
935
      return newEntry;
1✔
936
    }
937
  }
938

939
  /**
940
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
941
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
942
   */
943
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
944

945
    @Nullable
1✔
946
    private ConnectivityState prevState = null;
947

948
    @Override
949
    public void onStatusChanged(ConnectivityState newState) {
950
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
951
          && newState == ConnectivityState.READY) {
952
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
953
        synchronized (lock) {
1✔
954
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
955
            if (value instanceof BackoffCacheEntry) {
1✔
956
              refreshBackoffEntry((BackoffCacheEntry) value);
×
957
            }
958
          }
1✔
959
        }
1✔
960
      }
961
      prevState = newState;
1✔
962
    }
1✔
963
  }
964

965
  /** A header will be added when RLS server respond with additional header data. */
966
  @VisibleForTesting
967
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
968
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
969

970
  final class RlsPicker extends SubchannelPicker {
971

972
    private final RlsRequestFactory requestFactory;
973
    private final String lookupService;
974

975
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
976
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
977
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
978
    }
1✔
979

980
    @Override
981
    public PickResult pickSubchannel(PickSubchannelArgs args) {
982
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
983
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
984
      RouteLookupRequest request =
1✔
985
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
986
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
987

988
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
989
        Metadata headers = args.getHeaders();
1✔
990
        headers.discardAll(RLS_DATA_KEY);
1✔
991
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
992
      }
993
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
994
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
995
      if (response.hasData()) {
1✔
996
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
997
        SubchannelPicker picker =
998
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
999
        if (picker == null) {
1✔
1000
          return PickResult.withNoResult();
×
1001
        }
1002
        // Happy path
1003
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1004
        if (pickResult.hasResult()) {
1✔
1005
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1006
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1007
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1008
              Collections.emptyList());
1✔
1009
        }
1010
        return pickResult;
1✔
1011
      } else if (response.hasError()) {
1✔
1012
        if (hasFallback) {
1✔
1013
          return useFallback(args);
1✔
1014
        }
1015
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1016
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1017
        return PickResult.withError(
1✔
1018
            convertRlsServerStatus(response.getStatus(),
1✔
1019
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1020
      } else {
1021
        return PickResult.withNoResult();
1✔
1022
      }
1023
    }
1024

1025
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
1026

1027
    /** Uses Subchannel connected to default target. */
1028
    private PickResult useFallback(PickSubchannelArgs args) {
1029
      // TODO(creamsoup) wait until lb is ready
1030
      startFallbackChildPolicy();
1✔
1031
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1032
      if (picker == null) {
1✔
1033
        return PickResult.withNoResult();
1✔
1034
      }
1035
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1036
      if (pickResult.hasResult()) {
1✔
1037
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1038
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1039
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1040
            Collections.emptyList());
1✔
1041
      }
1042
      return pickResult;
1✔
1043
    }
1044

1045
    private String determineMetricsPickResult(PickResult pickResult) {
1046
      if (pickResult.getStatus().isOk()) {
1✔
1047
        return "complete";
1✔
1048
      } else if (pickResult.isDrop()) {
1✔
1049
        return "drop";
×
1050
      } else {
1051
        return "fail";
1✔
1052
      }
1053
    }
1054

1055
    private void startFallbackChildPolicy() {
1056
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1057
      synchronized (lock) {
1✔
1058
        if (fallbackChildPolicyWrapper != null) {
1✔
1059
          return;
1✔
1060
        }
1061
        logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1062
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1063
      }
1✔
1064
    }
1✔
1065

1066
    // GuardedBy CachingRlsLbClient.lock
1067
    void close() {
1068
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1069
        if (fallbackChildPolicyWrapper != null) {
1✔
1070
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1071
        }
1072
      }
1✔
1073
    }
1✔
1074

1075
    @Override
1076
    public String toString() {
1077
      return MoreObjects.toStringHelper(this)
×
1078
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1079
          .toString();
×
1080
    }
1081
  }
1082

1083
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc