• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19908

16 Jul 2025 07:54PM UTC coverage: 88.593% (+0.07%) from 88.528%
#19908

push

github

ejona86
Revert "xds: Convert CdsLb to XdsDepManager"

This reverts commit 297ab05ef.

b/430347751 shows multiple concerning behaviors in the xDS stack with
the new A74 config update model. XdsDepManager and CdsLB2 still seem to
be working correctly, but the change is exacerbated issues in other
parts of the stack, like RingHashConfig not having equals fixed in
a8de9f07ab.

Revert only for the v1.74.x release, leaving it on master.

34647 of 39108 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.63
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import com.google.errorprone.annotations.CheckReturnValue;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ConnectivityState;
36
import io.grpc.LoadBalancer.Helper;
37
import io.grpc.LoadBalancer.PickResult;
38
import io.grpc.LoadBalancer.PickSubchannelArgs;
39
import io.grpc.LoadBalancer.ResolvedAddresses;
40
import io.grpc.LoadBalancer.SubchannelPicker;
41
import io.grpc.LongCounterMetricInstrument;
42
import io.grpc.LongGaugeMetricInstrument;
43
import io.grpc.ManagedChannel;
44
import io.grpc.ManagedChannelBuilder;
45
import io.grpc.Metadata;
46
import io.grpc.MetricInstrumentRegistry;
47
import io.grpc.MetricRecorder.BatchCallback;
48
import io.grpc.MetricRecorder.BatchRecorder;
49
import io.grpc.MetricRecorder.Registration;
50
import io.grpc.Status;
51
import io.grpc.internal.BackoffPolicy;
52
import io.grpc.internal.ExponentialBackoffPolicy;
53
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
54
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
55
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
56
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
57
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
58
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
59
import io.grpc.rls.LruCache.EvictionListener;
60
import io.grpc.rls.LruCache.EvictionType;
61
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
62
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
63
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
64
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
65
import io.grpc.stub.StreamObserver;
66
import io.grpc.util.ForwardingLoadBalancerHelper;
67
import java.net.URI;
68
import java.net.URISyntaxException;
69
import java.util.Arrays;
70
import java.util.Collections;
71
import java.util.HashMap;
72
import java.util.List;
73
import java.util.Map;
74
import java.util.UUID;
75
import java.util.concurrent.Future;
76
import java.util.concurrent.ScheduledExecutorService;
77
import java.util.concurrent.TimeUnit;
78
import javax.annotation.Nullable;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
/**
82
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
83
 * routing by fetching the decision from route lookup server. Every single request is routed by
84
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
85
 */
86
@ThreadSafe
87
final class CachingRlsLbClient {
88

89
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
90
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
91
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
92
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
93
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
94
  public static final int BYTES_PER_CHAR = 2;
95
  public static final int STRING_OVERHEAD_BYTES = 38;
96
  /** Minimum bytes for a Java Object. */
97
  public static final int OBJ_OVERHEAD_B = 16;
98

99
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
100
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104
  private final Registration gaugeRegistration;
105
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
106

107
  // All cache status changes (pending, backoff, success) must be under this lock
108
  private final Object lock = new Object();
1✔
109
  // LRU cache based on access order (BACKOFF and actual data will be here)
110
  @GuardedBy("lock")
111
  private final RlsAsyncLruCache linkedHashLruCache;
112
  private final Future<?> periodicCleaner;
113
  // any RPC on the fly will cached in this map
114
  @GuardedBy("lock")
1✔
115
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
116

117
  private final ScheduledExecutorService scheduledExecutorService;
118
  private final Ticker ticker;
119
  private final Throttler throttler;
120

121
  private final LbPolicyConfiguration lbPolicyConfig;
122
  private final BackoffPolicy.Provider backoffProvider;
123
  private final long maxAgeNanos;
124
  private final long staleAgeNanos;
125
  private final long callTimeoutNanos;
126

127
  private final RlsLbHelper helper;
128
  private final ManagedChannel rlsChannel;
129
  private final RouteLookupServiceStub rlsStub;
130
  private final RlsPicker rlsPicker;
131
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
132
  @GuardedBy("lock")
133
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
134
  private final ChannelLogger logger;
135

136
  static {
137
    MetricInstrumentRegistry metricInstrumentRegistry
138
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
139
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
140
        "grpc.lb.rls.default_target_picks",
141
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
142
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
143
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
1✔
144
        false);
145
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
146
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
147
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
148
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
149
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
150
            "grpc.lb.pick_result"), Collections.emptyList(),
1✔
151
        false);
152
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
153
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
154
            + "RLS channel being throttled", "{pick}",
155
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
156
        Collections.emptyList(), false);
1✔
157
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
158
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
159
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
160
        Collections.emptyList(), false);
1✔
161
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
162
        "EXPERIMENTAL. The current size of the RLS cache", "By",
163
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
164
        Collections.emptyList(), false);
1✔
165
  }
166

167
  private CachingRlsLbClient(Builder builder) {
1✔
168
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
169
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
170
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
171
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
172
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
173
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
174
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
175
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
176
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
177
    linkedHashLruCache =
1✔
178
        new RlsAsyncLruCache(
179
            rlsConfig.cacheSizeBytes(),
1✔
180
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
181
            ticker,
182
            helper);
183
    periodicCleaner =
1✔
184
        scheduledExecutorService.scheduleAtFixedRate(this::periodicClean, 1, 1, TimeUnit.MINUTES);
1✔
185
    logger = helper.getChannelLogger();
1✔
186
    String serverHost = null;
1✔
187
    try {
188
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
189
    } catch (URISyntaxException ignore) {
×
190
      // handled by the following null check
191
    }
1✔
192
    if (serverHost == null) {
1✔
193
      logger.log(
×
194
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
195
      serverHost = helper.getAuthority();
×
196
    }
197
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
198
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
199
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
200
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
201
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
202
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
203
    // called to impose the authority security restrictions.
204
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
205
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
206
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
207
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
208
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
209
    if (routeLookupChannelServiceConfig != null) {
1✔
210
      logger.log(
1✔
211
          ChannelLogLevel.DEBUG,
212
          "RLS channel service config: {0}",
213
          routeLookupChannelServiceConfig);
214
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
215
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
216
    }
217
    rlsChannel = rlsChannelBuilder.build();
1✔
218
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
219
    childLbResolvedAddressFactory =
1✔
220
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
221
    backoffProvider = builder.backoffProvider;
1✔
222
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
223
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
224
    refCountedChildPolicyWrapperFactory =
1✔
225
        new RefCountedChildPolicyWrapperFactory(
226
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
227
            childLbHelperProvider,
228
            new BackoffRefreshListener());
229

230
    gaugeRegistration = helper.getMetricRecorder()
1✔
231
        .registerBatchCallback(new BatchCallback() {
1✔
232
          @Override
233
          public void accept(BatchRecorder recorder) {
234
            int estimatedSize;
235
            long estimatedSizeBytes;
236
            synchronized (lock) {
1✔
237
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
238
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
239
            }
1✔
240
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
241
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
242
                    metricsInstanceUuid), Collections.emptyList());
1✔
243
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
244
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
245
                    metricsInstanceUuid), Collections.emptyList());
1✔
246
          }
1✔
247
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
248

249
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
250
  }
1✔
251

252
  void init() {
253
    synchronized (lock) {
1✔
254
      refCountedChildPolicyWrapperFactory.init();
1✔
255
    }
1✔
256
  }
1✔
257

258
  /**
259
   * Convert the status to UNAVAILABLE and enhance the error message.
260
   * @param status status as provided by server
261
   * @param serverName Used for error description
262
   * @return Transformed status
263
   */
264
  static Status convertRlsServerStatus(Status status, String serverName) {
265
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
266
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
267
                + "RLS server returned: %s: %s",
268
            serverName, status.getCode(), status.getDescription()));
1✔
269
  }
270

271
  private void periodicClean() {
272
    synchronized (lock) {
1✔
273
      linkedHashLruCache.cleanupExpiredEntries();
1✔
274
    }
1✔
275
  }
1✔
276

277
  /** Populates async cache entry for new request. */
278
  @GuardedBy("lock")
279
  private CachedRouteLookupResponse asyncRlsCall(
280
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
281
    if (throttler.shouldThrottle()) {
1✔
282
      logger.log(ChannelLogLevel.DEBUG, "[RLS Entry {0}] Throttled RouteLookup", request);
1✔
283
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
284
      // on this result
285
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
286
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
287
    }
288
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
289
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
290
    logger.log(ChannelLogLevel.DEBUG,
1✔
291
        "[RLS Entry {0}] Starting RouteLookup: {1}", request, routeLookupRequest);
292
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
293
        .routeLookup(
1✔
294
            routeLookupRequest,
295
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
296
              @Override
297
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
298
                logger.log(ChannelLogLevel.DEBUG,
1✔
299
                    "[RLS Entry {0}] RouteLookup succeeded: {1}", request, value);
300
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
301
              }
1✔
302

303
              @Override
304
              public void onError(Throwable t) {
305
                logger.log(ChannelLogLevel.DEBUG,
1✔
306
                    "[RLS Entry {0}] RouteLookup failed: {1}", request, t);
307
                response.setException(t);
1✔
308
                throttler.registerBackendResponse(true);
1✔
309
              }
1✔
310

311
              @Override
312
              public void onCompleted() {
313
                throttler.registerBackendResponse(false);
1✔
314
              }
1✔
315
            });
316
    return CachedRouteLookupResponse.pendingResponse(
1✔
317
        createPendingEntry(request, response, backoffPolicy));
1✔
318
  }
319

320
  /**
321
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
322
   * cached, pending and backed-off due to error. The result remains same even if the status is
323
   * changed after the return.
324
   */
325
  @CheckReturnValue
326
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
327
    synchronized (lock) {
1✔
328
      final CacheEntry cacheEntry;
329
      cacheEntry = linkedHashLruCache.read(request);
1✔
330
      if (cacheEntry == null) {
1✔
331
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
332
        if (pendingEntry != null) {
1✔
333
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
334
        }
335
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
336
      }
337

338
      if (cacheEntry instanceof DataCacheEntry) {
1✔
339
        // cache hit, initiate async-refresh if entry is staled
340
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
341
        if (dataEntry.isStaled(ticker.read())) {
1✔
342
          dataEntry.maybeRefresh();
1✔
343
        }
344
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
345
      }
346
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
347
    }
348
  }
349

350
  /** Performs any pending maintenance operations needed by the cache. */
351
  void close() {
352
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
353
    synchronized (lock) {
1✔
354
      periodicCleaner.cancel(false);
1✔
355
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
356
      linkedHashLruCache.close();
1✔
357
      // TODO(creamsoup) maybe cancel all pending requests
358
      pendingCallCache.clear();
1✔
359
      rlsChannel.shutdownNow();
1✔
360
      rlsPicker.close();
1✔
361
      gaugeRegistration.close();
1✔
362
    }
1✔
363
  }
1✔
364

365
  void requestConnection() {
366
    rlsChannel.getState(true);
×
367
  }
×
368

369
  @GuardedBy("lock")
370
  private PendingCacheEntry createPendingEntry(
371
      RouteLookupRequest request,
372
      ListenableFuture<RouteLookupResponse> pendingCall,
373
      @Nullable BackoffPolicy backoffPolicy) {
374
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
375
    // Add the entry to the map before adding the Listener, because the listener removes the
376
    // entry from the map
377
    pendingCallCache.put(request, entry);
1✔
378
    // Beware that the listener can run immediately on the current thread
379
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
380
    return entry;
1✔
381
  }
382

383
  private void pendingRpcComplete(PendingCacheEntry entry) {
384
    synchronized (lock) {
1✔
385
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
386
      if (clientClosed) {
1✔
387
        return;
1✔
388
      }
389

390
      try {
391
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
392
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
393
        // reattempt picks when the child LB is done connecting
394
      } catch (Exception e) {
1✔
395
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
396
        // Cache updated. updateBalancingState() to reattempt picks
397
        helper.triggerPendingRpcProcessing();
1✔
398
      }
1✔
399
    }
1✔
400
  }
1✔
401

402
  @GuardedBy("lock")
403
  private DataCacheEntry createDataEntry(
404
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
405
    logger.log(
1✔
406
        ChannelLogLevel.DEBUG,
407
        "[RLS Entry {0}] Transition to data cache: routeLookupResponse={1}",
408
        request, routeLookupResponse);
409
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
410
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
411
    // this cache update because the lock is held
412
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
413
    return entry;
1✔
414
  }
415

416
  @GuardedBy("lock")
417
  private BackoffCacheEntry createBackOffEntry(
418
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
419
    if (backoffPolicy == null) {
1✔
420
      backoffPolicy = backoffProvider.get();
1✔
421
    }
422
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
423
    logger.log(
1✔
424
        ChannelLogLevel.DEBUG,
425
        "[RLS Entry {0}] Transition to back off: status={1}, delayNanos={2}",
426
        request, status, delayNanos);
1✔
427
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
428
    // Lock is held, so the task can't execute before the assignment
429
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
430
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
431
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
432
    return entry;
1✔
433
  }
434

435
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
436
    synchronized (lock) {
1✔
437
      // This checks whether the task has been cancelled and prevents a second execution.
438
      if (!entry.scheduledFuture.cancel(false)) {
1✔
439
        // Future was previously cancelled
440
        return;
×
441
      }
442
      logger.log(ChannelLogLevel.DEBUG,
1✔
443
          "[RLS Entry {0}] Calling RLS for transition to pending", entry.request);
444
      linkedHashLruCache.invalidate(entry.request);
1✔
445
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
446
    }
1✔
447
  }
1✔
448

449
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
450

451
    final Helper helper;
452
    private ConnectivityState state;
453
    private SubchannelPicker picker;
454

455
    RlsLbHelper(Helper helper) {
1✔
456
      this.helper = helper;
1✔
457
    }
1✔
458

459
    @Override
460
    protected Helper delegate() {
461
      return helper;
1✔
462
    }
463

464
    @Override
465
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
466
      state = newState;
1✔
467
      picker = newPicker;
1✔
468
      super.updateBalancingState(newState, newPicker);
1✔
469
    }
1✔
470

471
    void triggerPendingRpcProcessing() {
472
      checkState(state != null, "updateBalancingState hasn't yet been called");
1✔
473
      helper.getSynchronizationContext().execute(
1✔
474
          () -> super.updateBalancingState(state, picker));
1✔
475
    }
1✔
476
  }
477

478
  /**
479
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
480
   */
481
  static final class CachedRouteLookupResponse {
482
    // Should only have 1 of following 3 cache entries
483
    @Nullable
484
    private final DataCacheEntry dataCacheEntry;
485
    @Nullable
486
    private final PendingCacheEntry pendingCacheEntry;
487
    @Nullable
488
    private final BackoffCacheEntry backoffCacheEntry;
489

490
    CachedRouteLookupResponse(
491
        DataCacheEntry dataCacheEntry,
492
        PendingCacheEntry pendingCacheEntry,
493
        BackoffCacheEntry backoffCacheEntry) {
1✔
494
      this.dataCacheEntry = dataCacheEntry;
1✔
495
      this.pendingCacheEntry = pendingCacheEntry;
1✔
496
      this.backoffCacheEntry = backoffCacheEntry;
1✔
497
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
498
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
499
          "Expected only 1 cache entry value provided");
500
    }
1✔
501

502
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
503
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
504
    }
505

506
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
507
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
508
    }
509

510
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
511
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
512
    }
513

514
    boolean hasData() {
515
      return dataCacheEntry != null;
1✔
516
    }
517

518
    @Nullable
519
    ChildPolicyWrapper getChildPolicyWrapper() {
520
      if (!hasData()) {
1✔
521
        return null;
×
522
      }
523
      return dataCacheEntry.getChildPolicyWrapper();
1✔
524
    }
525

526
    @VisibleForTesting
527
    @Nullable
528
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
529
      if (!hasData()) {
1✔
530
        return null;
×
531
      }
532
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
533
    }
534

535
    @Nullable
536
    String getHeaderData() {
537
      if (!hasData()) {
1✔
538
        return null;
1✔
539
      }
540
      return dataCacheEntry.getHeaderData();
1✔
541
    }
542

543
    boolean hasError() {
544
      return backoffCacheEntry != null;
1✔
545
    }
546

547
    boolean isPending() {
548
      return pendingCacheEntry != null;
1✔
549
    }
550

551
    @Nullable
552
    Status getStatus() {
553
      if (!hasError()) {
1✔
554
        return null;
×
555
      }
556
      return backoffCacheEntry.getStatus();
1✔
557
    }
558

559
    @Override
560
    public String toString() {
561
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
562
      if (dataCacheEntry != null) {
×
563
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
564
      }
565
      if (pendingCacheEntry != null) {
×
566
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
567
      }
568
      if (backoffCacheEntry != null) {
×
569
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
570
      }
571
      return toStringHelper.toString();
×
572
    }
573
  }
574

575
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
576
  static final class PendingCacheEntry {
577
    private final ListenableFuture<RouteLookupResponse> pendingCall;
578
    private final RouteLookupRequest request;
579
    @Nullable
580
    private final BackoffPolicy backoffPolicy;
581

582
    PendingCacheEntry(
583
        RouteLookupRequest request,
584
        ListenableFuture<RouteLookupResponse> pendingCall,
585
        @Nullable BackoffPolicy backoffPolicy) {
1✔
586
      this.request = checkNotNull(request, "request");
1✔
587
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
588
      this.backoffPolicy = backoffPolicy;
1✔
589
    }
1✔
590

591
    @Override
592
    public String toString() {
593
      return MoreObjects.toStringHelper(this)
×
594
          .add("request", request)
×
595
          .toString();
×
596
    }
597
  }
598

599
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
600
  abstract static class CacheEntry {
601

602
    protected final RouteLookupRequest request;
603

604
    CacheEntry(RouteLookupRequest request) {
1✔
605
      this.request = checkNotNull(request, "request");
1✔
606
    }
1✔
607

608
    abstract int getSizeBytes();
609

610
    abstract boolean isExpired(long now);
611

612
    abstract void cleanup();
613

614
    protected boolean isOldEnoughToBeEvicted(long now) {
615
      return true;
×
616
    }
617
  }
618

619
  /** Implementation of {@link CacheEntry} contains valid data. */
620
  final class DataCacheEntry extends CacheEntry {
621
    private final RouteLookupResponse response;
622
    private final long minEvictionTime;
623
    private final long expireTime;
624
    private final long staleTime;
625
    private final List<ChildPolicyWrapper> childPolicyWrappers;
626

627
    // GuardedBy CachingRlsLbClient.lock
628
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
629
      super(request);
1✔
630
      this.response = checkNotNull(response, "response");
1✔
631
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
632
      childPolicyWrappers =
1✔
633
          refCountedChildPolicyWrapperFactory
1✔
634
              .createOrGet(response.targets());
1✔
635
      long now = ticker.read();
1✔
636
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
637
      expireTime = now + maxAgeNanos;
1✔
638
      staleTime = now + staleAgeNanos;
1✔
639
    }
1✔
640

641
    /**
642
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
643
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
644
     * data still exists. Flow looks like following.
645
     *
646
     * <pre>
647
     * Timeline                       | async refresh
648
     *                                V put new cache (entry2)
649
     * entry1: Pending | hasValue | staled  |
650
     * entry2:                        | OV* | pending | hasValue | staled |
651
     *
652
     * OV: old value
653
     * </pre>
654
     */
655
    void maybeRefresh() {
656
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
657
        if (pendingCallCache.containsKey(request)) {
1✔
658
          // pending already requested
659
          return;
×
660
        }
661
        logger.log(ChannelLogLevel.DEBUG,
1✔
662
            "[RLS Entry {0}] Cache entry is stale, refreshing", request);
663
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
664
      }
1✔
665
    }
1✔
666

667
    @VisibleForTesting
668
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
669
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
670
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
671
          return childPolicyWrapper;
1✔
672
        }
673
      }
1✔
674

675
      throw new RuntimeException("Target not found:" + target);
×
676
    }
677

678
    @Nullable
679
    ChildPolicyWrapper getChildPolicyWrapper() {
680
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
681
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
682
          return childPolicyWrapper;
1✔
683
        }
684
      }
1✔
685
      return childPolicyWrappers.get(0);
1✔
686
    }
687

688
    String getHeaderData() {
689
      return response.getHeaderData();
1✔
690
    }
691

692
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
693
    int calcStringSize(String target) {
694
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
695
    }
696

697
    @Override
698
    int getSizeBytes() {
699
      int targetSize = 0;
1✔
700
      for (String target : response.targets()) {
1✔
701
        targetSize += calcStringSize(target);
1✔
702
      }
1✔
703
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
704
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
705
    }
706

707
    @Override
708
    boolean isExpired(long now) {
709
      return expireTime - now <= 0;
1✔
710
    }
711

712
    boolean isStaled(long now) {
713
      return staleTime - now <= 0;
1✔
714
    }
715

716
    @Override
717
    protected boolean isOldEnoughToBeEvicted(long now) {
718
      return minEvictionTime - now <= 0;
×
719
    }
720

721
    @Override
722
    void cleanup() {
723
      synchronized (lock) {
1✔
724
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
725
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
726
        }
1✔
727
      }
1✔
728
    }
1✔
729

730
    @Override
731
    public String toString() {
732
      return MoreObjects.toStringHelper(this)
×
733
          .add("request", request)
×
734
          .add("response", response)
×
735
          .add("expireTime", expireTime)
×
736
          .add("staleTime", staleTime)
×
737
          .add("childPolicyWrappers", childPolicyWrappers)
×
738
          .toString();
×
739
    }
740
  }
741

742
  /**
743
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
744
   * status when the backoff time is expired.
745
   */
746
  private static final class BackoffCacheEntry extends CacheEntry {
747

748
    private final Status status;
749
    private final BackoffPolicy backoffPolicy;
750
    private Future<?> scheduledFuture;
751

752
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
753
      super(request);
1✔
754
      this.status = checkNotNull(status, "status");
1✔
755
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
756
    }
1✔
757

758
    Status getStatus() {
759
      return status;
1✔
760
    }
761

762
    @Override
763
    int getSizeBytes() {
764
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
765
    }
766

767
    @Override
768
    boolean isExpired(long now) {
769
      return scheduledFuture.isDone();
1✔
770
    }
771

772
    @Override
773
    void cleanup() {
774
      scheduledFuture.cancel(false);
1✔
775
    }
1✔
776

777
    @Override
778
    public String toString() {
779
      return MoreObjects.toStringHelper(this)
×
780
          .add("request", request)
×
781
          .add("status", status)
×
782
          .toString();
×
783
    }
784
  }
785

786
  /** Returns a Builder for {@link CachingRlsLbClient}. */
787
  static Builder newBuilder() {
788
    return new Builder();
1✔
789
  }
790

791
  /** A Builder for {@link CachingRlsLbClient}. */
792
  static final class Builder {
1✔
793

794
    private Helper helper;
795
    private LbPolicyConfiguration lbPolicyConfig;
796
    private Throttler throttler = new HappyThrottler();
1✔
797
    private ResolvedAddressFactory resolvedAddressFactory;
798
    private Ticker ticker = Ticker.systemTicker();
1✔
799
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
800
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
801

802
    Builder setHelper(Helper helper) {
803
      this.helper = checkNotNull(helper, "helper");
1✔
804
      return this;
1✔
805
    }
806

807
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
808
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
809
      return this;
1✔
810
    }
811

812
    Builder setThrottler(Throttler throttler) {
813
      this.throttler = checkNotNull(throttler, "throttler");
1✔
814
      return this;
1✔
815
    }
816

817
    /**
818
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
819
     */
820
    Builder setResolvedAddressesFactory(
821
        ResolvedAddressFactory resolvedAddressFactory) {
822
      this.resolvedAddressFactory =
1✔
823
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
824
      return this;
1✔
825
    }
826

827
    Builder setTicker(Ticker ticker) {
828
      this.ticker = checkNotNull(ticker, "ticker");
1✔
829
      return this;
1✔
830
    }
831

832
    Builder setEvictionListener(
833
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
834
      this.evictionListener = evictionListener;
1✔
835
      return this;
1✔
836
    }
837

838
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
839
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
840
      return this;
1✔
841
    }
842

843
    CachingRlsLbClient build() {
844
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
845
      client.init();
1✔
846
      return client;
1✔
847
    }
848
  }
849

850
  /**
851
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
852
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
853
   */
854
  private static final class AutoCleaningEvictionListener
855
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
856

857
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
858

859
    AutoCleaningEvictionListener(
860
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
861
      this.delegate = delegate;
1✔
862
    }
1✔
863

864
    @Override
865
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
866
      if (delegate != null) {
1✔
867
        delegate.onEviction(key, value, cause);
1✔
868
      }
869
      // performs cleanup after delegation
870
      value.cleanup();
1✔
871
    }
1✔
872
  }
873

874
  /** A Throttler never throttles. */
875
  private static final class HappyThrottler implements Throttler {
876

877
    @Override
878
    public boolean shouldThrottle() {
879
      return false;
×
880
    }
881

882
    @Override
883
    public void registerBackendResponse(boolean throttled) {
884
      // no-op
885
    }
×
886
  }
887

888
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
889
  private static final class RlsAsyncLruCache
890
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
891
    private final RlsLbHelper helper;
892

893
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
894
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
895
        Ticker ticker, RlsLbHelper helper) {
896
      super(maxEstimatedSizeBytes, evictionListener, ticker);
1✔
897
      this.helper = checkNotNull(helper, "helper");
1✔
898
    }
1✔
899

900
    @Override
901
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
902
      return value.isExpired(nowNanos);
1✔
903
    }
904

905
    @Override
906
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
907
      return value.getSizeBytes();
1✔
908
    }
909

910
    @Override
911
    protected boolean shouldInvalidateEldestEntry(
912
        RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) {
913
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
914
        return false;
×
915
      }
916

917
      // eldest entry should be evicted if size limit exceeded
918
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
919
    }
920

921
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
922
      CacheEntry newEntry = cache(key, value);
1✔
923

924
      // force cleanup if new entry pushed cache over max size (in bytes)
925
      if (fitToLimit()) {
1✔
926
        helper.triggerPendingRpcProcessing();
×
927
      }
928
      return newEntry;
1✔
929
    }
930
  }
931

932
  /**
933
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
934
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
935
   */
936
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
937

938
    @Nullable
1✔
939
    private ConnectivityState prevState = null;
940

941
    @Override
942
    public void onStatusChanged(ConnectivityState newState) {
943
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
944
          && newState == ConnectivityState.READY) {
945
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
946
        synchronized (lock) {
1✔
947
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
948
            if (value instanceof BackoffCacheEntry) {
1✔
949
              refreshBackoffEntry((BackoffCacheEntry) value);
×
950
            }
951
          }
1✔
952
        }
1✔
953
      }
954
      prevState = newState;
1✔
955
    }
1✔
956
  }
957

958
  /** A header will be added when RLS server respond with additional header data. */
959
  @VisibleForTesting
960
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
961
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
962

963
  final class RlsPicker extends SubchannelPicker {
964

965
    private final RlsRequestFactory requestFactory;
966
    private final String lookupService;
967

968
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
969
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
970
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
971
    }
1✔
972

973
    @Override
974
    public PickResult pickSubchannel(PickSubchannelArgs args) {
975
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
976
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
977
      RouteLookupRequest request =
1✔
978
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
979
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
980

981
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
982
        Metadata headers = args.getHeaders();
1✔
983
        headers.discardAll(RLS_DATA_KEY);
1✔
984
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
985
      }
986
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
987
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
988
      if (response.hasData()) {
1✔
989
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
990
        SubchannelPicker picker =
991
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
992
        if (picker == null) {
1✔
993
          return PickResult.withNoResult();
×
994
        }
995
        // Happy path
996
        PickResult pickResult = picker.pickSubchannel(args);
1✔
997
        if (pickResult.hasResult()) {
1✔
998
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
999
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1000
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1001
              Collections.emptyList());
1✔
1002
        }
1003
        return pickResult;
1✔
1004
      } else if (response.hasError()) {
1✔
1005
        if (hasFallback) {
1✔
1006
          return useFallback(args);
1✔
1007
        }
1008
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1009
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1010
        return PickResult.withError(
1✔
1011
            convertRlsServerStatus(response.getStatus(),
1✔
1012
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1013
      } else {
1014
        return PickResult.withNoResult();
1✔
1015
      }
1016
    }
1017

1018
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
1019

1020
    /** Uses Subchannel connected to default target. */
1021
    private PickResult useFallback(PickSubchannelArgs args) {
1022
      // TODO(creamsoup) wait until lb is ready
1023
      startFallbackChildPolicy();
1✔
1024
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1025
      if (picker == null) {
1✔
1026
        return PickResult.withNoResult();
1✔
1027
      }
1028
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1029
      if (pickResult.hasResult()) {
1✔
1030
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1031
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1032
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1033
            Collections.emptyList());
1✔
1034
      }
1035
      return pickResult;
1✔
1036
    }
1037

1038
    private String determineMetricsPickResult(PickResult pickResult) {
1039
      if (pickResult.getStatus().isOk()) {
1✔
1040
        return "complete";
1✔
1041
      } else if (pickResult.isDrop()) {
1✔
1042
        return "drop";
×
1043
      } else {
1044
        return "fail";
1✔
1045
      }
1046
    }
1047

1048
    private void startFallbackChildPolicy() {
1049
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1050
      synchronized (lock) {
1✔
1051
        if (fallbackChildPolicyWrapper != null) {
1✔
1052
          return;
1✔
1053
        }
1054
        logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1055
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1056
      }
1✔
1057
    }
1✔
1058

1059
    // GuardedBy CachingRlsLbClient.lock
1060
    void close() {
1061
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1062
        if (fallbackChildPolicyWrapper != null) {
1✔
1063
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1064
        }
1065
      }
1✔
1066
    }
1✔
1067

1068
    @Override
1069
    public String toString() {
1070
      return MoreObjects.toStringHelper(this)
×
1071
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1072
          .toString();
×
1073
    }
1074
  }
1075

1076
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc