• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19095

11 Mar 2024 08:40PM CUT coverage: 88.305%. Remained the same
#19095

push

github

ejona86
rls: Fix a local and remote race

The local race passes `rlsPicker` to the channel before
CachingRlsLbClient is finished constructing. `RlsPicker` can use
multiple of the fields not yet initialized. This seems not to be
happening in practice, because it appears like it would break things
very loudly (e.g., NPE).

The remote race seems incredibly hard to hit, because it requires an RPC
to complete before the pending data tracking the RPC is added to a map.
But with if a system is at 100% CPU utilization, maybe it can be hit. If
it is hit, all RPCs needing the impacted cache entry will forever be
buffered.

31064 of 35178 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.9
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.util.concurrent.ListenableFuture;
28
import com.google.common.util.concurrent.SettableFuture;
29
import io.grpc.ChannelLogger;
30
import io.grpc.ChannelLogger.ChannelLogLevel;
31
import io.grpc.ConnectivityState;
32
import io.grpc.LoadBalancer.Helper;
33
import io.grpc.LoadBalancer.PickResult;
34
import io.grpc.LoadBalancer.PickSubchannelArgs;
35
import io.grpc.LoadBalancer.ResolvedAddresses;
36
import io.grpc.LoadBalancer.SubchannelPicker;
37
import io.grpc.ManagedChannel;
38
import io.grpc.ManagedChannelBuilder;
39
import io.grpc.Metadata;
40
import io.grpc.Status;
41
import io.grpc.SynchronizationContext;
42
import io.grpc.SynchronizationContext.ScheduledHandle;
43
import io.grpc.internal.BackoffPolicy;
44
import io.grpc.internal.ExponentialBackoffPolicy;
45
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
46
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
47
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
48
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
49
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
50
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
51
import io.grpc.rls.LruCache.EvictionListener;
52
import io.grpc.rls.LruCache.EvictionType;
53
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
54
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
55
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
56
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
57
import io.grpc.rls.Throttler.ThrottledException;
58
import io.grpc.stub.StreamObserver;
59
import io.grpc.util.ForwardingLoadBalancerHelper;
60
import java.net.URI;
61
import java.net.URISyntaxException;
62
import java.util.HashMap;
63
import java.util.List;
64
import java.util.Map;
65
import java.util.concurrent.ScheduledExecutorService;
66
import java.util.concurrent.TimeUnit;
67
import javax.annotation.CheckReturnValue;
68
import javax.annotation.Nullable;
69
import javax.annotation.concurrent.GuardedBy;
70
import javax.annotation.concurrent.ThreadSafe;
71

72
/**
73
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
74
 * routing by fetching the decision from route lookup server. Every single request is routed by
75
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
76
 */
77
@ThreadSafe
78
final class CachingRlsLbClient {
79

80
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
81
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
82
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
83
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
84
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
85
  public static final int BYTES_PER_CHAR = 2;
86
  public static final int STRING_OVERHEAD_BYTES = 38;
87
  /** Minimum bytes for a Java Object. */
88
  public static final int OBJ_OVERHEAD_B = 16;
89

90
  // All cache status changes (pending, backoff, success) must be under this lock
91
  private final Object lock = new Object();
1✔
92
  // LRU cache based on access order (BACKOFF and actual data will be here)
93
  @GuardedBy("lock")
94
  private final RlsAsyncLruCache linkedHashLruCache;
95
  // any RPC on the fly will cached in this map
96
  @GuardedBy("lock")
1✔
97
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
98

99
  private final SynchronizationContext synchronizationContext;
100
  private final ScheduledExecutorService scheduledExecutorService;
101
  private final Ticker ticker;
102
  private final Throttler throttler;
103

104
  private final LbPolicyConfiguration lbPolicyConfig;
105
  private final BackoffPolicy.Provider backoffProvider;
106
  private final long maxAgeNanos;
107
  private final long staleAgeNanos;
108
  private final long callTimeoutNanos;
109

110
  private final RlsLbHelper helper;
111
  private final ManagedChannel rlsChannel;
112
  private final RouteLookupServiceStub rlsStub;
113
  private final RlsPicker rlsPicker;
114
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
115
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
116
  private final ChannelLogger logger;
117

118
  private CachingRlsLbClient(Builder builder) {
1✔
119
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
120
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
121
    synchronizationContext = helper.getSynchronizationContext();
1✔
122
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
123
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
124
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
125
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
126
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
127
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
128
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
129
    linkedHashLruCache =
1✔
130
        new RlsAsyncLruCache(
131
            rlsConfig.cacheSizeBytes(),
1✔
132
            builder.evictionListener,
1✔
133
            scheduledExecutorService,
134
            ticker,
135
            lock);
136
    logger = helper.getChannelLogger();
1✔
137
    String serverHost = null;
1✔
138
    try {
139
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
140
    } catch (URISyntaxException ignore) {
×
141
      // handled by the following null check
142
    }
1✔
143
    if (serverHost == null) {
1✔
144
      logger.log(
×
145
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
146
      serverHost = helper.getAuthority();
×
147
    }
148
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
149
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
150
    rlsPicker = new RlsPicker(requestFactory);
1✔
151
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
152
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
153
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
154
    // called to impose the authority security restrictions.
155
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
156
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
157
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
158
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
159
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
160
    if (routeLookupChannelServiceConfig != null) {
1✔
161
      logger.log(
1✔
162
          ChannelLogLevel.DEBUG,
163
          "RLS channel service config: {0}",
164
          routeLookupChannelServiceConfig);
165
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
166
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
167
    }
168
    rlsChannel = rlsChannelBuilder.build();
1✔
169
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
170
    childLbResolvedAddressFactory =
1✔
171
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
172
    backoffProvider = builder.backoffProvider;
1✔
173
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
174
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
175
    refCountedChildPolicyWrapperFactory =
1✔
176
        new RefCountedChildPolicyWrapperFactory(
177
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
178
            childLbHelperProvider,
179
            new BackoffRefreshListener());
180
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
181
  }
1✔
182

183
  /**
184
   * Convert the status to UNAVAILBLE and enhance the error message.
185
   * @param status status as provided by server
186
   * @param serverName Used for error description
187
   * @return Transformed status
188
   */
189
  static Status convertRlsServerStatus(Status status, String serverName) {
190
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
191
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
192
                + "RLS server returned: %s: %s",
193
            serverName, status.getCode(), status.getDescription()));
1✔
194
  }
195

196
  @CheckReturnValue
197
  private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
198
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
199
    if (throttler.shouldThrottle()) {
1✔
200
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
201
      response.setException(new ThrottledException());
1✔
202
      return response;
1✔
203
    }
204
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
205
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
206
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
207
        .routeLookup(
1✔
208
            routeLookupRequest,
209
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
210
              @Override
211
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
212
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
213
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
214
              }
1✔
215

216
              @Override
217
              public void onError(Throwable t) {
218
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
219
                response.setException(t);
1✔
220
                throttler.registerBackendResponse(true);
1✔
221
                helper.propagateRlsError();
1✔
222
              }
1✔
223

224
              @Override
225
              public void onCompleted() {
226
                throttler.registerBackendResponse(false);
1✔
227
              }
1✔
228
            });
229
    return response;
1✔
230
  }
231

232
  /**
233
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
234
   * cached, pending and backed-off due to error. The result remains same even if the status is
235
   * changed after the return.
236
   */
237
  @CheckReturnValue
238
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
239
    synchronized (lock) {
1✔
240
      final CacheEntry cacheEntry;
241
      cacheEntry = linkedHashLruCache.read(request);
1✔
242
      if (cacheEntry == null) {
1✔
243
        return handleNewRequest(request);
1✔
244
      }
245

246
      if (cacheEntry instanceof DataCacheEntry) {
1✔
247
        // cache hit, initiate async-refresh if entry is staled
248
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
249
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
250
        if (dataEntry.isStaled(ticker.read())) {
1✔
251
          dataEntry.maybeRefresh();
1✔
252
        }
253
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
254
      }
255
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
256
    }
257
  }
258

259
  /** Performs any pending maintenance operations needed by the cache. */
260
  void close() {
261
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
262
    synchronized (lock) {
1✔
263
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
264
      linkedHashLruCache.close();
1✔
265
      // TODO(creamsoup) maybe cancel all pending requests
266
      pendingCallCache.clear();
1✔
267
      rlsChannel.shutdownNow();
1✔
268
      rlsPicker.close();
1✔
269
    }
1✔
270
  }
1✔
271

272
  /**
273
   * Populates async cache entry for new request. This is only methods directly modifies the cache,
274
   * any status change is happening via event (async request finished, timed out, etc) in {@link
275
   * PendingCacheEntry}, {@link DataCacheEntry} and {@link BackoffCacheEntry}.
276
   */
277
  private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) {
278
    synchronized (lock) {
1✔
279
      PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
280
      if (pendingEntry != null) {
1✔
281
        return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
282
      }
283

284
      ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
1✔
285
      if (!asyncCall.isDone()) {
1✔
286
        pendingEntry = new PendingCacheEntry(request, asyncCall);
1✔
287
        // Add the entry to the map before adding the Listener, because the listener removes the
288
        // entry from the map
289
        pendingCallCache.put(request, pendingEntry);
1✔
290
        // Beware that the listener can run immediately on the current thread
291
        asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
1✔
292
        return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
293
      } else {
294
        // async call returned finished future is most likely throttled
295
        try {
296
          RouteLookupResponse response = asyncCall.get();
1✔
297
          DataCacheEntry dataEntry = new DataCacheEntry(request, response);
1✔
298
          linkedHashLruCache.cacheAndClean(request, dataEntry);
1✔
299
          return CachedRouteLookupResponse.dataEntry(dataEntry);
1✔
300
        } catch (Exception e) {
1✔
301
          BackoffCacheEntry backoffEntry =
1✔
302
              new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
1✔
303
          linkedHashLruCache.cacheAndClean(request, backoffEntry);
1✔
304
          return CachedRouteLookupResponse.backoffEntry(backoffEntry);
1✔
305
        }
306
      }
307
    }
308
  }
309

310
  void requestConnection() {
311
    rlsChannel.getState(true);
×
312
  }
×
313

314
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
315

316
    final Helper helper;
317
    private ConnectivityState state;
318
    private SubchannelPicker picker;
319

320
    RlsLbHelper(Helper helper) {
1✔
321
      this.helper = helper;
1✔
322
    }
1✔
323

324
    @Override
325
    protected Helper delegate() {
326
      return helper;
1✔
327
    }
328

329
    @Override
330
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
331
      state = newState;
1✔
332
      picker = newPicker;
1✔
333
      super.updateBalancingState(newState, newPicker);
1✔
334
    }
1✔
335

336
    void propagateRlsError() {
337
      getSynchronizationContext().execute(new Runnable() {
1✔
338
        @Override
339
        public void run() {
340
          if (picker != null) {
1✔
341
            // Refresh the channel state and let pending RPCs reprocess the picker.
342
            updateBalancingState(state, picker);
1✔
343
          }
344
        }
1✔
345
      });
346
    }
1✔
347

348
    void triggerPendingRpcProcessing() {
349
      super.updateBalancingState(state, picker);
×
350
    }
×
351
  }
352

353
  /**
354
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
355
   */
356
  static final class CachedRouteLookupResponse {
357
    // Should only have 1 of following 3 cache entries
358
    @Nullable
359
    private final DataCacheEntry dataCacheEntry;
360
    @Nullable
361
    private final PendingCacheEntry pendingCacheEntry;
362
    @Nullable
363
    private final BackoffCacheEntry backoffCacheEntry;
364

365
    CachedRouteLookupResponse(
366
        DataCacheEntry dataCacheEntry,
367
        PendingCacheEntry pendingCacheEntry,
368
        BackoffCacheEntry backoffCacheEntry) {
1✔
369
      this.dataCacheEntry = dataCacheEntry;
1✔
370
      this.pendingCacheEntry = pendingCacheEntry;
1✔
371
      this.backoffCacheEntry = backoffCacheEntry;
1✔
372
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
373
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
374
          "Expected only 1 cache entry value provided");
375
    }
1✔
376

377
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
378
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
379
    }
380

381
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
382
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
383
    }
384

385
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
386
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
387
    }
388

389
    boolean hasData() {
390
      return dataCacheEntry != null;
1✔
391
    }
392

393
    @Nullable
394
    ChildPolicyWrapper getChildPolicyWrapper() {
395
      if (!hasData()) {
1✔
396
        return null;
×
397
      }
398
      return dataCacheEntry.getChildPolicyWrapper();
1✔
399
    }
400

401
    @VisibleForTesting
402
    @Nullable
403
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
404
      if (!hasData()) {
1✔
405
        return null;
×
406
      }
407
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
408
    }
409

410
    @Nullable
411
    String getHeaderData() {
412
      if (!hasData()) {
1✔
413
        return null;
1✔
414
      }
415
      return dataCacheEntry.getHeaderData();
1✔
416
    }
417

418
    boolean hasError() {
419
      return backoffCacheEntry != null;
1✔
420
    }
421

422
    boolean isPending() {
423
      return pendingCacheEntry != null;
1✔
424
    }
425

426
    @Nullable
427
    Status getStatus() {
428
      if (!hasError()) {
×
429
        return null;
×
430
      }
431
      return backoffCacheEntry.getStatus();
×
432
    }
433

434
    @Override
435
    public String toString() {
436
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
437
      if (dataCacheEntry != null) {
×
438
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
439
      }
440
      if (pendingCacheEntry != null) {
×
441
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
442
      }
443
      if (backoffCacheEntry != null) {
×
444
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
445
      }
446
      return toStringHelper.toString();
×
447
    }
448
  }
449

450
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
451
  final class PendingCacheEntry {
452
    private final ListenableFuture<RouteLookupResponse> pendingCall;
453
    private final RouteLookupRequest request;
454
    private final BackoffPolicy backoffPolicy;
455

456
    PendingCacheEntry(
457
        RouteLookupRequest request, ListenableFuture<RouteLookupResponse> pendingCall) {
458
      this(request, pendingCall, null);
1✔
459
    }
1✔
460

461
    PendingCacheEntry(
462
        RouteLookupRequest request,
463
        ListenableFuture<RouteLookupResponse> pendingCall,
464
        @Nullable BackoffPolicy backoffPolicy) {
1✔
465
      this.request = checkNotNull(request, "request");
1✔
466
      this.pendingCall = pendingCall;
1✔
467
      this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy;
1✔
468
    }
1✔
469

470
    void handleDoneFuture() {
471
      synchronized (lock) {
1✔
472
        pendingCallCache.remove(request);
1✔
473
        if (pendingCall.isCancelled()) {
1✔
474
          return;
×
475
        }
476

477
        try {
478
          transitionToDataEntry(pendingCall.get());
1✔
479
        } catch (Exception e) {
1✔
480
          if (e instanceof ThrottledException) {
1✔
481
            transitionToBackOff(Status.RESOURCE_EXHAUSTED.withCause(e));
×
482
          } else {
483
            transitionToBackOff(Status.fromThrowable(e));
1✔
484
          }
485
        }
1✔
486
      }
1✔
487
    }
1✔
488

489
    private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) {
490
      synchronized (lock) {
1✔
491
        logger.log(
1✔
492
            ChannelLogLevel.DEBUG,
493
            "Transition to data cache: routeLookupResponse={0}",
494
            routeLookupResponse);
495
        linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, routeLookupResponse));
1✔
496
      }
1✔
497
    }
1✔
498

499
    private void transitionToBackOff(Status status) {
500
      synchronized (lock) {
1✔
501
        logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
502
        linkedHashLruCache.cacheAndClean(request,
1✔
503
            new BackoffCacheEntry(request, status, backoffPolicy));
504
      }
1✔
505
    }
1✔
506

507
    @Override
508
    public String toString() {
509
      return MoreObjects.toStringHelper(this)
×
510
          .add("request", request)
×
511
          .toString();
×
512
    }
513
  }
514

515
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
516
  abstract class CacheEntry {
517

518
    protected final RouteLookupRequest request;
519

520
    CacheEntry(RouteLookupRequest request) {
1✔
521
      this.request = checkNotNull(request, "request");
1✔
522
    }
1✔
523

524
    abstract int getSizeBytes();
525

526
    final boolean isExpired() {
527
      return isExpired(ticker.read());
1✔
528
    }
529

530
    abstract boolean isExpired(long now);
531

532
    abstract void cleanup();
533

534
    protected long getMinEvictionTime() {
535
      return 0L;
×
536
    }
537

538
    protected void triggerPendingRpcProcessing() {
539
      helper.triggerPendingRpcProcessing();
×
540
    }
×
541
  }
542

543
  /** Implementation of {@link CacheEntry} contains valid data. */
544
  final class DataCacheEntry extends CacheEntry {
545
    private final RouteLookupResponse response;
546
    private final long minEvictionTime;
547
    private final long expireTime;
548
    private final long staleTime;
549
    private final List<ChildPolicyWrapper> childPolicyWrappers;
550

551
    // GuardedBy CachingRlsLbClient.lock
552
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
553
      super(request);
1✔
554
      this.response = checkNotNull(response, "response");
1✔
555
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
556
      childPolicyWrappers =
1✔
557
          refCountedChildPolicyWrapperFactory
1✔
558
              .createOrGet(response.targets());
1✔
559
      long now = ticker.read();
1✔
560
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
561
      expireTime = now + maxAgeNanos;
1✔
562
      staleTime = now + staleAgeNanos;
1✔
563
    }
1✔
564

565
    /**
566
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
567
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
568
     * data still exists. Flow looks like following.
569
     *
570
     * <pre>
571
     * Timeline                       | async refresh
572
     *                                V put new cache (entry2)
573
     * entry1: Pending | hasValue | staled  |
574
     * entry2:                        | OV* | pending | hasValue | staled |
575
     *
576
     * OV: old value
577
     * </pre>
578
     */
579
    void maybeRefresh() {
580
      synchronized (lock) {
1✔
581
        if (pendingCallCache.containsKey(request)) {
1✔
582
          // pending already requested
583
          return;
×
584
        }
585
        final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
1✔
586
        if (!asyncCall.isDone()) {
1✔
587
          PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall);
1✔
588
          pendingCallCache.put(request, pendingEntry);
1✔
589
          asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
1✔
590
        } else {
1✔
591
          // async call returned finished future is most likely throttled
592
          try {
593
            RouteLookupResponse response = asyncCall.get();
×
594
            linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
×
595
          } catch (InterruptedException e) {
×
596
            Thread.currentThread().interrupt();
×
597
          } catch (Exception e) {
×
598
            BackoffCacheEntry backoffEntry =
×
599
                new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
×
600
            linkedHashLruCache.cacheAndClean(request, backoffEntry);
×
601
          }
×
602
        }
603
      }
1✔
604
    }
1✔
605

606
    @VisibleForTesting
607
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
608
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
609
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
610
          return childPolicyWrapper;
1✔
611
        }
612
      }
1✔
613

614
      throw new RuntimeException("Target not found:" + target);
×
615
    }
616

617
    @Nullable
618
    ChildPolicyWrapper getChildPolicyWrapper() {
619
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
620
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
621
          return childPolicyWrapper;
1✔
622
        }
623
      }
1✔
624
      return childPolicyWrappers.get(0);
1✔
625
    }
626

627
    String getHeaderData() {
628
      return response.getHeaderData();
1✔
629
    }
630

631
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
632
    int calcStringSize(String target) {
633
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
634
    }
635

636
    @Override
637
    int getSizeBytes() {
638
      int targetSize = 0;
1✔
639
      for (String target : response.targets()) {
1✔
640
        targetSize += calcStringSize(target);
1✔
641
      }
1✔
642
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
643
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
644
    }
645

646
    @Override
647
    boolean isExpired(long now) {
648
      return expireTime - now <= 0;
1✔
649
    }
650

651
    boolean isStaled(long now) {
652
      return staleTime - now <= 0;
1✔
653
    }
654

655
    @Override
656
    protected long getMinEvictionTime() {
657
      return minEvictionTime;
×
658
    }
659

660
    @Override
661
    void cleanup() {
662
      synchronized (lock) {
1✔
663
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
664
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
665
        }
1✔
666
      }
1✔
667
    }
1✔
668

669
    @Override
670
    public String toString() {
671
      return MoreObjects.toStringHelper(this)
×
672
          .add("request", request)
×
673
          .add("response", response)
×
674
          .add("expireTime", expireTime)
×
675
          .add("staleTime", staleTime)
×
676
          .add("childPolicyWrappers", childPolicyWrappers)
×
677
          .toString();
×
678
    }
679
  }
680

681
  /**
682
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
683
   * status when the backoff time is expired.
684
   */
685
  private final class BackoffCacheEntry extends CacheEntry {
686

687
    private final Status status;
688
    private final ScheduledHandle scheduledHandle;
689
    private final BackoffPolicy backoffPolicy;
690
    private final long expireNanos;
691
    private boolean shutdown = false;
1✔
692

693
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
1✔
694
      super(request);
1✔
695
      this.status = checkNotNull(status, "status");
1✔
696
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
697
      long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
698
      this.expireNanos = ticker.read() + delayNanos;
1✔
699
      this.scheduledHandle =
1✔
700
          synchronizationContext.schedule(
1✔
701
              new Runnable() {
1✔
702
                @Override
703
                public void run() {
704
                  transitionToPending();
1✔
705
                }
1✔
706
              },
707
              delayNanos,
708
              TimeUnit.NANOSECONDS,
709
              scheduledExecutorService);
1✔
710
    }
1✔
711

712
    /** Forcefully refreshes cache entry by ignoring the backoff timer. */
713
    void forceRefresh() {
714
      if (scheduledHandle.isPending()) {
×
715
        scheduledHandle.cancel();
×
716
        transitionToPending();
×
717
      }
718
    }
×
719

720
    private void transitionToPending() {
721
      synchronized (lock) {
1✔
722
        if (shutdown) {
1✔
723
          return;
×
724
        }
725
        ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
1✔
726
        if (!call.isDone()) {
1✔
727
          linkedHashLruCache.invalidate(request);
1✔
728
          PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
1✔
729
          pendingCallCache.put(request, pendingEntry);
1✔
730
          call.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
1✔
731
        } else {
1✔
732
          try {
733
            RouteLookupResponse response = call.get();
1✔
734
            linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
1✔
735
          } catch (InterruptedException e) {
×
736
            Thread.currentThread().interrupt();
×
737
          } catch (Exception e) {
1✔
738
            linkedHashLruCache.cacheAndClean(
1✔
739
                request,
740
                new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
1✔
741
          }
1✔
742
        }
743
      }
1✔
744
    }
1✔
745

746
    Status getStatus() {
747
      return status;
×
748
    }
749

750
    @Override
751
    int getSizeBytes() {
752
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
753
    }
754

755
    @Override
756
    boolean isExpired(long now) {
757
      return expireNanos - now <= 0;
1✔
758
    }
759

760
    @Override
761
    void cleanup() {
762
      if (shutdown) {
1✔
763
        return;
×
764
      }
765
      shutdown = true;
1✔
766
      if (!scheduledHandle.isPending()) {
1✔
767
        scheduledHandle.cancel();
1✔
768
      }
769
    }
1✔
770

771
    @Override
772
    public String toString() {
773
      return MoreObjects.toStringHelper(this)
×
774
          .add("request", request)
×
775
          .add("status", status)
×
776
          .toString();
×
777
    }
778
  }
779

780
  /** Returns a Builder for {@link CachingRlsLbClient}. */
781
  static Builder newBuilder() {
782
    return new Builder();
1✔
783
  }
784

785
  /** A Builder for {@link CachingRlsLbClient}. */
786
  static final class Builder {
1✔
787

788
    private Helper helper;
789
    private LbPolicyConfiguration lbPolicyConfig;
790
    private Throttler throttler = new HappyThrottler();
1✔
791
    private ResolvedAddressFactory resolvedAddressFactory;
792
    private Ticker ticker = Ticker.systemTicker();
1✔
793
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
794
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
795

796
    Builder setHelper(Helper helper) {
797
      this.helper = checkNotNull(helper, "helper");
1✔
798
      return this;
1✔
799
    }
800

801
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
802
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
803
      return this;
1✔
804
    }
805

806
    Builder setThrottler(Throttler throttler) {
807
      this.throttler = checkNotNull(throttler, "throttler");
1✔
808
      return this;
1✔
809
    }
810

811
    /**
812
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
813
     */
814
    Builder setResolvedAddressesFactory(
815
        ResolvedAddressFactory resolvedAddressFactory) {
816
      this.resolvedAddressFactory =
1✔
817
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
818
      return this;
1✔
819
    }
820

821
    Builder setTicker(Ticker ticker) {
822
      this.ticker = checkNotNull(ticker, "ticker");
1✔
823
      return this;
1✔
824
    }
825

826
    Builder setEvictionListener(
827
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
828
      this.evictionListener = evictionListener;
1✔
829
      return this;
1✔
830
    }
831

832
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
833
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
834
      return this;
1✔
835
    }
836

837
    CachingRlsLbClient build() {
838
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
839
      helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
1✔
840
      return client;
1✔
841
    }
842
  }
843

844
  /**
845
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
846
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
847
   */
848
  private static final class AutoCleaningEvictionListener
849
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
850

851
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
852

853
    AutoCleaningEvictionListener(
854
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
855
      this.delegate = delegate;
1✔
856
    }
1✔
857

858
    @Override
859
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
860
      if (delegate != null) {
1✔
861
        delegate.onEviction(key, value, cause);
1✔
862
      }
863
      // performs cleanup after delegation
864
      value.cleanup();
1✔
865
    }
1✔
866
  }
867

868
  /** A Throttler never throttles. */
869
  private static final class HappyThrottler implements Throttler {
870

871
    @Override
872
    public boolean shouldThrottle() {
873
      return false;
×
874
    }
875

876
    @Override
877
    public void registerBackendResponse(boolean throttled) {
878
      // no-op
879
    }
×
880
  }
881

882
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
883
  private static final class RlsAsyncLruCache
884
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
885

886
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
887
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
888
        ScheduledExecutorService ses, Ticker ticker, Object lock) {
889
      super(
1✔
890
          maxEstimatedSizeBytes,
891
          new AutoCleaningEvictionListener(evictionListener),
892
          1,
893
          TimeUnit.MINUTES,
894
          ses,
895
          ticker,
896
          lock);
897
    }
1✔
898

899
    @Override
900
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
901
      return value.isExpired();
1✔
902
    }
903

904
    @Override
905
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
906
      return value.getSizeBytes();
1✔
907
    }
908

909
    @Override
910
    protected boolean shouldInvalidateEldestEntry(
911
        RouteLookupRequest eldestKey, CacheEntry eldestValue) {
912
      if (eldestValue.getMinEvictionTime() > now()) {
×
913
        return false;
×
914
      }
915

916
      // eldest entry should be evicted if size limit exceeded
917
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
918
    }
919

920
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
921
      CacheEntry newEntry = cache(key, value);
1✔
922

923
      // force cleanup if new entry pushed cache over max size (in bytes)
924
      if (fitToLimit()) {
1✔
925
        value.triggerPendingRpcProcessing();
×
926
      }
927
      return newEntry;
1✔
928
    }
929
  }
930

931
  /**
932
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
933
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
934
   */
935
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
936

937
    @Nullable
1✔
938
    private ConnectivityState prevState = null;
939

940
    @Override
941
    public void onStatusChanged(ConnectivityState newState) {
942
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
943
          && newState == ConnectivityState.READY) {
944
        synchronized (lock) {
1✔
945
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
946
            if (value instanceof BackoffCacheEntry) {
1✔
947
              ((BackoffCacheEntry) value).forceRefresh();
×
948
            }
949
          }
1✔
950
        }
1✔
951
      }
952
      prevState = newState;
1✔
953
    }
1✔
954
  }
955

956
  /** A header will be added when RLS server respond with additional header data. */
957
  @VisibleForTesting
958
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
959
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
960

961
  final class RlsPicker extends SubchannelPicker {
962

963
    private final RlsRequestFactory requestFactory;
964

965
    RlsPicker(RlsRequestFactory requestFactory) {
1✔
966
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
967
    }
1✔
968

969
    @Override
970
    public PickResult pickSubchannel(PickSubchannelArgs args) {
971
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
972
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
973
      RouteLookupRequest request =
1✔
974
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
975
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
976
      logger.log(ChannelLogLevel.DEBUG,
1✔
977
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
978
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
979

980
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
981
        Metadata headers = args.getHeaders();
1✔
982
        headers.discardAll(RLS_DATA_KEY);
1✔
983
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
984
      }
985
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
986
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
987
      if (response.hasData()) {
1✔
988
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
989
        SubchannelPicker picker =
990
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
991
        if (picker == null) {
1✔
992
          return PickResult.withNoResult();
×
993
        }
994
        // Happy path
995
        return picker.pickSubchannel(args);
1✔
996
      } else if (response.hasError()) {
1✔
997
        if (hasFallback) {
1✔
998
          return useFallback(args);
1✔
999
        }
1000
        return PickResult.withError(
×
1001
            convertRlsServerStatus(response.getStatus(),
×
1002
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
×
1003
      } else {
1004
        return PickResult.withNoResult();
1✔
1005
      }
1006
    }
1007

1008
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
1009

1010
    /** Uses Subchannel connected to default target. */
1011
    private PickResult useFallback(PickSubchannelArgs args) {
1012
      // TODO(creamsoup) wait until lb is ready
1013
      startFallbackChildPolicy();
1✔
1014
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1015
      if (picker == null) {
1✔
1016
        return PickResult.withNoResult();
×
1017
      }
1018
      return picker.pickSubchannel(args);
1✔
1019
    }
1020

1021
    private void startFallbackChildPolicy() {
1022
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1023
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1024
      synchronized (lock) {
1✔
1025
        if (fallbackChildPolicyWrapper != null) {
1✔
1026
          return;
1✔
1027
        }
1028
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1029
      }
1✔
1030
    }
1✔
1031

1032
    // GuardedBy CachingRlsLbClient.lock
1033
    void close() {
1034
      if (fallbackChildPolicyWrapper != null) {
1✔
1035
        refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1036
      }
1037
    }
1✔
1038

1039
    @Override
1040
    public String toString() {
1041
      return MoreObjects.toStringHelper(this)
×
1042
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1043
          .toString();
×
1044
    }
1045
  }
1046

1047
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc