• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19232

14 May 2024 01:34PM UTC coverage: 88.419% (+0.02%) from 88.403%
#19232

push

github

ejona86
rls: Guarantee backoff will update RLS picker

Previously, picker was likely null if entering backoff soon after
start-up. This prevented the picker from being updated and directing
queued RPCs to the fallback. It would work for new RPCs if RLS returned
extremely rapidly; both ManagedChannelImpl and DelayedClientTransport do
a pick before enqueuing so the ManagedChannelImpl pick could request
from RLS and DelayedClientTransport could use the response. So the test
uses a delay to purposefully avoid that unlikely-in-real-life case.

Creating a resolving OOB channel for InProcess doesn't actually change
the destination from the parent, because InProcess uses directaddress.
Thus the fakeRlsServiceImpl is now being added to the fake backend
server, because the same server is used for RLS within the test.

b/333185213

31615 of 35756 relevant lines covered (88.42%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.66
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ChannelLogger.ChannelLogLevel;
33
import io.grpc.ConnectivityState;
34
import io.grpc.LoadBalancer.Helper;
35
import io.grpc.LoadBalancer.PickResult;
36
import io.grpc.LoadBalancer.PickSubchannelArgs;
37
import io.grpc.LoadBalancer.ResolvedAddresses;
38
import io.grpc.LoadBalancer.SubchannelPicker;
39
import io.grpc.LongCounterMetricInstrument;
40
import io.grpc.LongGaugeMetricInstrument;
41
import io.grpc.ManagedChannel;
42
import io.grpc.ManagedChannelBuilder;
43
import io.grpc.Metadata;
44
import io.grpc.MetricInstrumentRegistry;
45
import io.grpc.MetricRecorder.BatchCallback;
46
import io.grpc.MetricRecorder.BatchRecorder;
47
import io.grpc.MetricRecorder.Registration;
48
import io.grpc.Status;
49
import io.grpc.internal.BackoffPolicy;
50
import io.grpc.internal.ExponentialBackoffPolicy;
51
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
52
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
53
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
54
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
55
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
56
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
57
import io.grpc.rls.LruCache.EvictionListener;
58
import io.grpc.rls.LruCache.EvictionType;
59
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
60
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
61
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
62
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
63
import io.grpc.stub.StreamObserver;
64
import io.grpc.util.ForwardingLoadBalancerHelper;
65
import java.net.URI;
66
import java.net.URISyntaxException;
67
import java.util.Arrays;
68
import java.util.Collections;
69
import java.util.HashMap;
70
import java.util.List;
71
import java.util.Map;
72
import java.util.UUID;
73
import java.util.concurrent.Future;
74
import java.util.concurrent.ScheduledExecutorService;
75
import java.util.concurrent.TimeUnit;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
/**
82
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
83
 * routing by fetching the decision from route lookup server. Every single request is routed by
84
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
85
 */
86
@ThreadSafe
87
final class CachingRlsLbClient {
88

89
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
90
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
91
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
92
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
93
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
94
  public static final int BYTES_PER_CHAR = 2;
95
  public static final int STRING_OVERHEAD_BYTES = 38;
96
  /** Minimum bytes for a Java Object. */
97
  public static final int OBJ_OVERHEAD_B = 16;
98

99
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
100
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104
  private final Registration gaugeRegistration;
105
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
106

107
  // All cache status changes (pending, backoff, success) must be under this lock
108
  private final Object lock = new Object();
1✔
109
  // LRU cache based on access order (BACKOFF and actual data will be here)
110
  @GuardedBy("lock")
111
  private final RlsAsyncLruCache linkedHashLruCache;
112
  // any RPC on the fly will cached in this map
113
  @GuardedBy("lock")
1✔
114
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
115

116
  private final ScheduledExecutorService scheduledExecutorService;
117
  private final Ticker ticker;
118
  private final Throttler throttler;
119

120
  private final LbPolicyConfiguration lbPolicyConfig;
121
  private final BackoffPolicy.Provider backoffProvider;
122
  private final long maxAgeNanos;
123
  private final long staleAgeNanos;
124
  private final long callTimeoutNanos;
125

126
  private final RlsLbHelper helper;
127
  private final ManagedChannel rlsChannel;
128
  private final RouteLookupServiceStub rlsStub;
129
  private final RlsPicker rlsPicker;
130
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
131
  @GuardedBy("lock")
132
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
133
  private final ChannelLogger logger;
134

135
  static {
136
    MetricInstrumentRegistry metricInstrumentRegistry
137
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
138
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
139
        "grpc.lb.rls.default_target_picks",
140
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
141
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
142
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
1✔
143
        false);
144
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
145
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
146
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
147
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
148
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
149
            "grpc.lb.pick_result"), Collections.emptyList(),
1✔
150
        false);
151
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
152
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
153
            + "RLS channel being throttled", "{pick}",
154
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
155
        Collections.emptyList(), false);
1✔
156
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
157
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
158
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
159
        Collections.emptyList(), false);
1✔
160
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
161
        "EXPERIMENTAL. The current size of the RLS cache", "By",
162
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
163
        Collections.emptyList(), false);
1✔
164
  }
165

166
  private CachingRlsLbClient(Builder builder) {
1✔
167
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
168
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
169
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
170
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
171
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
172
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
173
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
174
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
175
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
176
    linkedHashLruCache =
1✔
177
        new RlsAsyncLruCache(
178
            rlsConfig.cacheSizeBytes(),
1✔
179
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
180
            scheduledExecutorService,
181
            ticker,
182
            lock,
183
            helper);
184
    logger = helper.getChannelLogger();
1✔
185
    String serverHost = null;
1✔
186
    try {
187
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
188
    } catch (URISyntaxException ignore) {
×
189
      // handled by the following null check
190
    }
1✔
191
    if (serverHost == null) {
1✔
192
      logger.log(
×
193
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
194
      serverHost = helper.getAuthority();
×
195
    }
196
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
197
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
198
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
199
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
200
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
201
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
202
    // called to impose the authority security restrictions.
203
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
204
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
205
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
206
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
207
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
208
    if (routeLookupChannelServiceConfig != null) {
1✔
209
      logger.log(
1✔
210
          ChannelLogLevel.DEBUG,
211
          "RLS channel service config: {0}",
212
          routeLookupChannelServiceConfig);
213
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
214
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
215
    }
216
    rlsChannel = rlsChannelBuilder.build();
1✔
217
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
218
    childLbResolvedAddressFactory =
1✔
219
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
220
    backoffProvider = builder.backoffProvider;
1✔
221
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
222
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
223
    refCountedChildPolicyWrapperFactory =
1✔
224
        new RefCountedChildPolicyWrapperFactory(
225
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
226
            childLbHelperProvider,
227
            new BackoffRefreshListener());
228

229
    gaugeRegistration = helper.getMetricRecorder()
1✔
230
        .registerBatchCallback(new BatchCallback() {
1✔
231
          @Override
232
          public void accept(BatchRecorder recorder) {
233
            int estimatedSize;
234
            long estimatedSizeBytes;
235
            synchronized (lock) {
1✔
236
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
237
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
238
            }
1✔
239
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
240
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
241
                    metricsInstanceUuid), Collections.emptyList());
1✔
242
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
243
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
244
                    metricsInstanceUuid), Collections.emptyList());
1✔
245
          }
1✔
246
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
247

248
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
249
  }
1✔
250

251
  void init() {
252
    synchronized (lock) {
1✔
253
      refCountedChildPolicyWrapperFactory.init();
1✔
254
    }
1✔
255
  }
1✔
256

257
  /**
258
   * Convert the status to UNAVAILBLE and enhance the error message.
259
   * @param status status as provided by server
260
   * @param serverName Used for error description
261
   * @return Transformed status
262
   */
263
  static Status convertRlsServerStatus(Status status, String serverName) {
264
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
265
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
266
                + "RLS server returned: %s: %s",
267
            serverName, status.getCode(), status.getDescription()));
1✔
268
  }
269

270
  /** Populates async cache entry for new request. */
271
  @GuardedBy("lock")
272
  private CachedRouteLookupResponse asyncRlsCall(
273
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
274
    logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
1✔
275
    if (throttler.shouldThrottle()) {
1✔
276
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
277
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
278
      // on this result
279
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
280
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
281
    }
282
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
283
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
284
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
285
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
286
        .routeLookup(
1✔
287
            routeLookupRequest,
288
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
289
              @Override
290
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
291
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
292
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
293
              }
1✔
294

295
              @Override
296
              public void onError(Throwable t) {
297
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
298
                response.setException(t);
1✔
299
                throttler.registerBackendResponse(true);
1✔
300
              }
1✔
301

302
              @Override
303
              public void onCompleted() {
304
                logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
1✔
305
                throttler.registerBackendResponse(false);
1✔
306
              }
1✔
307
            });
308
    return CachedRouteLookupResponse.pendingResponse(
1✔
309
        createPendingEntry(request, response, backoffPolicy));
1✔
310
  }
311

312
  /**
313
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
314
   * cached, pending and backed-off due to error. The result remains same even if the status is
315
   * changed after the return.
316
   */
317
  @CheckReturnValue
318
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
319
    logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
1✔
320
    synchronized (lock) {
1✔
321
      logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
1✔
322
      final CacheEntry cacheEntry;
323
      cacheEntry = linkedHashLruCache.read(request);
1✔
324
      if (cacheEntry == null) {
1✔
325
        logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
1✔
326
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
327
        if (pendingEntry != null) {
1✔
328
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
329
        }
330
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
331
      }
332

333
      if (cacheEntry instanceof DataCacheEntry) {
1✔
334
        // cache hit, initiate async-refresh if entry is staled
335
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
336
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
337
        if (dataEntry.isStaled(ticker.read())) {
1✔
338
          logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
1✔
339
          dataEntry.maybeRefresh();
1✔
340
        }
341
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
342
      }
343
      logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
1✔
344
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
345
    }
346
  }
347

348
  /** Performs any pending maintenance operations needed by the cache. */
349
  void close() {
350
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
351
    synchronized (lock) {
1✔
352
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
353
      linkedHashLruCache.close();
1✔
354
      // TODO(creamsoup) maybe cancel all pending requests
355
      pendingCallCache.clear();
1✔
356
      rlsChannel.shutdownNow();
1✔
357
      rlsPicker.close();
1✔
358
      gaugeRegistration.close();
1✔
359
    }
1✔
360
  }
1✔
361

362
  void requestConnection() {
363
    rlsChannel.getState(true);
×
364
  }
×
365

366
  @GuardedBy("lock")
367
  private PendingCacheEntry createPendingEntry(
368
      RouteLookupRequest request,
369
      ListenableFuture<RouteLookupResponse> pendingCall,
370
      @Nullable BackoffPolicy backoffPolicy) {
371
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
372
    // Add the entry to the map before adding the Listener, because the listener removes the
373
    // entry from the map
374
    pendingCallCache.put(request, entry);
1✔
375
    // Beware that the listener can run immediately on the current thread
376
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
377
    return entry;
1✔
378
  }
379

380
  private void pendingRpcComplete(PendingCacheEntry entry) {
381
    synchronized (lock) {
1✔
382
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
383
      if (clientClosed) {
1✔
384
        return;
1✔
385
      }
386

387
      try {
388
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
389
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
390
        // reattempt picks when the child LB is done connecting
391
      } catch (Exception e) {
1✔
392
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
393
        // Cache updated. updateBalancingState() to reattempt picks
394
        helper.triggerPendingRpcProcessing();
1✔
395
      }
1✔
396
    }
1✔
397
  }
1✔
398

399
  @GuardedBy("lock")
400
  private DataCacheEntry createDataEntry(
401
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
402
    logger.log(
1✔
403
        ChannelLogLevel.DEBUG,
404
        "Transition to data cache: routeLookupResponse={0}",
405
        routeLookupResponse);
406
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
407
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
408
    // this cache update because the lock is held
409
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
410
    return entry;
1✔
411
  }
412

413
  @GuardedBy("lock")
414
  private BackoffCacheEntry createBackOffEntry(
415
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
416
    logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
417
    if (backoffPolicy == null) {
1✔
418
      backoffPolicy = backoffProvider.get();
1✔
419
    }
420
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
421
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
422
    // Lock is held, so the task can't execute before the assignment
423
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
424
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
425
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
426
    logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
1✔
427
        delayNanos);
1✔
428
    return entry;
1✔
429
  }
430

431
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
432
    synchronized (lock) {
1✔
433
      // This checks whether the task has been cancelled and prevents a second execution.
434
      if (!entry.scheduledFuture.cancel(false)) {
1✔
435
        // Future was previously cancelled
436
        return;
×
437
      }
438
      logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
1✔
439
      linkedHashLruCache.invalidate(entry.request);
1✔
440
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
441
    }
1✔
442
  }
1✔
443

444
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
445

446
    final Helper helper;
447
    private ConnectivityState state;
448
    private SubchannelPicker picker;
449

450
    RlsLbHelper(Helper helper) {
1✔
451
      this.helper = helper;
1✔
452
    }
1✔
453

454
    @Override
455
    protected Helper delegate() {
456
      return helper;
1✔
457
    }
458

459
    @Override
460
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
461
      state = newState;
1✔
462
      picker = newPicker;
1✔
463
      super.updateBalancingState(newState, newPicker);
1✔
464
    }
1✔
465

466
    void triggerPendingRpcProcessing() {
467
      checkState(state != null, "updateBalancingState hasn't yet been called");
1✔
468
      helper.getSynchronizationContext().execute(
1✔
469
          () -> super.updateBalancingState(state, picker));
1✔
470
    }
1✔
471
  }
472

473
  /**
474
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
475
   */
476
  static final class CachedRouteLookupResponse {
477
    // Should only have 1 of following 3 cache entries
478
    @Nullable
479
    private final DataCacheEntry dataCacheEntry;
480
    @Nullable
481
    private final PendingCacheEntry pendingCacheEntry;
482
    @Nullable
483
    private final BackoffCacheEntry backoffCacheEntry;
484

485
    CachedRouteLookupResponse(
486
        DataCacheEntry dataCacheEntry,
487
        PendingCacheEntry pendingCacheEntry,
488
        BackoffCacheEntry backoffCacheEntry) {
1✔
489
      this.dataCacheEntry = dataCacheEntry;
1✔
490
      this.pendingCacheEntry = pendingCacheEntry;
1✔
491
      this.backoffCacheEntry = backoffCacheEntry;
1✔
492
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
493
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
494
          "Expected only 1 cache entry value provided");
495
    }
1✔
496

497
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
498
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
499
    }
500

501
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
502
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
503
    }
504

505
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
506
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
507
    }
508

509
    boolean hasData() {
510
      return dataCacheEntry != null;
1✔
511
    }
512

513
    @Nullable
514
    ChildPolicyWrapper getChildPolicyWrapper() {
515
      if (!hasData()) {
1✔
516
        return null;
×
517
      }
518
      return dataCacheEntry.getChildPolicyWrapper();
1✔
519
    }
520

521
    @VisibleForTesting
522
    @Nullable
523
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
524
      if (!hasData()) {
1✔
525
        return null;
×
526
      }
527
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
528
    }
529

530
    @Nullable
531
    String getHeaderData() {
532
      if (!hasData()) {
1✔
533
        return null;
1✔
534
      }
535
      return dataCacheEntry.getHeaderData();
1✔
536
    }
537

538
    boolean hasError() {
539
      return backoffCacheEntry != null;
1✔
540
    }
541

542
    boolean isPending() {
543
      return pendingCacheEntry != null;
1✔
544
    }
545

546
    @Nullable
547
    Status getStatus() {
548
      if (!hasError()) {
1✔
549
        return null;
×
550
      }
551
      return backoffCacheEntry.getStatus();
1✔
552
    }
553

554
    @Override
555
    public String toString() {
556
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
557
      if (dataCacheEntry != null) {
×
558
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
559
      }
560
      if (pendingCacheEntry != null) {
×
561
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
562
      }
563
      if (backoffCacheEntry != null) {
×
564
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
565
      }
566
      return toStringHelper.toString();
×
567
    }
568
  }
569

570
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
571
  static final class PendingCacheEntry {
572
    private final ListenableFuture<RouteLookupResponse> pendingCall;
573
    private final RouteLookupRequest request;
574
    @Nullable
575
    private final BackoffPolicy backoffPolicy;
576

577
    PendingCacheEntry(
578
        RouteLookupRequest request,
579
        ListenableFuture<RouteLookupResponse> pendingCall,
580
        @Nullable BackoffPolicy backoffPolicy) {
1✔
581
      this.request = checkNotNull(request, "request");
1✔
582
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
583
      this.backoffPolicy = backoffPolicy;
1✔
584
    }
1✔
585

586
    @Override
587
    public String toString() {
588
      return MoreObjects.toStringHelper(this)
×
589
          .add("request", request)
×
590
          .toString();
×
591
    }
592
  }
593

594
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
595
  abstract static class CacheEntry {
596

597
    protected final RouteLookupRequest request;
598

599
    CacheEntry(RouteLookupRequest request) {
1✔
600
      this.request = checkNotNull(request, "request");
1✔
601
    }
1✔
602

603
    abstract int getSizeBytes();
604

605
    abstract boolean isExpired(long now);
606

607
    abstract void cleanup();
608

609
    protected boolean isOldEnoughToBeEvicted(long now) {
610
      return true;
×
611
    }
612
  }
613

614
  /** Implementation of {@link CacheEntry} contains valid data. */
615
  final class DataCacheEntry extends CacheEntry {
616
    private final RouteLookupResponse response;
617
    private final long minEvictionTime;
618
    private final long expireTime;
619
    private final long staleTime;
620
    private final List<ChildPolicyWrapper> childPolicyWrappers;
621

622
    // GuardedBy CachingRlsLbClient.lock
623
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
624
      super(request);
1✔
625
      this.response = checkNotNull(response, "response");
1✔
626
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
627
      childPolicyWrappers =
1✔
628
          refCountedChildPolicyWrapperFactory
1✔
629
              .createOrGet(response.targets());
1✔
630
      long now = ticker.read();
1✔
631
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
632
      expireTime = now + maxAgeNanos;
1✔
633
      staleTime = now + staleAgeNanos;
1✔
634
    }
1✔
635

636
    /**
637
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
638
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
639
     * data still exists. Flow looks like following.
640
     *
641
     * <pre>
642
     * Timeline                       | async refresh
643
     *                                V put new cache (entry2)
644
     * entry1: Pending | hasValue | staled  |
645
     * entry2:                        | OV* | pending | hasValue | staled |
646
     *
647
     * OV: old value
648
     * </pre>
649
     */
650
    void maybeRefresh() {
651
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
652
        if (pendingCallCache.containsKey(request)) {
1✔
653
          // pending already requested
654
          logger.log(ChannelLogLevel.DEBUG,
×
655
              "A pending refresh request already created, no need to proceed with refresh");
656
          return;
×
657
        }
658
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
659
      }
1✔
660
    }
1✔
661

662
    @VisibleForTesting
663
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
664
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
665
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
666
          return childPolicyWrapper;
1✔
667
        }
668
      }
1✔
669

670
      throw new RuntimeException("Target not found:" + target);
×
671
    }
672

673
    @Nullable
674
    ChildPolicyWrapper getChildPolicyWrapper() {
675
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
676
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
677
          return childPolicyWrapper;
1✔
678
        }
679
      }
1✔
680
      return childPolicyWrappers.get(0);
1✔
681
    }
682

683
    String getHeaderData() {
684
      return response.getHeaderData();
1✔
685
    }
686

687
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
688
    int calcStringSize(String target) {
689
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
690
    }
691

692
    @Override
693
    int getSizeBytes() {
694
      int targetSize = 0;
1✔
695
      for (String target : response.targets()) {
1✔
696
        targetSize += calcStringSize(target);
1✔
697
      }
1✔
698
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
699
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
700
    }
701

702
    @Override
703
    boolean isExpired(long now) {
704
      return expireTime - now <= 0;
1✔
705
    }
706

707
    boolean isStaled(long now) {
708
      return staleTime - now <= 0;
1✔
709
    }
710

711
    @Override
712
    protected boolean isOldEnoughToBeEvicted(long now) {
713
      return minEvictionTime - now <= 0;
×
714
    }
715

716
    @Override
717
    void cleanup() {
718
      synchronized (lock) {
1✔
719
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
720
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
721
        }
1✔
722
      }
1✔
723
    }
1✔
724

725
    @Override
726
    public String toString() {
727
      return MoreObjects.toStringHelper(this)
×
728
          .add("request", request)
×
729
          .add("response", response)
×
730
          .add("expireTime", expireTime)
×
731
          .add("staleTime", staleTime)
×
732
          .add("childPolicyWrappers", childPolicyWrappers)
×
733
          .toString();
×
734
    }
735
  }
736

737
  /**
738
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
739
   * status when the backoff time is expired.
740
   */
741
  private static final class BackoffCacheEntry extends CacheEntry {
742

743
    private final Status status;
744
    private final BackoffPolicy backoffPolicy;
745
    private Future<?> scheduledFuture;
746

747
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
748
      super(request);
1✔
749
      this.status = checkNotNull(status, "status");
1✔
750
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
751
    }
1✔
752

753
    Status getStatus() {
754
      return status;
1✔
755
    }
756

757
    @Override
758
    int getSizeBytes() {
759
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
760
    }
761

762
    @Override
763
    boolean isExpired(long now) {
764
      return scheduledFuture.isDone();
1✔
765
    }
766

767
    @Override
768
    void cleanup() {
769
      scheduledFuture.cancel(false);
1✔
770
    }
1✔
771

772
    @Override
773
    public String toString() {
774
      return MoreObjects.toStringHelper(this)
×
775
          .add("request", request)
×
776
          .add("status", status)
×
777
          .toString();
×
778
    }
779
  }
780

781
  /** Returns a Builder for {@link CachingRlsLbClient}. */
782
  static Builder newBuilder() {
783
    return new Builder();
1✔
784
  }
785

786
  /** A Builder for {@link CachingRlsLbClient}. */
787
  static final class Builder {
1✔
788

789
    private Helper helper;
790
    private LbPolicyConfiguration lbPolicyConfig;
791
    private Throttler throttler = new HappyThrottler();
1✔
792
    private ResolvedAddressFactory resolvedAddressFactory;
793
    private Ticker ticker = Ticker.systemTicker();
1✔
794
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
795
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
796

797
    Builder setHelper(Helper helper) {
798
      this.helper = checkNotNull(helper, "helper");
1✔
799
      return this;
1✔
800
    }
801

802
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
803
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
804
      return this;
1✔
805
    }
806

807
    Builder setThrottler(Throttler throttler) {
808
      this.throttler = checkNotNull(throttler, "throttler");
1✔
809
      return this;
1✔
810
    }
811

812
    /**
813
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
814
     */
815
    Builder setResolvedAddressesFactory(
816
        ResolvedAddressFactory resolvedAddressFactory) {
817
      this.resolvedAddressFactory =
1✔
818
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
819
      return this;
1✔
820
    }
821

822
    Builder setTicker(Ticker ticker) {
823
      this.ticker = checkNotNull(ticker, "ticker");
1✔
824
      return this;
1✔
825
    }
826

827
    Builder setEvictionListener(
828
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
829
      this.evictionListener = evictionListener;
1✔
830
      return this;
1✔
831
    }
832

833
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
834
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
835
      return this;
1✔
836
    }
837

838
    CachingRlsLbClient build() {
839
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
840
      client.init();
1✔
841
      return client;
1✔
842
    }
843
  }
844

845
  /**
846
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
847
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
848
   */
849
  private static final class AutoCleaningEvictionListener
850
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
851

852
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
853

854
    AutoCleaningEvictionListener(
855
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
856
      this.delegate = delegate;
1✔
857
    }
1✔
858

859
    @Override
860
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
861
      if (delegate != null) {
1✔
862
        delegate.onEviction(key, value, cause);
1✔
863
      }
864
      // performs cleanup after delegation
865
      value.cleanup();
1✔
866
    }
1✔
867
  }
868

869
  /** A Throttler never throttles. */
870
  private static final class HappyThrottler implements Throttler {
871

872
    @Override
873
    public boolean shouldThrottle() {
874
      return false;
×
875
    }
876

877
    @Override
878
    public void registerBackendResponse(boolean throttled) {
879
      // no-op
880
    }
×
881
  }
882

883
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
884
  private static final class RlsAsyncLruCache
885
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
886
    private final RlsLbHelper helper;
887

888
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
889
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
890
        ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) {
891
      super(
1✔
892
          maxEstimatedSizeBytes,
893
          evictionListener,
894
          1,
895
          TimeUnit.MINUTES,
896
          ses,
897
          ticker,
898
          lock);
899
      this.helper = checkNotNull(helper, "helper");
1✔
900
    }
1✔
901

902
    @Override
903
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
904
      return value.isExpired(nowNanos);
1✔
905
    }
906

907
    @Override
908
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
909
      return value.getSizeBytes();
1✔
910
    }
911

912
    @Override
913
    protected boolean shouldInvalidateEldestEntry(
914
        RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) {
915
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
916
        return false;
×
917
      }
918

919
      // eldest entry should be evicted if size limit exceeded
920
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
921
    }
922

923
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
924
      CacheEntry newEntry = cache(key, value);
1✔
925

926
      // force cleanup if new entry pushed cache over max size (in bytes)
927
      if (fitToLimit()) {
1✔
928
        helper.triggerPendingRpcProcessing();
×
929
      }
930
      return newEntry;
1✔
931
    }
932
  }
933

934
  /**
935
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
936
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
937
   */
938
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
939

940
    @Nullable
1✔
941
    private ConnectivityState prevState = null;
942

943
    @Override
944
    public void onStatusChanged(ConnectivityState newState) {
945
      logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
1✔
946
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
947
          && newState == ConnectivityState.READY) {
948
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
949
        logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
1✔
950
        synchronized (lock) {
1✔
951
          logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
1✔
952
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
953
            if (value instanceof BackoffCacheEntry) {
1✔
954
              refreshBackoffEntry((BackoffCacheEntry) value);
×
955
            }
956
          }
1✔
957
        }
1✔
958
      }
959
      prevState = newState;
1✔
960
    }
1✔
961
  }
962

963
  /** A header will be added when RLS server respond with additional header data. */
964
  @VisibleForTesting
965
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
966
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
967

968
  final class RlsPicker extends SubchannelPicker {
969

970
    private final RlsRequestFactory requestFactory;
971
    private final String lookupService;
972

973
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
974
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
975
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
976
    }
1✔
977

978
    @Override
979
    public PickResult pickSubchannel(PickSubchannelArgs args) {
980
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
981
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
982
      RouteLookupRequest request =
1✔
983
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
984
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
985
      logger.log(ChannelLogLevel.DEBUG,
1✔
986
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
987
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
988

989
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
990
        logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
1✔
991
        Metadata headers = args.getHeaders();
1✔
992
        headers.discardAll(RLS_DATA_KEY);
1✔
993
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
994
      }
995
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
996
      logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
1✔
997
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
998
      if (response.hasData()) {
1✔
999
        logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
1✔
1000
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1001
        SubchannelPicker picker =
1002
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1003
        if (picker == null) {
1✔
1004
          logger.log(ChannelLogLevel.DEBUG,
×
1005
              "Child policy wrapper didn't return a picker, returning PickResult with no results");
1006
          return PickResult.withNoResult();
×
1007
        }
1008
        // Happy path
1009
        logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
1✔
1010
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1011
        if (pickResult.hasResult()) {
1✔
1012
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1013
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1014
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1015
              Collections.emptyList());
1✔
1016
        }
1017
        return pickResult;
1✔
1018
      } else if (response.hasError()) {
1✔
1019
        logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
1✔
1020
        if (hasFallback) {
1✔
1021
          logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
1✔
1022
          return useFallback(args);
1✔
1023
        }
1024
        logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
1✔
1025
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1026
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1027
        return PickResult.withError(
1✔
1028
            convertRlsServerStatus(response.getStatus(),
1✔
1029
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1030
      } else {
1031
        logger.log(ChannelLogLevel.DEBUG,
1✔
1032
            "RLS response had no data, return a PickResult with no data");
1033
        return PickResult.withNoResult();
1✔
1034
      }
1035
    }
1036

1037
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
1038

1039
    /** Uses Subchannel connected to default target. */
1040
    private PickResult useFallback(PickSubchannelArgs args) {
1041
      // TODO(creamsoup) wait until lb is ready
1042
      startFallbackChildPolicy();
1✔
1043
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1044
      if (picker == null) {
1✔
1045
        return PickResult.withNoResult();
1✔
1046
      }
1047
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1048
      if (pickResult.hasResult()) {
1✔
1049
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1050
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1051
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1052
            Collections.emptyList());
1✔
1053
      }
1054
      return pickResult;
1✔
1055
    }
1056

1057
    private String determineMetricsPickResult(PickResult pickResult) {
1058
      if (pickResult.getStatus().isOk()) {
1✔
1059
        return "complete";
1✔
1060
      } else if (pickResult.isDrop()) {
1✔
1061
        return "drop";
×
1062
      } else {
1063
        return "fail";
1✔
1064
      }
1065
    }
1066

1067
    private void startFallbackChildPolicy() {
1068
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1069
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1070
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
1✔
1071
      synchronized (lock) {
1✔
1072
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
1✔
1073
        if (fallbackChildPolicyWrapper != null) {
1✔
1074
          return;
1✔
1075
        }
1076
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1077
      }
1✔
1078
    }
1✔
1079

1080
    // GuardedBy CachingRlsLbClient.lock
1081
    void close() {
1082
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1083
        logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
1✔
1084
        if (fallbackChildPolicyWrapper != null) {
1✔
1085
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1086
        }
1087
      }
1✔
1088
    }
1✔
1089

1090
    @Override
1091
    public String toString() {
1092
      return MoreObjects.toStringHelper(this)
×
1093
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1094
          .toString();
×
1095
    }
1096
  }
1097

1098
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc