• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19671

30 Jan 2025 08:43PM UTC coverage: 88.581% (-0.01%) from 88.595%
#19671

push

github

web-flow
xds: Reuse filter interceptors across RPCs

This moves the interceptor creation from the ConfigSelector to the
resource update handling.

The code structure changes will make adding support for filter
lifecycles (for RLQS) a bit easier. The filter lifecycles will allow
filters to share state across interceptors, and constructing all the
interceptors on a single thread will mean filters wouldn't need to be
thread-safe (but their interceptors would be thread-safe).

33760 of 38112 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.2
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.MetricRecorder;
45
import io.grpc.NameResolver;
46
import io.grpc.Status;
47
import io.grpc.Status.Code;
48
import io.grpc.SynchronizationContext;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.ObjectPool;
51
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
52
import io.grpc.xds.Filter.ClientInterceptorBuilder;
53
import io.grpc.xds.Filter.FilterConfig;
54
import io.grpc.xds.Filter.NamedFilterConfig;
55
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
56
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
57
import io.grpc.xds.VirtualHost.Route;
58
import io.grpc.xds.VirtualHost.Route.RouteAction;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
61
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
62
import io.grpc.xds.VirtualHost.Route.RouteMatch;
63
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
64
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
65
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
66
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
67
import io.grpc.xds.client.XdsClient;
68
import io.grpc.xds.client.XdsClient.ResourceWatcher;
69
import io.grpc.xds.client.XdsLogger;
70
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
71
import java.net.URI;
72
import java.util.ArrayList;
73
import java.util.Collections;
74
import java.util.HashMap;
75
import java.util.HashSet;
76
import java.util.List;
77
import java.util.Locale;
78
import java.util.Map;
79
import java.util.Objects;
80
import java.util.Set;
81
import java.util.concurrent.ConcurrentHashMap;
82
import java.util.concurrent.ConcurrentMap;
83
import java.util.concurrent.ScheduledExecutorService;
84
import java.util.concurrent.atomic.AtomicInteger;
85
import javax.annotation.Nullable;
86

87
/**
88
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
89
 *
90
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
91
 * protocol to retrieve service information and produce a service config to the caller.
92
 *
93
 * @see XdsNameResolverProvider
94
 */
95
final class XdsNameResolver extends NameResolver {
96

97
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
98
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
99
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
100
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
101
  static final CallOptions.Key<Boolean> AUTO_HOST_REWRITE_KEY =
1✔
102
      CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY");
1✔
103
  @VisibleForTesting
104
  static boolean enableTimeout =
1✔
105
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
106
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
107

108
  private final InternalLogId logId;
109
  private final XdsLogger logger;
110
  @Nullable
111
  private final String targetAuthority;
112
  private final String target;
113
  private final String serviceAuthority;
114
  // Encoded version of the service authority as per 
115
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
116
  private final String encodedServiceAuthority;
117
  private final String overrideAuthority;
118
  private final ServiceConfigParser serviceConfigParser;
119
  private final SynchronizationContext syncContext;
120
  private final ScheduledExecutorService scheduler;
121
  private final XdsClientPoolFactory xdsClientPoolFactory;
122
  private final ThreadSafeRandom random;
123
  private final FilterRegistry filterRegistry;
124
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
125
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
126
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
127
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
128
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
129
  private final long randomChannelId;
130
  private final MetricRecorder metricRecorder;
131

132
  private volatile RoutingConfig routingConfig = RoutingConfig.empty;
1✔
133
  private Listener2 listener;
134
  private ObjectPool<XdsClient> xdsClientPool;
135
  private XdsClient xdsClient;
136
  private CallCounterProvider callCounterProvider;
137
  private ResolveState resolveState;
138
  // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
139
  // XdsClient instead of here.
140
  private boolean receivedConfig;
141

142
  XdsNameResolver(
143
      URI targetUri, String name, @Nullable String overrideAuthority,
144
      ServiceConfigParser serviceConfigParser,
145
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
146
      @Nullable Map<String, ?> bootstrapOverride,
147
      MetricRecorder metricRecorder) {
148
    this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
1✔
149
        syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
1✔
150
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
1✔
151
        metricRecorder);
152
  }
1✔
153

154
  @VisibleForTesting
155
  XdsNameResolver(
156
      URI targetUri, @Nullable String targetAuthority, String name,
157
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
158
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
159
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
160
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
161
      MetricRecorder metricRecorder) {
1✔
162
    this.targetAuthority = targetAuthority;
1✔
163
    target = targetUri.toString();
1✔
164

165
    // The name might have multiple slashes so encode it before verifying.
166
    serviceAuthority = checkNotNull(name, "name");
1✔
167
    this.encodedServiceAuthority = 
1✔
168
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
169

170
    this.overrideAuthority = overrideAuthority;
1✔
171
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
172
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
173
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
174
    this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
1✔
175
            "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
1✔
176
    this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
1✔
177
    this.random = checkNotNull(random, "random");
1✔
178
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
179
    this.metricRecorder = metricRecorder;
1✔
180
    randomChannelId = random.nextLong();
1✔
181
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
182
    logger = XdsLogger.withLogId(logId);
1✔
183
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
184
  }
1✔
185

186
  @Override
187
  public String getServiceAuthority() {
188
    return encodedServiceAuthority;
1✔
189
  }
190

191
  @Override
192
  public void start(Listener2 listener) {
193
    this.listener = checkNotNull(listener, "listener");
1✔
194
    try {
195
      xdsClientPool = xdsClientPoolFactory.getOrCreate(target, metricRecorder);
1✔
196
    } catch (Exception e) {
1✔
197
      listener.onError(
1✔
198
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
199
      return;
1✔
200
    }
1✔
201
    xdsClient = xdsClientPool.getObject();
1✔
202
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
203
    String listenerNameTemplate;
204
    if (targetAuthority == null) {
1✔
205
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
206
    } else {
207
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
208
      if (authorityInfo == null) {
1✔
209
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
210
            "invalid target URI: target authority not found in the bootstrap"));
211
        return;
1✔
212
      }
213
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
214
    }
215
    String replacement = serviceAuthority;
1✔
216
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
217
      replacement = XdsClient.percentEncodePath(replacement);
1✔
218
    }
219
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
220
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
221
        ) {
222
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
223
          "invalid listener resource URI for service authority: " + serviceAuthority));
224
      return;
×
225
    }
226
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
227
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
228
    resolveState = new ResolveState(ldsResourceName);
1✔
229

230
    resolveState.start();
1✔
231
  }
1✔
232

233
  private static String expandPercentS(String template, String replacement) {
234
    return template.replace("%s", replacement);
1✔
235
  }
236

237
  @Override
238
  public void shutdown() {
239
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
240
    if (resolveState != null) {
1✔
241
      resolveState.stop();
1✔
242
    }
243
    if (xdsClient != null) {
1✔
244
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
245
    }
246
  }
1✔
247

248
  @VisibleForTesting
249
  static Map<String, ?> generateServiceConfigWithMethodConfig(
250
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
251
    if (timeoutNano == null
1✔
252
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
253
      return Collections.emptyMap();
1✔
254
    }
255
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
256
    methodConfig.put(
1✔
257
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
258
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
259
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
260
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
261
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
262
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
263
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
264
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
265
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
266
        codes.add(code.name());
1✔
267
      }
1✔
268
      rawRetryPolicy.put(
1✔
269
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
270
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
271
        rawRetryPolicy.put(
×
272
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
273
      }
274
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
275
    }
276
    if (timeoutNano != null) {
1✔
277
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
278
      methodConfig.put("timeout", timeout);
1✔
279
    }
280
    return Collections.singletonMap(
1✔
281
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
282
  }
283

284
  @VisibleForTesting
285
  XdsClient getXdsClient() {
286
    return xdsClient;
1✔
287
  }
288

289
  // called in syncContext
290
  private void updateResolutionResult() {
291
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
292

293
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
294
    for (String name : clusterRefs.keySet()) {
1✔
295
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
296
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
297
    }
1✔
298
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
299
        "loadBalancingConfig",
300
        ImmutableList.of(ImmutableMap.of(
1✔
301
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
302
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
303

304
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
305
      logger.log(
×
306
          XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
×
307
    }
308
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
309
    Attributes attrs =
310
        Attributes.newBuilder()
1✔
311
            .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
1✔
312
            .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
313
            .set(InternalConfigSelector.KEY, configSelector)
1✔
314
            .build();
1✔
315
    ResolutionResult result =
316
        ResolutionResult.newBuilder()
1✔
317
            .setAttributes(attrs)
1✔
318
            .setServiceConfig(parsedServiceConfig)
1✔
319
            .build();
1✔
320
    listener.onResult(result);
1✔
321
    receivedConfig = true;
1✔
322
  }
1✔
323

324
  /**
325
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
326
   * case-insensitive.
327
   *
328
   * <p>Wildcard pattern rules:
329
   * <ol>
330
   * <li>A single asterisk (*) matches any domain.</li>
331
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
332
   *     but not both.</li>
333
   * </ol>
334
   */
335
  @VisibleForTesting
336
  static boolean matchHostName(String hostName, String pattern) {
337
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
338
        "Invalid host name");
339
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
340
        "Invalid pattern/domain name");
341

342
    hostName = hostName.toLowerCase(Locale.US);
1✔
343
    pattern = pattern.toLowerCase(Locale.US);
1✔
344
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
345

346
    if (!pattern.contains("*")) {
1✔
347
      // Not a wildcard pattern -- hostName and pattern must match exactly.
348
      return hostName.equals(pattern);
1✔
349
    }
350
    // Wildcard pattern
351

352
    if (pattern.length() == 1) {
1✔
353
      return true;
×
354
    }
355

356
    int index = pattern.indexOf('*');
1✔
357

358
    // At most one asterisk (*) is allowed.
359
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
360
      return false;
×
361
    }
362

363
    // Asterisk can only match prefix or suffix.
364
    if (index != 0 && index != pattern.length() - 1) {
1✔
365
      return false;
×
366
    }
367

368
    // HostName must be at least as long as the pattern because asterisk has to
369
    // match one or more characters.
370
    if (hostName.length() < pattern.length()) {
1✔
371
      return false;
1✔
372
    }
373

374
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
375
      // Prefix matching fails.
376
      return true;
1✔
377
    }
378

379
    // Pattern matches hostname if suffix matching succeeds.
380
    return index == pattern.length() - 1
1✔
381
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
382
  }
383

384
  private final class ConfigSelector extends InternalConfigSelector {
1✔
385
    @Override
386
    public Result selectConfig(PickSubchannelArgs args) {
387
      String cluster = null;
1✔
388
      ClientInterceptor filters = null; // null iff cluster is null
1✔
389
      RouteData selectedRoute = null;
1✔
390
      RoutingConfig routingCfg;
391
      Metadata headers = args.getHeaders();
1✔
392
      do {
393
        routingCfg = routingConfig;
1✔
394
        for (RouteData route : routingCfg.routes) {
1✔
395
          if (RoutingUtils.matchRoute(
1✔
396
                  route.routeMatch, "/" + args.getMethodDescriptor().getFullMethodName(),
1✔
397
                  headers, random)) {
1✔
398
            selectedRoute = route;
1✔
399
            break;
1✔
400
          }
401
        }
1✔
402
        if (selectedRoute == null) {
1✔
403
          return Result.forError(
1✔
404
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
405
        }
406
        if (selectedRoute.routeAction == null) {
1✔
407
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
408
              "Could not route RPC to Route with non-forwarding action"));
409
        }
410
        RouteAction action = selectedRoute.routeAction;
1✔
411
        if (action.cluster() != null) {
1✔
412
          cluster = prefixedClusterName(action.cluster());
1✔
413
          filters = selectedRoute.filterChoices.get(0);
1✔
414
        } else if (action.weightedClusters() != null) {
1✔
415
          long totalWeight = 0;
1✔
416
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
417
            totalWeight += weightedCluster.weight();
1✔
418
          }
1✔
419
          long select = random.nextLong(totalWeight);
1✔
420
          long accumulator = 0;
1✔
421
          for (int i = 0; i < action.weightedClusters().size(); i++) {
1✔
422
            ClusterWeight weightedCluster = action.weightedClusters().get(i);
1✔
423
            accumulator += weightedCluster.weight();
1✔
424
            if (select < accumulator) {
1✔
425
              cluster = prefixedClusterName(weightedCluster.name());
1✔
426
              filters = selectedRoute.filterChoices.get(i);
1✔
427
              break;
1✔
428
            }
429
          }
430
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
431
          cluster =
1✔
432
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
433
          filters = selectedRoute.filterChoices.get(0);
1✔
434
        }
435
      } while (!retainCluster(cluster));
1✔
436
      Long timeoutNanos = null;
1✔
437
      if (enableTimeout) {
1✔
438
        if (selectedRoute != null) {
1✔
439
          timeoutNanos = selectedRoute.routeAction.timeoutNano();
1✔
440
        }
441
        if (timeoutNanos == null) {
1✔
442
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
443
        }
444
        if (timeoutNanos <= 0) {
1✔
445
          timeoutNanos = null;
1✔
446
        }
447
      }
448
      RetryPolicy retryPolicy =
449
          selectedRoute == null ? null : selectedRoute.routeAction.retryPolicy();
1✔
450
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
451
      Map<String, ?> rawServiceConfig =
1✔
452
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
453
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
454
      Object config = parsedServiceConfig.getConfig();
1✔
455
      if (config == null) {
1✔
456
        releaseCluster(cluster);
×
457
        return Result.forError(
×
458
            parsedServiceConfig.getError().augmentDescription(
×
459
                "Failed to parse service config (method config)"));
460
      }
461
      final String finalCluster = cluster;
1✔
462
      final long hash = generateHash(selectedRoute.routeAction.hashPolicies(), headers);
1✔
463
      RouteData finalSelectedRoute = selectedRoute;
1✔
464
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
465
        @Override
466
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
467
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
468
            final Channel next) {
469
          CallOptions callOptionsForCluster =
1✔
470
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
471
                  .withOption(RPC_HASH_KEY, hash);
1✔
472
          if (finalSelectedRoute.routeAction.autoHostRewrite()) {
1✔
473
            callOptionsForCluster = callOptionsForCluster.withOption(AUTO_HOST_REWRITE_KEY, true);
1✔
474
          }
475
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
476
              next.newCall(method, callOptionsForCluster)) {
1✔
477
            @Override
478
            public void start(Listener<RespT> listener, Metadata headers) {
479
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
480
                boolean committed;
481

482
                @Override
483
                public void onHeaders(Metadata headers) {
484
                  committed = true;
1✔
485
                  releaseCluster(finalCluster);
1✔
486
                  delegate().onHeaders(headers);
1✔
487
                }
1✔
488

489
                @Override
490
                public void onClose(Status status, Metadata trailers) {
491
                  if (!committed) {
1✔
492
                    releaseCluster(finalCluster);
1✔
493
                  }
494
                  delegate().onClose(status, trailers);
1✔
495
                }
1✔
496
              };
497
              delegate().start(listener, headers);
1✔
498
            }
1✔
499
          };
500
        }
501
      }
502

503
      return
1✔
504
          Result.newBuilder()
1✔
505
              .setConfig(config)
1✔
506
              .setInterceptor(combineInterceptors(
1✔
507
                  ImmutableList.of(filters, new ClusterSelectionInterceptor())))
1✔
508
              .build();
1✔
509
    }
510

511
    private boolean retainCluster(String cluster) {
512
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
513
      if (clusterRefState == null) {
1✔
514
        return false;
×
515
      }
516
      AtomicInteger refCount = clusterRefState.refCount;
1✔
517
      int count;
518
      do {
519
        count = refCount.get();
1✔
520
        if (count == 0) {
1✔
521
          return false;
×
522
        }
523
      } while (!refCount.compareAndSet(count, count + 1));
1✔
524
      return true;
1✔
525
    }
526

527
    private void releaseCluster(final String cluster) {
528
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
529
      if (count == 0) {
1✔
530
        syncContext.execute(new Runnable() {
1✔
531
          @Override
532
          public void run() {
533
            if (clusterRefs.get(cluster).refCount.get() == 0) {
1✔
534
              clusterRefs.remove(cluster);
1✔
535
              updateResolutionResult();
1✔
536
            }
537
          }
1✔
538
        });
539
      }
540
    }
1✔
541

542
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
543
      Long hash = null;
1✔
544
      for (HashPolicy policy : hashPolicies) {
1✔
545
        Long newHash = null;
1✔
546
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
547
          String value = getHeaderValue(headers, policy.headerName());
1✔
548
          if (value != null) {
1✔
549
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
550
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
551
            }
552
            newHash = hashFunc.hashAsciiString(value);
1✔
553
          }
554
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
555
          newHash = hashFunc.hashLong(randomChannelId);
1✔
556
        }
557
        if (newHash != null ) {
1✔
558
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
559
          // and preserves all of the entropy.
560
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
561
          hash = oldHash ^ newHash;
1✔
562
        }
563
        // If the policy is a terminal policy and a hash has been generated, ignore
564
        // the rest of the hash policies.
565
        if (policy.isTerminal() && hash != null) {
1✔
566
          break;
×
567
        }
568
      }
1✔
569
      return hash == null ? random.nextLong() : hash;
1✔
570
    }
571
  }
572

573
  static final class PassthroughClientInterceptor implements ClientInterceptor {
1✔
574
    @Override
575
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
576
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
577
      return next.newCall(method, callOptions);
1✔
578
    }
579
  }
580

581
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
582
    if (interceptors.size() == 0) {
1✔
583
      return new PassthroughClientInterceptor();
1✔
584
    }
585
    if (interceptors.size() == 1) {
1✔
586
      return interceptors.get(0);
1✔
587
    }
588
    return new ClientInterceptor() {
1✔
589
      @Override
590
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
591
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
592
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
593
        return next.newCall(method, callOptions);
1✔
594
      }
595
    };
596
  }
597

598
  @Nullable
599
  private static String getHeaderValue(Metadata headers, String headerName) {
600
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
601
      return null;
×
602
    }
603
    if (headerName.equals("content-type")) {
1✔
604
      return "application/grpc";
×
605
    }
606
    Metadata.Key<String> key;
607
    try {
608
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
609
    } catch (IllegalArgumentException e) {
×
610
      return null;
×
611
    }
1✔
612
    Iterable<String> values = headers.getAll(key);
1✔
613
    return values == null ? null : Joiner.on(",").join(values);
1✔
614
  }
615

616
  private static String prefixedClusterName(String name) {
617
    return "cluster:" + name;
1✔
618
  }
619

620
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
621
    return "cluster_specifier_plugin:" + pluginName;
1✔
622
  }
623

624
  private static final class FailingConfigSelector extends InternalConfigSelector {
625
    private final Result result;
626

627
    public FailingConfigSelector(Status error) {
1✔
628
      this.result = Result.forError(error);
1✔
629
    }
1✔
630

631
    @Override
632
    public Result selectConfig(PickSubchannelArgs args) {
633
      return result;
1✔
634
    }
635
  }
636

637
  private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
638
    private final ConfigOrError emptyServiceConfig =
1✔
639
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
640
    private final String ldsResourceName;
641
    private boolean stopped;
642
    @Nullable
643
    private Set<String> existingClusters;  // clusters to which new requests can be routed
644
    @Nullable
645
    private RouteDiscoveryState routeDiscoveryState;
646

647
    ResolveState(String ldsResourceName) {
1✔
648
      this.ldsResourceName = ldsResourceName;
1✔
649
    }
1✔
650

651
    @Override
652
    public void onChanged(final XdsListenerResource.LdsUpdate update) {
653
      if (stopped) {
1✔
654
        return;
×
655
      }
656
      logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
1✔
657
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
658
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
659
      String rdsName = httpConnectionManager.rdsName();
1✔
660
      cleanUpRouteDiscoveryState();
1✔
661
      if (virtualHosts != null) {
1✔
662
        updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
663
            httpConnectionManager.httpFilterConfigs());
1✔
664
      } else {
665
        routeDiscoveryState = new RouteDiscoveryState(
1✔
666
            rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
667
            httpConnectionManager.httpFilterConfigs());
1✔
668
        logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
669
        xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
670
            rdsName, routeDiscoveryState, syncContext);
1✔
671
      }
672
    }
1✔
673

674
    @Override
675
    public void onError(final Status error) {
676
      if (stopped || receivedConfig) {
1✔
677
        return;
×
678
      }
679
      listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
680
          String.format("Unable to load LDS %s. xDS server returned: %s: %s",
1✔
681
          ldsResourceName, error.getCode(), error.getDescription())));
1✔
682
    }
1✔
683

684
    @Override
685
    public void onResourceDoesNotExist(final String resourceName) {
686
      if (stopped) {
1✔
687
        return;
×
688
      }
689
      String error = "LDS resource does not exist: " + resourceName;
1✔
690
      logger.log(XdsLogLevel.INFO, error);
1✔
691
      cleanUpRouteDiscoveryState();
1✔
692
      cleanUpRoutes(error);
1✔
693
    }
1✔
694

695
    private void start() {
696
      logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
1✔
697
      xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
1✔
698
          ldsResourceName, this, syncContext);
1✔
699
    }
1✔
700

701
    private void stop() {
702
      logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
1✔
703
      stopped = true;
1✔
704
      cleanUpRouteDiscoveryState();
1✔
705
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
706
    }
1✔
707

708
    // called in syncContext
709
    private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
710
        @Nullable List<NamedFilterConfig> filterConfigs) {
711
      String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
712
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
1✔
713
      if (virtualHost == null) {
1✔
714
        String error = "Failed to find virtual host matching hostname: " + authority;
1✔
715
        logger.log(XdsLogLevel.WARNING, error);
1✔
716
        cleanUpRoutes(error);
1✔
717
        return;
1✔
718
      }
719

720
      List<Route> routes = virtualHost.routes();
1✔
721
      ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
1✔
722

723
      // Populate all clusters to which requests can be routed to through the virtual host.
724
      Set<String> clusters = new HashSet<>();
1✔
725
      // uniqueName -> clusterName
726
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
727
      // uniqueName -> pluginConfig
728
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
729
      for (Route route : routes) {
1✔
730
        RouteAction action = route.routeAction();
1✔
731
        String prefixedName;
732
        if (action == null) {
1✔
733
          routesData.add(new RouteData(route.routeMatch(), null, ImmutableList.of()));
1✔
734
        } else if (action.cluster() != null) {
1✔
735
          prefixedName = prefixedClusterName(action.cluster());
1✔
736
          clusters.add(prefixedName);
1✔
737
          clusterNameMap.put(prefixedName, action.cluster());
1✔
738
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
739
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
740
        } else if (action.weightedClusters() != null) {
1✔
741
          ImmutableList.Builder<ClientInterceptor> filterList = ImmutableList.builder();
1✔
742
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
743
            prefixedName = prefixedClusterName(weightedCluster.name());
1✔
744
            clusters.add(prefixedName);
1✔
745
            clusterNameMap.put(prefixedName, weightedCluster.name());
1✔
746
            filterList.add(createFilters(filterConfigs, virtualHost, route, weightedCluster));
1✔
747
          }
1✔
748
          routesData.add(
1✔
749
              new RouteData(route.routeMatch(), route.routeAction(), filterList.build()));
1✔
750
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
751
          PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
752
          if (pluginConfig instanceof RlsPluginConfig) {
1✔
753
            prefixedName = prefixedClusterSpecifierPluginName(
1✔
754
                action.namedClusterSpecifierPluginConfig().name());
1✔
755
            clusters.add(prefixedName);
1✔
756
            rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
757
          }
758
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
759
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
760
        }
761
      }
1✔
762

763
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
764
      boolean shouldUpdateResult = existingClusters == null;
1✔
765
      Set<String> addedClusters =
766
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
767
      Set<String> deletedClusters =
768
          existingClusters == null
1✔
769
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
770
      existingClusters = clusters;
1✔
771
      for (String cluster : addedClusters) {
1✔
772
        if (clusterRefs.containsKey(cluster)) {
1✔
773
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
774
        } else {
775
          if (clusterNameMap.containsKey(cluster)) {
1✔
776
            clusterRefs.put(
1✔
777
                cluster,
778
                ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
1✔
779
          }
780
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
781
            clusterRefs.put(
1✔
782
                cluster,
783
                ClusterRefState.forRlsPlugin(
1✔
784
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
785
          }
786
          shouldUpdateResult = true;
1✔
787
        }
788
      }
1✔
789
      for (String cluster : clusters) {
1✔
790
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
791
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
792
          ClusterRefState newClusterRefState =
1✔
793
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
794
          clusterRefs.put(cluster, newClusterRefState);
1✔
795
          shouldUpdateResult = true;
1✔
796
        }
797
      }
1✔
798
      // Update service config to include newly added clusters.
799
      if (shouldUpdateResult) {
1✔
800
        updateResolutionResult();
1✔
801
      }
802
      // Make newly added clusters selectable by config selector and deleted clusters no longer
803
      // selectable.
804
      routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routesData.build());
1✔
805
      shouldUpdateResult = false;
1✔
806
      for (String cluster : deletedClusters) {
1✔
807
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
808
        if (count == 0) {
1✔
809
          clusterRefs.remove(cluster);
1✔
810
          shouldUpdateResult = true;
1✔
811
        }
812
      }
1✔
813
      if (shouldUpdateResult) {
1✔
814
        updateResolutionResult();
1✔
815
      }
816
    }
1✔
817

818
    private ClientInterceptor createFilters(
819
        @Nullable List<NamedFilterConfig> filterConfigs,
820
        VirtualHost virtualHost,
821
        Route route,
822
        @Nullable ClusterWeight weightedCluster) {
823
      if (filterConfigs == null) {
1✔
824
        return new PassthroughClientInterceptor();
1✔
825
      }
826
      Map<String, FilterConfig> selectedOverrideConfigs =
1✔
827
          new HashMap<>(virtualHost.filterConfigOverrides());
1✔
828
      selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
829
      if (weightedCluster != null) {
1✔
830
        selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
831
      }
832
      ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
1✔
833
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
834
        FilterConfig filterConfig = namedFilter.filterConfig;
1✔
835
        Filter filter = filterRegistry.get(filterConfig.typeUrl());
1✔
836
        if (filter instanceof ClientInterceptorBuilder) {
1✔
837
          ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter)
1✔
838
              .buildClientInterceptor(
1✔
839
                  filterConfig, selectedOverrideConfigs.get(namedFilter.name),
1✔
840
                  scheduler);
1✔
841
          if (interceptor != null) {
1✔
842
            filterInterceptors.add(interceptor);
1✔
843
          }
844
        }
845
      }
1✔
846
      return combineInterceptors(filterInterceptors.build());
1✔
847
    }
848

849
    private void cleanUpRoutes(String error) {
850
      if (existingClusters != null) {
1✔
851
        for (String cluster : existingClusters) {
1✔
852
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
853
          if (count == 0) {
1✔
854
            clusterRefs.remove(cluster);
1✔
855
          }
856
        }
1✔
857
        existingClusters = null;
1✔
858
      }
859
      routingConfig = RoutingConfig.empty;
1✔
860
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
861
      // the config selector handles the error message itself. Once the LB API allows providing
862
      // failure information for addresses yet still providing a service config, the config seector
863
      // could be avoided.
864
      String errorWithNodeId =
1✔
865
          error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
866
      listener.onResult(ResolutionResult.newBuilder()
1✔
867
          .setAttributes(Attributes.newBuilder()
1✔
868
            .set(InternalConfigSelector.KEY,
1✔
869
              new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId)))
1✔
870
            .build())
1✔
871
          .setServiceConfig(emptyServiceConfig)
1✔
872
          .build());
1✔
873
      receivedConfig = true;
1✔
874
    }
1✔
875

876
    private void cleanUpRouteDiscoveryState() {
877
      if (routeDiscoveryState != null) {
1✔
878
        String rdsName = routeDiscoveryState.resourceName;
1✔
879
        logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
1✔
880
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
881
            routeDiscoveryState);
882
        routeDiscoveryState = null;
1✔
883
      }
884
    }
1✔
885

886
    /**
887
     * Discovery state for RouteConfiguration resource. One instance for each Listener resource
888
     * update.
889
     */
890
    private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
891
      private final String resourceName;
892
      private final long httpMaxStreamDurationNano;
893
      @Nullable
894
      private final List<NamedFilterConfig> filterConfigs;
895

896
      private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
897
          @Nullable List<NamedFilterConfig> filterConfigs) {
1✔
898
        this.resourceName = resourceName;
1✔
899
        this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
1✔
900
        this.filterConfigs = filterConfigs;
1✔
901
      }
1✔
902

903
      @Override
904
      public void onChanged(final RdsUpdate update) {
905
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
906
          return;
×
907
        }
908
        logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
1✔
909
        updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
1✔
910
      }
1✔
911

912
      @Override
913
      public void onError(final Status error) {
914
        if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
1✔
915
          return;
×
916
        }
917
        listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
918
            String.format("Unable to load RDS %s. xDS server returned: %s: %s",
1✔
919
            resourceName, error.getCode(), error.getDescription())));
1✔
920
      }
1✔
921

922
      @Override
923
      public void onResourceDoesNotExist(final String resourceName) {
924
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
925
          return;
×
926
        }
927
        String error = "RDS resource does not exist: " + resourceName;
1✔
928
        logger.log(XdsLogLevel.INFO, error);
1✔
929
        cleanUpRoutes(error);
1✔
930
      }
1✔
931
    }
932
  }
933

934
  /**
935
   * VirtualHost-level configuration for request routing.
936
   */
937
  private static class RoutingConfig {
938
    private final long fallbackTimeoutNano;
939
    final ImmutableList<RouteData> routes;
940

941
    private static RoutingConfig empty = new RoutingConfig(0, ImmutableList.of());
1✔
942

943
    private RoutingConfig(long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
1✔
944
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
945
      this.routes = checkNotNull(routes, "routes");
1✔
946
    }
1✔
947
  }
948

949
  static final class RouteData {
950
    final RouteMatch routeMatch;
951
    /** null implies non-forwarding action. */
952
    @Nullable
953
    final RouteAction routeAction;
954
    /**
955
     * Only one of these interceptors should be used per-RPC. There are only multiple values in the
956
     * list for weighted clusters, in which case the order of the list mirrors the weighted
957
     * clusters.
958
     */
959
    final ImmutableList<ClientInterceptor> filterChoices;
960

961
    RouteData(RouteMatch routeMatch, @Nullable RouteAction routeAction, ClientInterceptor filter) {
962
      this(routeMatch, routeAction, ImmutableList.of(filter));
1✔
963
    }
1✔
964

965
    RouteData(
966
        RouteMatch routeMatch,
967
        @Nullable RouteAction routeAction,
968
        ImmutableList<ClientInterceptor> filterChoices) {
1✔
969
      this.routeMatch = checkNotNull(routeMatch, "routeMatch");
1✔
970
      checkArgument(
1✔
971
          routeAction == null || !filterChoices.isEmpty(),
1✔
972
          "filter may be empty only for non-forwarding action");
973
      this.routeAction = routeAction;
1✔
974
      if (routeAction != null && routeAction.weightedClusters() != null) {
1✔
975
        checkArgument(
1✔
976
            routeAction.weightedClusters().size() == filterChoices.size(),
1✔
977
            "filter choices must match size of weighted clusters");
978
      }
979
      for (ClientInterceptor filter : filterChoices) {
1✔
980
        checkNotNull(filter, "entry in filterChoices is null");
1✔
981
      }
1✔
982
      this.filterChoices = checkNotNull(filterChoices, "filterChoices");
1✔
983
    }
1✔
984
  }
985

986
  private static class ClusterRefState {
987
    final AtomicInteger refCount;
988
    @Nullable
989
    final String traditionalCluster;
990
    @Nullable
991
    final RlsPluginConfig rlsPluginConfig;
992

993
    private ClusterRefState(
994
        AtomicInteger refCount, @Nullable String traditionalCluster,
995
        @Nullable RlsPluginConfig rlsPluginConfig) {
1✔
996
      this.refCount = refCount;
1✔
997
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
998
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
999
      this.traditionalCluster = traditionalCluster;
1✔
1000
      this.rlsPluginConfig = rlsPluginConfig;
1✔
1001
    }
1✔
1002

1003
    private Map<String, ?> toLbPolicy() {
1004
      if (traditionalCluster != null) {
1✔
1005
        return ImmutableMap.of(
1✔
1006
            XdsLbPolicies.CDS_POLICY_NAME,
1007
            ImmutableMap.of("cluster", traditionalCluster));
1✔
1008
      } else {
1009
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
1010
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
1011
            .put(
1✔
1012
                "childPolicy",
1013
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
1✔
1014
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
1015
            .buildOrThrow();
1✔
1016
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
1017
      }
1018
    }
1019

1020
    static ClusterRefState forCluster(AtomicInteger refCount, String name) {
1021
      return new ClusterRefState(refCount, name, null);
1✔
1022
    }
1023

1024
    static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
1025
      return new ClusterRefState(refCount, null, rlsPluginConfig);
1✔
1026
    }
1027
  }
1028
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc