• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19141

03 Apr 2024 07:22PM UTC coverage: 88.301% (+0.06%) from 88.246%
#19141

push

github

web-flow
rls: Synchronization fixes in CachingRlsLbClient

This started with combining handleNewRequest with asyncRlsCall, but that
emphasized pre-existing synchronization issues and trying to fix those
exposed others. It was hard to split this into smaller commits because
they were interconnected.

handleNewRequest was combined with asyncRlsCall to use a single code
flow for handling the completed future while also failing the pick
immediately for thottled requests. That flow was then reused for
refreshing after backoff and data stale. It no longer optimizes the RPC
completing immediately because that would not happen in real life; it
only happens in tests because of inprocess+directExecutor() and we don't
want to test a different code flow in tests. This did require updating
some of the tests.

One small behavior change to share the combined asyncRlsCall with
backoff is we now always invalidate an entry after the backoff.
Previously the code could replace the entry with its new value in one
operation if the asyncRlsCall future completed immediately. That only
mattered to a single test which now sees an EXPLICIT eviction.

SynchronizationContext used to provide atomic scheduling in
BackoffCacheEntry, but it was not guaranteeing the scheduledRunnable was
only accessed from the sync context. The same was true for calling up
the LB tree with `updateBalancingState()`. In particular, adding entries
to the cache during a pick could evict entries without running the
cleanup methods within the context, as well as the RLS channel
transitioning from TRANSIENT_FAILURE to READY. This was replaced with
using a bare Future with a lock to provide atomicity.

BackoffCacheEntry no longer uses the current time and instead waits for
the backoff timer to actually run before considering itself expired.
Previously, it could race with periodic cleanup and get evicted before
the timer ran, which would cancel the timer and forget the
backoffPolicy. Si... (continued)

31165 of 35294 relevant lines covered (88.3%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.69
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ChannelLogger.ChannelLogLevel;
33
import io.grpc.ConnectivityState;
34
import io.grpc.LoadBalancer.Helper;
35
import io.grpc.LoadBalancer.PickResult;
36
import io.grpc.LoadBalancer.PickSubchannelArgs;
37
import io.grpc.LoadBalancer.ResolvedAddresses;
38
import io.grpc.LoadBalancer.SubchannelPicker;
39
import io.grpc.ManagedChannel;
40
import io.grpc.ManagedChannelBuilder;
41
import io.grpc.Metadata;
42
import io.grpc.Status;
43
import io.grpc.internal.BackoffPolicy;
44
import io.grpc.internal.ExponentialBackoffPolicy;
45
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
46
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
47
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
48
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
49
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
50
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
51
import io.grpc.rls.LruCache.EvictionListener;
52
import io.grpc.rls.LruCache.EvictionType;
53
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
54
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
55
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
56
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
57
import io.grpc.stub.StreamObserver;
58
import io.grpc.util.ForwardingLoadBalancerHelper;
59
import java.net.URI;
60
import java.net.URISyntaxException;
61
import java.util.HashMap;
62
import java.util.List;
63
import java.util.Map;
64
import java.util.concurrent.Future;
65
import java.util.concurrent.ScheduledExecutorService;
66
import java.util.concurrent.TimeUnit;
67
import javax.annotation.CheckReturnValue;
68
import javax.annotation.Nullable;
69
import javax.annotation.concurrent.GuardedBy;
70
import javax.annotation.concurrent.ThreadSafe;
71

72
/**
73
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
74
 * routing by fetching the decision from route lookup server. Every single request is routed by
75
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
76
 */
77
@ThreadSafe
78
final class CachingRlsLbClient {
79

80
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
81
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
82
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
83
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
84
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
85
  public static final int BYTES_PER_CHAR = 2;
86
  public static final int STRING_OVERHEAD_BYTES = 38;
87
  /** Minimum bytes for a Java Object. */
88
  public static final int OBJ_OVERHEAD_B = 16;
89

90
  // All cache status changes (pending, backoff, success) must be under this lock
91
  private final Object lock = new Object();
1✔
92
  // LRU cache based on access order (BACKOFF and actual data will be here)
93
  @GuardedBy("lock")
94
  private final RlsAsyncLruCache linkedHashLruCache;
95
  // any RPC on the fly will cached in this map
96
  @GuardedBy("lock")
1✔
97
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
98

99
  private final ScheduledExecutorService scheduledExecutorService;
100
  private final Ticker ticker;
101
  private final Throttler throttler;
102

103
  private final LbPolicyConfiguration lbPolicyConfig;
104
  private final BackoffPolicy.Provider backoffProvider;
105
  private final long maxAgeNanos;
106
  private final long staleAgeNanos;
107
  private final long callTimeoutNanos;
108

109
  private final RlsLbHelper helper;
110
  private final ManagedChannel rlsChannel;
111
  private final RouteLookupServiceStub rlsStub;
112
  private final RlsPicker rlsPicker;
113
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
114
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
115
  private final ChannelLogger logger;
116

117
  private CachingRlsLbClient(Builder builder) {
1✔
118
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
119
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
120
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
121
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
122
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
123
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
124
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
125
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
126
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
127
    linkedHashLruCache =
1✔
128
        new RlsAsyncLruCache(
129
            rlsConfig.cacheSizeBytes(),
1✔
130
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
131
            scheduledExecutorService,
132
            ticker,
133
            lock,
134
            helper);
135
    logger = helper.getChannelLogger();
1✔
136
    String serverHost = null;
1✔
137
    try {
138
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
139
    } catch (URISyntaxException ignore) {
×
140
      // handled by the following null check
141
    }
1✔
142
    if (serverHost == null) {
1✔
143
      logger.log(
×
144
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
145
      serverHost = helper.getAuthority();
×
146
    }
147
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
148
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
149
    rlsPicker = new RlsPicker(requestFactory);
1✔
150
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
151
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
152
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
153
    // called to impose the authority security restrictions.
154
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
155
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
156
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
157
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
158
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
159
    if (routeLookupChannelServiceConfig != null) {
1✔
160
      logger.log(
1✔
161
          ChannelLogLevel.DEBUG,
162
          "RLS channel service config: {0}",
163
          routeLookupChannelServiceConfig);
164
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
165
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
166
    }
167
    rlsChannel = rlsChannelBuilder.build();
1✔
168
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
169
    childLbResolvedAddressFactory =
1✔
170
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
171
    backoffProvider = builder.backoffProvider;
1✔
172
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
173
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
174
    refCountedChildPolicyWrapperFactory =
1✔
175
        new RefCountedChildPolicyWrapperFactory(
176
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
177
            childLbHelperProvider,
178
            new BackoffRefreshListener());
179
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
180
  }
1✔
181

182
  /**
183
   * Convert the status to UNAVAILBLE and enhance the error message.
184
   * @param status status as provided by server
185
   * @param serverName Used for error description
186
   * @return Transformed status
187
   */
188
  static Status convertRlsServerStatus(Status status, String serverName) {
189
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
190
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
191
                + "RLS server returned: %s: %s",
192
            serverName, status.getCode(), status.getDescription()));
1✔
193
  }
194

195
  /** Populates async cache entry for new request. */
196
  @GuardedBy("lock")
197
  private CachedRouteLookupResponse asyncRlsCall(
198
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
199
    logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
1✔
200
    if (throttler.shouldThrottle()) {
1✔
201
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
202
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
203
      // on this result
204
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
205
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
206
    }
207
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
208
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
209
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
210
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
211
        .routeLookup(
1✔
212
            routeLookupRequest,
213
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
214
              @Override
215
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
216
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
217
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
218
              }
1✔
219

220
              @Override
221
              public void onError(Throwable t) {
222
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
223
                response.setException(t);
1✔
224
                throttler.registerBackendResponse(true);
1✔
225
              }
1✔
226

227
              @Override
228
              public void onCompleted() {
229
                logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
1✔
230
                throttler.registerBackendResponse(false);
1✔
231
              }
1✔
232
            });
233
    return CachedRouteLookupResponse.pendingResponse(
1✔
234
        createPendingEntry(request, response, backoffPolicy));
1✔
235
  }
236

237
  /**
238
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
239
   * cached, pending and backed-off due to error. The result remains same even if the status is
240
   * changed after the return.
241
   */
242
  @CheckReturnValue
243
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
244
    logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
1✔
245
    synchronized (lock) {
1✔
246
      logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
1✔
247
      final CacheEntry cacheEntry;
248
      cacheEntry = linkedHashLruCache.read(request);
1✔
249
      if (cacheEntry == null) {
1✔
250
        logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
1✔
251
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
252
        if (pendingEntry != null) {
1✔
253
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
254
        }
255
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
256
      }
257

258
      if (cacheEntry instanceof DataCacheEntry) {
1✔
259
        // cache hit, initiate async-refresh if entry is staled
260
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
261
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
262
        if (dataEntry.isStaled(ticker.read())) {
1✔
263
          logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
1✔
264
          dataEntry.maybeRefresh();
1✔
265
        }
266
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
267
      }
268
      logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
1✔
269
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
270
    }
271
  }
272

273
  /** Performs any pending maintenance operations needed by the cache. */
274
  void close() {
275
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
276
    synchronized (lock) {
1✔
277
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
278
      linkedHashLruCache.close();
1✔
279
      // TODO(creamsoup) maybe cancel all pending requests
280
      pendingCallCache.clear();
1✔
281
      rlsChannel.shutdownNow();
1✔
282
      rlsPicker.close();
1✔
283
    }
1✔
284
  }
1✔
285

286
  void requestConnection() {
287
    rlsChannel.getState(true);
×
288
  }
×
289

290
  @GuardedBy("lock")
291
  private PendingCacheEntry createPendingEntry(
292
      RouteLookupRequest request,
293
      ListenableFuture<RouteLookupResponse> pendingCall,
294
      @Nullable BackoffPolicy backoffPolicy) {
295
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
296
    // Add the entry to the map before adding the Listener, because the listener removes the
297
    // entry from the map
298
    pendingCallCache.put(request, entry);
1✔
299
    // Beware that the listener can run immediately on the current thread
300
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
301
    return entry;
1✔
302
  }
303

304
  private void pendingRpcComplete(PendingCacheEntry entry) {
305
    synchronized (lock) {
1✔
306
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
307
      if (clientClosed) {
1✔
308
        return;
1✔
309
      }
310

311
      try {
312
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
313
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
314
        // reattempt picks when the child LB is done connecting
315
      } catch (Exception e) {
1✔
316
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
317
        // Cache updated. updateBalancingState() to reattempt picks
318
        helper.propagateRlsError();
1✔
319
      }
1✔
320
    }
1✔
321
  }
1✔
322

323
  @GuardedBy("lock")
324
  private DataCacheEntry createDataEntry(
325
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
326
    logger.log(
1✔
327
        ChannelLogLevel.DEBUG,
328
        "Transition to data cache: routeLookupResponse={0}",
329
        routeLookupResponse);
330
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
331
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
332
    // this cache update because the lock is held
333
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
334
    return entry;
1✔
335
  }
336

337
  @GuardedBy("lock")
338
  private BackoffCacheEntry createBackOffEntry(
339
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
340
    logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
341
    if (backoffPolicy == null) {
1✔
342
      backoffPolicy = backoffProvider.get();
1✔
343
    }
344
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
345
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
346
    // Lock is held, so the task can't execute before the assignment
347
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
348
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
349
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
350
    logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
1✔
351
        delayNanos);
1✔
352
    return entry;
1✔
353
  }
354

355
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
356
    synchronized (lock) {
1✔
357
      // This checks whether the task has been cancelled and prevents a second execution.
358
      if (!entry.scheduledFuture.cancel(false)) {
1✔
359
        // Future was previously cancelled
360
        return;
×
361
      }
362
      logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
1✔
363
      linkedHashLruCache.invalidate(entry.request);
1✔
364
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
365
    }
1✔
366
  }
1✔
367

368
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
369

370
    final Helper helper;
371
    private ConnectivityState state;
372
    private SubchannelPicker picker;
373

374
    RlsLbHelper(Helper helper) {
1✔
375
      this.helper = helper;
1✔
376
    }
1✔
377

378
    @Override
379
    protected Helper delegate() {
380
      return helper;
1✔
381
    }
382

383
    @Override
384
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
385
      state = newState;
1✔
386
      picker = newPicker;
1✔
387
      super.updateBalancingState(newState, newPicker);
1✔
388
    }
1✔
389

390
    void propagateRlsError() {
391
      getSynchronizationContext().execute(new Runnable() {
1✔
392
        @Override
393
        public void run() {
394
          if (picker != null) {
1✔
395
            // Refresh the channel state and let pending RPCs reprocess the picker.
396
            updateBalancingState(state, picker);
1✔
397
          }
398
        }
1✔
399
      });
400
    }
1✔
401

402
    void triggerPendingRpcProcessing() {
403
      helper.getSynchronizationContext().execute(
×
404
          () -> super.updateBalancingState(state, picker));
×
405
    }
×
406
  }
407

408
  /**
409
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
410
   */
411
  static final class CachedRouteLookupResponse {
412
    // Should only have 1 of following 3 cache entries
413
    @Nullable
414
    private final DataCacheEntry dataCacheEntry;
415
    @Nullable
416
    private final PendingCacheEntry pendingCacheEntry;
417
    @Nullable
418
    private final BackoffCacheEntry backoffCacheEntry;
419

420
    CachedRouteLookupResponse(
421
        DataCacheEntry dataCacheEntry,
422
        PendingCacheEntry pendingCacheEntry,
423
        BackoffCacheEntry backoffCacheEntry) {
1✔
424
      this.dataCacheEntry = dataCacheEntry;
1✔
425
      this.pendingCacheEntry = pendingCacheEntry;
1✔
426
      this.backoffCacheEntry = backoffCacheEntry;
1✔
427
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
428
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
429
          "Expected only 1 cache entry value provided");
430
    }
1✔
431

432
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
433
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
434
    }
435

436
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
437
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
438
    }
439

440
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
441
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
442
    }
443

444
    boolean hasData() {
445
      return dataCacheEntry != null;
1✔
446
    }
447

448
    @Nullable
449
    ChildPolicyWrapper getChildPolicyWrapper() {
450
      if (!hasData()) {
1✔
451
        return null;
×
452
      }
453
      return dataCacheEntry.getChildPolicyWrapper();
1✔
454
    }
455

456
    @VisibleForTesting
457
    @Nullable
458
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
459
      if (!hasData()) {
1✔
460
        return null;
×
461
      }
462
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
463
    }
464

465
    @Nullable
466
    String getHeaderData() {
467
      if (!hasData()) {
1✔
468
        return null;
1✔
469
      }
470
      return dataCacheEntry.getHeaderData();
1✔
471
    }
472

473
    boolean hasError() {
474
      return backoffCacheEntry != null;
1✔
475
    }
476

477
    boolean isPending() {
478
      return pendingCacheEntry != null;
1✔
479
    }
480

481
    @Nullable
482
    Status getStatus() {
483
      if (!hasError()) {
×
484
        return null;
×
485
      }
486
      return backoffCacheEntry.getStatus();
×
487
    }
488

489
    @Override
490
    public String toString() {
491
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
492
      if (dataCacheEntry != null) {
×
493
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
494
      }
495
      if (pendingCacheEntry != null) {
×
496
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
497
      }
498
      if (backoffCacheEntry != null) {
×
499
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
500
      }
501
      return toStringHelper.toString();
×
502
    }
503
  }
504

505
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
506
  static final class PendingCacheEntry {
507
    private final ListenableFuture<RouteLookupResponse> pendingCall;
508
    private final RouteLookupRequest request;
509
    @Nullable
510
    private final BackoffPolicy backoffPolicy;
511

512
    PendingCacheEntry(
513
        RouteLookupRequest request,
514
        ListenableFuture<RouteLookupResponse> pendingCall,
515
        @Nullable BackoffPolicy backoffPolicy) {
1✔
516
      this.request = checkNotNull(request, "request");
1✔
517
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
518
      this.backoffPolicy = backoffPolicy;
1✔
519
    }
1✔
520

521
    @Override
522
    public String toString() {
523
      return MoreObjects.toStringHelper(this)
×
524
          .add("request", request)
×
525
          .toString();
×
526
    }
527
  }
528

529
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
530
  abstract class CacheEntry {
531

532
    protected final RouteLookupRequest request;
533

534
    CacheEntry(RouteLookupRequest request) {
1✔
535
      this.request = checkNotNull(request, "request");
1✔
536
    }
1✔
537

538
    abstract int getSizeBytes();
539

540
    final boolean isExpired() {
541
      return isExpired(ticker.read());
1✔
542
    }
543

544
    abstract boolean isExpired(long now);
545

546
    abstract void cleanup();
547

548
    protected long getMinEvictionTime() {
549
      return 0L;
×
550
    }
551
  }
552

553
  /** Implementation of {@link CacheEntry} contains valid data. */
554
  final class DataCacheEntry extends CacheEntry {
555
    private final RouteLookupResponse response;
556
    private final long minEvictionTime;
557
    private final long expireTime;
558
    private final long staleTime;
559
    private final List<ChildPolicyWrapper> childPolicyWrappers;
560

561
    // GuardedBy CachingRlsLbClient.lock
562
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
563
      super(request);
1✔
564
      this.response = checkNotNull(response, "response");
1✔
565
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
566
      childPolicyWrappers =
1✔
567
          refCountedChildPolicyWrapperFactory
1✔
568
              .createOrGet(response.targets());
1✔
569
      long now = ticker.read();
1✔
570
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
571
      expireTime = now + maxAgeNanos;
1✔
572
      staleTime = now + staleAgeNanos;
1✔
573
    }
1✔
574

575
    /**
576
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
577
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
578
     * data still exists. Flow looks like following.
579
     *
580
     * <pre>
581
     * Timeline                       | async refresh
582
     *                                V put new cache (entry2)
583
     * entry1: Pending | hasValue | staled  |
584
     * entry2:                        | OV* | pending | hasValue | staled |
585
     *
586
     * OV: old value
587
     * </pre>
588
     */
589
    void maybeRefresh() {
590
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
591
        if (pendingCallCache.containsKey(request)) {
1✔
592
          // pending already requested
593
          logger.log(ChannelLogLevel.DEBUG,
×
594
              "A pending refresh request already created, no need to proceed with refresh");
595
          return;
×
596
        }
597
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
598
      }
1✔
599
    }
1✔
600

601
    @VisibleForTesting
602
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
603
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
604
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
605
          return childPolicyWrapper;
1✔
606
        }
607
      }
1✔
608

609
      throw new RuntimeException("Target not found:" + target);
×
610
    }
611

612
    @Nullable
613
    ChildPolicyWrapper getChildPolicyWrapper() {
614
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
615
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
616
          return childPolicyWrapper;
1✔
617
        }
618
      }
1✔
619
      return childPolicyWrappers.get(0);
1✔
620
    }
621

622
    String getHeaderData() {
623
      return response.getHeaderData();
1✔
624
    }
625

626
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
627
    int calcStringSize(String target) {
628
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
629
    }
630

631
    @Override
632
    int getSizeBytes() {
633
      int targetSize = 0;
1✔
634
      for (String target : response.targets()) {
1✔
635
        targetSize += calcStringSize(target);
1✔
636
      }
1✔
637
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
638
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
639
    }
640

641
    @Override
642
    boolean isExpired(long now) {
643
      return expireTime - now <= 0;
1✔
644
    }
645

646
    boolean isStaled(long now) {
647
      return staleTime - now <= 0;
1✔
648
    }
649

650
    @Override
651
    protected long getMinEvictionTime() {
652
      return minEvictionTime;
×
653
    }
654

655
    @Override
656
    void cleanup() {
657
      synchronized (lock) {
1✔
658
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
659
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
660
        }
1✔
661
      }
1✔
662
    }
1✔
663

664
    @Override
665
    public String toString() {
666
      return MoreObjects.toStringHelper(this)
×
667
          .add("request", request)
×
668
          .add("response", response)
×
669
          .add("expireTime", expireTime)
×
670
          .add("staleTime", staleTime)
×
671
          .add("childPolicyWrappers", childPolicyWrappers)
×
672
          .toString();
×
673
    }
674
  }
675

676
  /**
677
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
678
   * status when the backoff time is expired.
679
   */
680
  private final class BackoffCacheEntry extends CacheEntry {
681

682
    private final Status status;
683
    private final BackoffPolicy backoffPolicy;
684
    private Future<?> scheduledFuture;
685

686
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
1✔
687
      super(request);
1✔
688
      this.status = checkNotNull(status, "status");
1✔
689
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
690
    }
1✔
691

692
    Status getStatus() {
693
      return status;
×
694
    }
695

696
    @Override
697
    int getSizeBytes() {
698
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
699
    }
700

701
    @Override
702
    boolean isExpired(long now) {
703
      return scheduledFuture.isDone();
1✔
704
    }
705

706
    @Override
707
    void cleanup() {
708
      scheduledFuture.cancel(false);
1✔
709
    }
1✔
710

711
    @Override
712
    public String toString() {
713
      return MoreObjects.toStringHelper(this)
×
714
          .add("request", request)
×
715
          .add("status", status)
×
716
          .toString();
×
717
    }
718
  }
719

720
  /** Returns a Builder for {@link CachingRlsLbClient}. */
721
  static Builder newBuilder() {
722
    return new Builder();
1✔
723
  }
724

725
  /** A Builder for {@link CachingRlsLbClient}. */
726
  static final class Builder {
1✔
727

728
    private Helper helper;
729
    private LbPolicyConfiguration lbPolicyConfig;
730
    private Throttler throttler = new HappyThrottler();
1✔
731
    private ResolvedAddressFactory resolvedAddressFactory;
732
    private Ticker ticker = Ticker.systemTicker();
1✔
733
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
734
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
735

736
    Builder setHelper(Helper helper) {
737
      this.helper = checkNotNull(helper, "helper");
1✔
738
      return this;
1✔
739
    }
740

741
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
742
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
743
      return this;
1✔
744
    }
745

746
    Builder setThrottler(Throttler throttler) {
747
      this.throttler = checkNotNull(throttler, "throttler");
1✔
748
      return this;
1✔
749
    }
750

751
    /**
752
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
753
     */
754
    Builder setResolvedAddressesFactory(
755
        ResolvedAddressFactory resolvedAddressFactory) {
756
      this.resolvedAddressFactory =
1✔
757
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
758
      return this;
1✔
759
    }
760

761
    Builder setTicker(Ticker ticker) {
762
      this.ticker = checkNotNull(ticker, "ticker");
1✔
763
      return this;
1✔
764
    }
765

766
    Builder setEvictionListener(
767
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
768
      this.evictionListener = evictionListener;
1✔
769
      return this;
1✔
770
    }
771

772
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
773
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
774
      return this;
1✔
775
    }
776

777
    CachingRlsLbClient build() {
778
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
779
      helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
1✔
780
      return client;
1✔
781
    }
782
  }
783

784
  /**
785
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
786
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
787
   */
788
  private static final class AutoCleaningEvictionListener
789
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
790

791
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
792

793
    AutoCleaningEvictionListener(
794
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
795
      this.delegate = delegate;
1✔
796
    }
1✔
797

798
    @Override
799
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
800
      if (delegate != null) {
1✔
801
        delegate.onEviction(key, value, cause);
1✔
802
      }
803
      // performs cleanup after delegation
804
      value.cleanup();
1✔
805
    }
1✔
806
  }
807

808
  /** A Throttler never throttles. */
809
  private static final class HappyThrottler implements Throttler {
810

811
    @Override
812
    public boolean shouldThrottle() {
813
      return false;
×
814
    }
815

816
    @Override
817
    public void registerBackendResponse(boolean throttled) {
818
      // no-op
819
    }
×
820
  }
821

822
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
823
  private static final class RlsAsyncLruCache
824
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
825
    private final RlsLbHelper helper;
826

827
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
828
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
829
        ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) {
830
      super(
1✔
831
          maxEstimatedSizeBytes,
832
          evictionListener,
833
          1,
834
          TimeUnit.MINUTES,
835
          ses,
836
          ticker,
837
          lock);
838
      this.helper = checkNotNull(helper, "helper");
1✔
839
    }
1✔
840

841
    @Override
842
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
843
      return value.isExpired();
1✔
844
    }
845

846
    @Override
847
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
848
      return value.getSizeBytes();
1✔
849
    }
850

851
    @Override
852
    protected boolean shouldInvalidateEldestEntry(
853
        RouteLookupRequest eldestKey, CacheEntry eldestValue) {
854
      if (eldestValue.getMinEvictionTime() > now()) {
×
855
        return false;
×
856
      }
857

858
      // eldest entry should be evicted if size limit exceeded
859
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
860
    }
861

862
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
863
      CacheEntry newEntry = cache(key, value);
1✔
864

865
      // force cleanup if new entry pushed cache over max size (in bytes)
866
      if (fitToLimit()) {
1✔
867
        helper.triggerPendingRpcProcessing();
×
868
      }
869
      return newEntry;
1✔
870
    }
871
  }
872

873
  /**
874
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
875
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
876
   */
877
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
878

879
    @Nullable
1✔
880
    private ConnectivityState prevState = null;
881

882
    @Override
883
    public void onStatusChanged(ConnectivityState newState) {
884
      logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
1✔
885
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
886
          && newState == ConnectivityState.READY) {
887
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
888
        logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
1✔
889
        synchronized (lock) {
1✔
890
          logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
1✔
891
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
892
            if (value instanceof BackoffCacheEntry) {
1✔
893
              refreshBackoffEntry((BackoffCacheEntry) value);
×
894
            }
895
          }
1✔
896
        }
1✔
897
      }
898
      prevState = newState;
1✔
899
    }
1✔
900
  }
901

902
  /** A header will be added when RLS server respond with additional header data. */
903
  @VisibleForTesting
904
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
905
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
906

907
  final class RlsPicker extends SubchannelPicker {
908

909
    private final RlsRequestFactory requestFactory;
910

911
    RlsPicker(RlsRequestFactory requestFactory) {
1✔
912
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
913
    }
1✔
914

915
    @Override
916
    public PickResult pickSubchannel(PickSubchannelArgs args) {
917
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
918
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
919
      RouteLookupRequest request =
1✔
920
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
921
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
922
      logger.log(ChannelLogLevel.DEBUG,
1✔
923
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
924
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
925

926
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
927
        logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
1✔
928
        Metadata headers = args.getHeaders();
1✔
929
        headers.discardAll(RLS_DATA_KEY);
1✔
930
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
931
      }
932
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
933
      logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
1✔
934
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
935
      if (response.hasData()) {
1✔
936
        logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
1✔
937
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
938
        SubchannelPicker picker =
939
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
940
        if (picker == null) {
1✔
941
          logger.log(ChannelLogLevel.DEBUG,
×
942
              "Child policy wrapper didn't return a picker, returning PickResult with no results");
943
          return PickResult.withNoResult();
×
944
        }
945
        // Happy path
946
        logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
1✔
947
        return picker.pickSubchannel(args);
1✔
948
      } else if (response.hasError()) {
1✔
949
        logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
1✔
950
        if (hasFallback) {
1✔
951
          logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
1✔
952
          return useFallback(args);
1✔
953
        }
954
        logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
×
955
        return PickResult.withError(
×
956
            convertRlsServerStatus(response.getStatus(),
×
957
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
×
958
      } else {
959
        logger.log(ChannelLogLevel.DEBUG,
1✔
960
            "RLS response had no data, return a PickResult with no data");
961
        return PickResult.withNoResult();
1✔
962
      }
963
    }
964

965
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
966

967
    /** Uses Subchannel connected to default target. */
968
    private PickResult useFallback(PickSubchannelArgs args) {
969
      // TODO(creamsoup) wait until lb is ready
970
      startFallbackChildPolicy();
1✔
971
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
972
      if (picker == null) {
1✔
973
        return PickResult.withNoResult();
×
974
      }
975
      return picker.pickSubchannel(args);
1✔
976
    }
977

978
    private void startFallbackChildPolicy() {
979
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
980
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
981
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
1✔
982
      synchronized (lock) {
1✔
983
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
1✔
984
        if (fallbackChildPolicyWrapper != null) {
1✔
985
          return;
1✔
986
        }
987
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
988
      }
1✔
989
    }
1✔
990

991
    // GuardedBy CachingRlsLbClient.lock
992
    void close() {
993
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
994
        logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
1✔
995
        if (fallbackChildPolicyWrapper != null) {
1✔
996
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
997
        }
998
      }
1✔
999
    }
1✔
1000

1001
    @Override
1002
    public String toString() {
1003
      return MoreObjects.toStringHelper(this)
×
1004
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1005
          .toString();
×
1006
    }
1007
  }
1008

1009
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc