• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19093

08 Mar 2024 05:47PM UTC coverage: 88.296% (-0.002%) from 88.298%
#19093

push

github

web-flow
rls: Fix a local and remote race

The local race passes `rlsPicker` to the channel before
CachingRlsLbClient is finished constructing. `RlsPicker` can use
multiple of the fields not yet initialized. This seems not to be
happening in practice, because it appears like it would break things
very loudly (e.g., NPE).

The remote race seems incredibly hard to hit, because it requires an RPC
to complete before the pending data tracking the RPC is added to a map.
But with if a system is at 100% CPU utilization, maybe it can be hit. If
it is hit, all RPCs needing the impacted cache entry will forever be
buffered.

31149 of 35278 relevant lines covered (88.3%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.74
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import com.google.common.util.concurrent.SettableFuture;
29
import io.grpc.ChannelLogger;
30
import io.grpc.ChannelLogger.ChannelLogLevel;
31
import io.grpc.ConnectivityState;
32
import io.grpc.LoadBalancer.Helper;
33
import io.grpc.LoadBalancer.PickResult;
34
import io.grpc.LoadBalancer.PickSubchannelArgs;
35
import io.grpc.LoadBalancer.ResolvedAddresses;
36
import io.grpc.LoadBalancer.SubchannelPicker;
37
import io.grpc.ManagedChannel;
38
import io.grpc.ManagedChannelBuilder;
39
import io.grpc.Metadata;
40
import io.grpc.Status;
41
import io.grpc.SynchronizationContext;
42
import io.grpc.SynchronizationContext.ScheduledHandle;
43
import io.grpc.internal.BackoffPolicy;
44
import io.grpc.internal.ExponentialBackoffPolicy;
45
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
46
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
47
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
48
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
49
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
50
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
51
import io.grpc.rls.LruCache.EvictionListener;
52
import io.grpc.rls.LruCache.EvictionType;
53
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
54
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
55
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
56
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
57
import io.grpc.rls.Throttler.ThrottledException;
58
import io.grpc.stub.StreamObserver;
59
import io.grpc.util.ForwardingLoadBalancerHelper;
60
import java.net.URI;
61
import java.net.URISyntaxException;
62
import java.util.HashMap;
63
import java.util.List;
64
import java.util.Map;
65
import java.util.concurrent.ScheduledExecutorService;
66
import java.util.concurrent.TimeUnit;
67
import javax.annotation.CheckReturnValue;
68
import javax.annotation.Nullable;
69
import javax.annotation.concurrent.GuardedBy;
70
import javax.annotation.concurrent.ThreadSafe;
71

72
/**
73
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
74
 * routing by fetching the decision from route lookup server. Every single request is routed by
75
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
76
 */
77
@ThreadSafe
78
final class CachingRlsLbClient {
79

80
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
81
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
82
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
83
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
84
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
85
  public static final int BYTES_PER_CHAR = 2;
86
  public static final int STRING_OVERHEAD_BYTES = 38;
87
  /** Minimum bytes for a Java Object. */
88
  public static final int OBJ_OVERHEAD_B = 16;
89

90
  // All cache status changes (pending, backoff, success) must be under this lock
91
  private final Object lock = new Object();
1✔
92
  // LRU cache based on access order (BACKOFF and actual data will be here)
93
  @GuardedBy("lock")
94
  private final RlsAsyncLruCache linkedHashLruCache;
95
  // any RPC on the fly will cached in this map
96
  @GuardedBy("lock")
1✔
97
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
98

99
  private final SynchronizationContext synchronizationContext;
100
  private final ScheduledExecutorService scheduledExecutorService;
101
  private final Ticker ticker;
102
  private final Throttler throttler;
103

104
  private final LbPolicyConfiguration lbPolicyConfig;
105
  private final BackoffPolicy.Provider backoffProvider;
106
  private final long maxAgeNanos;
107
  private final long staleAgeNanos;
108
  private final long callTimeoutNanos;
109

110
  private final RlsLbHelper helper;
111
  private final ManagedChannel rlsChannel;
112
  private final RouteLookupServiceStub rlsStub;
113
  private final RlsPicker rlsPicker;
114
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
115
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
116
  private final ChannelLogger logger;
117

118
  private CachingRlsLbClient(Builder builder) {
1✔
119
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
120
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
121
    synchronizationContext = helper.getSynchronizationContext();
1✔
122
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
123
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
124
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
125
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
126
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
127
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
128
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
129
    linkedHashLruCache =
1✔
130
        new RlsAsyncLruCache(
131
            rlsConfig.cacheSizeBytes(),
1✔
132
            builder.evictionListener,
1✔
133
            scheduledExecutorService,
134
            ticker,
135
            lock);
136
    logger = helper.getChannelLogger();
1✔
137
    String serverHost = null;
1✔
138
    try {
139
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
140
    } catch (URISyntaxException ignore) {
×
141
      // handled by the following null check
142
    }
1✔
143
    if (serverHost == null) {
1✔
144
      logger.log(
×
145
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
146
      serverHost = helper.getAuthority();
×
147
    }
148
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
149
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
150
    rlsPicker = new RlsPicker(requestFactory);
1✔
151
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
152
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
153
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
154
    // called to impose the authority security restrictions.
155
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
156
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
157
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
158
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
159
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
160
    if (routeLookupChannelServiceConfig != null) {
1✔
161
      logger.log(
1✔
162
          ChannelLogLevel.DEBUG,
163
          "RLS channel service config: {0}",
164
          routeLookupChannelServiceConfig);
165
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
166
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
167
    }
168
    rlsChannel = rlsChannelBuilder.build();
1✔
169
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
170
    childLbResolvedAddressFactory =
1✔
171
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
172
    backoffProvider = builder.backoffProvider;
1✔
173
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
174
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
175
    refCountedChildPolicyWrapperFactory =
1✔
176
        new RefCountedChildPolicyWrapperFactory(
177
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
178
            childLbHelperProvider,
179
            new BackoffRefreshListener());
180
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
181
  }
1✔
182

183
  /**
184
   * Convert the status to UNAVAILBLE and enhance the error message.
185
   * @param status status as provided by server
186
   * @param serverName Used for error description
187
   * @return Transformed status
188
   */
189
  static Status convertRlsServerStatus(Status status, String serverName) {
190
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
191
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
192
                + "RLS server returned: %s: %s",
193
            serverName, status.getCode(), status.getDescription()));
1✔
194
  }
195

196
  @CheckReturnValue
197
  private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
198
    logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
1✔
199
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
200
    if (throttler.shouldThrottle()) {
1✔
201
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
202
      response.setException(new ThrottledException());
1✔
203
      return response;
1✔
204
    }
205
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
206
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
207
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
208
        .routeLookup(
1✔
209
            routeLookupRequest,
210
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
211
              @Override
212
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
213
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
214
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
215
              }
1✔
216

217
              @Override
218
              public void onError(Throwable t) {
219
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
220
                response.setException(t);
1✔
221
                throttler.registerBackendResponse(true);
1✔
222
                helper.propagateRlsError();
1✔
223
              }
1✔
224

225
              @Override
226
              public void onCompleted() {
227
                logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
1✔
228
                throttler.registerBackendResponse(false);
1✔
229
              }
1✔
230
            });
231
    return response;
1✔
232
  }
233

234
  /**
235
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
236
   * cached, pending and backed-off due to error. The result remains same even if the status is
237
   * changed after the return.
238
   */
239
  @CheckReturnValue
240
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
241
    logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
1✔
242
    synchronized (lock) {
1✔
243
      logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
1✔
244
      final CacheEntry cacheEntry;
245
      cacheEntry = linkedHashLruCache.read(request);
1✔
246
      if (cacheEntry == null) {
1✔
247
        logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
1✔
248
        return handleNewRequest(request);
1✔
249
      }
250

251
      if (cacheEntry instanceof DataCacheEntry) {
1✔
252
        // cache hit, initiate async-refresh if entry is staled
253
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
254
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
255
        if (dataEntry.isStaled(ticker.read())) {
1✔
256
          logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
1✔
257
          dataEntry.maybeRefresh();
1✔
258
        }
259
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
260
      }
261
      logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
1✔
262
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
263
    }
264
  }
265

266
  /** Performs any pending maintenance operations needed by the cache. */
267
  void close() {
268
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
269
    synchronized (lock) {
1✔
270
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
271
      linkedHashLruCache.close();
1✔
272
      // TODO(creamsoup) maybe cancel all pending requests
273
      pendingCallCache.clear();
1✔
274
      rlsChannel.shutdownNow();
1✔
275
      rlsPicker.close();
1✔
276
    }
1✔
277
  }
1✔
278

279
  /**
280
   * Populates async cache entry for new request. This is only methods directly modifies the cache,
281
   * any status change is happening via event (async request finished, timed out, etc) in {@link
282
   * PendingCacheEntry}, {@link DataCacheEntry} and {@link BackoffCacheEntry}.
283
   */
284
  private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) {
285
    synchronized (lock) {
1✔
286
      PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
287
      if (pendingEntry != null) {
1✔
288
        return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
289
      }
290

291
      ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
1✔
292
      if (!asyncCall.isDone()) {
1✔
293
        pendingEntry = new PendingCacheEntry(request, asyncCall);
1✔
294
        // Add the entry to the map before adding the Listener, because the listener removes the
295
        // entry from the map
296
        pendingCallCache.put(request, pendingEntry);
1✔
297
        // Beware that the listener can run immediately on the current thread
298
        asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
1✔
299
        return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
300
      } else {
301
        // async call returned finished future is most likely throttled
302
        try {
303
          RouteLookupResponse response = asyncCall.get();
1✔
304
          DataCacheEntry dataEntry = new DataCacheEntry(request, response);
1✔
305
          linkedHashLruCache.cacheAndClean(request, dataEntry);
1✔
306
          return CachedRouteLookupResponse.dataEntry(dataEntry);
1✔
307
        } catch (Exception e) {
1✔
308
          BackoffCacheEntry backoffEntry =
1✔
309
              new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
1✔
310
          linkedHashLruCache.cacheAndClean(request, backoffEntry);
1✔
311
          return CachedRouteLookupResponse.backoffEntry(backoffEntry);
1✔
312
        }
313
      }
314
    }
315
  }
316

317
  void requestConnection() {
318
    rlsChannel.getState(true);
×
319
  }
×
320

321
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
322

323
    final Helper helper;
324
    private ConnectivityState state;
325
    private SubchannelPicker picker;
326

327
    RlsLbHelper(Helper helper) {
1✔
328
      this.helper = helper;
1✔
329
    }
1✔
330

331
    @Override
332
    protected Helper delegate() {
333
      return helper;
1✔
334
    }
335

336
    @Override
337
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
338
      state = newState;
1✔
339
      picker = newPicker;
1✔
340
      super.updateBalancingState(newState, newPicker);
1✔
341
    }
1✔
342

343
    void propagateRlsError() {
344
      getSynchronizationContext().execute(new Runnable() {
1✔
345
        @Override
346
        public void run() {
347
          if (picker != null) {
1✔
348
            // Refresh the channel state and let pending RPCs reprocess the picker.
349
            updateBalancingState(state, picker);
1✔
350
          }
351
        }
1✔
352
      });
353
    }
1✔
354

355
    void triggerPendingRpcProcessing() {
356
      super.updateBalancingState(state, picker);
×
357
    }
×
358
  }
359

360
  /**
361
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
362
   */
363
  static final class CachedRouteLookupResponse {
364
    // Should only have 1 of following 3 cache entries
365
    @Nullable
366
    private final DataCacheEntry dataCacheEntry;
367
    @Nullable
368
    private final PendingCacheEntry pendingCacheEntry;
369
    @Nullable
370
    private final BackoffCacheEntry backoffCacheEntry;
371

372
    CachedRouteLookupResponse(
373
        DataCacheEntry dataCacheEntry,
374
        PendingCacheEntry pendingCacheEntry,
375
        BackoffCacheEntry backoffCacheEntry) {
1✔
376
      this.dataCacheEntry = dataCacheEntry;
1✔
377
      this.pendingCacheEntry = pendingCacheEntry;
1✔
378
      this.backoffCacheEntry = backoffCacheEntry;
1✔
379
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
380
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
381
          "Expected only 1 cache entry value provided");
382
    }
1✔
383

384
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
385
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
386
    }
387

388
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
389
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
390
    }
391

392
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
393
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
394
    }
395

396
    boolean hasData() {
397
      return dataCacheEntry != null;
1✔
398
    }
399

400
    @Nullable
401
    ChildPolicyWrapper getChildPolicyWrapper() {
402
      if (!hasData()) {
1✔
403
        return null;
×
404
      }
405
      return dataCacheEntry.getChildPolicyWrapper();
1✔
406
    }
407

408
    @VisibleForTesting
409
    @Nullable
410
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
411
      if (!hasData()) {
1✔
412
        return null;
×
413
      }
414
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
415
    }
416

417
    @Nullable
418
    String getHeaderData() {
419
      if (!hasData()) {
1✔
420
        return null;
1✔
421
      }
422
      return dataCacheEntry.getHeaderData();
1✔
423
    }
424

425
    boolean hasError() {
426
      return backoffCacheEntry != null;
1✔
427
    }
428

429
    boolean isPending() {
430
      return pendingCacheEntry != null;
1✔
431
    }
432

433
    @Nullable
434
    Status getStatus() {
435
      if (!hasError()) {
×
436
        return null;
×
437
      }
438
      return backoffCacheEntry.getStatus();
×
439
    }
440

441
    @Override
442
    public String toString() {
443
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
444
      if (dataCacheEntry != null) {
×
445
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
446
      }
447
      if (pendingCacheEntry != null) {
×
448
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
449
      }
450
      if (backoffCacheEntry != null) {
×
451
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
452
      }
453
      return toStringHelper.toString();
×
454
    }
455
  }
456

457
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
458
  final class PendingCacheEntry {
459
    private final ListenableFuture<RouteLookupResponse> pendingCall;
460
    private final RouteLookupRequest request;
461
    private final BackoffPolicy backoffPolicy;
462

463
    PendingCacheEntry(
464
        RouteLookupRequest request, ListenableFuture<RouteLookupResponse> pendingCall) {
465
      this(request, pendingCall, null);
1✔
466
    }
1✔
467

468
    PendingCacheEntry(
469
        RouteLookupRequest request,
470
        ListenableFuture<RouteLookupResponse> pendingCall,
471
        @Nullable BackoffPolicy backoffPolicy) {
1✔
472
      this.request = checkNotNull(request, "request");
1✔
473
      this.pendingCall = pendingCall;
1✔
474
      this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy;
1✔
475
    }
1✔
476

477
    void handleDoneFuture() {
478
      synchronized (lock) {
1✔
479
        pendingCallCache.remove(request);
1✔
480
        if (pendingCall.isCancelled()) {
1✔
481
          return;
×
482
        }
483

484
        try {
485
          transitionToDataEntry(pendingCall.get());
1✔
486
        } catch (Exception e) {
1✔
487
          if (e instanceof ThrottledException) {
1✔
488
            transitionToBackOff(Status.RESOURCE_EXHAUSTED.withCause(e));
×
489
          } else {
490
            transitionToBackOff(Status.fromThrowable(e));
1✔
491
          }
492
        }
1✔
493
      }
1✔
494
    }
1✔
495

496
    private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) {
497
      synchronized (lock) {
1✔
498
        logger.log(
1✔
499
            ChannelLogLevel.DEBUG,
500
            "Transition to data cache: routeLookupResponse={0}",
501
            routeLookupResponse);
502
        linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, routeLookupResponse));
1✔
503
      }
1✔
504
    }
1✔
505

506
    private void transitionToBackOff(Status status) {
507
      synchronized (lock) {
1✔
508
        logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
509
        linkedHashLruCache.cacheAndClean(request,
1✔
510
            new BackoffCacheEntry(request, status, backoffPolicy));
511
      }
1✔
512
    }
1✔
513

514
    @Override
515
    public String toString() {
516
      return MoreObjects.toStringHelper(this)
×
517
          .add("request", request)
×
518
          .toString();
×
519
    }
520
  }
521

522
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
523
  abstract class CacheEntry {
524

525
    protected final RouteLookupRequest request;
526

527
    CacheEntry(RouteLookupRequest request) {
1✔
528
      this.request = checkNotNull(request, "request");
1✔
529
    }
1✔
530

531
    abstract int getSizeBytes();
532

533
    final boolean isExpired() {
534
      return isExpired(ticker.read());
1✔
535
    }
536

537
    abstract boolean isExpired(long now);
538

539
    abstract void cleanup();
540

541
    protected long getMinEvictionTime() {
542
      return 0L;
×
543
    }
544

545
    protected void triggerPendingRpcProcessing() {
546
      helper.triggerPendingRpcProcessing();
×
547
    }
×
548
  }
549

550
  /** Implementation of {@link CacheEntry} contains valid data. */
551
  final class DataCacheEntry extends CacheEntry {
552
    private final RouteLookupResponse response;
553
    private final long minEvictionTime;
554
    private final long expireTime;
555
    private final long staleTime;
556
    private final List<ChildPolicyWrapper> childPolicyWrappers;
557

558
    // GuardedBy CachingRlsLbClient.lock
559
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
560
      super(request);
1✔
561
      this.response = checkNotNull(response, "response");
1✔
562
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
563
      childPolicyWrappers =
1✔
564
          refCountedChildPolicyWrapperFactory
1✔
565
              .createOrGet(response.targets());
1✔
566
      long now = ticker.read();
1✔
567
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
568
      expireTime = now + maxAgeNanos;
1✔
569
      staleTime = now + staleAgeNanos;
1✔
570
    }
1✔
571

572
    /**
573
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
574
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
575
     * data still exists. Flow looks like following.
576
     *
577
     * <pre>
578
     * Timeline                       | async refresh
579
     *                                V put new cache (entry2)
580
     * entry1: Pending | hasValue | staled  |
581
     * entry2:                        | OV* | pending | hasValue | staled |
582
     *
583
     * OV: old value
584
     * </pre>
585
     */
586
    void maybeRefresh() {
587
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to maybe refresh cache entry");
1✔
588
      synchronized (lock) {
1✔
589
        logger.log(ChannelLogLevel.DEBUG, "Lock to maybe refresh cache entry acquired");
1✔
590
        if (pendingCallCache.containsKey(request)) {
1✔
591
          // pending already requested
592
          logger.log(ChannelLogLevel.DEBUG,
×
593
              "A pending refresh request already created, no need to proceed with refresh");
594
          return;
×
595
        }
596
        final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
1✔
597
        if (!asyncCall.isDone()) {
1✔
598
          logger.log(ChannelLogLevel.DEBUG,
1✔
599
              "Async call to rls not yet complete, adding a pending cache entry");
600
          PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall);
1✔
601
          pendingCallCache.put(request, pendingEntry);
1✔
602
          asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
1✔
603
        } else {
1✔
604
          // async call returned finished future is most likely throttled
605
          try {
606
            logger.log(ChannelLogLevel.DEBUG, "Waiting for RLS call to return");
×
607
            RouteLookupResponse response = asyncCall.get();
×
608
            logger.log(ChannelLogLevel.DEBUG, "RLS call to returned");
×
609
            linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
×
610
          } catch (InterruptedException e) {
×
611
            Thread.currentThread().interrupt();
×
612
          } catch (Exception e) {
×
613
            logger.log(ChannelLogLevel.DEBUG, "RLS call failed, adding a backoff entry", e);
×
614
            BackoffCacheEntry backoffEntry =
×
615
                new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
×
616
            linkedHashLruCache.cacheAndClean(request, backoffEntry);
×
617
          }
×
618
        }
619
      }
1✔
620
    }
1✔
621

622
    @VisibleForTesting
623
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
624
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
625
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
626
          return childPolicyWrapper;
1✔
627
        }
628
      }
1✔
629

630
      throw new RuntimeException("Target not found:" + target);
×
631
    }
632

633
    @Nullable
634
    ChildPolicyWrapper getChildPolicyWrapper() {
635
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
636
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
637
          return childPolicyWrapper;
1✔
638
        }
639
      }
1✔
640
      return childPolicyWrappers.get(0);
1✔
641
    }
642

643
    String getHeaderData() {
644
      return response.getHeaderData();
1✔
645
    }
646

647
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
648
    int calcStringSize(String target) {
649
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
650
    }
651

652
    @Override
653
    int getSizeBytes() {
654
      int targetSize = 0;
1✔
655
      for (String target : response.targets()) {
1✔
656
        targetSize += calcStringSize(target);
1✔
657
      }
1✔
658
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
659
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
660
    }
661

662
    @Override
663
    boolean isExpired(long now) {
664
      return expireTime - now <= 0;
1✔
665
    }
666

667
    boolean isStaled(long now) {
668
      return staleTime - now <= 0;
1✔
669
    }
670

671
    @Override
672
    protected long getMinEvictionTime() {
673
      return minEvictionTime;
×
674
    }
675

676
    @Override
677
    void cleanup() {
678
      synchronized (lock) {
1✔
679
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
680
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
681
        }
1✔
682
      }
1✔
683
    }
1✔
684

685
    @Override
686
    public String toString() {
687
      return MoreObjects.toStringHelper(this)
×
688
          .add("request", request)
×
689
          .add("response", response)
×
690
          .add("expireTime", expireTime)
×
691
          .add("staleTime", staleTime)
×
692
          .add("childPolicyWrappers", childPolicyWrappers)
×
693
          .toString();
×
694
    }
695
  }
696

697
  /**
698
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
699
   * status when the backoff time is expired.
700
   */
701
  private final class BackoffCacheEntry extends CacheEntry {
702

703
    private final Status status;
704
    private final ScheduledHandle scheduledHandle;
705
    private final BackoffPolicy backoffPolicy;
706
    private final long expireNanos;
707
    private boolean shutdown = false;
1✔
708

709
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
1✔
710
      super(request);
1✔
711
      this.status = checkNotNull(status, "status");
1✔
712
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
713
      long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
714
      this.expireNanos = ticker.read() + delayNanos;
1✔
715
      this.scheduledHandle =
1✔
716
          synchronizationContext.schedule(
1✔
717
              new Runnable() {
1✔
718
                @Override
719
                public void run() {
720
                  transitionToPending();
1✔
721
                }
1✔
722
              },
723
              delayNanos,
724
              TimeUnit.NANOSECONDS,
725
              scheduledExecutorService);
1✔
726
      logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
1✔
727
          delayNanos);
1✔
728
    }
1✔
729

730
    /** Forcefully refreshes cache entry by ignoring the backoff timer. */
731
    void forceRefresh() {
732
      logger.log(ChannelLogLevel.DEBUG, "Forcefully refreshing cache entry");
×
733
      if (scheduledHandle.isPending()) {
×
734
        scheduledHandle.cancel();
×
735
        transitionToPending();
×
736
      }
737
    }
×
738

739
    private void transitionToPending() {
740
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to transition to pending");
1✔
741
      synchronized (lock) {
1✔
742
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock to transition to pending");
1✔
743
        if (shutdown) {
1✔
744
          logger.log(ChannelLogLevel.DEBUG, "Already shut down, not transitioning to pending");
×
745
          return;
×
746
        }
747
        logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
1✔
748
        ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
1✔
749
        if (!call.isDone()) {
1✔
750
          logger.log(ChannelLogLevel.DEBUG,
1✔
751
              "Transition to pending RLS call not done, adding a pending cache entry");
752
          linkedHashLruCache.invalidate(request);
1✔
753
          PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
1✔
754
          pendingCallCache.put(request, pendingEntry);
1✔
755
          call.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
1✔
756
        } else {
1✔
757
          try {
758
            logger.log(ChannelLogLevel.DEBUG,
1✔
759
                "Waiting for transition to pending RLS call response");
760
            RouteLookupResponse response = call.get();
1✔
761
            linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
1✔
762
          } catch (InterruptedException e) {
×
763
            Thread.currentThread().interrupt();
×
764
          } catch (Exception e) {
1✔
765
            logger.log(ChannelLogLevel.DEBUG,
1✔
766
                "Transition to pending RLS call failed, creating a backoff entry", e);
767
            linkedHashLruCache.cacheAndClean(
1✔
768
                request,
769
                new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
1✔
770
          }
1✔
771
        }
772
      }
1✔
773
    }
1✔
774

775
    Status getStatus() {
776
      return status;
×
777
    }
778

779
    @Override
780
    int getSizeBytes() {
781
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
782
    }
783

784
    @Override
785
    boolean isExpired(long now) {
786
      return expireNanos - now <= 0;
1✔
787
    }
788

789
    @Override
790
    void cleanup() {
791
      if (shutdown) {
1✔
792
        return;
×
793
      }
794
      shutdown = true;
1✔
795
      if (!scheduledHandle.isPending()) {
1✔
796
        scheduledHandle.cancel();
1✔
797
      }
798
    }
1✔
799

800
    @Override
801
    public String toString() {
802
      return MoreObjects.toStringHelper(this)
×
803
          .add("request", request)
×
804
          .add("status", status)
×
805
          .toString();
×
806
    }
807
  }
808

809
  /** Returns a Builder for {@link CachingRlsLbClient}. */
810
  static Builder newBuilder() {
811
    return new Builder();
1✔
812
  }
813

814
  /** A Builder for {@link CachingRlsLbClient}. */
815
  static final class Builder {
1✔
816

817
    private Helper helper;
818
    private LbPolicyConfiguration lbPolicyConfig;
819
    private Throttler throttler = new HappyThrottler();
1✔
820
    private ResolvedAddressFactory resolvedAddressFactory;
821
    private Ticker ticker = Ticker.systemTicker();
1✔
822
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
823
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
824

825
    Builder setHelper(Helper helper) {
826
      this.helper = checkNotNull(helper, "helper");
1✔
827
      return this;
1✔
828
    }
829

830
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
831
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
832
      return this;
1✔
833
    }
834

835
    Builder setThrottler(Throttler throttler) {
836
      this.throttler = checkNotNull(throttler, "throttler");
1✔
837
      return this;
1✔
838
    }
839

840
    /**
841
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
842
     */
843
    Builder setResolvedAddressesFactory(
844
        ResolvedAddressFactory resolvedAddressFactory) {
845
      this.resolvedAddressFactory =
1✔
846
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
847
      return this;
1✔
848
    }
849

850
    Builder setTicker(Ticker ticker) {
851
      this.ticker = checkNotNull(ticker, "ticker");
1✔
852
      return this;
1✔
853
    }
854

855
    Builder setEvictionListener(
856
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
857
      this.evictionListener = evictionListener;
1✔
858
      return this;
1✔
859
    }
860

861
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
862
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
863
      return this;
1✔
864
    }
865

866
    CachingRlsLbClient build() {
867
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
868
      helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
1✔
869
      return client;
1✔
870
    }
871
  }
872

873
  /**
874
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
875
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
876
   */
877
  private static final class AutoCleaningEvictionListener
878
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
879

880
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
881

882
    AutoCleaningEvictionListener(
883
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
884
      this.delegate = delegate;
1✔
885
    }
1✔
886

887
    @Override
888
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
889
      if (delegate != null) {
1✔
890
        delegate.onEviction(key, value, cause);
1✔
891
      }
892
      // performs cleanup after delegation
893
      value.cleanup();
1✔
894
    }
1✔
895
  }
896

897
  /** A Throttler never throttles. */
898
  private static final class HappyThrottler implements Throttler {
899

900
    @Override
901
    public boolean shouldThrottle() {
902
      return false;
×
903
    }
904

905
    @Override
906
    public void registerBackendResponse(boolean throttled) {
907
      // no-op
908
    }
×
909
  }
910

911
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
912
  private static final class RlsAsyncLruCache
913
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
914

915
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
916
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
917
        ScheduledExecutorService ses, Ticker ticker, Object lock) {
918
      super(
1✔
919
          maxEstimatedSizeBytes,
920
          new AutoCleaningEvictionListener(evictionListener),
921
          1,
922
          TimeUnit.MINUTES,
923
          ses,
924
          ticker,
925
          lock);
926
    }
1✔
927

928
    @Override
929
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
930
      return value.isExpired();
1✔
931
    }
932

933
    @Override
934
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
935
      return value.getSizeBytes();
1✔
936
    }
937

938
    @Override
939
    protected boolean shouldInvalidateEldestEntry(
940
        RouteLookupRequest eldestKey, CacheEntry eldestValue) {
941
      if (eldestValue.getMinEvictionTime() > now()) {
×
942
        return false;
×
943
      }
944

945
      // eldest entry should be evicted if size limit exceeded
946
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
947
    }
948

949
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
950
      CacheEntry newEntry = cache(key, value);
1✔
951

952
      // force cleanup if new entry pushed cache over max size (in bytes)
953
      if (fitToLimit()) {
1✔
954
        value.triggerPendingRpcProcessing();
×
955
      }
956
      return newEntry;
1✔
957
    }
958
  }
959

960
  /**
961
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
962
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
963
   */
964
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
965

966
    @Nullable
1✔
967
    private ConnectivityState prevState = null;
968

969
    @Override
970
    public void onStatusChanged(ConnectivityState newState) {
971
      logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
1✔
972
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
973
          && newState == ConnectivityState.READY) {
974
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
975
        logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
1✔
976
        synchronized (lock) {
1✔
977
          logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
1✔
978
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
979
            if (value instanceof BackoffCacheEntry) {
1✔
980
              ((BackoffCacheEntry) value).forceRefresh();
×
981
            }
982
          }
1✔
983
        }
1✔
984
      }
985
      prevState = newState;
1✔
986
    }
1✔
987
  }
988

989
  /** A header will be added when RLS server respond with additional header data. */
990
  @VisibleForTesting
991
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
992
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
993

994
  final class RlsPicker extends SubchannelPicker {
995

996
    private final RlsRequestFactory requestFactory;
997

998
    RlsPicker(RlsRequestFactory requestFactory) {
1✔
999
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
1000
    }
1✔
1001

1002
    @Override
1003
    public PickResult pickSubchannel(PickSubchannelArgs args) {
1004
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
1005
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
1006
      RouteLookupRequest request =
1✔
1007
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
1008
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
1009
      logger.log(ChannelLogLevel.DEBUG,
1✔
1010
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
1011
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
1012

1013
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
1014
        logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
1✔
1015
        Metadata headers = args.getHeaders();
1✔
1016
        headers.discardAll(RLS_DATA_KEY);
1✔
1017
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
1018
      }
1019
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1020
      logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
1✔
1021
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
1022
      if (response.hasData()) {
1✔
1023
        logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
1✔
1024
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1025
        SubchannelPicker picker =
1026
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1027
        if (picker == null) {
1✔
1028
          logger.log(ChannelLogLevel.DEBUG,
×
1029
              "Child policy wrapper didn't return a picker, returning PickResult with no results");
1030
          return PickResult.withNoResult();
×
1031
        }
1032
        // Happy path
1033
        logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
1✔
1034
        return picker.pickSubchannel(args);
1✔
1035
      } else if (response.hasError()) {
1✔
1036
        logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
1✔
1037
        if (hasFallback) {
1✔
1038
          logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
1✔
1039
          return useFallback(args);
1✔
1040
        }
1041
        logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
×
1042
        return PickResult.withError(
×
1043
            convertRlsServerStatus(response.getStatus(),
×
1044
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
×
1045
      } else {
1046
        logger.log(ChannelLogLevel.DEBUG,
1✔
1047
            "RLS response had no data, return a PickResult with no data");
1048
        return PickResult.withNoResult();
1✔
1049
      }
1050
    }
1051

1052
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
1053

1054
    /** Uses Subchannel connected to default target. */
1055
    private PickResult useFallback(PickSubchannelArgs args) {
1056
      // TODO(creamsoup) wait until lb is ready
1057
      startFallbackChildPolicy();
1✔
1058
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1059
      if (picker == null) {
1✔
1060
        return PickResult.withNoResult();
×
1061
      }
1062
      return picker.pickSubchannel(args);
1✔
1063
    }
1064

1065
    private void startFallbackChildPolicy() {
1066
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1067
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1068
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
1✔
1069
      synchronized (lock) {
1✔
1070
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
1✔
1071
        if (fallbackChildPolicyWrapper != null) {
1✔
1072
          return;
1✔
1073
        }
1074
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1075
      }
1✔
1076
    }
1✔
1077

1078
    // GuardedBy CachingRlsLbClient.lock
1079
    void close() {
1080
      logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
1✔
1081
      if (fallbackChildPolicyWrapper != null) {
1✔
1082
        refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1083
      }
1084
    }
1✔
1085

1086
    @Override
1087
    public String toString() {
1088
      return MoreObjects.toStringHelper(this)
×
1089
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1090
          .toString();
×
1091
    }
1092
  }
1093

1094
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc