• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19183

01 May 2024 06:24PM UTC coverage: 88.319% (+0.004%) from 88.315%
#19183

push

github

web-flow
rls: add counter metrics (#11138)

Adds the following metrics to the RlsLoadBalancer:
- grpc.lb.rls.default_target_picks
- grpc.lb.rls.target_picks
- grpc.lb.rls.failed_picks

31475 of 35638 relevant lines covered (88.32%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.08
/../rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.rls;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.Converter;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.MoreObjects.ToStringHelper;
26
import com.google.common.base.Ticker;
27
import com.google.common.collect.Lists;
28
import com.google.common.util.concurrent.Futures;
29
import com.google.common.util.concurrent.ListenableFuture;
30
import com.google.common.util.concurrent.MoreExecutors;
31
import com.google.common.util.concurrent.SettableFuture;
32
import io.grpc.ChannelLogger;
33
import io.grpc.ChannelLogger.ChannelLogLevel;
34
import io.grpc.ConnectivityState;
35
import io.grpc.LoadBalancer.Helper;
36
import io.grpc.LoadBalancer.PickResult;
37
import io.grpc.LoadBalancer.PickSubchannelArgs;
38
import io.grpc.LoadBalancer.ResolvedAddresses;
39
import io.grpc.LoadBalancer.SubchannelPicker;
40
import io.grpc.LongCounterMetricInstrument;
41
import io.grpc.ManagedChannel;
42
import io.grpc.ManagedChannelBuilder;
43
import io.grpc.Metadata;
44
import io.grpc.MetricInstrumentRegistry;
45
import io.grpc.Status;
46
import io.grpc.internal.BackoffPolicy;
47
import io.grpc.internal.ExponentialBackoffPolicy;
48
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
49
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
50
import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
51
import io.grpc.rls.LbPolicyConfiguration.ChildLbStatusListener;
52
import io.grpc.rls.LbPolicyConfiguration.ChildPolicyWrapper;
53
import io.grpc.rls.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
54
import io.grpc.rls.LruCache.EvictionListener;
55
import io.grpc.rls.LruCache.EvictionType;
56
import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
57
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
58
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
59
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
60
import io.grpc.stub.StreamObserver;
61
import io.grpc.util.ForwardingLoadBalancerHelper;
62
import java.net.URI;
63
import java.net.URISyntaxException;
64
import java.util.HashMap;
65
import java.util.List;
66
import java.util.Map;
67
import java.util.concurrent.Future;
68
import java.util.concurrent.ScheduledExecutorService;
69
import java.util.concurrent.TimeUnit;
70
import javax.annotation.CheckReturnValue;
71
import javax.annotation.Nullable;
72
import javax.annotation.concurrent.GuardedBy;
73
import javax.annotation.concurrent.ThreadSafe;
74

75
/**
76
 * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
77
 * routing by fetching the decision from route lookup server. Every single request is routed by
78
 * the server's decision. To reduce the performance penalty, {@link LruCache} is used.
79
 */
80
@ThreadSafe
81
final class CachingRlsLbClient {
82

83
  private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
84
      REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
1✔
85
  private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
86
      RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
1✔
87
  public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
1✔
88
  public static final int BYTES_PER_CHAR = 2;
89
  public static final int STRING_OVERHEAD_BYTES = 38;
90
  /** Minimum bytes for a Java Object. */
91
  public static final int OBJ_OVERHEAD_B = 16;
92

93
  private static final LongCounterMetricInstrument DEFAULT_TARGET_PICKS_COUNTER;
94
  private static final LongCounterMetricInstrument TARGET_PICKS_COUNTER;
95
  private static final LongCounterMetricInstrument FAILED_PICKS_COUNTER;
96

97
  // All cache status changes (pending, backoff, success) must be under this lock
98
  private final Object lock = new Object();
1✔
99
  // LRU cache based on access order (BACKOFF and actual data will be here)
100
  @GuardedBy("lock")
101
  private final RlsAsyncLruCache linkedHashLruCache;
102
  // any RPC on the fly will cached in this map
103
  @GuardedBy("lock")
1✔
104
  private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
105

106
  private final ScheduledExecutorService scheduledExecutorService;
107
  private final Ticker ticker;
108
  private final Throttler throttler;
109

110
  private final LbPolicyConfiguration lbPolicyConfig;
111
  private final BackoffPolicy.Provider backoffProvider;
112
  private final long maxAgeNanos;
113
  private final long staleAgeNanos;
114
  private final long callTimeoutNanos;
115

116
  private final RlsLbHelper helper;
117
  private final ManagedChannel rlsChannel;
118
  private final RouteLookupServiceStub rlsStub;
119
  private final RlsPicker rlsPicker;
120
  private final ResolvedAddressFactory childLbResolvedAddressFactory;
121
  @GuardedBy("lock")
122
  private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
123
  private final ChannelLogger logger;
124

125
  static {
126
    MetricInstrumentRegistry metricInstrumentRegistry
127
        = MetricInstrumentRegistry.getDefaultRegistry();
1✔
128
    DEFAULT_TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter(
1✔
129
        "grpc.lb.rls.default_target_picks", "Number of LB picks sent to the default target", "pick",
130
        Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target",
1✔
131
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Lists.newArrayList(), true);
1✔
132
    TARGET_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.target_picks",
1✔
133
        "Number of LB picks sent to each RLS target", "pick",
134
        Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target",
1✔
135
            "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"), Lists.newArrayList(), true);
1✔
136
    FAILED_PICKS_COUNTER = metricInstrumentRegistry.registerLongCounter("grpc.lb.rls.failed_picks",
1✔
137
        "Number of LB picks failed due to either a failed RLS request or the RLS channel being "
138
            + "throttled", "pick", Lists.newArrayList("grpc.target", "grpc.lb.rls.server_target"),
1✔
139
        Lists.newArrayList(), true);
1✔
140
  }
141

142
  private CachingRlsLbClient(Builder builder) {
1✔
143
    helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
1✔
144
    scheduledExecutorService = helper.getScheduledExecutorService();
1✔
145
    lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
1✔
146
    RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
1✔
147
    maxAgeNanos = rlsConfig.maxAgeInNanos();
1✔
148
    staleAgeNanos = rlsConfig.staleAgeInNanos();
1✔
149
    callTimeoutNanos = rlsConfig.lookupServiceTimeoutInNanos();
1✔
150
    ticker = checkNotNull(builder.ticker, "ticker");
1✔
151
    throttler = checkNotNull(builder.throttler, "throttler");
1✔
152
    linkedHashLruCache =
1✔
153
        new RlsAsyncLruCache(
154
            rlsConfig.cacheSizeBytes(),
1✔
155
            new AutoCleaningEvictionListener(builder.evictionListener),
1✔
156
            scheduledExecutorService,
157
            ticker,
158
            lock,
159
            helper);
160
    logger = helper.getChannelLogger();
1✔
161
    String serverHost = null;
1✔
162
    try {
163
      serverHost = new URI(null, helper.getAuthority(), null, null, null).getHost();
1✔
164
    } catch (URISyntaxException ignore) {
×
165
      // handled by the following null check
166
    }
1✔
167
    if (serverHost == null) {
1✔
168
      logger.log(
×
169
          ChannelLogLevel.DEBUG, "Can not get hostname from authority: {0}", helper.getAuthority());
×
170
      serverHost = helper.getAuthority();
×
171
    }
172
    RlsRequestFactory requestFactory = new RlsRequestFactory(
1✔
173
        lbPolicyConfig.getRouteLookupConfig(), serverHost);
1✔
174
    rlsPicker = new RlsPicker(requestFactory, rlsConfig.lookupService());
1✔
175
    // It is safe to use helper.getUnsafeChannelCredentials() because the client authenticates the
176
    // RLS server using the same authority as the backends, even though the RLS server’s addresses
177
    // will be looked up differently than the backends; overrideAuthority(helper.getAuthority()) is
178
    // called to impose the authority security restrictions.
179
    ManagedChannelBuilder<?> rlsChannelBuilder = helper.createResolvingOobChannelBuilder(
1✔
180
        rlsConfig.lookupService(), helper.getUnsafeChannelCredentials());
1✔
181
    rlsChannelBuilder.overrideAuthority(helper.getAuthority());
1✔
182
    Map<String, ?> routeLookupChannelServiceConfig =
1✔
183
        lbPolicyConfig.getRouteLookupChannelServiceConfig();
1✔
184
    if (routeLookupChannelServiceConfig != null) {
1✔
185
      logger.log(
1✔
186
          ChannelLogLevel.DEBUG,
187
          "RLS channel service config: {0}",
188
          routeLookupChannelServiceConfig);
189
      rlsChannelBuilder.defaultServiceConfig(routeLookupChannelServiceConfig);
1✔
190
      rlsChannelBuilder.disableServiceConfigLookUp();
1✔
191
    }
192
    rlsChannel = rlsChannelBuilder.build();
1✔
193
    rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
1✔
194
    childLbResolvedAddressFactory =
1✔
195
        checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
1✔
196
    backoffProvider = builder.backoffProvider;
1✔
197
    ChildLoadBalancerHelperProvider childLbHelperProvider =
1✔
198
        new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
199
    refCountedChildPolicyWrapperFactory =
1✔
200
        new RefCountedChildPolicyWrapperFactory(
201
            lbPolicyConfig.getLoadBalancingPolicy(), childLbResolvedAddressFactory,
1✔
202
            childLbHelperProvider,
203
            new BackoffRefreshListener());
204
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
1✔
205
  }
1✔
206

207
  /**
208
   * Convert the status to UNAVAILBLE and enhance the error message.
209
   * @param status status as provided by server
210
   * @param serverName Used for error description
211
   * @return Transformed status
212
   */
213
  static Status convertRlsServerStatus(Status status, String serverName) {
214
    return Status.UNAVAILABLE.withCause(status.getCause()).withDescription(
1✔
215
        String.format("Unable to retrieve RLS targets from RLS server %s.  "
1✔
216
                + "RLS server returned: %s: %s",
217
            serverName, status.getCode(), status.getDescription()));
1✔
218
  }
219

220
  /** Populates async cache entry for new request. */
221
  @GuardedBy("lock")
222
  private CachedRouteLookupResponse asyncRlsCall(
223
      RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
224
    logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
1✔
225
    if (throttler.shouldThrottle()) {
1✔
226
      logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
1✔
227
      // Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
228
      // on this result
229
      return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
1✔
230
          request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
1✔
231
    }
232
    final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
1✔
233
    io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
1✔
234
    logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
1✔
235
    rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
1✔
236
        .routeLookup(
1✔
237
            routeLookupRequest,
238
            new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
1✔
239
              @Override
240
              public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
241
                logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
1✔
242
                response.set(RESPONSE_CONVERTER.reverse().convert(value));
1✔
243
              }
1✔
244

245
              @Override
246
              public void onError(Throwable t) {
247
                logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
1✔
248
                response.setException(t);
1✔
249
                throttler.registerBackendResponse(true);
1✔
250
              }
1✔
251

252
              @Override
253
              public void onCompleted() {
254
                logger.log(ChannelLogLevel.DEBUG, "routeLookup call completed");
1✔
255
                throttler.registerBackendResponse(false);
1✔
256
              }
1✔
257
            });
258
    return CachedRouteLookupResponse.pendingResponse(
1✔
259
        createPendingEntry(request, response, backoffPolicy));
1✔
260
  }
261

262
  /**
263
   * Returns async response of the {@code request}. The returned value can be in 3 different states;
264
   * cached, pending and backed-off due to error. The result remains same even if the status is
265
   * changed after the return.
266
   */
267
  @CheckReturnValue
268
  final CachedRouteLookupResponse get(final RouteLookupRequest request) {
269
    logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to get cached entry");
1✔
270
    synchronized (lock) {
1✔
271
      logger.log(ChannelLogLevel.DEBUG, "Acquired lock to get cached entry");
1✔
272
      final CacheEntry cacheEntry;
273
      cacheEntry = linkedHashLruCache.read(request);
1✔
274
      if (cacheEntry == null) {
1✔
275
        logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
1✔
276
        PendingCacheEntry pendingEntry = pendingCallCache.get(request);
1✔
277
        if (pendingEntry != null) {
1✔
278
          return CachedRouteLookupResponse.pendingResponse(pendingEntry);
1✔
279
        }
280
        return asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
281
      }
282

283
      if (cacheEntry instanceof DataCacheEntry) {
1✔
284
        // cache hit, initiate async-refresh if entry is staled
285
        logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
1✔
286
        DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
1✔
287
        if (dataEntry.isStaled(ticker.read())) {
1✔
288
          logger.log(ChannelLogLevel.DEBUG, "Cache entry is stale");
1✔
289
          dataEntry.maybeRefresh();
1✔
290
        }
291
        return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
1✔
292
      }
293
      logger.log(ChannelLogLevel.DEBUG, "Cache hit for a backup entry");
1✔
294
      return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
1✔
295
    }
296
  }
297

298
  /** Performs any pending maintenance operations needed by the cache. */
299
  void close() {
300
    logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
1✔
301
    synchronized (lock) {
1✔
302
      // all childPolicyWrapper will be returned via AutoCleaningEvictionListener
303
      linkedHashLruCache.close();
1✔
304
      // TODO(creamsoup) maybe cancel all pending requests
305
      pendingCallCache.clear();
1✔
306
      rlsChannel.shutdownNow();
1✔
307
      rlsPicker.close();
1✔
308
    }
1✔
309
  }
1✔
310

311
  void requestConnection() {
312
    rlsChannel.getState(true);
×
313
  }
×
314

315
  @GuardedBy("lock")
316
  private PendingCacheEntry createPendingEntry(
317
      RouteLookupRequest request,
318
      ListenableFuture<RouteLookupResponse> pendingCall,
319
      @Nullable BackoffPolicy backoffPolicy) {
320
    PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
1✔
321
    // Add the entry to the map before adding the Listener, because the listener removes the
322
    // entry from the map
323
    pendingCallCache.put(request, entry);
1✔
324
    // Beware that the listener can run immediately on the current thread
325
    pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
1✔
326
    return entry;
1✔
327
  }
328

329
  private void pendingRpcComplete(PendingCacheEntry entry) {
330
    synchronized (lock) {
1✔
331
      boolean clientClosed = pendingCallCache.remove(entry.request) == null;
1✔
332
      if (clientClosed) {
1✔
333
        return;
1✔
334
      }
335

336
      try {
337
        createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
1✔
338
        // Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
339
        // reattempt picks when the child LB is done connecting
340
      } catch (Exception e) {
1✔
341
        createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
1✔
342
        // Cache updated. updateBalancingState() to reattempt picks
343
        helper.propagateRlsError();
1✔
344
      }
1✔
345
    }
1✔
346
  }
1✔
347

348
  @GuardedBy("lock")
349
  private DataCacheEntry createDataEntry(
350
      RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
351
    logger.log(
1✔
352
        ChannelLogLevel.DEBUG,
353
        "Transition to data cache: routeLookupResponse={0}",
354
        routeLookupResponse);
355
    DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
1✔
356
    // Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
357
    // this cache update because the lock is held
358
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
359
    return entry;
1✔
360
  }
361

362
  @GuardedBy("lock")
363
  private BackoffCacheEntry createBackOffEntry(
364
      RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
365
    logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
1✔
366
    if (backoffPolicy == null) {
1✔
367
      backoffPolicy = backoffProvider.get();
1✔
368
    }
369
    long delayNanos = backoffPolicy.nextBackoffNanos();
1✔
370
    BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
1✔
371
    // Lock is held, so the task can't execute before the assignment
372
    entry.scheduledFuture = scheduledExecutorService.schedule(
1✔
373
        () -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
1✔
374
    linkedHashLruCache.cacheAndClean(request, entry);
1✔
375
    logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
1✔
376
        delayNanos);
1✔
377
    return entry;
1✔
378
  }
379

380
  private void refreshBackoffEntry(BackoffCacheEntry entry) {
381
    synchronized (lock) {
1✔
382
      // This checks whether the task has been cancelled and prevents a second execution.
383
      if (!entry.scheduledFuture.cancel(false)) {
1✔
384
        // Future was previously cancelled
385
        return;
×
386
      }
387
      logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
1✔
388
      linkedHashLruCache.invalidate(entry.request);
1✔
389
      asyncRlsCall(entry.request, entry.backoffPolicy);
1✔
390
    }
1✔
391
  }
1✔
392

393
  private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
394

395
    final Helper helper;
396
    private ConnectivityState state;
397
    private SubchannelPicker picker;
398

399
    RlsLbHelper(Helper helper) {
1✔
400
      this.helper = helper;
1✔
401
    }
1✔
402

403
    @Override
404
    protected Helper delegate() {
405
      return helper;
1✔
406
    }
407

408
    @Override
409
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
410
      state = newState;
1✔
411
      picker = newPicker;
1✔
412
      super.updateBalancingState(newState, newPicker);
1✔
413
    }
1✔
414

415
    void propagateRlsError() {
416
      getSynchronizationContext().execute(new Runnable() {
1✔
417
        @Override
418
        public void run() {
419
          if (picker != null) {
1✔
420
            // Refresh the channel state and let pending RPCs reprocess the picker.
421
            updateBalancingState(state, picker);
1✔
422
          }
423
        }
1✔
424
      });
425
    }
1✔
426

427
    void triggerPendingRpcProcessing() {
428
      helper.getSynchronizationContext().execute(
×
429
          () -> super.updateBalancingState(state, picker));
×
430
    }
×
431
  }
432

433
  /**
434
   * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
435
   */
436
  static final class CachedRouteLookupResponse {
437
    // Should only have 1 of following 3 cache entries
438
    @Nullable
439
    private final DataCacheEntry dataCacheEntry;
440
    @Nullable
441
    private final PendingCacheEntry pendingCacheEntry;
442
    @Nullable
443
    private final BackoffCacheEntry backoffCacheEntry;
444

445
    CachedRouteLookupResponse(
446
        DataCacheEntry dataCacheEntry,
447
        PendingCacheEntry pendingCacheEntry,
448
        BackoffCacheEntry backoffCacheEntry) {
1✔
449
      this.dataCacheEntry = dataCacheEntry;
1✔
450
      this.pendingCacheEntry = pendingCacheEntry;
1✔
451
      this.backoffCacheEntry = backoffCacheEntry;
1✔
452
      checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
1✔
453
          && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
454
          "Expected only 1 cache entry value provided");
455
    }
1✔
456

457
    static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
458
      return new CachedRouteLookupResponse(null, pendingEntry, null);
1✔
459
    }
460

461
    static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
462
      return new CachedRouteLookupResponse(null, null, backoffEntry);
1✔
463
    }
464

465
    static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
466
      return new CachedRouteLookupResponse(dataEntry, null, null);
1✔
467
    }
468

469
    boolean hasData() {
470
      return dataCacheEntry != null;
1✔
471
    }
472

473
    @Nullable
474
    ChildPolicyWrapper getChildPolicyWrapper() {
475
      if (!hasData()) {
1✔
476
        return null;
×
477
      }
478
      return dataCacheEntry.getChildPolicyWrapper();
1✔
479
    }
480

481
    @VisibleForTesting
482
    @Nullable
483
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
484
      if (!hasData()) {
1✔
485
        return null;
×
486
      }
487
      return dataCacheEntry.getChildPolicyWrapper(target);
1✔
488
    }
489

490
    @Nullable
491
    String getHeaderData() {
492
      if (!hasData()) {
1✔
493
        return null;
1✔
494
      }
495
      return dataCacheEntry.getHeaderData();
1✔
496
    }
497

498
    boolean hasError() {
499
      return backoffCacheEntry != null;
1✔
500
    }
501

502
    boolean isPending() {
503
      return pendingCacheEntry != null;
1✔
504
    }
505

506
    @Nullable
507
    Status getStatus() {
508
      if (!hasError()) {
1✔
509
        return null;
×
510
      }
511
      return backoffCacheEntry.getStatus();
1✔
512
    }
513

514
    @Override
515
    public String toString() {
516
      ToStringHelper toStringHelper = MoreObjects.toStringHelper(this);
×
517
      if (dataCacheEntry != null) {
×
518
        toStringHelper.add("dataCacheEntry", dataCacheEntry);
×
519
      }
520
      if (pendingCacheEntry != null) {
×
521
        toStringHelper.add("pendingCacheEntry", pendingCacheEntry);
×
522
      }
523
      if (backoffCacheEntry != null) {
×
524
        toStringHelper.add("backoffCacheEntry", backoffCacheEntry);
×
525
      }
526
      return toStringHelper.toString();
×
527
    }
528
  }
529

530
  /** A pending cache entry when the async RouteLookup RPC is still on the fly. */
531
  static final class PendingCacheEntry {
532
    private final ListenableFuture<RouteLookupResponse> pendingCall;
533
    private final RouteLookupRequest request;
534
    @Nullable
535
    private final BackoffPolicy backoffPolicy;
536

537
    PendingCacheEntry(
538
        RouteLookupRequest request,
539
        ListenableFuture<RouteLookupResponse> pendingCall,
540
        @Nullable BackoffPolicy backoffPolicy) {
1✔
541
      this.request = checkNotNull(request, "request");
1✔
542
      this.pendingCall = checkNotNull(pendingCall, "pendingCall");
1✔
543
      this.backoffPolicy = backoffPolicy;
1✔
544
    }
1✔
545

546
    @Override
547
    public String toString() {
548
      return MoreObjects.toStringHelper(this)
×
549
          .add("request", request)
×
550
          .toString();
×
551
    }
552
  }
553

554
  /** Common cache entry data for {@link RlsAsyncLruCache}. */
555
  abstract static class CacheEntry {
556

557
    protected final RouteLookupRequest request;
558

559
    CacheEntry(RouteLookupRequest request) {
1✔
560
      this.request = checkNotNull(request, "request");
1✔
561
    }
1✔
562

563
    abstract int getSizeBytes();
564

565
    abstract boolean isExpired(long now);
566

567
    abstract void cleanup();
568

569
    protected boolean isOldEnoughToBeEvicted(long now) {
570
      return true;
×
571
    }
572
  }
573

574
  /** Implementation of {@link CacheEntry} contains valid data. */
575
  final class DataCacheEntry extends CacheEntry {
576
    private final RouteLookupResponse response;
577
    private final long minEvictionTime;
578
    private final long expireTime;
579
    private final long staleTime;
580
    private final List<ChildPolicyWrapper> childPolicyWrappers;
581

582
    // GuardedBy CachingRlsLbClient.lock
583
    DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
1✔
584
      super(request);
1✔
585
      this.response = checkNotNull(response, "response");
1✔
586
      checkState(!response.targets().isEmpty(), "No targets returned by RLS");
1✔
587
      childPolicyWrappers =
1✔
588
          refCountedChildPolicyWrapperFactory
1✔
589
              .createOrGet(response.targets());
1✔
590
      long now = ticker.read();
1✔
591
      minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
1✔
592
      expireTime = now + maxAgeNanos;
1✔
593
      staleTime = now + staleAgeNanos;
1✔
594
    }
1✔
595

596
    /**
597
     * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
598
     * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
599
     * data still exists. Flow looks like following.
600
     *
601
     * <pre>
602
     * Timeline                       | async refresh
603
     *                                V put new cache (entry2)
604
     * entry1: Pending | hasValue | staled  |
605
     * entry2:                        | OV* | pending | hasValue | staled |
606
     *
607
     * OV: old value
608
     * </pre>
609
     */
610
    void maybeRefresh() {
611
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
612
        if (pendingCallCache.containsKey(request)) {
1✔
613
          // pending already requested
614
          logger.log(ChannelLogLevel.DEBUG,
×
615
              "A pending refresh request already created, no need to proceed with refresh");
616
          return;
×
617
        }
618
        asyncRlsCall(request, /* backoffPolicy= */ null);
1✔
619
      }
1✔
620
    }
1✔
621

622
    @VisibleForTesting
623
    ChildPolicyWrapper getChildPolicyWrapper(String target) {
624
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
625
        if (childPolicyWrapper.getTarget().equals(target)) {
1✔
626
          return childPolicyWrapper;
1✔
627
        }
628
      }
1✔
629

630
      throw new RuntimeException("Target not found:" + target);
×
631
    }
632

633
    @Nullable
634
    ChildPolicyWrapper getChildPolicyWrapper() {
635
      for (ChildPolicyWrapper childPolicyWrapper : childPolicyWrappers) {
1✔
636
        if (childPolicyWrapper.getState() != ConnectivityState.TRANSIENT_FAILURE) {
1✔
637
          return childPolicyWrapper;
1✔
638
        }
639
      }
1✔
640
      return childPolicyWrappers.get(0);
1✔
641
    }
642

643
    String getHeaderData() {
644
      return response.getHeaderData();
1✔
645
    }
646

647
    // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
648
    int calcStringSize(String target) {
649
      return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
1✔
650
    }
651

652
    @Override
653
    int getSizeBytes() {
654
      int targetSize = 0;
1✔
655
      for (String target : response.targets()) {
1✔
656
        targetSize += calcStringSize(target);
1✔
657
      }
1✔
658
      return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
1✔
659
          + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
660
    }
661

662
    @Override
663
    boolean isExpired(long now) {
664
      return expireTime - now <= 0;
1✔
665
    }
666

667
    boolean isStaled(long now) {
668
      return staleTime - now <= 0;
1✔
669
    }
670

671
    @Override
672
    protected boolean isOldEnoughToBeEvicted(long now) {
673
      return minEvictionTime - now <= 0;
×
674
    }
675

676
    @Override
677
    void cleanup() {
678
      synchronized (lock) {
1✔
679
        for (ChildPolicyWrapper policyWrapper : childPolicyWrappers) {
1✔
680
          refCountedChildPolicyWrapperFactory.release(policyWrapper);
1✔
681
        }
1✔
682
      }
1✔
683
    }
1✔
684

685
    @Override
686
    public String toString() {
687
      return MoreObjects.toStringHelper(this)
×
688
          .add("request", request)
×
689
          .add("response", response)
×
690
          .add("expireTime", expireTime)
×
691
          .add("staleTime", staleTime)
×
692
          .add("childPolicyWrappers", childPolicyWrappers)
×
693
          .toString();
×
694
    }
695
  }
696

697
  /**
698
   * Implementation of {@link CacheEntry} contains error. This entry will transition to pending
699
   * status when the backoff time is expired.
700
   */
701
  private static final class BackoffCacheEntry extends CacheEntry {
702

703
    private final Status status;
704
    private final BackoffPolicy backoffPolicy;
705
    private Future<?> scheduledFuture;
706

707
    BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
708
      super(request);
1✔
709
      this.status = checkNotNull(status, "status");
1✔
710
      this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
1✔
711
    }
1✔
712

713
    Status getStatus() {
714
      return status;
1✔
715
    }
716

717
    @Override
718
    int getSizeBytes() {
719
      return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
1✔
720
    }
721

722
    @Override
723
    boolean isExpired(long now) {
724
      return scheduledFuture.isDone();
1✔
725
    }
726

727
    @Override
728
    void cleanup() {
729
      scheduledFuture.cancel(false);
1✔
730
    }
1✔
731

732
    @Override
733
    public String toString() {
734
      return MoreObjects.toStringHelper(this)
×
735
          .add("request", request)
×
736
          .add("status", status)
×
737
          .toString();
×
738
    }
739
  }
740

741
  /** Returns a Builder for {@link CachingRlsLbClient}. */
742
  static Builder newBuilder() {
743
    return new Builder();
1✔
744
  }
745

746
  /** A Builder for {@link CachingRlsLbClient}. */
747
  static final class Builder {
1✔
748

749
    private Helper helper;
750
    private LbPolicyConfiguration lbPolicyConfig;
751
    private Throttler throttler = new HappyThrottler();
1✔
752
    private ResolvedAddressFactory resolvedAddressFactory;
753
    private Ticker ticker = Ticker.systemTicker();
1✔
754
    private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
755
    private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
1✔
756

757
    Builder setHelper(Helper helper) {
758
      this.helper = checkNotNull(helper, "helper");
1✔
759
      return this;
1✔
760
    }
761

762
    Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
763
      this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
1✔
764
      return this;
1✔
765
    }
766

767
    Builder setThrottler(Throttler throttler) {
768
      this.throttler = checkNotNull(throttler, "throttler");
1✔
769
      return this;
1✔
770
    }
771

772
    /**
773
     * Sets a factory to create {@link ResolvedAddresses} for child load balancer.
774
     */
775
    Builder setResolvedAddressesFactory(
776
        ResolvedAddressFactory resolvedAddressFactory) {
777
      this.resolvedAddressFactory =
1✔
778
          checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
1✔
779
      return this;
1✔
780
    }
781

782
    Builder setTicker(Ticker ticker) {
783
      this.ticker = checkNotNull(ticker, "ticker");
1✔
784
      return this;
1✔
785
    }
786

787
    Builder setEvictionListener(
788
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
789
      this.evictionListener = evictionListener;
1✔
790
      return this;
1✔
791
    }
792

793
    Builder setBackoffProvider(BackoffPolicy.Provider provider) {
794
      this.backoffProvider = checkNotNull(provider, "provider");
1✔
795
      return this;
1✔
796
    }
797

798
    CachingRlsLbClient build() {
799
      CachingRlsLbClient client = new CachingRlsLbClient(this);
1✔
800
      helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
1✔
801
      return client;
1✔
802
    }
803
  }
804

805
  /**
806
   * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
807
   * CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
808
   */
809
  private static final class AutoCleaningEvictionListener
810
      implements EvictionListener<RouteLookupRequest, CacheEntry> {
811

812
    private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
813

814
    AutoCleaningEvictionListener(
815
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
1✔
816
      this.delegate = delegate;
1✔
817
    }
1✔
818

819
    @Override
820
    public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
821
      if (delegate != null) {
1✔
822
        delegate.onEviction(key, value, cause);
1✔
823
      }
824
      // performs cleanup after delegation
825
      value.cleanup();
1✔
826
    }
1✔
827
  }
828

829
  /** A Throttler never throttles. */
830
  private static final class HappyThrottler implements Throttler {
831

832
    @Override
833
    public boolean shouldThrottle() {
834
      return false;
×
835
    }
836

837
    @Override
838
    public void registerBackendResponse(boolean throttled) {
839
      // no-op
840
    }
×
841
  }
842

843
  /** Implementation of {@link LinkedHashLruCache} for RLS. */
844
  private static final class RlsAsyncLruCache
845
      extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
846
    private final RlsLbHelper helper;
847

848
    RlsAsyncLruCache(long maxEstimatedSizeBytes,
849
        @Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
850
        ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) {
851
      super(
1✔
852
          maxEstimatedSizeBytes,
853
          evictionListener,
854
          1,
855
          TimeUnit.MINUTES,
856
          ses,
857
          ticker,
858
          lock);
859
      this.helper = checkNotNull(helper, "helper");
1✔
860
    }
1✔
861

862
    @Override
863
    protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
864
      return value.isExpired(nowNanos);
1✔
865
    }
866

867
    @Override
868
    protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
869
      return value.getSizeBytes();
1✔
870
    }
871

872
    @Override
873
    protected boolean shouldInvalidateEldestEntry(
874
        RouteLookupRequest eldestKey, CacheEntry eldestValue, long now) {
875
      if (!eldestValue.isOldEnoughToBeEvicted(now)) {
×
876
        return false;
×
877
      }
878

879
      // eldest entry should be evicted if size limit exceeded
880
      return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
×
881
    }
882

883
    public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
884
      CacheEntry newEntry = cache(key, value);
1✔
885

886
      // force cleanup if new entry pushed cache over max size (in bytes)
887
      if (fitToLimit()) {
1✔
888
        helper.triggerPendingRpcProcessing();
×
889
      }
890
      return newEntry;
1✔
891
    }
892
  }
893

894
  /**
895
   * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
896
   * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
897
   */
898
  private final class BackoffRefreshListener implements ChildLbStatusListener {
1✔
899

900
    @Nullable
1✔
901
    private ConnectivityState prevState = null;
902

903
    @Override
904
    public void onStatusChanged(ConnectivityState newState) {
905
      logger.log(ChannelLogLevel.DEBUG, "LB status changed to: {0}", newState);
1✔
906
      if (prevState == ConnectivityState.TRANSIENT_FAILURE
1✔
907
          && newState == ConnectivityState.READY) {
908
        logger.log(ChannelLogLevel.DEBUG, "Transitioning from TRANSIENT_FAILURE to READY");
1✔
909
        logger.log(ChannelLogLevel.DEBUG, "Acquiring lock force refresh backoff cache entries");
1✔
910
        synchronized (lock) {
1✔
911
          logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
1✔
912
          for (CacheEntry value : linkedHashLruCache.values()) {
1✔
913
            if (value instanceof BackoffCacheEntry) {
1✔
914
              refreshBackoffEntry((BackoffCacheEntry) value);
×
915
            }
916
          }
1✔
917
        }
1✔
918
      }
919
      prevState = newState;
1✔
920
    }
1✔
921
  }
922

923
  /** A header will be added when RLS server respond with additional header data. */
924
  @VisibleForTesting
925
  static final Metadata.Key<String> RLS_DATA_KEY =
1✔
926
      Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
1✔
927

928
  final class RlsPicker extends SubchannelPicker {
929

930
    private final RlsRequestFactory requestFactory;
931
    private final String lookupService;
932

933
    RlsPicker(RlsRequestFactory requestFactory, String lookupService) {
1✔
934
      this.requestFactory = checkNotNull(requestFactory, "requestFactory");
1✔
935
      this.lookupService = checkNotNull(lookupService, "rlsConfig");
1✔
936
    }
1✔
937

938
    @Override
939
    public PickResult pickSubchannel(PickSubchannelArgs args) {
940
      String serviceName = args.getMethodDescriptor().getServiceName();
1✔
941
      String methodName = args.getMethodDescriptor().getBareMethodName();
1✔
942
      RouteLookupRequest request =
1✔
943
          requestFactory.create(serviceName, methodName, args.getHeaders());
1✔
944
      final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
1✔
945
      logger.log(ChannelLogLevel.DEBUG,
1✔
946
          "Got route lookup cache entry for service={0}, method={1}, headers={2}:\n {3}",
947
          new Object[]{serviceName, methodName, args.getHeaders(), response});
1✔
948

949
      if (response.getHeaderData() != null && !response.getHeaderData().isEmpty()) {
1✔
950
        logger.log(ChannelLogLevel.DEBUG, "Updating LRS metadata from the LRS response headers");
1✔
951
        Metadata headers = args.getHeaders();
1✔
952
        headers.discardAll(RLS_DATA_KEY);
1✔
953
        headers.put(RLS_DATA_KEY, response.getHeaderData());
1✔
954
      }
955
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
956
      logger.log(ChannelLogLevel.DEBUG, "defaultTarget = {0}", defaultTarget);
1✔
957
      boolean hasFallback = defaultTarget != null && !defaultTarget.isEmpty();
1✔
958
      if (response.hasData()) {
1✔
959
        logger.log(ChannelLogLevel.DEBUG, "LRS response has data, proceed with selecting a picker");
1✔
960
        ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
1✔
961
        SubchannelPicker picker =
962
            (childPolicyWrapper != null) ? childPolicyWrapper.getPicker() : null;
1✔
963
        if (picker == null) {
1✔
964
          logger.log(ChannelLogLevel.DEBUG,
×
965
              "Child policy wrapper didn't return a picker, returning PickResult with no results");
966
          return PickResult.withNoResult();
×
967
        }
968
        // Happy path
969
        logger.log(ChannelLogLevel.DEBUG, "Returning PickResult");
1✔
970
        PickResult pickResult = picker.pickSubchannel(args);
1✔
971
        // TODO: include the "grpc.target" label once target is available here.
972
        if (pickResult.hasResult()) {
1✔
973
          helper.getMetricRecorder().addLongCounter(TARGET_PICKS_COUNTER, 1,
1✔
974
              Lists.newArrayList("", lookupService, childPolicyWrapper.getTarget(),
1✔
975
                  determineMetricsPickResult(pickResult)), Lists.newArrayList());
1✔
976
        }
977
        return pickResult;
1✔
978
      } else if (response.hasError()) {
1✔
979
        logger.log(ChannelLogLevel.DEBUG, "RLS response has errors");
1✔
980
        if (hasFallback) {
1✔
981
          logger.log(ChannelLogLevel.DEBUG, "Using RLS fallback");
1✔
982
          return useFallback(args);
1✔
983
        }
984
        logger.log(ChannelLogLevel.DEBUG, "No RLS fallback, returning PickResult with an error");
1✔
985
        // TODO: include the "grpc.target" label once target is available here.
986
        helper.getMetricRecorder().addLongCounter(FAILED_PICKS_COUNTER, 1,
1✔
987
            Lists.newArrayList("", lookupService), Lists.newArrayList());
1✔
988
        return PickResult.withError(
1✔
989
            convertRlsServerStatus(response.getStatus(),
1✔
990
                lbPolicyConfig.getRouteLookupConfig().lookupService()));
1✔
991
      } else {
992
        logger.log(ChannelLogLevel.DEBUG,
1✔
993
            "RLS response had no data, return a PickResult with no data");
994
        return PickResult.withNoResult();
1✔
995
      }
996
    }
997

998
    private ChildPolicyWrapper fallbackChildPolicyWrapper;
999

1000
    /** Uses Subchannel connected to default target. */
1001
    private PickResult useFallback(PickSubchannelArgs args) {
1002
      // TODO(creamsoup) wait until lb is ready
1003
      startFallbackChildPolicy();
1✔
1004
      SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
1✔
1005
      if (picker == null) {
1✔
1006
        return PickResult.withNoResult();
×
1007
      }
1008
      PickResult pickResult = picker.pickSubchannel(args);
1✔
1009
      if (pickResult.hasResult()) {
1✔
1010
        // TODO: include the grpc.target label once target is available here.
1011
        helper.getMetricRecorder().addLongCounter(DEFAULT_TARGET_PICKS_COUNTER, 1,
1✔
1012
            Lists.newArrayList("", lookupService, fallbackChildPolicyWrapper.getTarget(),
1✔
1013
                determineMetricsPickResult(pickResult)), Lists.newArrayList());
1✔
1014
      }
1015
      return pickResult;
1✔
1016
    }
1017

1018
    private String determineMetricsPickResult(PickResult pickResult) {
1019
      if (pickResult.getStatus().isOk()) {
1✔
1020
        return "complete";
1✔
1021
      } else if (pickResult.isDrop()) {
1✔
1022
        return "drop";
×
1023
      } else {
1024
        return "fail";
1✔
1025
      }
1026
    }
1027

1028
    private void startFallbackChildPolicy() {
1029
      String defaultTarget = lbPolicyConfig.getRouteLookupConfig().defaultTarget();
1✔
1030
      logger.log(ChannelLogLevel.DEBUG, "starting fallback to {0}", defaultTarget);
1✔
1031
      logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to start fallback child policy");
1✔
1032
      synchronized (lock) {
1✔
1033
        logger.log(ChannelLogLevel.DEBUG, "Acquired lock for starting fallback child policy");
1✔
1034
        if (fallbackChildPolicyWrapper != null) {
1✔
1035
          return;
1✔
1036
        }
1037
        fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
1✔
1038
      }
1✔
1039
    }
1✔
1040

1041
    // GuardedBy CachingRlsLbClient.lock
1042
    void close() {
1043
      synchronized (lock) { // Lock is already held, but ErrorProne can't tell
1✔
1044
        logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
1✔
1045
        if (fallbackChildPolicyWrapper != null) {
1✔
1046
          refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
1✔
1047
        }
1048
      }
1✔
1049
    }
1✔
1050

1051
    @Override
1052
    public String toString() {
1053
      return MoreObjects.toStringHelper(this)
×
1054
          .add("target", lbPolicyConfig.getRouteLookupConfig().lookupService())
×
1055
          .toString();
×
1056
    }
1057
  }
1058

1059
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc