• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20218

25 Mar 2026 03:59AM UTC coverage: 88.706% (+0.04%) from 88.668%
#20218

push

github

jdcormie
xds: Run integration test with both values of RFC 3986 URI flag

35492 of 40011 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.82
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.MetricRecorder;
45
import io.grpc.NameResolver;
46
import io.grpc.Status;
47
import io.grpc.Status.Code;
48
import io.grpc.StatusOr;
49
import io.grpc.SynchronizationContext;
50
import io.grpc.internal.GrpcUtil;
51
import io.grpc.internal.ObjectPool;
52
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
53
import io.grpc.xds.Filter.FilterConfig;
54
import io.grpc.xds.Filter.NamedFilterConfig;
55
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
56
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
57
import io.grpc.xds.VirtualHost.Route;
58
import io.grpc.xds.VirtualHost.Route.RouteAction;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
61
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
62
import io.grpc.xds.VirtualHost.Route.RouteMatch;
63
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
64
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
65
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
66
import io.grpc.xds.client.XdsClient;
67
import io.grpc.xds.client.XdsInitializationException;
68
import io.grpc.xds.client.XdsLogger;
69
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
70
import java.util.ArrayList;
71
import java.util.Collections;
72
import java.util.HashMap;
73
import java.util.HashSet;
74
import java.util.List;
75
import java.util.Locale;
76
import java.util.Map;
77
import java.util.Objects;
78
import java.util.Set;
79
import java.util.concurrent.ConcurrentHashMap;
80
import java.util.concurrent.ConcurrentMap;
81
import java.util.concurrent.ScheduledExecutorService;
82
import java.util.concurrent.atomic.AtomicInteger;
83
import java.util.function.Supplier;
84
import javax.annotation.Nullable;
85

86
/**
87
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
88
 *
89
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
90
 * protocol to retrieve service information and produce a service config to the caller.
91
 *
92
 * @see XdsNameResolverProvider
93
 */
94
final class XdsNameResolver extends NameResolver {
95
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
96
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
97
  static final CallOptions.Key<XdsConfig> XDS_CONFIG_CALL_OPTION_KEY =
1✔
98
      CallOptions.Key.create("io.grpc.xds.XDS_CONFIG_CALL_OPTION_KEY");
1✔
99
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
100
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
101
  static final CallOptions.Key<Boolean> AUTO_HOST_REWRITE_KEY =
1✔
102
      CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY");
1✔
103
  @VisibleForTesting
104
  static boolean enableTimeout =
1✔
105
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
106
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
107

108
  private final InternalLogId logId;
109
  private final XdsLogger logger;
110
  @Nullable
111
  private final String targetAuthority;
112
  private final String serviceAuthority;
113
  // Encoded version of the service authority as per 
114
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
115
  private final String encodedServiceAuthority;
116
  private final String overrideAuthority;
117
  private final ServiceConfigParser serviceConfigParser;
118
  private final SynchronizationContext syncContext;
119
  private final ScheduledExecutorService scheduler;
120
  private final XdsClientPool xdsClientPool;
121
  private final ThreadSafeRandom random;
122
  private final FilterRegistry filterRegistry;
123
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
124
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
125
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
126
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
127
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
128
  private final long randomChannelId;
129
  private final Args nameResolverArgs;
130
  // Must be accessed in syncContext.
131
  // Filter instances are unique per channel, and per filter (name+typeUrl).
132
  // NamedFilterConfig.filterStateKey -> filter_instance.
133
  private final HashMap<String, Filter> activeFilters = new HashMap<>();
1✔
134

135
  private volatile RoutingConfig routingConfig;
136
  private Listener2 listener;
137
  private XdsClient xdsClient;
138
  private CallCounterProvider callCounterProvider;
139
  private ResolveState resolveState;
140

141
  /**
142
   * Constructs a new instance.
143
   *
144
   * @param target the target URI to resolve
145
   * @param targetAuthority the authority component of `target`, possibly the empty string, or null
146
   *     if 'target' has no such component
147
   */
148
  XdsNameResolver(
149
      String target, @Nullable String targetAuthority, String name,
150
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
151
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
152
      @Nullable Map<String, ?> bootstrapOverride,
153
      MetricRecorder metricRecorder, Args nameResolverArgs) {
154
    this(target, targetAuthority, name, overrideAuthority, serviceConfigParser,
1✔
155
        syncContext, scheduler,
156
        bootstrapOverride == null
1✔
157
          ? SharedXdsClientPoolProvider.getDefaultProvider()
1✔
158
          : new SharedXdsClientPoolProvider(),
1✔
159
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
1✔
160
        metricRecorder, nameResolverArgs);
161
  }
1✔
162

163
  @VisibleForTesting
164
  XdsNameResolver(
165
      String target, @Nullable String targetAuthority, String name,
166
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
167
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
168
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
169
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
170
      MetricRecorder metricRecorder, Args nameResolverArgs) {
1✔
171
    this.targetAuthority = targetAuthority;
1✔
172

173
    // The name might have multiple slashes so encode it before verifying.
174
    serviceAuthority = checkNotNull(name, "name");
1✔
175
    this.encodedServiceAuthority = 
1✔
176
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
177

178
    this.overrideAuthority = overrideAuthority;
1✔
179
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
180
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
181
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
182
    Supplier<XdsClient> xdsClientSupplierArg =
1✔
183
        nameResolverArgs.getArg(XdsNameResolverProvider.XDS_CLIENT_SUPPLIER);
1✔
184
    if (xdsClientSupplierArg != null) {
1✔
185
      this.xdsClientPool = new SupplierXdsClientPool(xdsClientSupplierArg);
×
186
    } else {
187
      checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
188
      this.xdsClientPool = new BootstrappingXdsClientPool(
1✔
189
          xdsClientPoolFactory, target, bootstrapOverride, metricRecorder);
190
    }
191
    this.random = checkNotNull(random, "random");
1✔
192
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
193
    this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
194

195
    randomChannelId = random.nextLong();
1✔
196
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
197
    logger = XdsLogger.withLogId(logId);
1✔
198
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
199
  }
1✔
200

201
  @Override
202
  public String getServiceAuthority() {
203
    return encodedServiceAuthority;
1✔
204
  }
205

206
  @Override
207
  public void start(Listener2 listener) {
208
    this.listener = checkNotNull(listener, "listener");
1✔
209
    try {
210
      xdsClient = xdsClientPool.getObject();
1✔
211
    } catch (Exception e) {
1✔
212
      listener.onError(
1✔
213
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
214
      return;
1✔
215
    }
1✔
216
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
217
    String listenerNameTemplate;
218
    if (targetAuthority == null || targetAuthority.isEmpty()) {
1✔
219
      // Both https://github.com/grpc/proposal/blob/master/A27-xds-global-load-balancing.md and
220
      // A47-xds-federation.md seem to treat an empty authority the same as an undefined one.
221
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
222
    } else {
223
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
224
      if (authorityInfo == null) {
1✔
225
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
226
            "invalid target URI: target authority not found in the bootstrap"));
227
        return;
1✔
228
      }
229
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
230
    }
231
    String replacement = serviceAuthority;
1✔
232
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
233
      replacement = XdsClient.percentEncodePath(replacement);
1✔
234
    }
235
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
236
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
237
        ) {
238
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
239
          "invalid listener resource URI for service authority: " + serviceAuthority));
240
      return;
×
241
    }
242
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
243
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
244

245
    resolveState = new ResolveState(ldsResourceName);
1✔
246
    resolveState.start();
1✔
247
  }
1✔
248

249
  @Override
250
  public void refresh() {
251
    if (resolveState != null) {
×
252
      resolveState.refresh();
×
253
    }
254
  }
×
255

256
  private static String expandPercentS(String template, String replacement) {
257
    return template.replace("%s", replacement);
1✔
258
  }
259

260
  @Override
261
  public void shutdown() {
262
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
263
    if (resolveState != null) {
1✔
264
      resolveState.shutdown();
1✔
265
    }
266
    if (xdsClient != null) {
1✔
267
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
268
    }
269
  }
1✔
270

271
  @VisibleForTesting
272
  static Map<String, ?> generateServiceConfigWithMethodConfig(
273
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
274
    if (timeoutNano == null
1✔
275
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
276
      return Collections.emptyMap();
1✔
277
    }
278
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
279
    methodConfig.put(
1✔
280
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
281
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
282
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
283
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
284
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
285
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
286
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
287
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
288
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
289
        codes.add(code.name());
1✔
290
      }
1✔
291
      rawRetryPolicy.put(
1✔
292
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
293
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
294
        rawRetryPolicy.put(
×
295
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
296
      }
297
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
298
    }
299
    if (timeoutNano != null) {
1✔
300
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
301
      methodConfig.put("timeout", timeout);
1✔
302
    }
303
    return Collections.singletonMap(
1✔
304
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
305
  }
306

307
  @VisibleForTesting
308
  XdsClient getXdsClient() {
309
    return xdsClient;
1✔
310
  }
311

312
  // called in syncContext
313
  private void updateResolutionResult(XdsConfig xdsConfig) {
314
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
315

316
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
317
    for (String name : clusterRefs.keySet()) {
1✔
318
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
319
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
320
    }
1✔
321
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
322
        "loadBalancingConfig",
323
        ImmutableList.of(ImmutableMap.of(
1✔
324
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
325
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
326

327
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
328
      logger.log(
×
329
          XdsLogLevel.INFO, "Generated service config: {0}", new Gson().toJson(rawServiceConfig));
×
330
    }
331
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
332
    Attributes attrs =
333
        Attributes.newBuilder()
1✔
334
            .set(XdsAttributes.XDS_CLIENT, xdsClient)
1✔
335
            .set(XdsAttributes.XDS_CONFIG, xdsConfig)
1✔
336
            .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, resolveState.xdsDependencyManager)
1✔
337
            .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
338
            .set(InternalConfigSelector.KEY, configSelector)
1✔
339
            .build();
1✔
340
    ResolutionResult result =
341
        ResolutionResult.newBuilder()
1✔
342
            .setAttributes(attrs)
1✔
343
            .setServiceConfig(parsedServiceConfig)
1✔
344
            .build();
1✔
345
    if (!listener.onResult2(result).isOk()) {
1✔
346
      resolveState.xdsDependencyManager.requestReresolution();
×
347
    }
348
  }
1✔
349

350
  /**
351
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
352
   * case-insensitive.
353
   *
354
   * <p>Wildcard pattern rules:
355
   * <ol>
356
   * <li>A single asterisk (*) matches any domain.</li>
357
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
358
   *     but not both.</li>
359
   * </ol>
360
   */
361
  @VisibleForTesting
362
  static boolean matchHostName(String hostName, String pattern) {
363
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
364
        "Invalid host name");
365
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
366
        "Invalid pattern/domain name");
367

368
    hostName = hostName.toLowerCase(Locale.US);
1✔
369
    pattern = pattern.toLowerCase(Locale.US);
1✔
370
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
371

372
    if (!pattern.contains("*")) {
1✔
373
      // Not a wildcard pattern -- hostName and pattern must match exactly.
374
      return hostName.equals(pattern);
1✔
375
    }
376
    // Wildcard pattern
377

378
    if (pattern.length() == 1) {
1✔
379
      return true;
×
380
    }
381

382
    int index = pattern.indexOf('*');
1✔
383

384
    // At most one asterisk (*) is allowed.
385
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
386
      return false;
×
387
    }
388

389
    // Asterisk can only match prefix or suffix.
390
    if (index != 0 && index != pattern.length() - 1) {
1✔
391
      return false;
×
392
    }
393

394
    // HostName must be at least as long as the pattern because asterisk has to
395
    // match one or more characters.
396
    if (hostName.length() < pattern.length()) {
1✔
397
      return false;
1✔
398
    }
399

400
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
401
      // Prefix matching fails.
402
      return true;
1✔
403
    }
404

405
    // Pattern matches hostname if suffix matching succeeds.
406
    return index == pattern.length() - 1
1✔
407
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
408
  }
409

410
  private final class ConfigSelector extends InternalConfigSelector {
1✔
411
    @Override
412
    public Result selectConfig(PickSubchannelArgs args) {
413
      RoutingConfig routingCfg;
414
      RouteData selectedRoute;
415
      String cluster;
416
      ClientInterceptor filters;
417
      Metadata headers = args.getHeaders();
1✔
418
      String path = "/" + args.getMethodDescriptor().getFullMethodName();
1✔
419
      do {
420
        routingCfg = routingConfig;
1✔
421
        if (routingCfg.errorStatus != null) {
1✔
422
          return Result.forError(routingCfg.errorStatus);
1✔
423
        }
424
        selectedRoute = null;
1✔
425
        for (RouteData route : routingCfg.routes) {
1✔
426
          if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
1✔
427
            selectedRoute = route;
1✔
428
            break;
1✔
429
          }
430
        }
1✔
431
        if (selectedRoute == null) {
1✔
432
          return Result.forError(
1✔
433
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
434
        }
435
        if (selectedRoute.routeAction == null) {
1✔
436
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
437
              "Could not route RPC to Route with non-forwarding action"));
438
        }
439
        RouteAction action = selectedRoute.routeAction;
1✔
440
        if (action.cluster() != null) {
1✔
441
          cluster = prefixedClusterName(action.cluster());
1✔
442
          filters = selectedRoute.filterChoices.get(0);
1✔
443
        } else if (action.weightedClusters() != null) {
1✔
444
          // XdsRouteConfigureResource verifies the total weight will not be 0 or exceed uint32
445
          long totalWeight = 0;
1✔
446
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
447
            totalWeight += weightedCluster.weight();
1✔
448
          }
1✔
449
          long select = random.nextLong(totalWeight);
1✔
450
          long accumulator = 0;
1✔
451
          for (int i = 0; ; i++) {
1✔
452
            ClusterWeight weightedCluster = action.weightedClusters().get(i);
1✔
453
            accumulator += weightedCluster.weight();
1✔
454
            if (select < accumulator) {
1✔
455
              cluster = prefixedClusterName(weightedCluster.name());
1✔
456
              filters = selectedRoute.filterChoices.get(i);
1✔
457
              break;
1✔
458
            }
459
          }
460
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
461
          cluster =
1✔
462
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
463
          filters = selectedRoute.filterChoices.get(0);
1✔
464
        } else {
465
          // updateRoutes() discards routes with unknown actions
466
          throw new AssertionError();
×
467
        }
468
      } while (!retainCluster(cluster));
1✔
469

470
      final RouteAction routeAction = selectedRoute.routeAction;
1✔
471
      Long timeoutNanos = null;
1✔
472
      if (enableTimeout) {
1✔
473
        timeoutNanos = routeAction.timeoutNano();
1✔
474
        if (timeoutNanos == null) {
1✔
475
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
476
        }
477
        if (timeoutNanos <= 0) {
1✔
478
          timeoutNanos = null;
1✔
479
        }
480
      }
481
      RetryPolicy retryPolicy = routeAction.retryPolicy();
1✔
482
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
483
      Map<String, ?> rawServiceConfig =
1✔
484
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
485
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
486
      Object config = parsedServiceConfig.getConfig();
1✔
487
      if (config == null) {
1✔
488
        releaseCluster(cluster);
×
489
        return Result.forError(
×
490
            parsedServiceConfig.getError().augmentDescription(
×
491
                "Failed to parse service config (method config)"));
492
      }
493
      final String finalCluster = cluster;
1✔
494
      final XdsConfig xdsConfig = routingCfg.xdsConfig;
1✔
495
      final long hash = generateHash(routeAction.hashPolicies(), headers);
1✔
496
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
497
        @Override
498
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
499
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
500
            final Channel next) {
501
          CallOptions callOptionsForCluster =
1✔
502
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
503
                  .withOption(XDS_CONFIG_CALL_OPTION_KEY, xdsConfig)
1✔
504
                  .withOption(RPC_HASH_KEY, hash);
1✔
505
          if (routeAction.autoHostRewrite()) {
1✔
506
            callOptionsForCluster = callOptionsForCluster.withOption(AUTO_HOST_REWRITE_KEY, true);
1✔
507
          }
508
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
509
              next.newCall(method, callOptionsForCluster)) {
1✔
510
            @Override
511
            public void start(Listener<RespT> listener, Metadata headers) {
512
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
513
                boolean committed;
514

515
                @Override
516
                public void onHeaders(Metadata headers) {
517
                  committed = true;
1✔
518
                  releaseCluster(finalCluster);
1✔
519
                  delegate().onHeaders(headers);
1✔
520
                }
1✔
521

522
                @Override
523
                public void onClose(Status status, Metadata trailers) {
524
                  if (!committed) {
1✔
525
                    releaseCluster(finalCluster);
1✔
526
                  }
527
                  delegate().onClose(status, trailers);
1✔
528
                }
1✔
529
              };
530
              delegate().start(listener, headers);
1✔
531
            }
1✔
532
          };
533
        }
534
      }
535

536
      return
1✔
537
          Result.newBuilder()
1✔
538
              .setConfig(config)
1✔
539
              .setInterceptor(combineInterceptors(
1✔
540
                  ImmutableList.of(new ClusterSelectionInterceptor(), filters)))
1✔
541
              .build();
1✔
542
    }
543

544
    private boolean retainCluster(String cluster) {
545
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
546
      if (clusterRefState == null) {
1✔
547
        return false;
×
548
      }
549
      AtomicInteger refCount = clusterRefState.refCount;
1✔
550
      int count;
551
      do {
552
        count = refCount.get();
1✔
553
        if (count == 0) {
1✔
554
          return false;
×
555
        }
556
      } while (!refCount.compareAndSet(count, count + 1));
1✔
557
      return true;
1✔
558
    }
559

560
    private void releaseCluster(final String cluster) {
561
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
562
      if (count < 0) {
1✔
563
        throw new AssertionError();
×
564
      }
565
      if (count == 0) {
1✔
566
        syncContext.execute(new Runnable() {
1✔
567
          @Override
568
          public void run() {
569
            if (clusterRefs.get(cluster).refCount.get() != 0) {
1✔
570
              throw new AssertionError();
×
571
            }
572
            clusterRefs.remove(cluster).close();
1✔
573
            if (resolveState.lastConfigOrStatus.hasValue()) {
1✔
574
              updateResolutionResult(resolveState.lastConfigOrStatus.getValue());
1✔
575
            } else {
576
              resolveState.cleanUpRoutes(resolveState.lastConfigOrStatus.getStatus());
×
577
            }
578
          }
1✔
579
        });
580
      }
581
    }
1✔
582

583
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
584
      Long hash = null;
1✔
585
      for (HashPolicy policy : hashPolicies) {
1✔
586
        Long newHash = null;
1✔
587
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
588
          String value = getHeaderValue(headers, policy.headerName());
1✔
589
          if (value != null) {
1✔
590
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
591
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
592
            }
593
            newHash = hashFunc.hashAsciiString(value);
1✔
594
          }
595
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
596
          newHash = hashFunc.hashLong(randomChannelId);
1✔
597
        }
598
        if (newHash != null ) {
1✔
599
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
600
          // and preserves all of the entropy.
601
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
602
          hash = oldHash ^ newHash;
1✔
603
        }
604
        // If the policy is a terminal policy and a hash has been generated, ignore
605
        // the rest of the hash policies.
606
        if (policy.isTerminal() && hash != null) {
1✔
607
          break;
×
608
        }
609
      }
1✔
610
      return hash == null ? random.nextLong() : hash;
1✔
611
    }
612
  }
613

614
  static final class PassthroughClientInterceptor implements ClientInterceptor {
1✔
615
    @Override
616
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
617
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
618
      return next.newCall(method, callOptions);
1✔
619
    }
620
  }
621

622
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
623
    if (interceptors.size() == 0) {
1✔
624
      return new PassthroughClientInterceptor();
1✔
625
    }
626
    if (interceptors.size() == 1) {
1✔
627
      return interceptors.get(0);
1✔
628
    }
629
    return new ClientInterceptor() {
1✔
630
      @Override
631
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
632
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
633
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
634
        return next.newCall(method, callOptions);
1✔
635
      }
636
    };
637
  }
638

639
  @Nullable
640
  private static String getHeaderValue(Metadata headers, String headerName) {
641
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
642
      return null;
×
643
    }
644
    if (headerName.equals("content-type")) {
1✔
645
      return "application/grpc";
×
646
    }
647
    Metadata.Key<String> key;
648
    try {
649
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
650
    } catch (IllegalArgumentException e) {
×
651
      return null;
×
652
    }
1✔
653
    Iterable<String> values = headers.getAll(key);
1✔
654
    return values == null ? null : Joiner.on(",").join(values);
1✔
655
  }
656

657
  private static String prefixedClusterName(String name) {
658
    return "cluster:" + name;
1✔
659
  }
660

661
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
662
    return "cluster_specifier_plugin:" + pluginName;
1✔
663
  }
664

665
  class ResolveState implements XdsDependencyManager.XdsConfigWatcher {
1✔
666
    private final ConfigOrError emptyServiceConfig =
1✔
667
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
668
    private final String authority;
669
    private final XdsDependencyManager xdsDependencyManager;
670
    private boolean stopped;
671
    @Nullable
672
    private Set<String> existingClusters;  // clusters to which new requests can be routed
673
    private StatusOr<XdsConfig> lastConfigOrStatus;
674

675
    private ResolveState(String ldsResourceName) {
1✔
676
      authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
677
      xdsDependencyManager =
1✔
678
          new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
1✔
679
              nameResolverArgs);
1✔
680
    }
1✔
681

682
    void start() {
683
      xdsDependencyManager.start(this);
1✔
684
    }
1✔
685

686
    void refresh() {
687
      xdsDependencyManager.requestReresolution();
×
688
    }
×
689

690
    private void shutdown() {
691
      if (stopped) {
1✔
692
        return;
×
693
      }
694

695
      stopped = true;
1✔
696
      xdsDependencyManager.shutdown();
1✔
697
      updateActiveFilters(null);
1✔
698
    }
1✔
699

700
    @Override
701
    public void onUpdate(StatusOr<XdsConfig> updateOrStatus) {
702
      if (stopped) {
1✔
703
        return;
×
704
      }
705
      logger.log(XdsLogLevel.INFO, "Receive XDS resource update: {0}", updateOrStatus);
1✔
706

707
      lastConfigOrStatus = updateOrStatus;
1✔
708
      if (!updateOrStatus.hasValue()) {
1✔
709
        updateActiveFilters(null);
1✔
710
        cleanUpRoutes(updateOrStatus.getStatus());
1✔
711
        return;
1✔
712
      }
713

714
      // Process Route
715
      XdsConfig update = updateOrStatus.getValue();
1✔
716
      HttpConnectionManager httpConnectionManager = update.getListener().httpConnectionManager();
1✔
717
      if (httpConnectionManager == null) {
1✔
718
        logger.log(XdsLogLevel.INFO, "API Listener: httpConnectionManager does not exist.");
×
719
        updateActiveFilters(null);
×
720
        cleanUpRoutes(updateOrStatus.getStatus());
×
721
        return;
×
722
      }
723

724
      VirtualHost virtualHost = update.getVirtualHost();
1✔
725
      ImmutableList<NamedFilterConfig> filterConfigs = httpConnectionManager.httpFilterConfigs();
1✔
726
      long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano();
1✔
727

728
      updateActiveFilters(filterConfigs);
1✔
729
      updateRoutes(update, virtualHost, streamDurationNano, filterConfigs);
1✔
730
    }
1✔
731

732
    // called in syncContext
733
    private void updateActiveFilters(@Nullable List<NamedFilterConfig> filterConfigs) {
734
      if (filterConfigs == null) {
1✔
735
        filterConfigs = ImmutableList.of();
1✔
736
      }
737
      Set<String> filtersToShutdown = new HashSet<>(activeFilters.keySet());
1✔
738
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
739
        String typeUrl = namedFilter.filterConfig.typeUrl();
1✔
740
        String filterKey = namedFilter.filterStateKey();
1✔
741

742
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
743
        checkNotNull(provider, "provider %s", typeUrl);
1✔
744
        Filter filter = activeFilters.computeIfAbsent(
1✔
745
            filterKey, k -> provider.newInstance(namedFilter.name));
1✔
746
        checkNotNull(filter, "filter %s", filterKey);
1✔
747
        filtersToShutdown.remove(filterKey);
1✔
748
      }
1✔
749

750
      // Shutdown filters not present in current HCM.
751
      for (String filterKey : filtersToShutdown) {
1✔
752
        Filter filterToShutdown = activeFilters.remove(filterKey);
1✔
753
        checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
1✔
754
        filterToShutdown.close();
1✔
755
      }
1✔
756
    }
1✔
757

758
    private void updateRoutes(
759
        XdsConfig xdsConfig,
760
        @Nullable VirtualHost virtualHost,
761
        long httpMaxStreamDurationNano,
762
        @Nullable List<NamedFilterConfig> filterConfigs) {
763
      List<Route> routes = virtualHost.routes();
1✔
764
      ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
1✔
765

766
      // Populate all clusters to which requests can be routed to through the virtual host.
767
      Set<String> clusters = new HashSet<>();
1✔
768
      // uniqueName -> clusterName
769
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
770
      // uniqueName -> pluginConfig
771
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
772
      for (Route route : routes) {
1✔
773
        RouteAction action = route.routeAction();
1✔
774
        String prefixedName;
775
        if (action == null) {
1✔
776
          routesData.add(new RouteData(route.routeMatch(), null, ImmutableList.of()));
1✔
777
        } else if (action.cluster() != null) {
1✔
778
          prefixedName = prefixedClusterName(action.cluster());
1✔
779
          clusters.add(prefixedName);
1✔
780
          clusterNameMap.put(prefixedName, action.cluster());
1✔
781
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
782
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
783
        } else if (action.weightedClusters() != null) {
1✔
784
          ImmutableList.Builder<ClientInterceptor> filterList = ImmutableList.builder();
1✔
785
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
786
            prefixedName = prefixedClusterName(weightedCluster.name());
1✔
787
            clusters.add(prefixedName);
1✔
788
            clusterNameMap.put(prefixedName, weightedCluster.name());
1✔
789
            filterList.add(createFilters(filterConfigs, virtualHost, route, weightedCluster));
1✔
790
          }
1✔
791
          routesData.add(
1✔
792
              new RouteData(route.routeMatch(), route.routeAction(), filterList.build()));
1✔
793
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
794
          PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
795
          if (pluginConfig instanceof RlsPluginConfig) {
1✔
796
            prefixedName = prefixedClusterSpecifierPluginName(
1✔
797
                action.namedClusterSpecifierPluginConfig().name());
1✔
798
            clusters.add(prefixedName);
1✔
799
            rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
800
          }
801
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
802
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
803
        } else {
804
          // Discard route
805
        }
806
      }
1✔
807

808
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
809
      boolean shouldUpdateResult = existingClusters == null;
1✔
810
      Set<String> addedClusters =
811
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
812
      Set<String> deletedClusters =
813
          existingClusters == null
1✔
814
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
815
      existingClusters = clusters;
1✔
816
      for (String cluster : addedClusters) {
1✔
817
        if (clusterRefs.containsKey(cluster)) {
1✔
818
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
819
        } else {
820
          if (clusterNameMap.containsKey(cluster)) {
1✔
821
            assert cluster.startsWith("cluster:");
1✔
822
            XdsConfig.Subscription subscription =
1✔
823
                xdsDependencyManager.subscribeToCluster(cluster.substring("cluster:".length()));
1✔
824
            clusterRefs.put(
1✔
825
                cluster,
826
                ClusterRefState.forCluster(
1✔
827
                    new AtomicInteger(1), clusterNameMap.get(cluster), subscription));
1✔
828
          }
829
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
830
            clusterRefs.put(
1✔
831
                cluster,
832
                ClusterRefState.forRlsPlugin(
1✔
833
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
834
          }
835
          shouldUpdateResult = true;
1✔
836
        }
837
      }
1✔
838
      for (String cluster : clusters) {
1✔
839
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
840
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
841
          ClusterRefState newClusterRefState =
1✔
842
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
843
          clusterRefs.put(cluster, newClusterRefState);
1✔
844
          shouldUpdateResult = true;
1✔
845
        }
846
      }
1✔
847
      // Update service config to include newly added clusters.
848
      if (shouldUpdateResult && routingConfig != null) {
1✔
849
        updateResolutionResult(xdsConfig);
1✔
850
        shouldUpdateResult = false;
1✔
851
      } else {
852
        // Need to update at least once
853
        shouldUpdateResult = true;
1✔
854
      }
855
      // Make newly added clusters selectable by config selector and deleted clusters no longer
856
      // selectable.
857
      routingConfig = new RoutingConfig(xdsConfig, httpMaxStreamDurationNano, routesData.build());
1✔
858
      for (String cluster : deletedClusters) {
1✔
859
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
860
        if (count == 0) {
1✔
861
          clusterRefs.remove(cluster).close();
1✔
862
          shouldUpdateResult = true;
1✔
863
        }
864
      }
1✔
865
      if (shouldUpdateResult) {
1✔
866
        updateResolutionResult(xdsConfig);
1✔
867
      }
868
    }
1✔
869

870
    private ClientInterceptor createFilters(
871
        @Nullable List<NamedFilterConfig> filterConfigs,
872
        VirtualHost virtualHost,
873
        Route route,
874
        @Nullable ClusterWeight weightedCluster) {
875
      if (filterConfigs == null) {
1✔
876
        return new PassthroughClientInterceptor();
1✔
877
      }
878

879
      Map<String, FilterConfig> selectedOverrideConfigs =
1✔
880
          new HashMap<>(virtualHost.filterConfigOverrides());
1✔
881
      selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
882
      if (weightedCluster != null) {
1✔
883
        selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
884
      }
885

886
      ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
1✔
887
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
888
        String name = namedFilter.name;
1✔
889
        FilterConfig config = namedFilter.filterConfig;
1✔
890
        FilterConfig overrideConfig = selectedOverrideConfigs.get(name);
1✔
891
        String filterKey = namedFilter.filterStateKey();
1✔
892

893
        Filter filter = activeFilters.get(filterKey);
1✔
894
        checkNotNull(filter, "activeFilters.get(%s)", filterKey);
1✔
895
        ClientInterceptor interceptor =
1✔
896
            filter.buildClientInterceptor(config, overrideConfig, scheduler);
1✔
897

898
        if (interceptor != null) {
1✔
899
          filterInterceptors.add(interceptor);
1✔
900
        }
901
      }
1✔
902

903
      // Combine interceptors produced by different filters into a single one that executes
904
      // them sequentially. The order is preserved.
905
      return combineInterceptors(filterInterceptors.build());
1✔
906
    }
907

908
    private void cleanUpRoutes(Status error) {
909
      routingConfig = new RoutingConfig(error);
1✔
910
      if (existingClusters != null) {
1✔
911
        for (String cluster : existingClusters) {
1✔
912
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
913
          if (count == 0) {
1✔
914
            clusterRefs.remove(cluster).close();
1✔
915
          }
916
        }
1✔
917
        existingClusters = null;
1✔
918
      }
919

920
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
921
      // the config selector handles the error message itself.
922
      listener.onResult2(ResolutionResult.newBuilder()
1✔
923
          .setAttributes(Attributes.newBuilder()
1✔
924
            .set(InternalConfigSelector.KEY, configSelector)
1✔
925
            .build())
1✔
926
          .setServiceConfig(emptyServiceConfig)
1✔
927
          .build());
1✔
928
    }
1✔
929
  }
930

931
  /**
932
   * VirtualHost-level configuration for request routing.
933
   */
934
  private static class RoutingConfig {
935
    final XdsConfig xdsConfig;
936
    final long fallbackTimeoutNano;
937
    final ImmutableList<RouteData> routes;
938
    final Status errorStatus;
939

940
    private RoutingConfig(
941
        XdsConfig xdsConfig, long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
1✔
942
      this.xdsConfig = checkNotNull(xdsConfig, "xdsConfig");
1✔
943
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
944
      this.routes = checkNotNull(routes, "routes");
1✔
945
      this.errorStatus = null;
1✔
946
    }
1✔
947

948
    private RoutingConfig(Status errorStatus) {
1✔
949
      this.xdsConfig = null;
1✔
950
      this.fallbackTimeoutNano = 0;
1✔
951
      this.routes = null;
1✔
952
      this.errorStatus = checkNotNull(errorStatus, "errorStatus");
1✔
953
      checkArgument(!errorStatus.isOk(), "errorStatus should not be okay");
1✔
954
    }
1✔
955
  }
956

957
  static final class RouteData {
958
    final RouteMatch routeMatch;
959
    /** null implies non-forwarding action. */
960
    @Nullable
961
    final RouteAction routeAction;
962
    /**
963
     * Only one of these interceptors should be used per-RPC. There are only multiple values in the
964
     * list for weighted clusters, in which case the order of the list mirrors the weighted
965
     * clusters.
966
     */
967
    final ImmutableList<ClientInterceptor> filterChoices;
968

969
    RouteData(RouteMatch routeMatch, @Nullable RouteAction routeAction, ClientInterceptor filter) {
970
      this(routeMatch, routeAction, ImmutableList.of(filter));
1✔
971
    }
1✔
972

973
    RouteData(
974
        RouteMatch routeMatch,
975
        @Nullable RouteAction routeAction,
976
        ImmutableList<ClientInterceptor> filterChoices) {
1✔
977
      this.routeMatch = checkNotNull(routeMatch, "routeMatch");
1✔
978
      checkArgument(
1✔
979
          routeAction == null || !filterChoices.isEmpty(),
1✔
980
          "filter may be empty only for non-forwarding action");
981
      this.routeAction = routeAction;
1✔
982
      if (routeAction != null && routeAction.weightedClusters() != null) {
1✔
983
        checkArgument(
1✔
984
            routeAction.weightedClusters().size() == filterChoices.size(),
1✔
985
            "filter choices must match size of weighted clusters");
986
      }
987
      for (ClientInterceptor filter : filterChoices) {
1✔
988
        checkNotNull(filter, "entry in filterChoices is null");
1✔
989
      }
1✔
990
      this.filterChoices = checkNotNull(filterChoices, "filterChoices");
1✔
991
    }
1✔
992
  }
993

994
  private static class ClusterRefState {
995
    final AtomicInteger refCount;
996
    @Nullable
997
    final String traditionalCluster;
998
    @Nullable
999
    final RlsPluginConfig rlsPluginConfig;
1000
    @Nullable
1001
    final XdsConfig.Subscription subscription;
1002

1003
    private ClusterRefState(
1004
        AtomicInteger refCount, @Nullable String traditionalCluster,
1005
        @Nullable RlsPluginConfig rlsPluginConfig, @Nullable XdsConfig.Subscription subscription) {
1✔
1006
      this.refCount = refCount;
1✔
1007
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
1008
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
1009
      this.traditionalCluster = traditionalCluster;
1✔
1010
      this.rlsPluginConfig = rlsPluginConfig;
1✔
1011
      this.subscription = subscription;
1✔
1012
    }
1✔
1013

1014
    private Map<String, ?> toLbPolicy() {
1015
      if (traditionalCluster != null) {
1✔
1016
        return ImmutableMap.of(
1✔
1017
            XdsLbPolicies.CDS_POLICY_NAME,
1018
            ImmutableMap.of("cluster", traditionalCluster));
1✔
1019
      } else {
1020
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
1021
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
1022
            .put(
1✔
1023
                "childPolicy",
1024
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of(
1✔
1025
                    "is_dynamic", true))))
1✔
1026
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
1027
            .buildOrThrow();
1✔
1028
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
1029
      }
1030
    }
1031

1032
    private void close() {
1033
      if (subscription != null) {
1✔
1034
        subscription.close();
1✔
1035
      }
1036
    }
1✔
1037

1038
    static ClusterRefState forCluster(
1039
        AtomicInteger refCount, String name, XdsConfig.Subscription subscription) {
1040
      return new ClusterRefState(refCount, name, null, checkNotNull(subscription, "subscription"));
1✔
1041
    }
1042

1043
    static ClusterRefState forRlsPlugin(
1044
        AtomicInteger refCount,
1045
        RlsPluginConfig rlsPluginConfig) {
1046
      return new ClusterRefState(refCount, null, rlsPluginConfig, null);
1✔
1047
    }
1048
  }
1049

1050
  /** An ObjectPool, except it can throw an exception. */
1051
  private interface XdsClientPool {
1052
    XdsClient getObject() throws XdsInitializationException;
1053

1054
    XdsClient returnObject(XdsClient xdsClient);
1055
  }
1056

1057
  private static final class BootstrappingXdsClientPool implements XdsClientPool {
1058
    private final XdsClientPoolFactory xdsClientPoolFactory;
1059
    private final String target;
1060
    private final @Nullable Map<String, ?> bootstrapOverride;
1061
    private final @Nullable MetricRecorder metricRecorder;
1062
    private ObjectPool<XdsClient> xdsClientPool;
1063

1064
    BootstrappingXdsClientPool(
1065
        XdsClientPoolFactory xdsClientPoolFactory,
1066
        String target,
1067
        @Nullable Map<String, ?> bootstrapOverride,
1068
        @Nullable MetricRecorder metricRecorder) {
1✔
1069
      this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
1070
      this.target = checkNotNull(target, "target");
1✔
1071
      this.bootstrapOverride = bootstrapOverride;
1✔
1072
      this.metricRecorder = metricRecorder;
1✔
1073
    }
1✔
1074

1075
    @Override
1076
    public XdsClient getObject() throws XdsInitializationException {
1077
      if (xdsClientPool == null) {
1✔
1078
        BootstrapInfo bootstrapInfo;
1079
        if (bootstrapOverride == null) {
1✔
1080
          bootstrapInfo = GrpcBootstrapperImpl.defaultBootstrap();
×
1081
        } else {
1082
          bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride);
1✔
1083
        }
1084
        this.xdsClientPool =
1✔
1085
            xdsClientPoolFactory.getOrCreate(target, bootstrapInfo, metricRecorder);
1✔
1086
      }
1087
      return xdsClientPool.getObject();
1✔
1088
    }
1089

1090
    @Override
1091
    public XdsClient returnObject(XdsClient xdsClient) {
1092
      return xdsClientPool.returnObject(xdsClient);
1✔
1093
    }
1094
  }
1095

1096
  private static final class SupplierXdsClientPool implements XdsClientPool {
1097
    private final Supplier<XdsClient> xdsClientSupplier;
1098

1099
    SupplierXdsClientPool(Supplier<XdsClient> xdsClientSupplier) {
×
1100
      this.xdsClientSupplier = checkNotNull(xdsClientSupplier, "xdsClientSupplier");
×
1101
    }
×
1102

1103
    @Override
1104
    public XdsClient getObject() throws XdsInitializationException {
1105
      XdsClient xdsClient = xdsClientSupplier.get();
×
1106
      if (xdsClient == null) {
×
1107
        throw new XdsInitializationException("Caller failed to initialize XDS_CLIENT_SUPPLIER");
×
1108
      }
1109
      return xdsClient;
×
1110
    }
1111

1112
    @Override
1113
    public XdsClient returnObject(XdsClient xdsClient) {
1114
      return null;
×
1115
    }
1116
  }
1117
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc