• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19230

13 May 2024 04:04PM UTC coverage: 88.403% (-0.004%) from 88.407%
#19230

push

github

web-flow
xds, rls: Experimental metrics are disabled by default (#11196) (#11197)

Experimental metrics (i.e WRR and RLS metrics) are disabled by default. Users are expected to explicitly enable while configuring metrics.

31606 of 35752 relevant lines covered (88.4%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.76
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ChannelLogger.ChannelLogLevel;
33
import io.grpc.ConnectivityState;
34
import io.grpc.LoadBalancer.Helper;
35
import io.grpc.LoadBalancer.PickResult;
36
import io.grpc.LoadBalancer.PickSubchannelArgs;
37
import io.grpc.LoadBalancer.ResolvedAddresses;
38
import io.grpc.LoadBalancer.SubchannelPicker;
39
import io.grpc.LongCounterMetricInstrument;
40
import io.grpc.LongGaugeMetricInstrument;
41
import io.grpc.ManagedChannel;
42
import io.grpc.ManagedChannelBuilder;
43
import io.grpc.Metadata;
44
import io.grpc.MetricInstrumentRegistry;
45
import io.grpc.MetricRecorder.BatchCallback;
46
import io.grpc.MetricRecorder.BatchRecorder;
47
import io.grpc.MetricRecorder.Registration;
48
import io.grpc.Status;
49
import io.grpc.internal.BackoffPolicy;
50
import io.grpc.internal.ExponentialBackoffPolicy;
51
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
52
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
53
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
54
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
55
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
56
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
57
import io.grpc.rls.LruCache.EvictionListener;
58
import io.grpc.rls.LruCache.EvictionType;
59
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
60
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
61
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
62
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
63
import io.grpc.stub.StreamObserver;
64
import io.grpc.util.ForwardingLoadBalancerHelper;
65
import java.net.URI;
66
import java.net.URISyntaxException;
67
import java.util.Arrays;
68
import java.util.Collections;
69
import java.util.HashMap;
70
import java.util.List;
71
import java.util.Map;
72
import java.util.UUID;
73
import java.util.concurrent.Future;
74
import java.util.concurrent.ScheduledExecutorService;
75
import java.util.concurrent.TimeUnit;
76
import javax.annotation.CheckReturnValue;
77
import javax.annotation.Nullable;
78
import javax.annotation.concurrent.GuardedBy;
79
import javax.annotation.concurrent.ThreadSafe;
80

81
/**
82
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
83
 * routing by fetching the decision from route lookup server. Every single request is routed by
84
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
85
 */
86
@ThreadSafe
87
final class CachingRlsLbClient {
88

89
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
90
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
91
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
92
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
93
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
94
  public static final int BYTES_PER_CHAR = 2;
95
  public static final int STRING_OVERHEAD_BYTES = 38;
96
  /** Minimum bytes for a Java Object. */
97
  public static final int OBJ_OVERHEAD_B = 16;
98

99
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
100
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
101
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
102
  private static final LongGaugeMetricInstrument CACHE_ENTRIES_GAUGE;
103
  private static final LongGaugeMetricInstrument CACHE_SIZE_GAUGE;
104
  private final Registration gaugeRegistration;
105
  private final String metricsInstanceUuid = UUID.randomUUID().toString();
1✔
106

107
  // All cache status changes (pending, backoff, success) must be under this lock
108
  private final Object lock = new Object();
1✔
109
  // LRU cache based on access order (BACKOFF and actual data will be here)
110
  @GuardedBy("lock")
111
  private final RlsAsyncLruCache linkedHashLruCache;
112
  // any RPC on the fly will cached in this map
113
  @GuardedBy("lock")
1✔
114
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
115

116
  private final ScheduledExecutorService scheduledExecutorService;
117
  private final Ticker ticker;
118
  private final Throttler throttler;
119

120
  private final LbPolicyConfiguration lbPolicyConfig;
121
  private final BackoffPolicy.Provider backoffProvider;
122
  private final long maxAgeNanos;
123
  private final long staleAgeNanos;
124
  private final long callTimeoutNanos;
125

126
  private final RlsLbHelper helper;
127
  private final ManagedChannel rlsChannel;
128
  private final RouteLookupServiceStub rlsStub;
129
  private final RlsPicker rlsPicker;
130
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
131
  @GuardedBy("lock")
132
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
133
  private final ChannelLogger logger;
134

135
  static {
136
    MetricInstrumentRegistry metricInstrumentRegistry
137
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
138
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
139
        "grpc.lb.rls.default_target_picks",
140
        "EXPERIMENTAL. Number of LB picks sent to the default target", "{pick}",
141
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target",
1✔
142
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Collections.emptyList(),
1✔
143
        false);
144
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
145
        "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default "
146
            + "target is also returned by the RLS server, RPCs sent to that target from the cache "
147
            + "will be counted in this metric, not in grpc.rls.default_target_picks.", "{pick}",
148
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target",
1✔
149
            "grpc.lb.pick_result"), Collections.emptyList(),
1✔
150
        false);
151
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
152
        "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the "
153
            + "RLS channel being throttled", "{pick}",
154
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target"),
1✔
155
        Collections.emptyList(), false);
1✔
156
    CACHE_ENTRIES_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_entries",
1✔
157
        "EXPERIMENTAL. Number of entries in the RLS cache", "{entry}",
158
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
159
        Collections.emptyList(), false);
1✔
160
    CACHE_SIZE_GAUGE = metricInstrumentRegistry.registerLongGauge("grpc.lb.rls.cache_size",
1✔
161
        "EXPERIMENTAL. The current size of the RLS cache", "By",
162
        Arrays.asList("grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"),
1✔
163
        Collections.emptyList(), false);
1✔
164
  }
165

166
  private CachingRlsLbClient(Builder builder) {
1✔
167
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
168
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
169
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
170
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
171
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
172
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
173
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
174
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
175
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
176
    linkedHashLruCache =
1✔
177
        new RlsAsyncLruCache(
178
            rlsConfig.cacheSizeBytes(),
1✔
179
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
180
            scheduledExecutorService,
181
            ticker,
182
            lock,
183
            helper);
184
    logger = helper.getChannelLogger();
1✔
185
    String serverHost = null;
1✔
186
    try {
187
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
188
    } catch (URISyntaxException ignore) {
×
189
      // handled by the following null check
190
    }
1✔
191
    if (serverHost == null) {
1✔
192
      logger.log(
×
193
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
194
      serverHost = helper.getAuthority();
×
195
    }
196
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
197
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
198
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
199
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
200
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
201
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
202
    // called to impose the authority security restrictions.
203
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
204
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
205
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
206
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
207
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
208
    if (routeLookupChannelServiceConfig != null) {
1✔
209
      logger.log(
1✔
210
          ChannelLogLevel.DEBUG,
211
          "RLS channel service config: {0}",
212
          routeLookupChannelServiceConfig);
213
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
214
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
215
    }
216
    rlsChannel = rlsChannelBuilder.build();
1✔
217
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
218
    childLbResolvedAddressFactory =
1✔
219
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
220
    backoffProvider = builder.backoffProvider;
1✔
221
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
222
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
223
    refCountedChildPolicyWrapperFactory =
1✔
224
        new RefCountedChildPolicyWrapperFactory(
225
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
226
            childLbHelperProvider,
227
            new BackoffRefreshListener());
228

229
    gaugeRegistration = helper.getMetricRecorder()
1✔
230
        .registerBatchCallback(new BatchCallback() {
1✔
231
          @Override
232
          public void accept(BatchRecorder recorder) {
233
            int estimatedSize;
234
            long estimatedSizeBytes;
235
            synchronized (lock) {
1✔
236
              estimatedSize = linkedHashLruCache.estimatedSize();
1✔
237
              estimatedSizeBytes = linkedHashLruCache.estimatedSizeBytes();
1✔
238
            }
1✔
239
            recorder.recordLongGauge(CACHE_ENTRIES_GAUGE, estimatedSize,
1✔
240
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
241
                    metricsInstanceUuid), Collections.emptyList());
1✔
242
            recorder.recordLongGauge(CACHE_SIZE_GAUGE, estimatedSizeBytes,
1✔
243
                Arrays.asList(helper.getChannelTarget(), rlsConfig.lookupService(),
1✔
244
                    metricsInstanceUuid), Collections.emptyList());
1✔
245
          }
1✔
246
        }, CACHE_ENTRIES_GAUGE, CACHE_SIZE_GAUGE);
247

248
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
249
  }
1✔
250

251
  /**
252
   * Convert the status to UNAVAILBLE and enhance the error message.
253
   * @param status status as provided by server
254
   * @param serverName Used for error description
255
   * @return Transformed status
256
   */
257
  static Status convertRlsServerStatus(Status status, String serverName) {
258
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
259
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
260
                + "RLS server returned: %s: %s",
261
            serverName, status.getCode(), status.getDescription()));
1✔
262
  }
263

264
  /** Populates async cache entry for new request. */
265
  @GuardedBy("lock")
266
  private CachedRouteLookupResponse asyncRlsCall(
267
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
268
    logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
1✔
269
    if (throttler.shouldThrottle()) {
1✔
270
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
271
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
272
      // on this result
273
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
274
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
275
    }
276
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
277
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
278
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
279
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
280
        .routeLookup(
1✔
281
            routeLookupRequest,
282
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
283
              @Override
284
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
285
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
286
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
287
              }
1✔
288

289
              @Override
290
              public void onError(Throwable t) {
291
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
292
                response.setException(t);
1✔
293
                throttler.registerBackendResponse(true);
1✔
294
              }
1✔
295

296
              @Override
297
              public void onCompleted() {
298
                logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
1✔
299
                throttler.registerBackendResponse(false);
1✔
300
              }
1✔
301
            });
302
    return CachedRouteLookupResponse.pendingResponse(
1✔
303
        createPendingEntry(request, response, backoffPolicy));
1✔
304
  }
305

306
  /**
307
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
308
   * cached, pending and backed-off due to error. The result remains same even if the status is
309
   * changed after the return.
310
   */
311
  @CheckReturnValue
312
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
313
    logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
1✔
314
    synchronized (lock) {
1✔
315
      logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
1✔
316
      final CacheEntry cacheEntry;
317
      cacheEntry = linkedHashLruCache.read(request);
1✔
318
      if (cacheEntry == null) {
1✔
319
        logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
1✔
320
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
321
        if (pendingEntry != null) {
1✔
322
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
323
        }
324
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
325
      }
326

327
      if (cacheEntry instanceof DataCacheEntry) {
1✔
328
        // cache hit, initiate async-refresh if entry is staled
329
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
330
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
331
        if (dataEntry.isStaled(ticker.read())) {
1✔
332
          logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
1✔
333
          dataEntry.maybeRefresh();
1✔
334
        }
335
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
336
      }
337
      logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
1✔
338
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
339
    }
340
  }
341

342
  /** Performs any pending maintenance operations needed by the cache. */
343
  void close() {
344
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
345
    synchronized (lock) {
1✔
346
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
347
      linkedHashLruCache.close();
1✔
348
      // TODO(creamsoup) maybe cancel all pending requests
349
      pendingCallCache.clear();
1✔
350
      rlsChannel.shutdownNow();
1✔
351
      rlsPicker.close();
1✔
352
      gaugeRegistration.close();
1✔
353
    }
1✔
354
  }
1✔
355

356
  void requestConnection() {
357
    rlsChannel.getState(true);
×
358
  }
×
359

360
  @GuardedBy("lock")
361
  private PendingCacheEntry createPendingEntry(
362
      RouteLookupRequest request,
363
      ListenableFuture<RouteLookupResponse> pendingCall,
364
      @Nullable BackoffPolicy backoffPolicy) {
365
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
366
    // Add the entry to the map before adding the Listener, because the listener removes the
367
    // entry from the map
368
    pendingCallCache.put(request, entry);
1✔
369
    // Beware that the listener can run immediately on the current thread
370
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
371
    return entry;
1✔
372
  }
373

374
  private void pendingRpcComplete(PendingCacheEntry entry) {
375
    synchronized (lock) {
1✔
376
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
377
      if (clientClosed) {
1✔
378
        return;
1✔
379
      }
380

381
      try {
382
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
383
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
384
        // reattempt picks when the child LB is done connecting
385
      } catch (Exception e) {
1✔
386
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
387
        // Cache updated. updateBalancingState() to reattempt picks
388
        helper.propagateRlsError();
1✔
389
      }
1✔
390
    }
1✔
391
  }
1✔
392

393
  @GuardedBy("lock")
394
  private DataCacheEntry createDataEntry(
395
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
396
    logger.log(
1✔
397
        ChannelLogLevel.DEBUG,
398
        "Transition to data cache: routeLookupResponse={0}",
399
        routeLookupResponse);
400
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
401
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
402
    // this cache update because the lock is held
403
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
404
    return entry;
1✔
405
  }
406

407
  @GuardedBy("lock")
408
  private BackoffCacheEntry createBackOffEntry(
409
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
410
    logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
411
    if (backoffPolicy == null) {
1✔
412
      backoffPolicy = backoffProvider.get();
1✔
413
    }
414
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
415
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
416
    // Lock is held, so the task can't execute before the assignment
417
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
418
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
419
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
420
    logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
1✔
421
        delayNanos);
1✔
422
    return entry;
1✔
423
  }
424

425
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
426
    synchronized (lock) {
1✔
427
      // This checks whether the task has been cancelled and prevents a second execution.
428
      if (!entry.scheduledFuture.cancel(false)) {
1✔
429
        // Future was previously cancelled
430
        return;
×
431
      }
432
      logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
1✔
433
      linkedHashLruCache.invalidate(entry.request);
1✔
434
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
435
    }
1✔
436
  }
1✔
437

438
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
439

440
    final Helper helper;
441
    private ConnectivityState state;
442
    private SubchannelPicker picker;
443

444
    RlsLbHelper(Helper helper) {
1✔
445
      this.helper = helper;
1✔
446
    }
1✔
447

448
    @Override
449
    protected Helper delegate() {
450
      return helper;
1✔
451
    }
452

453
    @Override
454
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
455
      state = newState;
1✔
456
      picker = newPicker;
1✔
457
      super.updateBalancingState(newState, newPicker);
1✔
458
    }
1✔
459

460
    void propagateRlsError() {
461
      getSynchronizationContext().execute(new Runnable() {
1✔
462
        @Override
463
        public void run() {
464
          if (picker != null) {
1✔
465
            // Refresh the channel state and let pending RPCs reprocess the picker.
466
            updateBalancingState(state, picker);
1✔
467
          }
468
        }
1✔
469
      });
470
    }
1✔
471

472
    void triggerPendingRpcProcessing() {
473
      helper.getSynchronizationContext().execute(
×
474
          () -> super.updateBalancingState(state, picker));
×
475
    }
×
476
  }
477

478
  /**
479
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
480
   */
481
  static final class CachedRouteLookupResponse {
482
    // Should only have 1 of following 3 cache entries
483
    @Nullable
484
    private final DataCacheEntry dataCacheEntry;
485
    @Nullable
486
    private final PendingCacheEntry pendingCacheEntry;
487
    @Nullable
488
    private final BackoffCacheEntry backoffCacheEntry;
489

490
    CachedRouteLookupResponse(
491
        DataCacheEntry dataCacheEntry,
492
        PendingCacheEntry pendingCacheEntry,
493
        BackoffCacheEntry backoffCacheEntry) {
1✔
494
      this.dataCacheEntry = dataCacheEntry;
1✔
495
      this.pendingCacheEntry = pendingCacheEntry;
1✔
496
      this.backoffCacheEntry = backoffCacheEntry;
1✔
497
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
498
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
499
          "Expected only 1 cache entry value provided");
500
    }
1✔
501

502
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
503
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
504
    }
505

506
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
507
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
508
    }
509

510
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
511
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
512
    }
513

514
    boolean hasData() {
515
      return dataCacheEntry != null;
1✔
516
    }
517

518
    @Nullable
519
    ChildPolicyWrapper getChildPolicyWrapper() {
520
      if (!hasData()) {
1✔
521
        return null;
×
522
      }
523
      return dataCacheEntry.getChildPolicyWrapper();
1✔
524
    }
525

526
    @VisibleForTesting
527
    @Nullable
528
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
529
      if (!hasData()) {
1✔
530
        return null;
×
531
      }
532
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
533
    }
534

535
    @Nullable
536
    String getHeaderData() {
537
      if (!hasData()) {
1✔
538
        return null;
1✔
539
      }
540
      return dataCacheEntry.getHeaderData();
1✔
541
    }
542

543
    boolean hasError() {
544
      return backoffCacheEntry != null;
1✔
545
    }
546

547
    boolean isPending() {
548
      return pendingCacheEntry != null;
1✔
549
    }
550

551
    @Nullable
552
    Status getStatus() {
553
      if (!hasError()) {
1✔
554
        return null;
×
555
      }
556
      return backoffCacheEntry.getStatus();
1✔
557
    }
558

559
    @Override
560
    public String toString() {
561
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
562
      if (dataCacheEntry != null) {
×
563
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
564
      }
565
      if (pendingCacheEntry != null) {
×
566
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
567
      }
568
      if (backoffCacheEntry != null) {
×
569
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
570
      }
571
      return toStringHelper.toString();
×
572
    }
573
  }
574

575
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
576
  static final class PendingCacheEntry {
577
    private final ListenableFuture<RouteLookupResponse> pendingCall;
578
    private final RouteLookupRequest request;
579
    @Nullable
580
    private final BackoffPolicy backoffPolicy;
581

582
    PendingCacheEntry(
583
        RouteLookupRequest request,
584
        ListenableFuture<RouteLookupResponse> pendingCall,
585
        @Nullable BackoffPolicy backoffPolicy) {
1✔
586
      this.request = checkNotNull(request, "request");
1✔
587
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
588
      this.backoffPolicy = backoffPolicy;
1✔
589
    }
1✔
590

591
    @Override
592
    public String toString() {
593
      return MoreObjects.toStringHelper(this)
×
594
          .add("request", request)
×
595
          .toString();
×
596
    }
597
  }
598

599
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
600
  abstract static class CacheEntry {
601

602
    protected final RouteLookupRequest request;
603

604
    CacheEntry(RouteLookupRequest request) {
1✔
605
      this.request = checkNotNull(request, "request");
1✔
606
    }
1✔
607

608
    abstract int getSizeBytes();
609

610
    abstract boolean isExpired(long now);
611

612
    abstract void cleanup();
613

614
    protected boolean isOldEnoughToBeEvicted(long now) {
615
      return true;
×
616
    }
617
  }
618

619
  /** Implementation of {@link CacheEntry} contains valid data. */
620
  final class DataCacheEntry extends CacheEntry {
621
    private final RouteLookupResponse response;
622
    private final long minEvictionTime;
623
    private final long expireTime;
624
    private final long staleTime;
625
    private final List<ChildPolicyWrapper> childPolicyWrappers;
626

627
    // GuardedBy CachingRlsLbClient.lock
628
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
629
      super(request);
1✔
630
      this.response = checkNotNull(response, "response");
1✔
631
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
632
      childPolicyWrappers =
1✔
633
          refCountedChildPolicyWrapperFactory
1✔
634
              .createOrGet(response.targets());
1✔
635
      long now = ticker.read();
1✔
636
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
637
      expireTime = now + maxAgeNanos;
1✔
638
      staleTime = now + staleAgeNanos;
1✔
639
    }
1✔
640

641
    /**
642
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
643
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
644
     * data still exists. Flow looks like following.
645
     *
646
     * <pre>
647
     * Timeline                       | async refresh
648
     *                                V put new cache (entry2)
649
     * entry1: Pending | hasValue | staled  |
650
     * entry2:                        | OV* | pending | hasValue | staled |
651
     *
652
     * OV: old value
653
     * </pre>
654
     */
655
    void maybeRefresh() {
656
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
657
        if (pendingCallCache.containsKey(request)) {
1✔
658
          // pending already requested
659
          logger.log(ChannelLogLevel.DEBUG,
×
660
              "A pending refresh request already created, no need to proceed with refresh");
661
          return;
×
662
        }
663
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
664
      }
1✔
665
    }
1✔
666

667
    @VisibleForTesting
668
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
669
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
670
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
671
          return childPolicyWrapper;
1✔
672
        }
673
      }
1✔
674

675
      throw new RuntimeException("Target not found:" + target);
×
676
    }
677

678
    @Nullable
679
    ChildPolicyWrapper getChildPolicyWrapper() {
680
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
681
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
682
          return childPolicyWrapper;
1✔
683
        }
684
      }
1✔
685
      return childPolicyWrappers.get(0);
1✔
686
    }
687

688
    String getHeaderData() {
689
      return response.getHeaderData();
1✔
690
    }
691

692
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
693
    int calcStringSize(String target) {
694
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
695
    }
696

697
    @Override
698
    int getSizeBytes() {
699
      int targetSize = 0;
1✔
700
      for (String target : response.targets()) {
1✔
701
        targetSize += calcStringSize(target);
1✔
702
      }
1✔
703
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
704
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
705
    }
706

707
    @Override
708
    boolean isExpired(long now) {
709
      return expireTime - now <= 0;
1✔
710
    }
711

712
    boolean isStaled(long now) {
713
      return staleTime - now <= 0;
1✔
714
    }
715

716
    @Override
717
    protected boolean isOldEnoughToBeEvicted(long now) {
718
      return minEvictionTime - now <= 0;
×
719
    }
720

721
    @Override
722
    void cleanup() {
723
      synchronized (lock) {
1✔
724
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
725
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
726
        }
1✔
727
      }
1✔
728
    }
1✔
729

730
    @Override
731
    public String toString() {
732
      return MoreObjects.toStringHelper(this)
×
733
          .add("request", request)
×
734
          .add("response", response)
×
735
          .add("expireTime", expireTime)
×
736
          .add("staleTime", staleTime)
×
737
          .add("childPolicyWrappers", childPolicyWrappers)
×
738
          .toString();
×
739
    }
740
  }
741

742
  /**
743
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
744
   * status when the backoff time is expired.
745
   */
746
  private static final class BackoffCacheEntry extends CacheEntry {
747

748
    private final Status status;
749
    private final BackoffPolicy backoffPolicy;
750
    private Future<?> scheduledFuture;
751

752
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
753
      super(request);
1✔
754
      this.status = checkNotNull(status, "status");
1✔
755
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
756
    }
1✔
757

758
    Status getStatus() {
759
      return status;
1✔
760
    }
761

762
    @Override
763
    int getSizeBytes() {
764
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
765
    }
766

767
    @Override
768
    boolean isExpired(long now) {
769
      return scheduledFuture.isDone();
1✔
770
    }
771

772
    @Override
773
    void cleanup() {
774
      scheduledFuture.cancel(false);
1✔
775
    }
1✔
776

777
    @Override
778
    public String toString() {
779
      return MoreObjects.toStringHelper(this)
×
780
          .add("request", request)
×
781
          .add("status", status)
×
782
          .toString();
×
783
    }
784
  }
785

786
  /** Returns a Builder for {@link CachingRlsLbClient}. */
787
  static Builder newBuilder() {
788
    return new Builder();
1✔
789
  }
790

791
  /** A Builder for {@link CachingRlsLbClient}. */
792
  static final class Builder {
1✔
793

794
    private Helper helper;
795
    private LbPolicyConfiguration lbPolicyConfig;
796
    private Throttler throttler = new HappyThrottler();
1✔
797
    private ResolvedAddressFactory resolvedAddressFactory;
798
    private Ticker ticker = Ticker.systemTicker();
1✔
799
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
800
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
801

802
    Builder setHelper(Helper helper) {
803
      this.helper = checkNotNull(helper, "helper");
1✔
804
      return this;
1✔
805
    }
806

807
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
808
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
809
      return this;
1✔
810
    }
811

812
    Builder setThrottler(Throttler throttler) {
813
      this.throttler = checkNotNull(throttler, "throttler");
1✔
814
      return this;
1✔
815
    }
816

817
    /**
818
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
819
     */
820
    Builder setResolvedAddressesFactory(
821
        ResolvedAddressFactory resolvedAddressFactory) {
822
      this.resolvedAddressFactory =
1✔
823
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
824
      return this;
1✔
825
    }
826

827
    Builder setTicker(Ticker ticker) {
828
      this.ticker = checkNotNull(ticker, "ticker");
1✔
829
      return this;
1✔
830
    }
831

832
    Builder setEvictionListener(
833
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
834
      this.evictionListener = evictionListener;
1✔
835
      return this;
1✔
836
    }
837

838
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
839
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
840
      return this;
1✔
841
    }
842

843
    CachingRlsLbClient build() {
844
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
845
      helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
1✔
846
      return client;
1✔
847
    }
848
  }
849

850
  /**
851
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
852
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
853
   */
854
  private static final class AutoCleaningEvictionListener
855
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
856

857
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
858

859
    AutoCleaningEvictionListener(
860
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
861
      this.delegate = delegate;
1✔
862
    }
1✔
863

864
    @Override
865
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
866
      if (delegate != null) {
1✔
867
        delegate.onEviction(key, value, cause);
1✔
868
      }
869
      // performs cleanup after delegation
870
      value.cleanup();
1✔
871
    }
1✔
872
  }
873

874
  /** A Throttler never throttles. */
875
  private static final class HappyThrottler implements Throttler {
876

877
    @Override
878
    public boolean shouldThrottle() {
879
      return false;
×
880
    }
881

882
    @Override
883
    public void registerBackendResponse(boolean throttled) {
884
      // no-op
885
    }
×
886
  }
887

888
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
889
  private static final class RlsAsyncLruCache
890
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
891
    private final RlsLbHelper helper;
892

893
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
894
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
895
        ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) {
896
      super(
1✔
897
          maxEstimatedSizeBytes,
898
          evictionListener,
899
          1,
900
          TimeUnit.MINUTES,
901
          ses,
902
          ticker,
903
          lock);
904
      this.helper = checkNotNull(helper, "helper");
1✔
905
    }
1✔
906

907
    @Override
908
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
909
      return value.isExpired(nowNanos);
1✔
910
    }
911

912
    @Override
913
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
914
      return value.getSizeBytes();
1✔
915
    }
916

917
    @Override
918
    protected boolean shouldInvalidateEldestEntry(
919
        RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) {
920
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
921
        return false;
×
922
      }
923

924
      // eldest entry should be evicted if size limit exceeded
925
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
926
    }
927

928
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
929
      CacheEntry newEntry = cache(key, value);
1✔
930

931
      // force cleanup if new entry pushed cache over max size (in bytes)
932
      if (fitToLimit()) {
1✔
933
        helper.triggerPendingRpcProcessing();
×
934
      }
935
      return newEntry;
1✔
936
    }
937
  }
938

939
  /**
940
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
941
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
942
   */
943
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
944

945
    @Nullable
1✔
946
    private ConnectivityState prevState = null;
947

948
    @Override
949
    public void onStatusChanged(ConnectivityState newState) {
950
      logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
1✔
951
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
952
          && newState == ConnectivityState.READY) {
953
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
954
        logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
1✔
955
        synchronized (lock) {
1✔
956
          logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
1✔
957
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
958
            if (value instanceof BackoffCacheEntry) {
1✔
959
              refreshBackoffEntry((BackoffCacheEntry) value);
×
960
            }
961
          }
1✔
962
        }
1✔
963
      }
964
      prevState = newState;
1✔
965
    }
1✔
966
  }
967

968
  /** A header will be added when RLS server respond with additional header data. */
969
  @VisibleForTesting
970
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
971
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
972

973
  final class RlsPicker extends SubchannelPicker {
974

975
    private final RlsRequestFactory requestFactory;
976
    private final String lookupService;
977

978
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
979
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
980
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
981
    }
1✔
982

983
    @Override
984
    public PickResult pickSubchannel(PickSubchannelArgs args) {
985
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
986
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
987
      RouteLookupRequest request =
1✔
988
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
989
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
990
      logger.log(ChannelLogLevel.DEBUG,
1✔
991
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
992
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
993

994
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
995
        logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
1✔
996
        Metadata headers = args.getHeaders();
1✔
997
        headers.discardAll(RLS_DATA_KEY);
1✔
998
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
999
      }
1000
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1001
      logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
1✔
1002
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
1003
      if (response.hasData()) {
1✔
1004
        logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
1✔
1005
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
1006
        SubchannelPicker picker =
1007
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
1008
        if (picker == null) {
1✔
1009
          logger.log(ChannelLogLevel.DEBUG,
×
1010
              "Child policy wrapper didn't return a picker, returning PickResult with no results");
1011
          return PickResult.withNoResult();
×
1012
        }
1013
        // Happy path
1014
        logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
1✔
1015
        PickResult pickResult = picker.pickSubchannel(args);
1✔
1016
        if (pickResult.hasResult()) {
1✔
1017
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
1018
              Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1019
                  childPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1020
              Collections.emptyList());
1✔
1021
        }
1022
        return pickResult;
1✔
1023
      } else if (response.hasError()) {
1✔
1024
        logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
1✔
1025
        if (hasFallback) {
1✔
1026
          logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
1✔
1027
          return useFallback(args);
1✔
1028
        }
1029
        logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
1✔
1030
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
1031
            Arrays.asList(helper.getChannelTarget(), lookupService), Collections.emptyList());
1✔
1032
        return PickResult.withError(
1✔
1033
            convertRlsServerStatus(response.getStatus(),
1✔
1034
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
1035
      } else {
1036
        logger.log(ChannelLogLevel.DEBUG,
1✔
1037
            "RLS response had no data, return a PickResult with no data");
1038
        return PickResult.withNoResult();
1✔
1039
      }
1040
    }
1041

1042
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
1043

1044
    /** Uses Subchannel connected to default target. */
1045
    private PickResult useFallback(PickSubchannelArgs args) {
1046
      // TODO(creamsoup) wait until lb is ready
1047
      startFallbackChildPolicy();
1✔
1048
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1049
      if (picker == null) {
1✔
1050
        return PickResult.withNoResult();
×
1051
      }
1052
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1053
      if (pickResult.hasResult()) {
1✔
1054
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1055
            Arrays.asList(helper.getChannelTarget(), lookupService,
1✔
1056
                fallbackChildPolicyWrapper.getTarget(), determineMetricsPickResult(pickResult)),
1✔
1057
            Collections.emptyList());
1✔
1058
      }
1059
      return pickResult;
1✔
1060
    }
1061

1062
    private String determineMetricsPickResult(PickResult pickResult) {
1063
      if (pickResult.getStatus().isOk()) {
1✔
1064
        return "complete";
1✔
1065
      } else if (pickResult.isDrop()) {
1✔
1066
        return "drop";
×
1067
      } else {
1068
        return "fail";
1✔
1069
      }
1070
    }
1071

1072
    private void startFallbackChildPolicy() {
1073
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1074
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1075
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
1✔
1076
      synchronized (lock) {
1✔
1077
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
1✔
1078
        if (fallbackChildPolicyWrapper != null) {
1✔
1079
          return;
1✔
1080
        }
1081
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1082
      }
1✔
1083
    }
1✔
1084

1085
    // GuardedBy CachingRlsLbClient.lock
1086
    void close() {
1087
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1088
        logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
1✔
1089
        if (fallbackChildPolicyWrapper != null) {
1✔
1090
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1091
        }
1092
      }
1✔
1093
    }
1✔
1094

1095
    @Override
1096
    public String toString() {
1097
      return MoreObjects.toStringHelper(this)
×
1098
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1099
          .toString();
×
1100
    }
1101
  }
1102

1103
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc