• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19174

25 Apr 2024 10:35PM UTC coverage: 88.091% (+0.02%) from 88.069%
#19174

push

github

ejona86
rls: Document RefCountedChildPolicyWrapperFactory as non-threadsafe

Instead of having docs in RefCountedChildPolicyWrapperFactory saying
that every method was guarded by a lock, I added `@GuardedBy("lock")`
within CachingRlsLbClient, so now it is clearly not thread-safe and the
lock protects access. The AtomicLong was replaced with a long since
1) there was no multi-threading and 2) the logic was not atomic-safe
which was misleading.

31216 of 35436 relevant lines covered (88.09%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.69
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.Futures;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.MoreExecutors;
30
import com.google.common.util.concurrent.SettableFuture;
31
import io.grpc.ChannelLogger;
32
import io.grpc.ChannelLogger.ChannelLogLevel;
33
import io.grpc.ConnectivityState;
34
import io.grpc.LoadBalancer.Helper;
35
import io.grpc.LoadBalancer.PickResult;
36
import io.grpc.LoadBalancer.PickSubchannelArgs;
37
import io.grpc.LoadBalancer.ResolvedAddresses;
38
import io.grpc.LoadBalancer.SubchannelPicker;
39
import io.grpc.ManagedChannel;
40
import io.grpc.ManagedChannelBuilder;
41
import io.grpc.Metadata;
42
import io.grpc.Status;
43
import io.grpc.internal.BackoffPolicy;
44
import io.grpc.internal.ExponentialBackoffPolicy;
45
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
46
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
47
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
48
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
49
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
50
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
51
import io.grpc.rls.LruCache.EvictionListener;
52
import io.grpc.rls.LruCache.EvictionType;
53
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
54
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
55
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
56
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
57
import io.grpc.stub.StreamObserver;
58
import io.grpc.util.ForwardingLoadBalancerHelper;
59
import java.net.URI;
60
import java.net.URISyntaxException;
61
import java.util.HashMap;
62
import java.util.List;
63
import java.util.Map;
64
import java.util.concurrent.Future;
65
import java.util.concurrent.ScheduledExecutorService;
66
import java.util.concurrent.TimeUnit;
67
import javax.annotation.CheckReturnValue;
68
import javax.annotation.Nullable;
69
import javax.annotation.concurrent.GuardedBy;
70
import javax.annotation.concurrent.ThreadSafe;
71

72
/**
73
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
74
 * routing by fetching the decision from route lookup server. Every single request is routed by
75
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
76
 */
77
@ThreadSafe
78
final class CachingRlsLbClient {
79

80
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
81
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
82
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
83
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
84
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
85
  public static final int BYTES_PER_CHAR = 2;
86
  public static final int STRING_OVERHEAD_BYTES = 38;
87
  /** Minimum bytes for a Java Object. */
88
  public static final int OBJ_OVERHEAD_B = 16;
89

90
  // All cache status changes (pending, backoff, success) must be under this lock
91
  private final Object lock = new Object();
1✔
92
  // LRU cache based on access order (BACKOFF and actual data will be here)
93
  @GuardedBy("lock")
94
  private final RlsAsyncLruCache linkedHashLruCache;
95
  // any RPC on the fly will cached in this map
96
  @GuardedBy("lock")
1✔
97
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
98

99
  private final ScheduledExecutorService scheduledExecutorService;
100
  private final Ticker ticker;
101
  private final Throttler throttler;
102

103
  private final LbPolicyConfiguration lbPolicyConfig;
104
  private final BackoffPolicy.Provider backoffProvider;
105
  private final long maxAgeNanos;
106
  private final long staleAgeNanos;
107
  private final long callTimeoutNanos;
108

109
  private final RlsLbHelper helper;
110
  private final ManagedChannel rlsChannel;
111
  private final RouteLookupServiceStub rlsStub;
112
  private final RlsPicker rlsPicker;
113
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
114
  @GuardedBy("lock")
115
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
116
  private final ChannelLogger logger;
117

118
  private CachingRlsLbClient(Builder builder) {
1✔
119
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
120
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
121
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
122
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
123
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
124
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
125
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
126
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
127
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
128
    linkedHashLruCache =
1✔
129
        new RlsAsyncLruCache(
130
            rlsConfig.cacheSizeBytes(),
1✔
131
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
132
            scheduledExecutorService,
133
            ticker,
134
            lock,
135
            helper);
136
    logger = helper.getChannelLogger();
1✔
137
    String serverHost = null;
1✔
138
    try {
139
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
140
    } catch (URISyntaxException ignore) {
×
141
      // handled by the following null check
142
    }
1✔
143
    if (serverHost == null) {
1✔
144
      logger.log(
×
145
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
146
      serverHost = helper.getAuthority();
×
147
    }
148
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
149
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
150
    rlsPicker = new RlsPicker(requestFactory);
1✔
151
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
152
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
153
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
154
    // called to impose the authority security restrictions.
155
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
156
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
157
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
158
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
159
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
160
    if (routeLookupChannelServiceConfig != null) {
1✔
161
      logger.log(
1✔
162
          ChannelLogLevel.DEBUG,
163
          "RLS channel service config: {0}",
164
          routeLookupChannelServiceConfig);
165
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
166
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
167
    }
168
    rlsChannel = rlsChannelBuilder.build();
1✔
169
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
170
    childLbResolvedAddressFactory =
1✔
171
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
172
    backoffProvider = builder.backoffProvider;
1✔
173
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
174
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
175
    refCountedChildPolicyWrapperFactory =
1✔
176
        new RefCountedChildPolicyWrapperFactory(
177
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
178
            childLbHelperProvider,
179
            new BackoffRefreshListener());
180
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
181
  }
1✔
182

183
  /**
184
   * Convert the status to UNAVAILBLE and enhance the error message.
185
   * @param status status as provided by server
186
   * @param serverName Used for error description
187
   * @return Transformed status
188
   */
189
  static Status convertRlsServerStatus(Status status, String serverName) {
190
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
191
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
192
                + "RLS server returned: %s: %s",
193
            serverName, status.getCode(), status.getDescription()));
1✔
194
  }
195

196
  /** Populates async cache entry for new request. */
197
  @GuardedBy("lock")
198
  private CachedRouteLookupResponse asyncRlsCall(
199
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
200
    logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
1✔
201
    if (throttler.shouldThrottle()) {
1✔
202
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
203
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
204
      // on this result
205
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
206
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
207
    }
208
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
209
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
210
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
211
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
212
        .routeLookup(
1✔
213
            routeLookupRequest,
214
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
215
              @Override
216
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
217
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
218
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
219
              }
1✔
220

221
              @Override
222
              public void onError(Throwable t) {
223
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
224
                response.setException(t);
1✔
225
                throttler.registerBackendResponse(true);
1✔
226
              }
1✔
227

228
              @Override
229
              public void onCompleted() {
230
                logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
1✔
231
                throttler.registerBackendResponse(false);
1✔
232
              }
1✔
233
            });
234
    return CachedRouteLookupResponse.pendingResponse(
1✔
235
        createPendingEntry(request, response, backoffPolicy));
1✔
236
  }
237

238
  /**
239
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
240
   * cached, pending and backed-off due to error. The result remains same even if the status is
241
   * changed after the return.
242
   */
243
  @CheckReturnValue
244
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
245
    logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
1✔
246
    synchronized (lock) {
1✔
247
      logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
1✔
248
      final CacheEntry cacheEntry;
249
      cacheEntry = linkedHashLruCache.read(request);
1✔
250
      if (cacheEntry == null) {
1✔
251
        logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
1✔
252
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
253
        if (pendingEntry != null) {
1✔
254
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
255
        }
256
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
257
      }
258

259
      if (cacheEntry instanceof DataCacheEntry) {
1✔
260
        // cache hit, initiate async-refresh if entry is staled
261
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
262
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
263
        if (dataEntry.isStaled(ticker.read())) {
1✔
264
          logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
1✔
265
          dataEntry.maybeRefresh();
1✔
266
        }
267
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
268
      }
269
      logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
1✔
270
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
271
    }
272
  }
273

274
  /** Performs any pending maintenance operations needed by the cache. */
275
  void close() {
276
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
277
    synchronized (lock) {
1✔
278
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
279
      linkedHashLruCache.close();
1✔
280
      // TODO(creamsoup) maybe cancel all pending requests
281
      pendingCallCache.clear();
1✔
282
      rlsChannel.shutdownNow();
1✔
283
      rlsPicker.close();
1✔
284
    }
1✔
285
  }
1✔
286

287
  void requestConnection() {
288
    rlsChannel.getState(true);
×
289
  }
×
290

291
  @GuardedBy("lock")
292
  private PendingCacheEntry createPendingEntry(
293
      RouteLookupRequest request,
294
      ListenableFuture<RouteLookupResponse> pendingCall,
295
      @Nullable BackoffPolicy backoffPolicy) {
296
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
297
    // Add the entry to the map before adding the Listener, because the listener removes the
298
    // entry from the map
299
    pendingCallCache.put(request, entry);
1✔
300
    // Beware that the listener can run immediately on the current thread
301
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
302
    return entry;
1✔
303
  }
304

305
  private void pendingRpcComplete(PendingCacheEntry entry) {
306
    synchronized (lock) {
1✔
307
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
308
      if (clientClosed) {
1✔
309
        return;
1✔
310
      }
311

312
      try {
313
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
314
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
315
        // reattempt picks when the child LB is done connecting
316
      } catch (Exception e) {
1✔
317
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
318
        // Cache updated. updateBalancingState() to reattempt picks
319
        helper.propagateRlsError();
1✔
320
      }
1✔
321
    }
1✔
322
  }
1✔
323

324
  @GuardedBy("lock")
325
  private DataCacheEntry createDataEntry(
326
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
327
    logger.log(
1✔
328
        ChannelLogLevel.DEBUG,
329
        "Transition to data cache: routeLookupResponse={0}",
330
        routeLookupResponse);
331
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
332
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
333
    // this cache update because the lock is held
334
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
335
    return entry;
1✔
336
  }
337

338
  @GuardedBy("lock")
339
  private BackoffCacheEntry createBackOffEntry(
340
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
341
    logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
342
    if (backoffPolicy == null) {
1✔
343
      backoffPolicy = backoffProvider.get();
1✔
344
    }
345
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
346
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
347
    // Lock is held, so the task can't execute before the assignment
348
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
349
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
350
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
351
    logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
1✔
352
        delayNanos);
1✔
353
    return entry;
1✔
354
  }
355

356
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
357
    synchronized (lock) {
1✔
358
      // This checks whether the task has been cancelled and prevents a second execution.
359
      if (!entry.scheduledFuture.cancel(false)) {
1✔
360
        // Future was previously cancelled
361
        return;
×
362
      }
363
      logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
1✔
364
      linkedHashLruCache.invalidate(entry.request);
1✔
365
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
366
    }
1✔
367
  }
1✔
368

369
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
370

371
    final Helper helper;
372
    private ConnectivityState state;
373
    private SubchannelPicker picker;
374

375
    RlsLbHelper(Helper helper) {
1✔
376
      this.helper = helper;
1✔
377
    }
1✔
378

379
    @Override
380
    protected Helper delegate() {
381
      return helper;
1✔
382
    }
383

384
    @Override
385
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
386
      state = newState;
1✔
387
      picker = newPicker;
1✔
388
      super.updateBalancingState(newState, newPicker);
1✔
389
    }
1✔
390

391
    void propagateRlsError() {
392
      getSynchronizationContext().execute(new Runnable() {
1✔
393
        @Override
394
        public void run() {
395
          if (picker != null) {
1✔
396
            // Refresh the channel state and let pending RPCs reprocess the picker.
397
            updateBalancingState(state, picker);
1✔
398
          }
399
        }
1✔
400
      });
401
    }
1✔
402

403
    void triggerPendingRpcProcessing() {
404
      helper.getSynchronizationContext().execute(
×
405
          () -> super.updateBalancingState(state, picker));
×
406
    }
×
407
  }
408

409
  /**
410
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
411
   */
412
  static final class CachedRouteLookupResponse {
413
    // Should only have 1 of following 3 cache entries
414
    @Nullable
415
    private final DataCacheEntry dataCacheEntry;
416
    @Nullable
417
    private final PendingCacheEntry pendingCacheEntry;
418
    @Nullable
419
    private final BackoffCacheEntry backoffCacheEntry;
420

421
    CachedRouteLookupResponse(
422
        DataCacheEntry dataCacheEntry,
423
        PendingCacheEntry pendingCacheEntry,
424
        BackoffCacheEntry backoffCacheEntry) {
1✔
425
      this.dataCacheEntry = dataCacheEntry;
1✔
426
      this.pendingCacheEntry = pendingCacheEntry;
1✔
427
      this.backoffCacheEntry = backoffCacheEntry;
1✔
428
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
429
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
430
          "Expected only 1 cache entry value provided");
431
    }
1✔
432

433
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
434
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
435
    }
436

437
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
438
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
439
    }
440

441
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
442
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
443
    }
444

445
    boolean hasData() {
446
      return dataCacheEntry != null;
1✔
447
    }
448

449
    @Nullable
450
    ChildPolicyWrapper getChildPolicyWrapper() {
451
      if (!hasData()) {
1✔
452
        return null;
×
453
      }
454
      return dataCacheEntry.getChildPolicyWrapper();
1✔
455
    }
456

457
    @VisibleForTesting
458
    @Nullable
459
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
460
      if (!hasData()) {
1✔
461
        return null;
×
462
      }
463
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
464
    }
465

466
    @Nullable
467
    String getHeaderData() {
468
      if (!hasData()) {
1✔
469
        return null;
1✔
470
      }
471
      return dataCacheEntry.getHeaderData();
1✔
472
    }
473

474
    boolean hasError() {
475
      return backoffCacheEntry != null;
1✔
476
    }
477

478
    boolean isPending() {
479
      return pendingCacheEntry != null;
1✔
480
    }
481

482
    @Nullable
483
    Status getStatus() {
484
      if (!hasError()) {
×
485
        return null;
×
486
      }
487
      return backoffCacheEntry.getStatus();
×
488
    }
489

490
    @Override
491
    public String toString() {
492
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
493
      if (dataCacheEntry != null) {
×
494
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
495
      }
496
      if (pendingCacheEntry != null) {
×
497
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
498
      }
499
      if (backoffCacheEntry != null) {
×
500
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
501
      }
502
      return toStringHelper.toString();
×
503
    }
504
  }
505

506
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
507
  static final class PendingCacheEntry {
508
    private final ListenableFuture<RouteLookupResponse> pendingCall;
509
    private final RouteLookupRequest request;
510
    @Nullable
511
    private final BackoffPolicy backoffPolicy;
512

513
    PendingCacheEntry(
514
        RouteLookupRequest request,
515
        ListenableFuture<RouteLookupResponse> pendingCall,
516
        @Nullable BackoffPolicy backoffPolicy) {
1✔
517
      this.request = checkNotNull(request, "request");
1✔
518
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
519
      this.backoffPolicy = backoffPolicy;
1✔
520
    }
1✔
521

522
    @Override
523
    public String toString() {
524
      return MoreObjects.toStringHelper(this)
×
525
          .add("request", request)
×
526
          .toString();
×
527
    }
528
  }
529

530
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
531
  abstract class CacheEntry {
532

533
    protected final RouteLookupRequest request;
534

535
    CacheEntry(RouteLookupRequest request) {
1✔
536
      this.request = checkNotNull(request, "request");
1✔
537
    }
1✔
538

539
    abstract int getSizeBytes();
540

541
    final boolean isExpired() {
542
      return isExpired(ticker.read());
1✔
543
    }
544

545
    abstract boolean isExpired(long now);
546

547
    abstract void cleanup();
548

549
    protected long getMinEvictionTime() {
550
      return 0L;
×
551
    }
552
  }
553

554
  /** Implementation of {@link CacheEntry} contains valid data. */
555
  final class DataCacheEntry extends CacheEntry {
556
    private final RouteLookupResponse response;
557
    private final long minEvictionTime;
558
    private final long expireTime;
559
    private final long staleTime;
560
    private final List<ChildPolicyWrapper> childPolicyWrappers;
561

562
    // GuardedBy CachingRlsLbClient.lock
563
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
564
      super(request);
1✔
565
      this.response = checkNotNull(response, "response");
1✔
566
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
567
      childPolicyWrappers =
1✔
568
          refCountedChildPolicyWrapperFactory
1✔
569
              .createOrGet(response.targets());
1✔
570
      long now = ticker.read();
1✔
571
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
572
      expireTime = now + maxAgeNanos;
1✔
573
      staleTime = now + staleAgeNanos;
1✔
574
    }
1✔
575

576
    /**
577
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
578
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
579
     * data still exists. Flow looks like following.
580
     *
581
     * <pre>
582
     * Timeline                       | async refresh
583
     *                                V put new cache (entry2)
584
     * entry1: Pending | hasValue | staled  |
585
     * entry2:                        | OV* | pending | hasValue | staled |
586
     *
587
     * OV: old value
588
     * </pre>
589
     */
590
    void maybeRefresh() {
591
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
592
        if (pendingCallCache.containsKey(request)) {
1✔
593
          // pending already requested
594
          logger.log(ChannelLogLevel.DEBUG,
×
595
              "A pending refresh request already created, no need to proceed with refresh");
596
          return;
×
597
        }
598
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
599
      }
1✔
600
    }
1✔
601

602
    @VisibleForTesting
603
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
604
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
605
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
606
          return childPolicyWrapper;
1✔
607
        }
608
      }
1✔
609

610
      throw new RuntimeException("Target not found:" + target);
×
611
    }
612

613
    @Nullable
614
    ChildPolicyWrapper getChildPolicyWrapper() {
615
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
616
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
617
          return childPolicyWrapper;
1✔
618
        }
619
      }
1✔
620
      return childPolicyWrappers.get(0);
1✔
621
    }
622

623
    String getHeaderData() {
624
      return response.getHeaderData();
1✔
625
    }
626

627
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
628
    int calcStringSize(String target) {
629
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
630
    }
631

632
    @Override
633
    int getSizeBytes() {
634
      int targetSize = 0;
1✔
635
      for (String target : response.targets()) {
1✔
636
        targetSize += calcStringSize(target);
1✔
637
      }
1✔
638
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
639
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
640
    }
641

642
    @Override
643
    boolean isExpired(long now) {
644
      return expireTime - now <= 0;
1✔
645
    }
646

647
    boolean isStaled(long now) {
648
      return staleTime - now <= 0;
1✔
649
    }
650

651
    @Override
652
    protected long getMinEvictionTime() {
653
      return minEvictionTime;
×
654
    }
655

656
    @Override
657
    void cleanup() {
658
      synchronized (lock) {
1✔
659
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
660
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
661
        }
1✔
662
      }
1✔
663
    }
1✔
664

665
    @Override
666
    public String toString() {
667
      return MoreObjects.toStringHelper(this)
×
668
          .add("request", request)
×
669
          .add("response", response)
×
670
          .add("expireTime", expireTime)
×
671
          .add("staleTime", staleTime)
×
672
          .add("childPolicyWrappers", childPolicyWrappers)
×
673
          .toString();
×
674
    }
675
  }
676

677
  /**
678
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
679
   * status when the backoff time is expired.
680
   */
681
  private final class BackoffCacheEntry extends CacheEntry {
682

683
    private final Status status;
684
    private final BackoffPolicy backoffPolicy;
685
    private Future<?> scheduledFuture;
686

687
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
1✔
688
      super(request);
1✔
689
      this.status = checkNotNull(status, "status");
1✔
690
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
691
    }
1✔
692

693
    Status getStatus() {
694
      return status;
×
695
    }
696

697
    @Override
698
    int getSizeBytes() {
699
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
700
    }
701

702
    @Override
703
    boolean isExpired(long now) {
704
      return scheduledFuture.isDone();
1✔
705
    }
706

707
    @Override
708
    void cleanup() {
709
      scheduledFuture.cancel(false);
1✔
710
    }
1✔
711

712
    @Override
713
    public String toString() {
714
      return MoreObjects.toStringHelper(this)
×
715
          .add("request", request)
×
716
          .add("status", status)
×
717
          .toString();
×
718
    }
719
  }
720

721
  /** Returns a Builder for {@link CachingRlsLbClient}. */
722
  static Builder newBuilder() {
723
    return new Builder();
1✔
724
  }
725

726
  /** A Builder for {@link CachingRlsLbClient}. */
727
  static final class Builder {
1✔
728

729
    private Helper helper;
730
    private LbPolicyConfiguration lbPolicyConfig;
731
    private Throttler throttler = new HappyThrottler();
1✔
732
    private ResolvedAddressFactory resolvedAddressFactory;
733
    private Ticker ticker = Ticker.systemTicker();
1✔
734
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
735
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
736

737
    Builder setHelper(Helper helper) {
738
      this.helper = checkNotNull(helper, "helper");
1✔
739
      return this;
1✔
740
    }
741

742
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
743
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
744
      return this;
1✔
745
    }
746

747
    Builder setThrottler(Throttler throttler) {
748
      this.throttler = checkNotNull(throttler, "throttler");
1✔
749
      return this;
1✔
750
    }
751

752
    /**
753
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
754
     */
755
    Builder setResolvedAddressesFactory(
756
        ResolvedAddressFactory resolvedAddressFactory) {
757
      this.resolvedAddressFactory =
1✔
758
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
759
      return this;
1✔
760
    }
761

762
    Builder setTicker(Ticker ticker) {
763
      this.ticker = checkNotNull(ticker, "ticker");
1✔
764
      return this;
1✔
765
    }
766

767
    Builder setEvictionListener(
768
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
769
      this.evictionListener = evictionListener;
1✔
770
      return this;
1✔
771
    }
772

773
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
774
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
775
      return this;
1✔
776
    }
777

778
    CachingRlsLbClient build() {
779
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
780
      helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
1✔
781
      return client;
1✔
782
    }
783
  }
784

785
  /**
786
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
787
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
788
   */
789
  private static final class AutoCleaningEvictionListener
790
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
791

792
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
793

794
    AutoCleaningEvictionListener(
795
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
796
      this.delegate = delegate;
1✔
797
    }
1✔
798

799
    @Override
800
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
801
      if (delegate != null) {
1✔
802
        delegate.onEviction(key, value, cause);
1✔
803
      }
804
      // performs cleanup after delegation
805
      value.cleanup();
1✔
806
    }
1✔
807
  }
808

809
  /** A Throttler never throttles. */
810
  private static final class HappyThrottler implements Throttler {
811

812
    @Override
813
    public boolean shouldThrottle() {
814
      return false;
×
815
    }
816

817
    @Override
818
    public void registerBackendResponse(boolean throttled) {
819
      // no-op
820
    }
×
821
  }
822

823
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
824
  private static final class RlsAsyncLruCache
825
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
826
    private final RlsLbHelper helper;
827

828
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
829
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
830
        ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) {
831
      super(
1✔
832
          maxEstimatedSizeBytes,
833
          evictionListener,
834
          1,
835
          TimeUnit.MINUTES,
836
          ses,
837
          ticker,
838
          lock);
839
      this.helper = checkNotNull(helper, "helper");
1✔
840
    }
1✔
841

842
    @Override
843
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
844
      return value.isExpired();
1✔
845
    }
846

847
    @Override
848
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
849
      return value.getSizeBytes();
1✔
850
    }
851

852
    @Override
853
    protected boolean shouldInvalidateEldestEntry(
854
        RouteLookupRequest eldestKey, CacheEntry eldestValue) {
855
      if (eldestValue.getMinEvictionTime() > now()) {
×
856
        return false;
×
857
      }
858

859
      // eldest entry should be evicted if size limit exceeded
860
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
861
    }
862

863
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
864
      CacheEntry newEntry = cache(key, value);
1✔
865

866
      // force cleanup if new entry pushed cache over max size (in bytes)
867
      if (fitToLimit()) {
1✔
868
        helper.triggerPendingRpcProcessing();
×
869
      }
870
      return newEntry;
1✔
871
    }
872
  }
873

874
  /**
875
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
876
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
877
   */
878
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
879

880
    @Nullable
1✔
881
    private ConnectivityState prevState = null;
882

883
    @Override
884
    public void onStatusChanged(ConnectivityState newState) {
885
      logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
1✔
886
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
887
          && newState == ConnectivityState.READY) {
888
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
889
        logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
1✔
890
        synchronized (lock) {
1✔
891
          logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
1✔
892
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
893
            if (value instanceof BackoffCacheEntry) {
1✔
894
              refreshBackoffEntry((BackoffCacheEntry) value);
×
895
            }
896
          }
1✔
897
        }
1✔
898
      }
899
      prevState = newState;
1✔
900
    }
1✔
901
  }
902

903
  /** A header will be added when RLS server respond with additional header data. */
904
  @VisibleForTesting
905
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
906
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
907

908
  final class RlsPicker extends SubchannelPicker {
909

910
    private final RlsRequestFactory requestFactory;
911

912
    RlsPicker(RlsRequestFactory requestFactory) {
1✔
913
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
914
    }
1✔
915

916
    @Override
917
    public PickResult pickSubchannel(PickSubchannelArgs args) {
918
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
919
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
920
      RouteLookupRequest request =
1✔
921
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
922
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
923
      logger.log(ChannelLogLevel.DEBUG,
1✔
924
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
925
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
926

927
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
928
        logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
1✔
929
        Metadata headers = args.getHeaders();
1✔
930
        headers.discardAll(RLS_DATA_KEY);
1✔
931
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
932
      }
933
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
934
      logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
1✔
935
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
936
      if (response.hasData()) {
1✔
937
        logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
1✔
938
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
939
        SubchannelPicker picker =
940
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
941
        if (picker == null) {
1✔
942
          logger.log(ChannelLogLevel.DEBUG,
×
943
              "Child policy wrapper didn't return a picker, returning PickResult with no results");
944
          return PickResult.withNoResult();
×
945
        }
946
        // Happy path
947
        logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
1✔
948
        return picker.pickSubchannel(args);
1✔
949
      } else if (response.hasError()) {
1✔
950
        logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
1✔
951
        if (hasFallback) {
1✔
952
          logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
1✔
953
          return useFallback(args);
1✔
954
        }
955
        logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
×
956
        return PickResult.withError(
×
957
            convertRlsServerStatus(response.getStatus(),
×
958
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
×
959
      } else {
960
        logger.log(ChannelLogLevel.DEBUG,
1✔
961
            "RLS response had no data, return a PickResult with no data");
962
        return PickResult.withNoResult();
1✔
963
      }
964
    }
965

966
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
967

968
    /** Uses Subchannel connected to default target. */
969
    private PickResult useFallback(PickSubchannelArgs args) {
970
      // TODO(creamsoup) wait until lb is ready
971
      startFallbackChildPolicy();
1✔
972
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
973
      if (picker == null) {
1✔
974
        return PickResult.withNoResult();
×
975
      }
976
      return picker.pickSubchannel(args);
1✔
977
    }
978

979
    private void startFallbackChildPolicy() {
980
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
981
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
982
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
1✔
983
      synchronized (lock) {
1✔
984
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
1✔
985
        if (fallbackChildPolicyWrapper != null) {
1✔
986
          return;
1✔
987
        }
988
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
989
      }
1✔
990
    }
1✔
991

992
    // GuardedBy CachingRlsLbClient.lock
993
    void close() {
994
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
995
        logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
1✔
996
        if (fallbackChildPolicyWrapper != null) {
1✔
997
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
998
        }
999
      }
1✔
1000
    }
1✔
1001

1002
    @Override
1003
    public String toString() {
1004
      return MoreObjects.toStringHelper(this)
×
1005
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1006
          .toString();
×
1007
    }
1008
  }
1009

1010
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc