• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20033

29 Oct 2025 04:43PM UTC coverage: 88.533% (-0.03%) from 88.561%
#20033

push

github

web-flow
xds,googleapis: Allow wrapping NameResolver to inject XdsClient (#12450)

Since there is no longer a single global XdsClient, it makes more sense
to allow things like the c2p name resolver to inject its own bootstrap
even if there is one defined in an environment variable.
GoogleCloudToProdNameResolver can now pass an XdsClient instance to
XdsNameResolver, and SharedXdsClientPoolProvider allows
GoogleCloudToProdNameResolver to choose the bootstrap for that one
specific target.

Since XdsNameResolver is no longer in control of the XdsClient pool the
XdsClient instance is now passed to ClusterImplLb. A channel will now
only access the global XdsClient pool exactly once: in the name
resolver.

BootstrapInfo is purposefully being shared across channels, as we really
want to share things like credentials which can have significant memory
use and may have caches which reduce I/O when shared. That is why
SharedXdsClientPoolProvider receives BootstrapInfo instead of
Map<String,?>.

Verifying BootstrapInfo.server() is not empty was moved from
SharedXdsClientPoolProvider to GrpcBootstrapperImpl so avoid
getOrCreate() throwing an exception in only that one case. It might make
sense to move that to BootstrapperImpl, but that will need more
investigation.

A lot of tests needed updating because XdsClientPoolProvider is no
longer responsible for parsing the bootstrap, so we now need bootstraps
even if XdsClientPoolProvider will ignore it.

This also fixes a bug in GoogleCloudToProdNameResolver where it would
initialize the delegate even when it failed to create the bootstrap.
That would certainly cause all RPCs on the channel to fail because of
the missing bootstrap and it defeated the point of `succeeded == false`
and `refresh()` which was supposed to retry contacting the metadata
server.

The server tests were enhanced to give a useful error when
server.start() throws an exception, as otherwise the real error is lost.

b/442819521

34966 of 39495 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.83
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.MetricRecorder;
45
import io.grpc.NameResolver;
46
import io.grpc.Status;
47
import io.grpc.Status.Code;
48
import io.grpc.StatusOr;
49
import io.grpc.SynchronizationContext;
50
import io.grpc.internal.GrpcUtil;
51
import io.grpc.internal.ObjectPool;
52
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
53
import io.grpc.xds.Filter.FilterConfig;
54
import io.grpc.xds.Filter.NamedFilterConfig;
55
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
56
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
57
import io.grpc.xds.VirtualHost.Route;
58
import io.grpc.xds.VirtualHost.Route.RouteAction;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
61
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
62
import io.grpc.xds.VirtualHost.Route.RouteMatch;
63
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
64
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
65
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
66
import io.grpc.xds.client.XdsClient;
67
import io.grpc.xds.client.XdsInitializationException;
68
import io.grpc.xds.client.XdsLogger;
69
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
70
import java.net.URI;
71
import java.util.ArrayList;
72
import java.util.Collections;
73
import java.util.HashMap;
74
import java.util.HashSet;
75
import java.util.List;
76
import java.util.Locale;
77
import java.util.Map;
78
import java.util.Objects;
79
import java.util.Set;
80
import java.util.concurrent.ConcurrentHashMap;
81
import java.util.concurrent.ConcurrentMap;
82
import java.util.concurrent.ScheduledExecutorService;
83
import java.util.concurrent.atomic.AtomicInteger;
84
import java.util.function.Supplier;
85
import javax.annotation.Nullable;
86

87
/**
88
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
89
 *
90
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
91
 * protocol to retrieve service information and produce a service config to the caller.
92
 *
93
 * @see XdsNameResolverProvider
94
 */
95
final class XdsNameResolver extends NameResolver {
96
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
97
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
98
  static final CallOptions.Key<XdsConfig> XDS_CONFIG_CALL_OPTION_KEY =
1✔
99
      CallOptions.Key.create("io.grpc.xds.XDS_CONFIG_CALL_OPTION_KEY");
1✔
100
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
101
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
102
  static final CallOptions.Key<Boolean> AUTO_HOST_REWRITE_KEY =
1✔
103
      CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY");
1✔
104
  @VisibleForTesting
105
  static boolean enableTimeout =
1✔
106
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
107
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
108

109
  private final InternalLogId logId;
110
  private final XdsLogger logger;
111
  @Nullable
112
  private final String targetAuthority;
113
  private final String target;
114
  private final String serviceAuthority;
115
  // Encoded version of the service authority as per 
116
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
117
  private final String encodedServiceAuthority;
118
  private final String overrideAuthority;
119
  private final ServiceConfigParser serviceConfigParser;
120
  private final SynchronizationContext syncContext;
121
  private final ScheduledExecutorService scheduler;
122
  private final XdsClientPool xdsClientPool;
123
  private final ThreadSafeRandom random;
124
  private final FilterRegistry filterRegistry;
125
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
126
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
127
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
128
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
129
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
130
  private final long randomChannelId;
131
  private final Args nameResolverArgs;
132
  // Must be accessed in syncContext.
133
  // Filter instances are unique per channel, and per filter (name+typeUrl).
134
  // NamedFilterConfig.filterStateKey -> filter_instance.
135
  private final HashMap<String, Filter> activeFilters = new HashMap<>();
1✔
136

137
  private volatile RoutingConfig routingConfig;
138
  private Listener2 listener;
139
  private XdsClient xdsClient;
140
  private CallCounterProvider callCounterProvider;
141
  private ResolveState resolveState;
142

143
  XdsNameResolver(
144
      URI targetUri, String name, @Nullable String overrideAuthority,
145
      ServiceConfigParser serviceConfigParser,
146
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
147
      @Nullable Map<String, ?> bootstrapOverride,
148
      MetricRecorder metricRecorder, Args nameResolverArgs) {
149
    this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
1✔
150
        syncContext, scheduler,
151
        bootstrapOverride == null
1✔
152
          ? SharedXdsClientPoolProvider.getDefaultProvider()
1✔
153
          : new SharedXdsClientPoolProvider(),
1✔
154
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
1✔
155
        metricRecorder, nameResolverArgs);
156
  }
1✔
157

158
  @VisibleForTesting
159
  XdsNameResolver(
160
      URI targetUri, @Nullable String targetAuthority, String name,
161
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
162
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
163
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
164
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
165
      MetricRecorder metricRecorder, Args nameResolverArgs) {
1✔
166
    this.targetAuthority = targetAuthority;
1✔
167
    target = targetUri.toString();
1✔
168

169
    // The name might have multiple slashes so encode it before verifying.
170
    serviceAuthority = checkNotNull(name, "name");
1✔
171
    this.encodedServiceAuthority = 
1✔
172
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
173

174
    this.overrideAuthority = overrideAuthority;
1✔
175
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
176
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
177
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
178
    Supplier<XdsClient> xdsClientSupplierArg =
1✔
179
        nameResolverArgs.getArg(XdsNameResolverProvider.XDS_CLIENT_SUPPLIER);
1✔
180
    if (xdsClientSupplierArg != null) {
1✔
181
      this.xdsClientPool = new SupplierXdsClientPool(xdsClientSupplierArg);
×
182
    } else {
183
      checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
184
      this.xdsClientPool = new BootstrappingXdsClientPool(
1✔
185
          xdsClientPoolFactory, target, bootstrapOverride, metricRecorder);
186
    }
187
    this.random = checkNotNull(random, "random");
1✔
188
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
189
    this.nameResolverArgs = checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
190

191
    randomChannelId = random.nextLong();
1✔
192
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
193
    logger = XdsLogger.withLogId(logId);
1✔
194
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
195
  }
1✔
196

197
  @Override
198
  public String getServiceAuthority() {
199
    return encodedServiceAuthority;
1✔
200
  }
201

202
  @Override
203
  public void start(Listener2 listener) {
204
    this.listener = checkNotNull(listener, "listener");
1✔
205
    try {
206
      xdsClient = xdsClientPool.getObject();
1✔
207
    } catch (Exception e) {
1✔
208
      listener.onError(
1✔
209
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
210
      return;
1✔
211
    }
1✔
212
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
213
    String listenerNameTemplate;
214
    if (targetAuthority == null) {
1✔
215
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
216
    } else {
217
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
218
      if (authorityInfo == null) {
1✔
219
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
220
            "invalid target URI: target authority not found in the bootstrap"));
221
        return;
1✔
222
      }
223
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
224
    }
225
    String replacement = serviceAuthority;
1✔
226
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
227
      replacement = XdsClient.percentEncodePath(replacement);
1✔
228
    }
229
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
230
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
231
        ) {
232
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
233
          "invalid listener resource URI for service authority: " + serviceAuthority));
234
      return;
×
235
    }
236
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
237
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
238

239
    resolveState = new ResolveState(ldsResourceName);
1✔
240
    resolveState.start();
1✔
241
  }
1✔
242

243
  @Override
244
  public void refresh() {
245
    if (resolveState != null) {
×
246
      resolveState.refresh();
×
247
    }
248
  }
×
249

250
  private static String expandPercentS(String template, String replacement) {
251
    return template.replace("%s", replacement);
1✔
252
  }
253

254
  @Override
255
  public void shutdown() {
256
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
257
    if (resolveState != null) {
1✔
258
      resolveState.shutdown();
1✔
259
    }
260
    if (xdsClient != null) {
1✔
261
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
262
    }
263
  }
1✔
264

265
  @VisibleForTesting
266
  static Map<String, ?> generateServiceConfigWithMethodConfig(
267
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
268
    if (timeoutNano == null
1✔
269
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
270
      return Collections.emptyMap();
1✔
271
    }
272
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
273
    methodConfig.put(
1✔
274
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
275
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
276
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
277
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
278
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
279
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
280
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
281
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
282
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
283
        codes.add(code.name());
1✔
284
      }
1✔
285
      rawRetryPolicy.put(
1✔
286
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
287
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
288
        rawRetryPolicy.put(
×
289
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
290
      }
291
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
292
    }
293
    if (timeoutNano != null) {
1✔
294
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
295
      methodConfig.put("timeout", timeout);
1✔
296
    }
297
    return Collections.singletonMap(
1✔
298
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
299
  }
300

301
  @VisibleForTesting
302
  XdsClient getXdsClient() {
303
    return xdsClient;
1✔
304
  }
305

306
  // called in syncContext
307
  private void updateResolutionResult(XdsConfig xdsConfig) {
308
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
309

310
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
311
    for (String name : clusterRefs.keySet()) {
1✔
312
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
313
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
314
    }
1✔
315
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
316
        "loadBalancingConfig",
317
        ImmutableList.of(ImmutableMap.of(
1✔
318
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
319
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
320

321
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
322
      logger.log(
×
323
          XdsLogLevel.INFO, "Generated service config: {0}", new Gson().toJson(rawServiceConfig));
×
324
    }
325
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
326
    Attributes attrs =
327
        Attributes.newBuilder()
1✔
328
            .set(XdsAttributes.XDS_CLIENT, xdsClient)
1✔
329
            .set(XdsAttributes.XDS_CONFIG, xdsConfig)
1✔
330
            .set(XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, resolveState.xdsDependencyManager)
1✔
331
            .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
332
            .set(InternalConfigSelector.KEY, configSelector)
1✔
333
            .build();
1✔
334
    ResolutionResult result =
335
        ResolutionResult.newBuilder()
1✔
336
            .setAttributes(attrs)
1✔
337
            .setServiceConfig(parsedServiceConfig)
1✔
338
            .build();
1✔
339
    if (!listener.onResult2(result).isOk()) {
1✔
340
      resolveState.xdsDependencyManager.requestReresolution();
×
341
    }
342
  }
1✔
343

344
  /**
345
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
346
   * case-insensitive.
347
   *
348
   * <p>Wildcard pattern rules:
349
   * <ol>
350
   * <li>A single asterisk (*) matches any domain.</li>
351
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
352
   *     but not both.</li>
353
   * </ol>
354
   */
355
  @VisibleForTesting
356
  static boolean matchHostName(String hostName, String pattern) {
357
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
358
        "Invalid host name");
359
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
360
        "Invalid pattern/domain name");
361

362
    hostName = hostName.toLowerCase(Locale.US);
1✔
363
    pattern = pattern.toLowerCase(Locale.US);
1✔
364
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
365

366
    if (!pattern.contains("*")) {
1✔
367
      // Not a wildcard pattern -- hostName and pattern must match exactly.
368
      return hostName.equals(pattern);
1✔
369
    }
370
    // Wildcard pattern
371

372
    if (pattern.length() == 1) {
1✔
373
      return true;
×
374
    }
375

376
    int index = pattern.indexOf('*');
1✔
377

378
    // At most one asterisk (*) is allowed.
379
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
380
      return false;
×
381
    }
382

383
    // Asterisk can only match prefix or suffix.
384
    if (index != 0 && index != pattern.length() - 1) {
1✔
385
      return false;
×
386
    }
387

388
    // HostName must be at least as long as the pattern because asterisk has to
389
    // match one or more characters.
390
    if (hostName.length() < pattern.length()) {
1✔
391
      return false;
1✔
392
    }
393

394
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
395
      // Prefix matching fails.
396
      return true;
1✔
397
    }
398

399
    // Pattern matches hostname if suffix matching succeeds.
400
    return index == pattern.length() - 1
1✔
401
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
402
  }
403

404
  private final class ConfigSelector extends InternalConfigSelector {
1✔
405
    @Override
406
    public Result selectConfig(PickSubchannelArgs args) {
407
      RoutingConfig routingCfg;
408
      RouteData selectedRoute;
409
      String cluster;
410
      ClientInterceptor filters;
411
      Metadata headers = args.getHeaders();
1✔
412
      String path = "/" + args.getMethodDescriptor().getFullMethodName();
1✔
413
      do {
414
        routingCfg = routingConfig;
1✔
415
        if (routingCfg.errorStatus != null) {
1✔
416
          return Result.forError(routingCfg.errorStatus);
1✔
417
        }
418
        selectedRoute = null;
1✔
419
        for (RouteData route : routingCfg.routes) {
1✔
420
          if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
1✔
421
            selectedRoute = route;
1✔
422
            break;
1✔
423
          }
424
        }
1✔
425
        if (selectedRoute == null) {
1✔
426
          return Result.forError(
1✔
427
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
428
        }
429
        if (selectedRoute.routeAction == null) {
1✔
430
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
431
              "Could not route RPC to Route with non-forwarding action"));
432
        }
433
        RouteAction action = selectedRoute.routeAction;
1✔
434
        if (action.cluster() != null) {
1✔
435
          cluster = prefixedClusterName(action.cluster());
1✔
436
          filters = selectedRoute.filterChoices.get(0);
1✔
437
        } else if (action.weightedClusters() != null) {
1✔
438
          // XdsRouteConfigureResource verifies the total weight will not be 0 or exceed uint32
439
          long totalWeight = 0;
1✔
440
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
441
            totalWeight += weightedCluster.weight();
1✔
442
          }
1✔
443
          long select = random.nextLong(totalWeight);
1✔
444
          long accumulator = 0;
1✔
445
          for (int i = 0; ; i++) {
1✔
446
            ClusterWeight weightedCluster = action.weightedClusters().get(i);
1✔
447
            accumulator += weightedCluster.weight();
1✔
448
            if (select < accumulator) {
1✔
449
              cluster = prefixedClusterName(weightedCluster.name());
1✔
450
              filters = selectedRoute.filterChoices.get(i);
1✔
451
              break;
1✔
452
            }
453
          }
454
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
455
          cluster =
1✔
456
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
457
          filters = selectedRoute.filterChoices.get(0);
1✔
458
        } else {
459
          // updateRoutes() discards routes with unknown actions
460
          throw new AssertionError();
×
461
        }
462
      } while (!retainCluster(cluster));
1✔
463

464
      final RouteAction routeAction = selectedRoute.routeAction;
1✔
465
      Long timeoutNanos = null;
1✔
466
      if (enableTimeout) {
1✔
467
        timeoutNanos = routeAction.timeoutNano();
1✔
468
        if (timeoutNanos == null) {
1✔
469
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
470
        }
471
        if (timeoutNanos <= 0) {
1✔
472
          timeoutNanos = null;
1✔
473
        }
474
      }
475
      RetryPolicy retryPolicy = routeAction.retryPolicy();
1✔
476
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
477
      Map<String, ?> rawServiceConfig =
1✔
478
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
479
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
480
      Object config = parsedServiceConfig.getConfig();
1✔
481
      if (config == null) {
1✔
482
        releaseCluster(cluster);
×
483
        return Result.forError(
×
484
            parsedServiceConfig.getError().augmentDescription(
×
485
                "Failed to parse service config (method config)"));
486
      }
487
      final String finalCluster = cluster;
1✔
488
      final XdsConfig xdsConfig = routingCfg.xdsConfig;
1✔
489
      final long hash = generateHash(routeAction.hashPolicies(), headers);
1✔
490
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
491
        @Override
492
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
493
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
494
            final Channel next) {
495
          CallOptions callOptionsForCluster =
1✔
496
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
497
                  .withOption(XDS_CONFIG_CALL_OPTION_KEY, xdsConfig)
1✔
498
                  .withOption(RPC_HASH_KEY, hash);
1✔
499
          if (routeAction.autoHostRewrite()) {
1✔
500
            callOptionsForCluster = callOptionsForCluster.withOption(AUTO_HOST_REWRITE_KEY, true);
1✔
501
          }
502
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
503
              next.newCall(method, callOptionsForCluster)) {
1✔
504
            @Override
505
            public void start(Listener<RespT> listener, Metadata headers) {
506
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
507
                boolean committed;
508

509
                @Override
510
                public void onHeaders(Metadata headers) {
511
                  committed = true;
1✔
512
                  releaseCluster(finalCluster);
1✔
513
                  delegate().onHeaders(headers);
1✔
514
                }
1✔
515

516
                @Override
517
                public void onClose(Status status, Metadata trailers) {
518
                  if (!committed) {
1✔
519
                    releaseCluster(finalCluster);
1✔
520
                  }
521
                  delegate().onClose(status, trailers);
1✔
522
                }
1✔
523
              };
524
              delegate().start(listener, headers);
1✔
525
            }
1✔
526
          };
527
        }
528
      }
529

530
      return
1✔
531
          Result.newBuilder()
1✔
532
              .setConfig(config)
1✔
533
              .setInterceptor(combineInterceptors(
1✔
534
                  ImmutableList.of(new ClusterSelectionInterceptor(), filters)))
1✔
535
              .build();
1✔
536
    }
537

538
    private boolean retainCluster(String cluster) {
539
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
540
      if (clusterRefState == null) {
1✔
541
        return false;
×
542
      }
543
      AtomicInteger refCount = clusterRefState.refCount;
1✔
544
      int count;
545
      do {
546
        count = refCount.get();
1✔
547
        if (count == 0) {
1✔
548
          return false;
×
549
        }
550
      } while (!refCount.compareAndSet(count, count + 1));
1✔
551
      return true;
1✔
552
    }
553

554
    private void releaseCluster(final String cluster) {
555
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
556
      if (count < 0) {
1✔
557
        throw new AssertionError();
×
558
      }
559
      if (count == 0) {
1✔
560
        syncContext.execute(new Runnable() {
1✔
561
          @Override
562
          public void run() {
563
            if (clusterRefs.get(cluster).refCount.get() != 0) {
1✔
564
              throw new AssertionError();
×
565
            }
566
            clusterRefs.remove(cluster).close();
1✔
567
            if (resolveState.lastConfigOrStatus.hasValue()) {
1✔
568
              updateResolutionResult(resolveState.lastConfigOrStatus.getValue());
1✔
569
            } else {
570
              resolveState.cleanUpRoutes(resolveState.lastConfigOrStatus.getStatus());
×
571
            }
572
          }
1✔
573
        });
574
      }
575
    }
1✔
576

577
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
578
      Long hash = null;
1✔
579
      for (HashPolicy policy : hashPolicies) {
1✔
580
        Long newHash = null;
1✔
581
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
582
          String value = getHeaderValue(headers, policy.headerName());
1✔
583
          if (value != null) {
1✔
584
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
585
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
586
            }
587
            newHash = hashFunc.hashAsciiString(value);
1✔
588
          }
589
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
590
          newHash = hashFunc.hashLong(randomChannelId);
1✔
591
        }
592
        if (newHash != null ) {
1✔
593
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
594
          // and preserves all of the entropy.
595
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
596
          hash = oldHash ^ newHash;
1✔
597
        }
598
        // If the policy is a terminal policy and a hash has been generated, ignore
599
        // the rest of the hash policies.
600
        if (policy.isTerminal() && hash != null) {
1✔
601
          break;
×
602
        }
603
      }
1✔
604
      return hash == null ? random.nextLong() : hash;
1✔
605
    }
606
  }
607

608
  static final class PassthroughClientInterceptor implements ClientInterceptor {
1✔
609
    @Override
610
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
611
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
612
      return next.newCall(method, callOptions);
1✔
613
    }
614
  }
615

616
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
617
    if (interceptors.size() == 0) {
1✔
618
      return new PassthroughClientInterceptor();
1✔
619
    }
620
    if (interceptors.size() == 1) {
1✔
621
      return interceptors.get(0);
1✔
622
    }
623
    return new ClientInterceptor() {
1✔
624
      @Override
625
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
626
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
627
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
628
        return next.newCall(method, callOptions);
1✔
629
      }
630
    };
631
  }
632

633
  @Nullable
634
  private static String getHeaderValue(Metadata headers, String headerName) {
635
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
636
      return null;
×
637
    }
638
    if (headerName.equals("content-type")) {
1✔
639
      return "application/grpc";
×
640
    }
641
    Metadata.Key<String> key;
642
    try {
643
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
644
    } catch (IllegalArgumentException e) {
×
645
      return null;
×
646
    }
1✔
647
    Iterable<String> values = headers.getAll(key);
1✔
648
    return values == null ? null : Joiner.on(",").join(values);
1✔
649
  }
650

651
  private static String prefixedClusterName(String name) {
652
    return "cluster:" + name;
1✔
653
  }
654

655
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
656
    return "cluster_specifier_plugin:" + pluginName;
1✔
657
  }
658

659
  class ResolveState implements XdsDependencyManager.XdsConfigWatcher {
1✔
660
    private final ConfigOrError emptyServiceConfig =
1✔
661
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
662
    private final String authority;
663
    private final XdsDependencyManager xdsDependencyManager;
664
    private boolean stopped;
665
    @Nullable
666
    private Set<String> existingClusters;  // clusters to which new requests can be routed
667
    private StatusOr<XdsConfig> lastConfigOrStatus;
668

669
    private ResolveState(String ldsResourceName) {
1✔
670
      authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
671
      xdsDependencyManager =
1✔
672
          new XdsDependencyManager(xdsClient, syncContext, authority, ldsResourceName,
1✔
673
              nameResolverArgs);
1✔
674
    }
1✔
675

676
    void start() {
677
      xdsDependencyManager.start(this);
1✔
678
    }
1✔
679

680
    void refresh() {
681
      xdsDependencyManager.requestReresolution();
×
682
    }
×
683

684
    private void shutdown() {
685
      if (stopped) {
1✔
686
        return;
×
687
      }
688

689
      stopped = true;
1✔
690
      xdsDependencyManager.shutdown();
1✔
691
      updateActiveFilters(null);
1✔
692
    }
1✔
693

694
    @Override
695
    public void onUpdate(StatusOr<XdsConfig> updateOrStatus) {
696
      if (stopped) {
1✔
697
        return;
×
698
      }
699
      logger.log(XdsLogLevel.INFO, "Receive XDS resource update: {0}", updateOrStatus);
1✔
700

701
      lastConfigOrStatus = updateOrStatus;
1✔
702
      if (!updateOrStatus.hasValue()) {
1✔
703
        updateActiveFilters(null);
1✔
704
        cleanUpRoutes(updateOrStatus.getStatus());
1✔
705
        return;
1✔
706
      }
707

708
      // Process Route
709
      XdsConfig update = updateOrStatus.getValue();
1✔
710
      HttpConnectionManager httpConnectionManager = update.getListener().httpConnectionManager();
1✔
711
      if (httpConnectionManager == null) {
1✔
712
        logger.log(XdsLogLevel.INFO, "API Listener: httpConnectionManager does not exist.");
×
713
        updateActiveFilters(null);
×
714
        cleanUpRoutes(updateOrStatus.getStatus());
×
715
        return;
×
716
      }
717

718
      VirtualHost virtualHost = update.getVirtualHost();
1✔
719
      ImmutableList<NamedFilterConfig> filterConfigs = httpConnectionManager.httpFilterConfigs();
1✔
720
      long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano();
1✔
721

722
      updateActiveFilters(filterConfigs);
1✔
723
      updateRoutes(update, virtualHost, streamDurationNano, filterConfigs);
1✔
724
    }
1✔
725

726
    // called in syncContext
727
    private void updateActiveFilters(@Nullable List<NamedFilterConfig> filterConfigs) {
728
      if (filterConfigs == null) {
1✔
729
        filterConfigs = ImmutableList.of();
1✔
730
      }
731
      Set<String> filtersToShutdown = new HashSet<>(activeFilters.keySet());
1✔
732
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
733
        String typeUrl = namedFilter.filterConfig.typeUrl();
1✔
734
        String filterKey = namedFilter.filterStateKey();
1✔
735

736
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
737
        checkNotNull(provider, "provider %s", typeUrl);
1✔
738
        Filter filter = activeFilters.computeIfAbsent(
1✔
739
            filterKey, k -> provider.newInstance(namedFilter.name));
1✔
740
        checkNotNull(filter, "filter %s", filterKey);
1✔
741
        filtersToShutdown.remove(filterKey);
1✔
742
      }
1✔
743

744
      // Shutdown filters not present in current HCM.
745
      for (String filterKey : filtersToShutdown) {
1✔
746
        Filter filterToShutdown = activeFilters.remove(filterKey);
1✔
747
        checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
1✔
748
        filterToShutdown.close();
1✔
749
      }
1✔
750
    }
1✔
751

752
    private void updateRoutes(
753
        XdsConfig xdsConfig,
754
        @Nullable VirtualHost virtualHost,
755
        long httpMaxStreamDurationNano,
756
        @Nullable List<NamedFilterConfig> filterConfigs) {
757
      List<Route> routes = virtualHost.routes();
1✔
758
      ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
1✔
759

760
      // Populate all clusters to which requests can be routed to through the virtual host.
761
      Set<String> clusters = new HashSet<>();
1✔
762
      // uniqueName -> clusterName
763
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
764
      // uniqueName -> pluginConfig
765
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
766
      for (Route route : routes) {
1✔
767
        RouteAction action = route.routeAction();
1✔
768
        String prefixedName;
769
        if (action == null) {
1✔
770
          routesData.add(new RouteData(route.routeMatch(), null, ImmutableList.of()));
1✔
771
        } else if (action.cluster() != null) {
1✔
772
          prefixedName = prefixedClusterName(action.cluster());
1✔
773
          clusters.add(prefixedName);
1✔
774
          clusterNameMap.put(prefixedName, action.cluster());
1✔
775
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
776
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
777
        } else if (action.weightedClusters() != null) {
1✔
778
          ImmutableList.Builder<ClientInterceptor> filterList = ImmutableList.builder();
1✔
779
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
780
            prefixedName = prefixedClusterName(weightedCluster.name());
1✔
781
            clusters.add(prefixedName);
1✔
782
            clusterNameMap.put(prefixedName, weightedCluster.name());
1✔
783
            filterList.add(createFilters(filterConfigs, virtualHost, route, weightedCluster));
1✔
784
          }
1✔
785
          routesData.add(
1✔
786
              new RouteData(route.routeMatch(), route.routeAction(), filterList.build()));
1✔
787
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
788
          PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
789
          if (pluginConfig instanceof RlsPluginConfig) {
1✔
790
            prefixedName = prefixedClusterSpecifierPluginName(
1✔
791
                action.namedClusterSpecifierPluginConfig().name());
1✔
792
            clusters.add(prefixedName);
1✔
793
            rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
794
          }
795
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
796
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
797
        } else {
798
          // Discard route
799
        }
800
      }
1✔
801

802
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
803
      boolean shouldUpdateResult = existingClusters == null;
1✔
804
      Set<String> addedClusters =
805
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
806
      Set<String> deletedClusters =
807
          existingClusters == null
1✔
808
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
809
      existingClusters = clusters;
1✔
810
      for (String cluster : addedClusters) {
1✔
811
        if (clusterRefs.containsKey(cluster)) {
1✔
812
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
813
        } else {
814
          if (clusterNameMap.containsKey(cluster)) {
1✔
815
            assert cluster.startsWith("cluster:");
1✔
816
            XdsConfig.Subscription subscription =
1✔
817
                xdsDependencyManager.subscribeToCluster(cluster.substring("cluster:".length()));
1✔
818
            clusterRefs.put(
1✔
819
                cluster,
820
                ClusterRefState.forCluster(
1✔
821
                    new AtomicInteger(1), clusterNameMap.get(cluster), subscription));
1✔
822
          }
823
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
824
            clusterRefs.put(
1✔
825
                cluster,
826
                ClusterRefState.forRlsPlugin(
1✔
827
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
828
          }
829
          shouldUpdateResult = true;
1✔
830
        }
831
      }
1✔
832
      for (String cluster : clusters) {
1✔
833
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
834
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
835
          ClusterRefState newClusterRefState =
1✔
836
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
837
          clusterRefs.put(cluster, newClusterRefState);
1✔
838
          shouldUpdateResult = true;
1✔
839
        }
840
      }
1✔
841
      // Update service config to include newly added clusters.
842
      if (shouldUpdateResult && routingConfig != null) {
1✔
843
        updateResolutionResult(xdsConfig);
1✔
844
        shouldUpdateResult = false;
1✔
845
      } else {
846
        // Need to update at least once
847
        shouldUpdateResult = true;
1✔
848
      }
849
      // Make newly added clusters selectable by config selector and deleted clusters no longer
850
      // selectable.
851
      routingConfig = new RoutingConfig(xdsConfig, httpMaxStreamDurationNano, routesData.build());
1✔
852
      for (String cluster : deletedClusters) {
1✔
853
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
854
        if (count == 0) {
1✔
855
          clusterRefs.remove(cluster).close();
1✔
856
          shouldUpdateResult = true;
1✔
857
        }
858
      }
1✔
859
      if (shouldUpdateResult) {
1✔
860
        updateResolutionResult(xdsConfig);
1✔
861
      }
862
    }
1✔
863

864
    private ClientInterceptor createFilters(
865
        @Nullable List<NamedFilterConfig> filterConfigs,
866
        VirtualHost virtualHost,
867
        Route route,
868
        @Nullable ClusterWeight weightedCluster) {
869
      if (filterConfigs == null) {
1✔
870
        return new PassthroughClientInterceptor();
1✔
871
      }
872

873
      Map<String, FilterConfig> selectedOverrideConfigs =
1✔
874
          new HashMap<>(virtualHost.filterConfigOverrides());
1✔
875
      selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
876
      if (weightedCluster != null) {
1✔
877
        selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
878
      }
879

880
      ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
1✔
881
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
882
        String name = namedFilter.name;
1✔
883
        FilterConfig config = namedFilter.filterConfig;
1✔
884
        FilterConfig overrideConfig = selectedOverrideConfigs.get(name);
1✔
885
        String filterKey = namedFilter.filterStateKey();
1✔
886

887
        Filter filter = activeFilters.get(filterKey);
1✔
888
        checkNotNull(filter, "activeFilters.get(%s)", filterKey);
1✔
889
        ClientInterceptor interceptor =
1✔
890
            filter.buildClientInterceptor(config, overrideConfig, scheduler);
1✔
891

892
        if (interceptor != null) {
1✔
893
          filterInterceptors.add(interceptor);
1✔
894
        }
895
      }
1✔
896

897
      // Combine interceptors produced by different filters into a single one that executes
898
      // them sequentially. The order is preserved.
899
      return combineInterceptors(filterInterceptors.build());
1✔
900
    }
901

902
    private void cleanUpRoutes(Status error) {
903
      routingConfig = new RoutingConfig(error);
1✔
904
      if (existingClusters != null) {
1✔
905
        for (String cluster : existingClusters) {
1✔
906
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
907
          if (count == 0) {
1✔
908
            clusterRefs.remove(cluster).close();
1✔
909
          }
910
        }
1✔
911
        existingClusters = null;
1✔
912
      }
913

914
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
915
      // the config selector handles the error message itself.
916
      listener.onResult2(ResolutionResult.newBuilder()
1✔
917
          .setAttributes(Attributes.newBuilder()
1✔
918
            .set(InternalConfigSelector.KEY, configSelector)
1✔
919
            .build())
1✔
920
          .setServiceConfig(emptyServiceConfig)
1✔
921
          .build());
1✔
922
    }
1✔
923
  }
924

925
  /**
926
   * VirtualHost-level configuration for request routing.
927
   */
928
  private static class RoutingConfig {
929
    final XdsConfig xdsConfig;
930
    final long fallbackTimeoutNano;
931
    final ImmutableList<RouteData> routes;
932
    final Status errorStatus;
933

934
    private RoutingConfig(
935
        XdsConfig xdsConfig, long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
1✔
936
      this.xdsConfig = checkNotNull(xdsConfig, "xdsConfig");
1✔
937
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
938
      this.routes = checkNotNull(routes, "routes");
1✔
939
      this.errorStatus = null;
1✔
940
    }
1✔
941

942
    private RoutingConfig(Status errorStatus) {
1✔
943
      this.xdsConfig = null;
1✔
944
      this.fallbackTimeoutNano = 0;
1✔
945
      this.routes = null;
1✔
946
      this.errorStatus = checkNotNull(errorStatus, "errorStatus");
1✔
947
      checkArgument(!errorStatus.isOk(), "errorStatus should not be okay");
1✔
948
    }
1✔
949
  }
950

951
  static final class RouteData {
952
    final RouteMatch routeMatch;
953
    /** null implies non-forwarding action. */
954
    @Nullable
955
    final RouteAction routeAction;
956
    /**
957
     * Only one of these interceptors should be used per-RPC. There are only multiple values in the
958
     * list for weighted clusters, in which case the order of the list mirrors the weighted
959
     * clusters.
960
     */
961
    final ImmutableList<ClientInterceptor> filterChoices;
962

963
    RouteData(RouteMatch routeMatch, @Nullable RouteAction routeAction, ClientInterceptor filter) {
964
      this(routeMatch, routeAction, ImmutableList.of(filter));
1✔
965
    }
1✔
966

967
    RouteData(
968
        RouteMatch routeMatch,
969
        @Nullable RouteAction routeAction,
970
        ImmutableList<ClientInterceptor> filterChoices) {
1✔
971
      this.routeMatch = checkNotNull(routeMatch, "routeMatch");
1✔
972
      checkArgument(
1✔
973
          routeAction == null || !filterChoices.isEmpty(),
1✔
974
          "filter may be empty only for non-forwarding action");
975
      this.routeAction = routeAction;
1✔
976
      if (routeAction != null && routeAction.weightedClusters() != null) {
1✔
977
        checkArgument(
1✔
978
            routeAction.weightedClusters().size() == filterChoices.size(),
1✔
979
            "filter choices must match size of weighted clusters");
980
      }
981
      for (ClientInterceptor filter : filterChoices) {
1✔
982
        checkNotNull(filter, "entry in filterChoices is null");
1✔
983
      }
1✔
984
      this.filterChoices = checkNotNull(filterChoices, "filterChoices");
1✔
985
    }
1✔
986
  }
987

988
  private static class ClusterRefState {
989
    final AtomicInteger refCount;
990
    @Nullable
991
    final String traditionalCluster;
992
    @Nullable
993
    final RlsPluginConfig rlsPluginConfig;
994
    @Nullable
995
    final XdsConfig.Subscription subscription;
996

997
    private ClusterRefState(
998
        AtomicInteger refCount, @Nullable String traditionalCluster,
999
        @Nullable RlsPluginConfig rlsPluginConfig, @Nullable XdsConfig.Subscription subscription) {
1✔
1000
      this.refCount = refCount;
1✔
1001
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
1002
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
1003
      this.traditionalCluster = traditionalCluster;
1✔
1004
      this.rlsPluginConfig = rlsPluginConfig;
1✔
1005
      this.subscription = subscription;
1✔
1006
    }
1✔
1007

1008
    private Map<String, ?> toLbPolicy() {
1009
      if (traditionalCluster != null) {
1✔
1010
        return ImmutableMap.of(
1✔
1011
            XdsLbPolicies.CDS_POLICY_NAME,
1012
            ImmutableMap.of("cluster", traditionalCluster));
1✔
1013
      } else {
1014
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
1015
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
1016
            .put(
1✔
1017
                "childPolicy",
1018
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of(
1✔
1019
                    "is_dynamic", true))))
1✔
1020
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
1021
            .buildOrThrow();
1✔
1022
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
1023
      }
1024
    }
1025

1026
    private void close() {
1027
      if (subscription != null) {
1✔
1028
        subscription.close();
1✔
1029
      }
1030
    }
1✔
1031

1032
    static ClusterRefState forCluster(
1033
        AtomicInteger refCount, String name, XdsConfig.Subscription subscription) {
1034
      return new ClusterRefState(refCount, name, null, checkNotNull(subscription, "subscription"));
1✔
1035
    }
1036

1037
    static ClusterRefState forRlsPlugin(
1038
        AtomicInteger refCount,
1039
        RlsPluginConfig rlsPluginConfig) {
1040
      return new ClusterRefState(refCount, null, rlsPluginConfig, null);
1✔
1041
    }
1042
  }
1043

1044
  /** An ObjectPool, except it can throw an exception. */
1045
  private interface XdsClientPool {
1046
    XdsClient getObject() throws XdsInitializationException;
1047

1048
    XdsClient returnObject(XdsClient xdsClient);
1049
  }
1050

1051
  private static final class BootstrappingXdsClientPool implements XdsClientPool {
1052
    private final XdsClientPoolFactory xdsClientPoolFactory;
1053
    private final String target;
1054
    private final @Nullable Map<String, ?> bootstrapOverride;
1055
    private final @Nullable MetricRecorder metricRecorder;
1056
    private ObjectPool<XdsClient> xdsClientPool;
1057

1058
    BootstrappingXdsClientPool(
1059
        XdsClientPoolFactory xdsClientPoolFactory,
1060
        String target,
1061
        @Nullable Map<String, ?> bootstrapOverride,
1062
        @Nullable MetricRecorder metricRecorder) {
1✔
1063
      this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
1064
      this.target = checkNotNull(target, "target");
1✔
1065
      this.bootstrapOverride = bootstrapOverride;
1✔
1066
      this.metricRecorder = metricRecorder;
1✔
1067
    }
1✔
1068

1069
    @Override
1070
    public XdsClient getObject() throws XdsInitializationException {
1071
      if (xdsClientPool == null) {
1✔
1072
        BootstrapInfo bootstrapInfo;
1073
        if (bootstrapOverride == null) {
1✔
1074
          bootstrapInfo = GrpcBootstrapperImpl.defaultBootstrap();
×
1075
        } else {
1076
          bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride);
1✔
1077
        }
1078
        this.xdsClientPool =
1✔
1079
            xdsClientPoolFactory.getOrCreate(target, bootstrapInfo, metricRecorder);
1✔
1080
      }
1081
      return xdsClientPool.getObject();
1✔
1082
    }
1083

1084
    @Override
1085
    public XdsClient returnObject(XdsClient xdsClient) {
1086
      return xdsClientPool.returnObject(xdsClient);
1✔
1087
    }
1088
  }
1089

1090
  private static final class SupplierXdsClientPool implements XdsClientPool {
1091
    private final Supplier<XdsClient> xdsClientSupplier;
1092

1093
    SupplierXdsClientPool(Supplier<XdsClient> xdsClientSupplier) {
×
1094
      this.xdsClientSupplier = checkNotNull(xdsClientSupplier, "xdsClientSupplier");
×
1095
    }
×
1096

1097
    @Override
1098
    public XdsClient getObject() throws XdsInitializationException {
1099
      XdsClient xdsClient = xdsClientSupplier.get();
×
1100
      if (xdsClient == null) {
×
1101
        throw new XdsInitializationException("Caller failed to initialize XDS_CLIENT_SUPPLIER");
×
1102
      }
1103
      return xdsClient;
×
1104
    }
1105

1106
    @Override
1107
    public XdsClient returnObject(XdsClient xdsClient) {
1108
      return null;
×
1109
    }
1110
  }
1111
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc