• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19225

09 May 2024 07:52PM UTC coverage: 88.412% (+0.03%) from 88.38%
#19225

push

github

ejona86
rls: Add gauge metric recording (#11175)

Adds these gauges:
- grpc.lb.rls.cache_entries
- grpc.lb.rls.cache_size

31610 of 35753 relevant lines covered (88.41%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.76
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ChannelLogger.ChannelLogLevel;
33
import io.grpc.ConnectivityState;
34
import io.grpc.LoadBalancer.Helper;
35
import io.grpc.LoadBalancer.PickResult;
36
import io.grpc.LoadBalancer.PickSubchannelArgs;
37
import io.grpc.LoadBalancer.ResolvedAddresses;
38
import io.grpc.LoadBalancer.SubchannelPicker;
39
import io.grpc.LongCounterMetricInstrument;
40
import io.grpc.LongGaugeMetricInstrument;
41
import io.grpc.ManagedChannel;
42
import io.grpc.ManagedChannelBuilder;
43
import io.grpc.Metadata;
44
import io.grpc.MetricInstrumentRegistry;
45
import io.grpc.MetricRecorder.BatchCallback;
46
import io.grpc.MetricRecorder.BatchRecorder;
47
import io.grpc.MetricRecorder.Registration;
48
import io.grpc.Status;
49
import io.grpc.internal.BackoffPolicy;
50
import io.grpc.internal.ExponentialBackoffPolicy;
51
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
52
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
53
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
54
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
55
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
56
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
57
import io.grpc.rls.LruCache.EvictionListener;
58
import io.grpc.rls.LruCache.EvictionType;
59
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
60
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
61
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
62
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
63
import io.grpc.stub.StreamObserver;
64
import io.grpc.util.ForwardingLoadBalancerHelper;
65
import java.net.URI;
66
import java.net.URISyntaxException;
67
import java.util.Arrays;
68
import java.util.Collections;
69
import java.util.HashMap;
70
import java.util.List;
71
import java.util.Map;
72
import java.util.UUID;
73
import java.util.concurrent.Future;
74
import java.util.concurrent.ScheduledExecutorService;
75
import java.util.concurrent.TimeUnit;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
/**
82
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
83
 * routing by fetching the decision from route lookup server. Every single request is routed by
84
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
85
 */
86
@ThreadSafe
87
final class CachingRlsLbClient {
88

89
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
90
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
91
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
92
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
93
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
94
  public static final int BYTES_PER_CHAR = 2;
95
  public static final int STRING_OVERHEAD_BYTES = 38;
96
  /** Minimum bytes for a Java Object. */
97
  public static final int OBJ_OVERHEAD_B = 16;
98

99
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
100
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104
  private final Registration gaugeRegistration;
105
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
106

107
  // All cache status changes (pending, backoff, success) must be under this lock
108
  private final Object lock = new Object();
1✔
109
  // LRU cache based on access order (BACKOFF and actual data will be here)
110
  @GuardedBy("lock")
111
  private final RlsAsyncLruCache linkedHashLruCache;
112
  // any RPC on the fly will cached in this map
113
  @GuardedBy("lock")
1✔
114
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
115

116
  private final ScheduledExecutorService scheduledExecutorService;
117
  private final Ticker ticker;
118
  private final Throttler throttler;
119

120
  private final LbPolicyConfiguration lbPolicyConfig;
121
  private final BackoffPolicy.Provider backoffProvider;
122
  private final long maxAgeNanos;
123
  private final long staleAgeNanos;
124
  private final long callTimeoutNanos;
125

126
  private final RlsLbHelper helper;
127
  private final ManagedChannel rlsChannel;
128
  private final RouteLookupServiceStub rlsStub;
129
  private final RlsPicker rlsPicker;
130
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
131
  @GuardedBy("lock")
132
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
133
  private final ChannelLogger logger;
134

135
  static {
136
    MetricInstrumentRegistry metricInstrumentRegistry
137
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
138
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
139
        "grpc.lb.rls.default_target_picks", "Number of LB picks sent to the default target", "pick",
140
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
141
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(), true);
1✔
142
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
143
        "Number of LB picks sent to each RLS target", "pick",
144
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
145
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(), true);
1✔
146
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
147
        "Number of LB picks failed due to either a failed RLS request or the RLS channel being "
148
            + "throttled", "pick", Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
149
        Collections.emptyList(), true);
1✔
150
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
151
        "Number of entries in the RLS cache", "entry",
152
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
1✔
153
        Collections.emptyList(), true);
1✔
154
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
155
        "The current size of the RLS cache", "byte",
156
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_id"),
1✔
157
        Collections.emptyList(), true);
1✔
158
  }
159

160
  private CachingRlsLbClient(Builder builder) {
1✔
161
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
162
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
163
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
164
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
165
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
166
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
167
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
168
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
169
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
170
    linkedHashLruCache =
1✔
171
        new RlsAsyncLruCache(
172
            rlsConfig.cacheSizeBytes(),
1✔
173
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
174
            scheduledExecutorService,
175
            ticker,
176
            lock,
177
            helper);
178
    logger = helper.getChannelLogger();
1✔
179
    String serverHost = null;
1✔
180
    try {
181
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
182
    } catch (URISyntaxException ignore) {
×
183
      // handled by the following null check
184
    }
1✔
185
    if (serverHost == null) {
1✔
186
      logger.log(
×
187
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
188
      serverHost = helper.getAuthority();
×
189
    }
190
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
191
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
192
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
193
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
194
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
195
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
196
    // called to impose the authority security restrictions.
197
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
198
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
199
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
200
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
201
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
202
    if (routeLookupChannelServiceConfig != null) {
1✔
203
      logger.log(
1✔
204
          ChannelLogLevel.DEBUG,
205
          "RLS channel service config: {0}",
206
          routeLookupChannelServiceConfig);
207
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
208
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
209
    }
210
    rlsChannel = rlsChannelBuilder.build();
1✔
211
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
212
    childLbResolvedAddressFactory =
1✔
213
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
214
    backoffProvider = builder.backoffProvider;
1✔
215
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
216
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
217
    refCountedChildPolicyWrapperFactory =
1✔
218
        new RefCountedChildPolicyWrapperFactory(
219
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
220
            childLbHelperProvider,
221
            new BackoffRefreshListener());
222

223
    gaugeRegistration = helper.getMetricRecorder()
1✔
224
        .registerBatchCallback(new BatchCallback() {
1✔
225
          @Override
226
          public void accept(BatchRecorder recorder) {
227
            int estimatedSize;
228
            long estimatedSizeBytes;
229
            synchronized (lock) {
1✔
230
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
231
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
232
            }
1✔
233
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
234
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
235
                    metricsInstanceUuid), Collections.emptyList());
1✔
236
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
237
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
238
                    metricsInstanceUuid), Collections.emptyList());
1✔
239
          }
1✔
240
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
241

242
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
243
  }
1✔
244

245
  /**
246
   * Convert the status to UNAVAILBLE and enhance the error message.
247
   * @param status status as provided by server
248
   * @param serverName Used for error description
249
   * @return Transformed status
250
   */
251
  static Status convertRlsServerStatus(Status status, String serverName) {
252
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
253
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
254
                + "RLS server returned: %s: %s",
255
            serverName, status.getCode(), status.getDescription()));
1✔
256
  }
257

258
  /** Populates async cache entry for new request. */
259
  @GuardedBy("lock")
260
  private CachedRouteLookupResponse asyncRlsCall(
261
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
262
    logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
1✔
263
    if (throttler.shouldThrottle()) {
1✔
264
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
265
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
266
      // on this result
267
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
268
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
269
    }
270
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
271
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
272
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
273
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
274
        .routeLookup(
1✔
275
            routeLookupRequest,
276
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
277
              @Override
278
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
279
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
280
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
281
              }
1✔
282

283
              @Override
284
              public void onError(Throwable t) {
285
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
286
                response.setException(t);
1✔
287
                throttler.registerBackendResponse(true);
1✔
288
              }
1✔
289

290
              @Override
291
              public void onCompleted() {
292
                logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
1✔
293
                throttler.registerBackendResponse(false);
1✔
294
              }
1✔
295
            });
296
    return CachedRouteLookupResponse.pendingResponse(
1✔
297
        createPendingEntry(request, response, backoffPolicy));
1✔
298
  }
299

300
  /**
301
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
302
   * cached, pending and backed-off due to error. The result remains same even if the status is
303
   * changed after the return.
304
   */
305
  @CheckReturnValue
306
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
307
    logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
1✔
308
    synchronized (lock) {
1✔
309
      logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
1✔
310
      final CacheEntry cacheEntry;
311
      cacheEntry = linkedHashLruCache.read(request);
1✔
312
      if (cacheEntry == null) {
1✔
313
        logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
1✔
314
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
315
        if (pendingEntry != null) {
1✔
316
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
317
        }
318
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
319
      }
320

321
      if (cacheEntry instanceof DataCacheEntry) {
1✔
322
        // cache hit, initiate async-refresh if entry is staled
323
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
324
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
325
        if (dataEntry.isStaled(ticker.read())) {
1✔
326
          logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
1✔
327
          dataEntry.maybeRefresh();
1✔
328
        }
329
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
330
      }
331
      logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
1✔
332
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
333
    }
334
  }
335

336
  /** Performs any pending maintenance operations needed by the cache. */
337
  void close() {
338
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
339
    synchronized (lock) {
1✔
340
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
341
      linkedHashLruCache.close();
1✔
342
      // TODO(creamsoup) maybe cancel all pending requests
343
      pendingCallCache.clear();
1✔
344
      rlsChannel.shutdownNow();
1✔
345
      rlsPicker.close();
1✔
346
      gaugeRegistration.close();
1✔
347
    }
1✔
348
  }
1✔
349

350
  void requestConnection() {
351
    rlsChannel.getState(true);
×
352
  }
×
353

354
  @GuardedBy("lock")
355
  private PendingCacheEntry createPendingEntry(
356
      RouteLookupRequest request,
357
      ListenableFuture<RouteLookupResponse> pendingCall,
358
      @Nullable BackoffPolicy backoffPolicy) {
359
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
360
    // Add the entry to the map before adding the Listener, because the listener removes the
361
    // entry from the map
362
    pendingCallCache.put(request, entry);
1✔
363
    // Beware that the listener can run immediately on the current thread
364
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
365
    return entry;
1✔
366
  }
367

368
  private void pendingRpcComplete(PendingCacheEntry entry) {
369
    synchronized (lock) {
1✔
370
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
371
      if (clientClosed) {
1✔
372
        return;
1✔
373
      }
374

375
      try {
376
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
377
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
378
        // reattempt picks when the child LB is done connecting
379
      } catch (Exception e) {
1✔
380
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
381
        // Cache updated. updateBalancingState() to reattempt picks
382
        helper.propagateRlsError();
1✔
383
      }
1✔
384
    }
1✔
385
  }
1✔
386

387
  @GuardedBy("lock")
388
  private DataCacheEntry createDataEntry(
389
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
390
    logger.log(
1✔
391
        ChannelLogLevel.DEBUG,
392
        "Transition to data cache: routeLookupResponse={0}",
393
        routeLookupResponse);
394
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
395
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
396
    // this cache update because the lock is held
397
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
398
    return entry;
1✔
399
  }
400

401
  @GuardedBy("lock")
402
  private BackoffCacheEntry createBackOffEntry(
403
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
404
    logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
405
    if (backoffPolicy == null) {
1✔
406
      backoffPolicy = backoffProvider.get();
1✔
407
    }
408
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
409
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
410
    // Lock is held, so the task can't execute before the assignment
411
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
412
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
413
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
414
    logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
1✔
415
        delayNanos);
1✔
416
    return entry;
1✔
417
  }
418

419
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
420
    synchronized (lock) {
1✔
421
      // This checks whether the task has been cancelled and prevents a second execution.
422
      if (!entry.scheduledFuture.cancel(false)) {
1✔
423
        // Future was previously cancelled
424
        return;
×
425
      }
426
      logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
1✔
427
      linkedHashLruCache.invalidate(entry.request);
1✔
428
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
429
    }
1✔
430
  }
1✔
431

432
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
433

434
    final Helper helper;
435
    private ConnectivityState state;
436
    private SubchannelPicker picker;
437

438
    RlsLbHelper(Helper helper) {
1✔
439
      this.helper = helper;
1✔
440
    }
1✔
441

442
    @Override
443
    protected Helper delegate() {
444
      return helper;
1✔
445
    }
446

447
    @Override
448
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
449
      state = newState;
1✔
450
      picker = newPicker;
1✔
451
      super.updateBalancingState(newState, newPicker);
1✔
452
    }
1✔
453

454
    void propagateRlsError() {
455
      getSynchronizationContext().execute(new Runnable() {
1✔
456
        @Override
457
        public void run() {
458
          if (picker != null) {
1✔
459
            // Refresh the channel state and let pending RPCs reprocess the picker.
460
            updateBalancingState(state, picker);
1✔
461
          }
462
        }
1✔
463
      });
464
    }
1✔
465

466
    void triggerPendingRpcProcessing() {
467
      helper.getSynchronizationContext().execute(
×
468
          () -> super.updateBalancingState(state, picker));
×
469
    }
×
470
  }
471

472
  /**
473
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
474
   */
475
  static final class CachedRouteLookupResponse {
476
    // Should only have 1 of following 3 cache entries
477
    @Nullable
478
    private final DataCacheEntry dataCacheEntry;
479
    @Nullable
480
    private final PendingCacheEntry pendingCacheEntry;
481
    @Nullable
482
    private final BackoffCacheEntry backoffCacheEntry;
483

484
    CachedRouteLookupResponse(
485
        DataCacheEntry dataCacheEntry,
486
        PendingCacheEntry pendingCacheEntry,
487
        BackoffCacheEntry backoffCacheEntry) {
1✔
488
      this.dataCacheEntry = dataCacheEntry;
1✔
489
      this.pendingCacheEntry = pendingCacheEntry;
1✔
490
      this.backoffCacheEntry = backoffCacheEntry;
1✔
491
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
492
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
493
          "Expected only 1 cache entry value provided");
494
    }
1✔
495

496
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
497
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
498
    }
499

500
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
501
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
502
    }
503

504
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
505
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
506
    }
507

508
    boolean hasData() {
509
      return dataCacheEntry != null;
1✔
510
    }
511

512
    @Nullable
513
    ChildPolicyWrapper getChildPolicyWrapper() {
514
      if (!hasData()) {
1✔
515
        return null;
×
516
      }
517
      return dataCacheEntry.getChildPolicyWrapper();
1✔
518
    }
519

520
    @VisibleForTesting
521
    @Nullable
522
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
523
      if (!hasData()) {
1✔
524
        return null;
×
525
      }
526
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
527
    }
528

529
    @Nullable
530
    String getHeaderData() {
531
      if (!hasData()) {
1✔
532
        return null;
1✔
533
      }
534
      return dataCacheEntry.getHeaderData();
1✔
535
    }
536

537
    boolean hasError() {
538
      return backoffCacheEntry != null;
1✔
539
    }
540

541
    boolean isPending() {
542
      return pendingCacheEntry != null;
1✔
543
    }
544

545
    @Nullable
546
    Status getStatus() {
547
      if (!hasError()) {
1✔
548
        return null;
×
549
      }
550
      return backoffCacheEntry.getStatus();
1✔
551
    }
552

553
    @Override
554
    public String toString() {
555
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
556
      if (dataCacheEntry != null) {
×
557
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
558
      }
559
      if (pendingCacheEntry != null) {
×
560
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
561
      }
562
      if (backoffCacheEntry != null) {
×
563
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
564
      }
565
      return toStringHelper.toString();
×
566
    }
567
  }
568

569
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
570
  static final class PendingCacheEntry {
571
    private final ListenableFuture<RouteLookupResponse> pendingCall;
572
    private final RouteLookupRequest request;
573
    @Nullable
574
    private final BackoffPolicy backoffPolicy;
575

576
    PendingCacheEntry(
577
        RouteLookupRequest request,
578
        ListenableFuture<RouteLookupResponse> pendingCall,
579
        @Nullable BackoffPolicy backoffPolicy) {
1✔
580
      this.request = checkNotNull(request, "request");
1✔
581
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
582
      this.backoffPolicy = backoffPolicy;
1✔
583
    }
1✔
584

585
    @Override
586
    public String toString() {
587
      return MoreObjects.toStringHelper(this)
×
588
          .add("request", request)
×
589
          .toString();
×
590
    }
591
  }
592

593
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
594
  abstract static class CacheEntry {
595

596
    protected final RouteLookupRequest request;
597

598
    CacheEntry(RouteLookupRequest request) {
1✔
599
      this.request = checkNotNull(request, "request");
1✔
600
    }
1✔
601

602
    abstract int getSizeBytes();
603

604
    abstract boolean isExpired(long now);
605

606
    abstract void cleanup();
607

608
    protected boolean isOldEnoughToBeEvicted(long now) {
609
      return true;
×
610
    }
611
  }
612

613
  /** Implementation of {@link CacheEntry} contains valid data. */
614
  final class DataCacheEntry extends CacheEntry {
615
    private final RouteLookupResponse response;
616
    private final long minEvictionTime;
617
    private final long expireTime;
618
    private final long staleTime;
619
    private final List<ChildPolicyWrapper> childPolicyWrappers;
620

621
    // GuardedBy CachingRlsLbClient.lock
622
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
623
      super(request);
1✔
624
      this.response = checkNotNull(response, "response");
1✔
625
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
626
      childPolicyWrappers =
1✔
627
          refCountedChildPolicyWrapperFactory
1✔
628
              .createOrGet(response.targets());
1✔
629
      long now = ticker.read();
1✔
630
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
631
      expireTime = now + maxAgeNanos;
1✔
632
      staleTime = now + staleAgeNanos;
1✔
633
    }
1✔
634

635
    /**
636
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
637
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
638
     * data still exists. Flow looks like following.
639
     *
640
     * <pre>
641
     * Timeline                       | async refresh
642
     *                                V put new cache (entry2)
643
     * entry1: Pending | hasValue | staled  |
644
     * entry2:                        | OV* | pending | hasValue | staled |
645
     *
646
     * OV: old value
647
     * </pre>
648
     */
649
    void maybeRefresh() {
650
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
651
        if (pendingCallCache.containsKey(request)) {
1✔
652
          // pending already requested
653
          logger.log(ChannelLogLevel.DEBUG,
×
654
              "A pending refresh request already created, no need to proceed with refresh");
655
          return;
×
656
        }
657
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
658
      }
1✔
659
    }
1✔
660

661
    @VisibleForTesting
662
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
663
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
664
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
665
          return childPolicyWrapper;
1✔
666
        }
667
      }
1✔
668

669
      throw new RuntimeException("Target not found:" + target);
×
670
    }
671

672
    @Nullable
673
    ChildPolicyWrapper getChildPolicyWrapper() {
674
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
675
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
676
          return childPolicyWrapper;
1✔
677
        }
678
      }
1✔
679
      return childPolicyWrappers.get(0);
1✔
680
    }
681

682
    String getHeaderData() {
683
      return response.getHeaderData();
1✔
684
    }
685

686
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
687
    int calcStringSize(String target) {
688
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
689
    }
690

691
    @Override
692
    int getSizeBytes() {
693
      int targetSize = 0;
1✔
694
      for (String target : response.targets()) {
1✔
695
        targetSize += calcStringSize(target);
1✔
696
      }
1✔
697
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
698
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
699
    }
700

701
    @Override
702
    boolean isExpired(long now) {
703
      return expireTime - now <= 0;
1✔
704
    }
705

706
    boolean isStaled(long now) {
707
      return staleTime - now <= 0;
1✔
708
    }
709

710
    @Override
711
    protected boolean isOldEnoughToBeEvicted(long now) {
712
      return minEvictionTime - now <= 0;
×
713
    }
714

715
    @Override
716
    void cleanup() {
717
      synchronized (lock) {
1✔
718
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
719
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
720
        }
1✔
721
      }
1✔
722
    }
1✔
723

724
    @Override
725
    public String toString() {
726
      return MoreObjects.toStringHelper(this)
×
727
          .add("request", request)
×
728
          .add("response", response)
×
729
          .add("expireTime", expireTime)
×
730
          .add("staleTime", staleTime)
×
731
          .add("childPolicyWrappers", childPolicyWrappers)
×
732
          .toString();
×
733
    }
734
  }
735

736
  /**
737
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
738
   * status when the backoff time is expired.
739
   */
740
  private static final class BackoffCacheEntry extends CacheEntry {
741

742
    private final Status status;
743
    private final BackoffPolicy backoffPolicy;
744
    private Future<?> scheduledFuture;
745

746
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
747
      super(request);
1✔
748
      this.status = checkNotNull(status, "status");
1✔
749
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
750
    }
1✔
751

752
    Status getStatus() {
753
      return status;
1✔
754
    }
755

756
    @Override
757
    int getSizeBytes() {
758
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
759
    }
760

761
    @Override
762
    boolean isExpired(long now) {
763
      return scheduledFuture.isDone();
1✔
764
    }
765

766
    @Override
767
    void cleanup() {
768
      scheduledFuture.cancel(false);
1✔
769
    }
1✔
770

771
    @Override
772
    public String toString() {
773
      return MoreObjects.toStringHelper(this)
×
774
          .add("request", request)
×
775
          .add("status", status)
×
776
          .toString();
×
777
    }
778
  }
779

780
  /** Returns a Builder for {@link CachingRlsLbClient}. */
781
  static Builder newBuilder() {
782
    return new Builder();
1✔
783
  }
784

785
  /** A Builder for {@link CachingRlsLbClient}. */
786
  static final class Builder {
1✔
787

788
    private Helper helper;
789
    private LbPolicyConfiguration lbPolicyConfig;
790
    private Throttler throttler = new HappyThrottler();
1✔
791
    private ResolvedAddressFactory resolvedAddressFactory;
792
    private Ticker ticker = Ticker.systemTicker();
1✔
793
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
794
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
795

796
    Builder setHelper(Helper helper) {
797
      this.helper = checkNotNull(helper, "helper");
1✔
798
      return this;
1✔
799
    }
800

801
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
802
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
803
      return this;
1✔
804
    }
805

806
    Builder setThrottler(Throttler throttler) {
807
      this.throttler = checkNotNull(throttler, "throttler");
1✔
808
      return this;
1✔
809
    }
810

811
    /**
812
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
813
     */
814
    Builder setResolvedAddressesFactory(
815
        ResolvedAddressFactory resolvedAddressFactory) {
816
      this.resolvedAddressFactory =
1✔
817
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
818
      return this;
1✔
819
    }
820

821
    Builder setTicker(Ticker ticker) {
822
      this.ticker = checkNotNull(ticker, "ticker");
1✔
823
      return this;
1✔
824
    }
825

826
    Builder setEvictionListener(
827
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
828
      this.evictionListener = evictionListener;
1✔
829
      return this;
1✔
830
    }
831

832
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
833
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
834
      return this;
1✔
835
    }
836

837
    CachingRlsLbClient build() {
838
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
839
      helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
1✔
840
      return client;
1✔
841
    }
842
  }
843

844
  /**
845
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
846
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
847
   */
848
  private static final class AutoCleaningEvictionListener
849
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
850

851
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
852

853
    AutoCleaningEvictionListener(
854
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
855
      this.delegate = delegate;
1✔
856
    }
1✔
857

858
    @Override
859
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
860
      if (delegate != null) {
1✔
861
        delegate.onEviction(key, value, cause);
1✔
862
      }
863
      // performs cleanup after delegation
864
      value.cleanup();
1✔
865
    }
1✔
866
  }
867

868
  /** A Throttler never throttles. */
869
  private static final class HappyThrottler implements Throttler {
870

871
    @Override
872
    public boolean shouldThrottle() {
873
      return false;
×
874
    }
875

876
    @Override
877
    public void registerBackendResponse(boolean throttled) {
878
      // no-op
879
    }
×
880
  }
881

882
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
883
  private static final class RlsAsyncLruCache
884
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
885
    private final RlsLbHelper helper;
886

887
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
888
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
889
        ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) {
890
      super(
1✔
891
          maxEstimatedSizeBytes,
892
          evictionListener,
893
          1,
894
          TimeUnit.MINUTES,
895
          ses,
896
          ticker,
897
          lock);
898
      this.helper = checkNotNull(helper, "helper");
1✔
899
    }
1✔
900

901
    @Override
902
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
903
      return value.isExpired(nowNanos);
1✔
904
    }
905

906
    @Override
907
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
908
      return value.getSizeBytes();
1✔
909
    }
910

911
    @Override
912
    protected boolean shouldInvalidateEldestEntry(
913
        RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) {
914
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
915
        return false;
×
916
      }
917

918
      // eldest entry should be evicted if size limit exceeded
919
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
920
    }
921

922
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
923
      CacheEntry newEntry = cache(key, value);
1✔
924

925
      // force cleanup if new entry pushed cache over max size (in bytes)
926
      if (fitToLimit()) {
1✔
927
        helper.triggerPendingRpcProcessing();
×
928
      }
929
      return newEntry;
1✔
930
    }
931
  }
932

933
  /**
934
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
935
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
936
   */
937
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
938

939
    @Nullable
1✔
940
    private ConnectivityState prevState = null;
941

942
    @Override
943
    public void onStatusChanged(ConnectivityState newState) {
944
      logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
1✔
945
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
946
          && newState == ConnectivityState.READY) {
947
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
948
        logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
1✔
949
        synchronized (lock) {
1✔
950
          logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
1✔
951
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
952
            if (value instanceof BackoffCacheEntry) {
1✔
953
              refreshBackoffEntry((BackoffCacheEntry) value);
×
954
            }
955
          }
1✔
956
        }
1✔
957
      }
958
      prevState = newState;
1✔
959
    }
1✔
960
  }
961

962
  /** A header will be added when RLS server respond with additional header data. */
963
  @VisibleForTesting
964
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
965
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
966

967
  final class RlsPicker extends SubchannelPicker {
968

969
    private final RlsRequestFactory requestFactory;
970
    private final String lookupService;
971

972
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
973
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
974
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
975
    }
1✔
976

977
    @Override
978
    public PickResult pickSubchannel(PickSubchannelArgs args) {
979
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
980
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
981
      RouteLookupRequest request =
1✔
982
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
983
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
984
      logger.log(ChannelLogLevel.DEBUG,
1✔
985
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
986
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
987

988
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
989
        logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
1✔
990
        Metadata headers = args.getHeaders();
1✔
991
        headers.discardAll(RLS_DATA_KEY);
1✔
992
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
993
      }
994
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
995
      logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
1✔
996
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
997
      if (response.hasData()) {
1✔
998
        logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
1✔
999
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1000
        SubchannelPicker picker =
1001
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1002
        if (picker == null) {
1✔
1003
          logger.log(ChannelLogLevel.DEBUG,
×
1004
              "Child policy wrapper didn't return a picker, returning PickResult with no results");
1005
          return PickResult.withNoResult();
×
1006
        }
1007
        // Happy path
1008
        logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
1✔
1009
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1010
        if (pickResult.hasResult()) {
1✔
1011
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1012
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1013
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1014
              Collections.emptyList());
1✔
1015
        }
1016
        return pickResult;
1✔
1017
      } else if (response.hasError()) {
1✔
1018
        logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
1✔
1019
        if (hasFallback) {
1✔
1020
          logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
1✔
1021
          return useFallback(args);
1✔
1022
        }
1023
        logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
1✔
1024
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1025
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1026
        return PickResult.withError(
1✔
1027
            convertRlsServerStatus(response.getStatus(),
1✔
1028
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1029
      } else {
1030
        logger.log(ChannelLogLevel.DEBUG,
1✔
1031
            "RLS response had no data, return a PickResult with no data");
1032
        return PickResult.withNoResult();
1✔
1033
      }
1034
    }
1035

1036
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
1037

1038
    /** Uses Subchannel connected to default target. */
1039
    private PickResult useFallback(PickSubchannelArgs args) {
1040
      // TODO(creamsoup) wait until lb is ready
1041
      startFallbackChildPolicy();
1✔
1042
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1043
      if (picker == null) {
1✔
1044
        return PickResult.withNoResult();
×
1045
      }
1046
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1047
      if (pickResult.hasResult()) {
1✔
1048
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1049
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1050
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1051
            Collections.emptyList());
1✔
1052
      }
1053
      return pickResult;
1✔
1054
    }
1055

1056
    private String determineMetricsPickResult(PickResult pickResult) {
1057
      if (pickResult.getStatus().isOk()) {
1✔
1058
        return "complete";
1✔
1059
      } else if (pickResult.isDrop()) {
1✔
1060
        return "drop";
×
1061
      } else {
1062
        return "fail";
1✔
1063
      }
1064
    }
1065

1066
    private void startFallbackChildPolicy() {
1067
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1068
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1069
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
1✔
1070
      synchronized (lock) {
1✔
1071
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
1✔
1072
        if (fallbackChildPolicyWrapper != null) {
1✔
1073
          return;
1✔
1074
        }
1075
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1076
      }
1✔
1077
    }
1✔
1078

1079
    // GuardedBy CachingRlsLbClient.lock
1080
    void close() {
1081
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1082
        logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
1✔
1083
        if (fallbackChildPolicyWrapper != null) {
1✔
1084
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1085
        }
1086
      }
1✔
1087
    }
1✔
1088

1089
    @Override
1090
    public String toString() {
1091
      return MoreObjects.toStringHelper(this)
×
1092
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1093
          .toString();
×
1094
    }
1095
  }
1096

1097
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc