• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19702

18 Feb 2025 06:47PM UTC coverage: 88.587% (-0.005%) from 88.592%
#19702

push

github

web-flow
xds: Change how xDS filters are created by introducing Filter.Provider (#11883)

This is the first step towards supporting filter state retention in
Java. The mechanism will be similar to the one described in [A83]
(https://github.com/grpc/proposal/blob/master/A83-xds-gcp-authn-filter.md#filter-call-credentials-cache)
for C-core, and will serve the same purpose. However, the
implementation details are very different due to the different nature
of xDS HTTP filter support in C-core and Java.

In Java, xDS HTTP filters are backed by classes implementing
`io.grpc.xds.Filter`, from here just called "Filters". To support
Filter state retention (next PR), Java's xDS implementation must be
able to create unique Filter instances per:
- Per HCM
  `envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager`
- Per filter name as specified in
  `envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter.name`

This PR **does not** implements Filter state retention, but lays the
groundwork for it by changing how filters are registered and
instantiated. To achieve this, all existing Filter classes had to be
updated to the new instantiation mechanism described below.

Prior to these this PR, Filters had no livecycle. FilterRegistry
provided singleton instances for a given typeUrl. This PR introduces
a new interface `Filter.Provider`, which instantiates Filter classes.
All functionality that doesn't need an instance of a Filter is moved
to the Filter.Provider. This includes parsing filter config proto
into FilterConfig and determining the filter kind
(client-side, server-side, or both).

This PR is limited to refactoring, and there's no changes to the
existing behavior. Note that all Filter Providers still return
singleton Filter instances. However, with this PR, it is now possible
to create Providers that return a new Filter instance each time
`newInstance` is called.

34252 of 38665 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.81
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.MetricRecorder;
45
import io.grpc.NameResolver;
46
import io.grpc.Status;
47
import io.grpc.Status.Code;
48
import io.grpc.SynchronizationContext;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.ObjectPool;
51
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
52
import io.grpc.xds.Filter.FilterConfig;
53
import io.grpc.xds.Filter.NamedFilterConfig;
54
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
55
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
56
import io.grpc.xds.VirtualHost.Route;
57
import io.grpc.xds.VirtualHost.Route.RouteAction;
58
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
61
import io.grpc.xds.VirtualHost.Route.RouteMatch;
62
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
63
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
64
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
65
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
66
import io.grpc.xds.client.XdsClient;
67
import io.grpc.xds.client.XdsClient.ResourceWatcher;
68
import io.grpc.xds.client.XdsLogger;
69
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
70
import java.net.URI;
71
import java.util.ArrayList;
72
import java.util.Collections;
73
import java.util.HashMap;
74
import java.util.HashSet;
75
import java.util.List;
76
import java.util.Locale;
77
import java.util.Map;
78
import java.util.Objects;
79
import java.util.Set;
80
import java.util.concurrent.ConcurrentHashMap;
81
import java.util.concurrent.ConcurrentMap;
82
import java.util.concurrent.ScheduledExecutorService;
83
import java.util.concurrent.atomic.AtomicInteger;
84
import javax.annotation.Nullable;
85

86
/**
87
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
88
 *
89
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
90
 * protocol to retrieve service information and produce a service config to the caller.
91
 *
92
 * @see XdsNameResolverProvider
93
 */
94
final class XdsNameResolver extends NameResolver {
95

96
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
97
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
98
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
99
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
100
  static final CallOptions.Key<Boolean> AUTO_HOST_REWRITE_KEY =
1✔
101
      CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY");
1✔
102
  @VisibleForTesting
103
  static boolean enableTimeout =
1✔
104
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
105
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
106

107
  private final InternalLogId logId;
108
  private final XdsLogger logger;
109
  @Nullable
110
  private final String targetAuthority;
111
  private final String target;
112
  private final String serviceAuthority;
113
  // Encoded version of the service authority as per 
114
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
115
  private final String encodedServiceAuthority;
116
  private final String overrideAuthority;
117
  private final ServiceConfigParser serviceConfigParser;
118
  private final SynchronizationContext syncContext;
119
  private final ScheduledExecutorService scheduler;
120
  private final XdsClientPoolFactory xdsClientPoolFactory;
121
  private final ThreadSafeRandom random;
122
  private final FilterRegistry filterRegistry;
123
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
124
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
125
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
126
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
127
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
128
  private final long randomChannelId;
129
  private final MetricRecorder metricRecorder;
130

131
  private volatile RoutingConfig routingConfig = RoutingConfig.EMPTY;
1✔
132
  private Listener2 listener;
133
  private ObjectPool<XdsClient> xdsClientPool;
134
  private XdsClient xdsClient;
135
  private CallCounterProvider callCounterProvider;
136
  private ResolveState resolveState;
137
  // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
138
  // XdsClient instead of here.
139
  private boolean receivedConfig;
140

141
  XdsNameResolver(
142
      URI targetUri, String name, @Nullable String overrideAuthority,
143
      ServiceConfigParser serviceConfigParser,
144
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
145
      @Nullable Map<String, ?> bootstrapOverride,
146
      MetricRecorder metricRecorder) {
147
    this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
1✔
148
        syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
1✔
149
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
1✔
150
        metricRecorder);
151
  }
1✔
152

153
  @VisibleForTesting
154
  XdsNameResolver(
155
      URI targetUri, @Nullable String targetAuthority, String name,
156
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
157
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
158
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
159
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
160
      MetricRecorder metricRecorder) {
1✔
161
    this.targetAuthority = targetAuthority;
1✔
162
    target = targetUri.toString();
1✔
163

164
    // The name might have multiple slashes so encode it before verifying.
165
    serviceAuthority = checkNotNull(name, "name");
1✔
166
    this.encodedServiceAuthority = 
1✔
167
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
168

169
    this.overrideAuthority = overrideAuthority;
1✔
170
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
171
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
172
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
173
    this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
1✔
174
            "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
1✔
175
    this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
1✔
176
    this.random = checkNotNull(random, "random");
1✔
177
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
178
    this.metricRecorder = metricRecorder;
1✔
179
    randomChannelId = random.nextLong();
1✔
180
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
181
    logger = XdsLogger.withLogId(logId);
1✔
182
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
183
  }
1✔
184

185
  @Override
186
  public String getServiceAuthority() {
187
    return encodedServiceAuthority;
1✔
188
  }
189

190
  @Override
191
  public void start(Listener2 listener) {
192
    this.listener = checkNotNull(listener, "listener");
1✔
193
    try {
194
      xdsClientPool = xdsClientPoolFactory.getOrCreate(target, metricRecorder);
1✔
195
    } catch (Exception e) {
1✔
196
      listener.onError(
1✔
197
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
198
      return;
1✔
199
    }
1✔
200
    xdsClient = xdsClientPool.getObject();
1✔
201
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
202
    String listenerNameTemplate;
203
    if (targetAuthority == null) {
1✔
204
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
205
    } else {
206
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
207
      if (authorityInfo == null) {
1✔
208
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
209
            "invalid target URI: target authority not found in the bootstrap"));
210
        return;
1✔
211
      }
212
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
213
    }
214
    String replacement = serviceAuthority;
1✔
215
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
216
      replacement = XdsClient.percentEncodePath(replacement);
1✔
217
    }
218
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
219
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
220
        ) {
221
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
222
          "invalid listener resource URI for service authority: " + serviceAuthority));
223
      return;
×
224
    }
225
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
226
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
227
    resolveState = new ResolveState(ldsResourceName);
1✔
228

229
    resolveState.start();
1✔
230
  }
1✔
231

232
  private static String expandPercentS(String template, String replacement) {
233
    return template.replace("%s", replacement);
1✔
234
  }
235

236
  @Override
237
  public void shutdown() {
238
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
239
    if (resolveState != null) {
1✔
240
      resolveState.stop();
1✔
241
    }
242
    if (xdsClient != null) {
1✔
243
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
244
    }
245
  }
1✔
246

247
  @VisibleForTesting
248
  static Map<String, ?> generateServiceConfigWithMethodConfig(
249
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
250
    if (timeoutNano == null
1✔
251
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
252
      return Collections.emptyMap();
1✔
253
    }
254
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
255
    methodConfig.put(
1✔
256
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
257
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
258
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
259
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
260
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
261
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
262
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
263
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
264
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
265
        codes.add(code.name());
1✔
266
      }
1✔
267
      rawRetryPolicy.put(
1✔
268
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
269
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
270
        rawRetryPolicy.put(
×
271
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
272
      }
273
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
274
    }
275
    if (timeoutNano != null) {
1✔
276
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
277
      methodConfig.put("timeout", timeout);
1✔
278
    }
279
    return Collections.singletonMap(
1✔
280
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
281
  }
282

283
  @VisibleForTesting
284
  XdsClient getXdsClient() {
285
    return xdsClient;
1✔
286
  }
287

288
  // called in syncContext
289
  private void updateResolutionResult() {
290
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
291

292
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
293
    for (String name : clusterRefs.keySet()) {
1✔
294
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
295
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
296
    }
1✔
297
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
298
        "loadBalancingConfig",
299
        ImmutableList.of(ImmutableMap.of(
1✔
300
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
301
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
302

303
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
304
      logger.log(
×
305
          XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
×
306
    }
307
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
308
    Attributes attrs =
309
        Attributes.newBuilder()
1✔
310
            .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
1✔
311
            .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
312
            .set(InternalConfigSelector.KEY, configSelector)
1✔
313
            .build();
1✔
314
    ResolutionResult result =
315
        ResolutionResult.newBuilder()
1✔
316
            .setAttributes(attrs)
1✔
317
            .setServiceConfig(parsedServiceConfig)
1✔
318
            .build();
1✔
319
    listener.onResult(result);
1✔
320
    receivedConfig = true;
1✔
321
  }
1✔
322

323
  /**
324
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
325
   * case-insensitive.
326
   *
327
   * <p>Wildcard pattern rules:
328
   * <ol>
329
   * <li>A single asterisk (*) matches any domain.</li>
330
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
331
   *     but not both.</li>
332
   * </ol>
333
   */
334
  @VisibleForTesting
335
  static boolean matchHostName(String hostName, String pattern) {
336
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
337
        "Invalid host name");
338
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
339
        "Invalid pattern/domain name");
340

341
    hostName = hostName.toLowerCase(Locale.US);
1✔
342
    pattern = pattern.toLowerCase(Locale.US);
1✔
343
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
344

345
    if (!pattern.contains("*")) {
1✔
346
      // Not a wildcard pattern -- hostName and pattern must match exactly.
347
      return hostName.equals(pattern);
1✔
348
    }
349
    // Wildcard pattern
350

351
    if (pattern.length() == 1) {
1✔
352
      return true;
×
353
    }
354

355
    int index = pattern.indexOf('*');
1✔
356

357
    // At most one asterisk (*) is allowed.
358
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
359
      return false;
×
360
    }
361

362
    // Asterisk can only match prefix or suffix.
363
    if (index != 0 && index != pattern.length() - 1) {
1✔
364
      return false;
×
365
    }
366

367
    // HostName must be at least as long as the pattern because asterisk has to
368
    // match one or more characters.
369
    if (hostName.length() < pattern.length()) {
1✔
370
      return false;
1✔
371
    }
372

373
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
374
      // Prefix matching fails.
375
      return true;
1✔
376
    }
377

378
    // Pattern matches hostname if suffix matching succeeds.
379
    return index == pattern.length() - 1
1✔
380
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
381
  }
382

383
  private final class ConfigSelector extends InternalConfigSelector {
1✔
384
    @Override
385
    public Result selectConfig(PickSubchannelArgs args) {
386
      RoutingConfig routingCfg;
387
      RouteData selectedRoute;
388
      String cluster;
389
      ClientInterceptor filters;
390
      Metadata headers = args.getHeaders();
1✔
391
      String path = "/" + args.getMethodDescriptor().getFullMethodName();
1✔
392
      do {
393
        routingCfg = routingConfig;
1✔
394
        selectedRoute = null;
1✔
395
        for (RouteData route : routingCfg.routes) {
1✔
396
          if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
1✔
397
            selectedRoute = route;
1✔
398
            break;
1✔
399
          }
400
        }
1✔
401
        if (selectedRoute == null) {
1✔
402
          return Result.forError(
1✔
403
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
404
        }
405
        if (selectedRoute.routeAction == null) {
1✔
406
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
407
              "Could not route RPC to Route with non-forwarding action"));
408
        }
409
        RouteAction action = selectedRoute.routeAction;
1✔
410
        if (action.cluster() != null) {
1✔
411
          cluster = prefixedClusterName(action.cluster());
1✔
412
          filters = selectedRoute.filterChoices.get(0);
1✔
413
        } else if (action.weightedClusters() != null) {
1✔
414
          // XdsRouteConfigureResource verifies the total weight will not be 0 or exceed uint32
415
          long totalWeight = 0;
1✔
416
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
417
            totalWeight += weightedCluster.weight();
1✔
418
          }
1✔
419
          long select = random.nextLong(totalWeight);
1✔
420
          long accumulator = 0;
1✔
421
          for (int i = 0; ; i++) {
1✔
422
            ClusterWeight weightedCluster = action.weightedClusters().get(i);
1✔
423
            accumulator += weightedCluster.weight();
1✔
424
            if (select < accumulator) {
1✔
425
              cluster = prefixedClusterName(weightedCluster.name());
1✔
426
              filters = selectedRoute.filterChoices.get(i);
1✔
427
              break;
1✔
428
            }
429
          }
430
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
431
          cluster =
1✔
432
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
433
          filters = selectedRoute.filterChoices.get(0);
1✔
434
        } else {
435
          // updateRoutes() discards routes with unknown actions
436
          throw new AssertionError();
×
437
        }
438
      } while (!retainCluster(cluster));
1✔
439

440
      final RouteAction routeAction = selectedRoute.routeAction;
1✔
441
      Long timeoutNanos = null;
1✔
442
      if (enableTimeout) {
1✔
443
        timeoutNanos = routeAction.timeoutNano();
1✔
444
        if (timeoutNanos == null) {
1✔
445
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
446
        }
447
        if (timeoutNanos <= 0) {
1✔
448
          timeoutNanos = null;
1✔
449
        }
450
      }
451
      RetryPolicy retryPolicy = routeAction.retryPolicy();
1✔
452
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
453
      Map<String, ?> rawServiceConfig =
1✔
454
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
455
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
456
      Object config = parsedServiceConfig.getConfig();
1✔
457
      if (config == null) {
1✔
458
        releaseCluster(cluster);
×
459
        return Result.forError(
×
460
            parsedServiceConfig.getError().augmentDescription(
×
461
                "Failed to parse service config (method config)"));
462
      }
463
      final String finalCluster = cluster;
1✔
464
      final long hash = generateHash(routeAction.hashPolicies(), headers);
1✔
465
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
466
        @Override
467
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
468
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
469
            final Channel next) {
470
          CallOptions callOptionsForCluster =
1✔
471
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
472
                  .withOption(RPC_HASH_KEY, hash);
1✔
473
          if (routeAction.autoHostRewrite()) {
1✔
474
            callOptionsForCluster = callOptionsForCluster.withOption(AUTO_HOST_REWRITE_KEY, true);
1✔
475
          }
476
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
477
              next.newCall(method, callOptionsForCluster)) {
1✔
478
            @Override
479
            public void start(Listener<RespT> listener, Metadata headers) {
480
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
481
                boolean committed;
482

483
                @Override
484
                public void onHeaders(Metadata headers) {
485
                  committed = true;
1✔
486
                  releaseCluster(finalCluster);
1✔
487
                  delegate().onHeaders(headers);
1✔
488
                }
1✔
489

490
                @Override
491
                public void onClose(Status status, Metadata trailers) {
492
                  if (!committed) {
1✔
493
                    releaseCluster(finalCluster);
1✔
494
                  }
495
                  delegate().onClose(status, trailers);
1✔
496
                }
1✔
497
              };
498
              delegate().start(listener, headers);
1✔
499
            }
1✔
500
          };
501
        }
502
      }
503

504
      return
1✔
505
          Result.newBuilder()
1✔
506
              .setConfig(config)
1✔
507
              .setInterceptor(combineInterceptors(
1✔
508
                  ImmutableList.of(filters, new ClusterSelectionInterceptor())))
1✔
509
              .build();
1✔
510
    }
511

512
    private boolean retainCluster(String cluster) {
513
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
514
      if (clusterRefState == null) {
1✔
515
        return false;
×
516
      }
517
      AtomicInteger refCount = clusterRefState.refCount;
1✔
518
      int count;
519
      do {
520
        count = refCount.get();
1✔
521
        if (count == 0) {
1✔
522
          return false;
×
523
        }
524
      } while (!refCount.compareAndSet(count, count + 1));
1✔
525
      return true;
1✔
526
    }
527

528
    private void releaseCluster(final String cluster) {
529
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
530
      if (count == 0) {
1✔
531
        syncContext.execute(new Runnable() {
1✔
532
          @Override
533
          public void run() {
534
            if (clusterRefs.get(cluster).refCount.get() == 0) {
1✔
535
              clusterRefs.remove(cluster);
1✔
536
              updateResolutionResult();
1✔
537
            }
538
          }
1✔
539
        });
540
      }
541
    }
1✔
542

543
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
544
      Long hash = null;
1✔
545
      for (HashPolicy policy : hashPolicies) {
1✔
546
        Long newHash = null;
1✔
547
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
548
          String value = getHeaderValue(headers, policy.headerName());
1✔
549
          if (value != null) {
1✔
550
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
551
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
552
            }
553
            newHash = hashFunc.hashAsciiString(value);
1✔
554
          }
555
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
556
          newHash = hashFunc.hashLong(randomChannelId);
1✔
557
        }
558
        if (newHash != null ) {
1✔
559
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
560
          // and preserves all of the entropy.
561
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
562
          hash = oldHash ^ newHash;
1✔
563
        }
564
        // If the policy is a terminal policy and a hash has been generated, ignore
565
        // the rest of the hash policies.
566
        if (policy.isTerminal() && hash != null) {
1✔
567
          break;
×
568
        }
569
      }
1✔
570
      return hash == null ? random.nextLong() : hash;
1✔
571
    }
572
  }
573

574
  static final class PassthroughClientInterceptor implements ClientInterceptor {
1✔
575
    @Override
576
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
577
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
578
      return next.newCall(method, callOptions);
1✔
579
    }
580
  }
581

582
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
583
    if (interceptors.size() == 0) {
1✔
584
      return new PassthroughClientInterceptor();
1✔
585
    }
586
    if (interceptors.size() == 1) {
1✔
587
      return interceptors.get(0);
1✔
588
    }
589
    return new ClientInterceptor() {
1✔
590
      @Override
591
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
592
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
593
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
594
        return next.newCall(method, callOptions);
1✔
595
      }
596
    };
597
  }
598

599
  @Nullable
600
  private static String getHeaderValue(Metadata headers, String headerName) {
601
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
602
      return null;
×
603
    }
604
    if (headerName.equals("content-type")) {
1✔
605
      return "application/grpc";
×
606
    }
607
    Metadata.Key<String> key;
608
    try {
609
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
610
    } catch (IllegalArgumentException e) {
×
611
      return null;
×
612
    }
1✔
613
    Iterable<String> values = headers.getAll(key);
1✔
614
    return values == null ? null : Joiner.on(",").join(values);
1✔
615
  }
616

617
  private static String prefixedClusterName(String name) {
618
    return "cluster:" + name;
1✔
619
  }
620

621
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
622
    return "cluster_specifier_plugin:" + pluginName;
1✔
623
  }
624

625
  private static final class FailingConfigSelector extends InternalConfigSelector {
626
    private final Result result;
627

628
    public FailingConfigSelector(Status error) {
1✔
629
      this.result = Result.forError(error);
1✔
630
    }
1✔
631

632
    @Override
633
    public Result selectConfig(PickSubchannelArgs args) {
634
      return result;
1✔
635
    }
636
  }
637

638
  private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
639
    private final ConfigOrError emptyServiceConfig =
1✔
640
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
641
    private final String ldsResourceName;
642
    private boolean stopped;
643
    @Nullable
644
    private Set<String> existingClusters;  // clusters to which new requests can be routed
645
    @Nullable
646
    private RouteDiscoveryState routeDiscoveryState;
647

648
    ResolveState(String ldsResourceName) {
1✔
649
      this.ldsResourceName = ldsResourceName;
1✔
650
    }
1✔
651

652
    @Override
653
    public void onChanged(final XdsListenerResource.LdsUpdate update) {
654
      if (stopped) {
1✔
655
        return;
×
656
      }
657
      logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
1✔
658
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
659
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
660
      String rdsName = httpConnectionManager.rdsName();
1✔
661
      cleanUpRouteDiscoveryState();
1✔
662
      if (virtualHosts != null) {
1✔
663
        updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
664
            httpConnectionManager.httpFilterConfigs());
1✔
665
      } else {
666
        routeDiscoveryState = new RouteDiscoveryState(
1✔
667
            rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
668
            httpConnectionManager.httpFilterConfigs());
1✔
669
        logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
670
        xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
671
            rdsName, routeDiscoveryState, syncContext);
1✔
672
      }
673
    }
1✔
674

675
    @Override
676
    public void onError(final Status error) {
677
      if (stopped || receivedConfig) {
1✔
678
        return;
×
679
      }
680
      listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
681
          String.format("Unable to load LDS %s. xDS server returned: %s: %s",
1✔
682
          ldsResourceName, error.getCode(), error.getDescription())));
1✔
683
    }
1✔
684

685
    @Override
686
    public void onResourceDoesNotExist(final String resourceName) {
687
      if (stopped) {
1✔
688
        return;
×
689
      }
690
      String error = "LDS resource does not exist: " + resourceName;
1✔
691
      logger.log(XdsLogLevel.INFO, error);
1✔
692
      cleanUpRouteDiscoveryState();
1✔
693
      cleanUpRoutes(error);
1✔
694
    }
1✔
695

696
    private void start() {
697
      logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
1✔
698
      xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
1✔
699
          ldsResourceName, this, syncContext);
1✔
700
    }
1✔
701

702
    private void stop() {
703
      logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
1✔
704
      stopped = true;
1✔
705
      cleanUpRouteDiscoveryState();
1✔
706
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
707
    }
1✔
708

709
    // called in syncContext
710
    private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
711
        @Nullable List<NamedFilterConfig> filterConfigs) {
712
      String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
713
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
1✔
714
      if (virtualHost == null) {
1✔
715
        String error = "Failed to find virtual host matching hostname: " + authority;
1✔
716
        logger.log(XdsLogLevel.WARNING, error);
1✔
717
        cleanUpRoutes(error);
1✔
718
        return;
1✔
719
      }
720

721
      List<Route> routes = virtualHost.routes();
1✔
722
      ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
1✔
723

724
      // Populate all clusters to which requests can be routed to through the virtual host.
725
      Set<String> clusters = new HashSet<>();
1✔
726
      // uniqueName -> clusterName
727
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
728
      // uniqueName -> pluginConfig
729
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
730
      for (Route route : routes) {
1✔
731
        RouteAction action = route.routeAction();
1✔
732
        String prefixedName;
733
        if (action == null) {
1✔
734
          routesData.add(new RouteData(route.routeMatch(), null, ImmutableList.of()));
1✔
735
        } else if (action.cluster() != null) {
1✔
736
          prefixedName = prefixedClusterName(action.cluster());
1✔
737
          clusters.add(prefixedName);
1✔
738
          clusterNameMap.put(prefixedName, action.cluster());
1✔
739
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
740
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
741
        } else if (action.weightedClusters() != null) {
1✔
742
          ImmutableList.Builder<ClientInterceptor> filterList = ImmutableList.builder();
1✔
743
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
744
            prefixedName = prefixedClusterName(weightedCluster.name());
1✔
745
            clusters.add(prefixedName);
1✔
746
            clusterNameMap.put(prefixedName, weightedCluster.name());
1✔
747
            filterList.add(createFilters(filterConfigs, virtualHost, route, weightedCluster));
1✔
748
          }
1✔
749
          routesData.add(
1✔
750
              new RouteData(route.routeMatch(), route.routeAction(), filterList.build()));
1✔
751
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
752
          PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
753
          if (pluginConfig instanceof RlsPluginConfig) {
1✔
754
            prefixedName = prefixedClusterSpecifierPluginName(
1✔
755
                action.namedClusterSpecifierPluginConfig().name());
1✔
756
            clusters.add(prefixedName);
1✔
757
            rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
758
          }
759
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
760
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
761
        } else {
762
          // Discard route
763
        }
764
      }
1✔
765

766
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
767
      boolean shouldUpdateResult = existingClusters == null;
1✔
768
      Set<String> addedClusters =
769
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
770
      Set<String> deletedClusters =
771
          existingClusters == null
1✔
772
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
773
      existingClusters = clusters;
1✔
774
      for (String cluster : addedClusters) {
1✔
775
        if (clusterRefs.containsKey(cluster)) {
1✔
776
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
777
        } else {
778
          if (clusterNameMap.containsKey(cluster)) {
1✔
779
            clusterRefs.put(
1✔
780
                cluster,
781
                ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
1✔
782
          }
783
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
784
            clusterRefs.put(
1✔
785
                cluster,
786
                ClusterRefState.forRlsPlugin(
1✔
787
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
788
          }
789
          shouldUpdateResult = true;
1✔
790
        }
791
      }
1✔
792
      for (String cluster : clusters) {
1✔
793
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
794
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
795
          ClusterRefState newClusterRefState =
1✔
796
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
797
          clusterRefs.put(cluster, newClusterRefState);
1✔
798
          shouldUpdateResult = true;
1✔
799
        }
800
      }
1✔
801
      // Update service config to include newly added clusters.
802
      if (shouldUpdateResult) {
1✔
803
        updateResolutionResult();
1✔
804
      }
805
      // Make newly added clusters selectable by config selector and deleted clusters no longer
806
      // selectable.
807
      routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routesData.build());
1✔
808
      shouldUpdateResult = false;
1✔
809
      for (String cluster : deletedClusters) {
1✔
810
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
811
        if (count == 0) {
1✔
812
          clusterRefs.remove(cluster);
1✔
813
          shouldUpdateResult = true;
1✔
814
        }
815
      }
1✔
816
      if (shouldUpdateResult) {
1✔
817
        updateResolutionResult();
1✔
818
      }
819
    }
1✔
820

821
    private ClientInterceptor createFilters(
822
        @Nullable List<NamedFilterConfig> filterConfigs,
823
        VirtualHost virtualHost,
824
        Route route,
825
        @Nullable ClusterWeight weightedCluster) {
826
      if (filterConfigs == null) {
1✔
827
        return new PassthroughClientInterceptor();
1✔
828
      }
829

830
      Map<String, FilterConfig> selectedOverrideConfigs =
1✔
831
          new HashMap<>(virtualHost.filterConfigOverrides());
1✔
832
      selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
833
      if (weightedCluster != null) {
1✔
834
        selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
835
      }
836

837
      ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
1✔
838
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
839
        FilterConfig config = namedFilter.filterConfig;
1✔
840
        String name = namedFilter.name;
1✔
841
        String typeUrl = config.typeUrl();
1✔
842

843
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
844
        if (provider == null || !provider.isClientFilter()) {
1✔
845
          continue;
×
846
        }
847

848
        Filter filter = provider.newInstance();
1✔
849

850
        ClientInterceptor interceptor =
1✔
851
            filter.buildClientInterceptor(config, selectedOverrideConfigs.get(name), scheduler);
1✔
852
        if (interceptor != null) {
1✔
853
          filterInterceptors.add(interceptor);
1✔
854
        }
855
      }
1✔
856

857
      // Combine interceptors produced by different filters into a single one that executes
858
      // them sequentially. The order is preserved.
859
      return combineInterceptors(filterInterceptors.build());
1✔
860
    }
861

862
    private void cleanUpRoutes(String error) {
863
      if (existingClusters != null) {
1✔
864
        for (String cluster : existingClusters) {
1✔
865
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
866
          if (count == 0) {
1✔
867
            clusterRefs.remove(cluster);
1✔
868
          }
869
        }
1✔
870
        existingClusters = null;
1✔
871
      }
872
      routingConfig = RoutingConfig.EMPTY;
1✔
873
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
874
      // the config selector handles the error message itself. Once the LB API allows providing
875
      // failure information for addresses yet still providing a service config, the config seector
876
      // could be avoided.
877
      String errorWithNodeId =
1✔
878
          error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
879
      listener.onResult(ResolutionResult.newBuilder()
1✔
880
          .setAttributes(Attributes.newBuilder()
1✔
881
            .set(InternalConfigSelector.KEY,
1✔
882
              new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId)))
1✔
883
            .build())
1✔
884
          .setServiceConfig(emptyServiceConfig)
1✔
885
          .build());
1✔
886
      receivedConfig = true;
1✔
887
    }
1✔
888

889
    private void cleanUpRouteDiscoveryState() {
890
      if (routeDiscoveryState != null) {
1✔
891
        String rdsName = routeDiscoveryState.resourceName;
1✔
892
        logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
1✔
893
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
894
            routeDiscoveryState);
895
        routeDiscoveryState = null;
1✔
896
      }
897
    }
1✔
898

899
    /**
900
     * Discovery state for RouteConfiguration resource. One instance for each Listener resource
901
     * update.
902
     */
903
    private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
904
      private final String resourceName;
905
      private final long httpMaxStreamDurationNano;
906
      @Nullable
907
      private final List<NamedFilterConfig> filterConfigs;
908

909
      private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
910
          @Nullable List<NamedFilterConfig> filterConfigs) {
1✔
911
        this.resourceName = resourceName;
1✔
912
        this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
1✔
913
        this.filterConfigs = filterConfigs;
1✔
914
      }
1✔
915

916
      @Override
917
      public void onChanged(final RdsUpdate update) {
918
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
919
          return;
×
920
        }
921
        logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
1✔
922
        updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
1✔
923
      }
1✔
924

925
      @Override
926
      public void onError(final Status error) {
927
        if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
1✔
928
          return;
×
929
        }
930
        listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
931
            String.format("Unable to load RDS %s. xDS server returned: %s: %s",
1✔
932
            resourceName, error.getCode(), error.getDescription())));
1✔
933
      }
1✔
934

935
      @Override
936
      public void onResourceDoesNotExist(final String resourceName) {
937
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
938
          return;
×
939
        }
940
        String error = "RDS resource does not exist: " + resourceName;
1✔
941
        logger.log(XdsLogLevel.INFO, error);
1✔
942
        cleanUpRoutes(error);
1✔
943
      }
1✔
944
    }
945
  }
946

947
  /**
948
   * VirtualHost-level configuration for request routing.
949
   */
950
  private static class RoutingConfig {
951
    private final long fallbackTimeoutNano;
952
    final ImmutableList<RouteData> routes;
953

954
    private static final RoutingConfig EMPTY = new RoutingConfig(0, ImmutableList.of());
1✔
955

956
    private RoutingConfig(long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
1✔
957
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
958
      this.routes = checkNotNull(routes, "routes");
1✔
959
    }
1✔
960
  }
961

962
  static final class RouteData {
963
    final RouteMatch routeMatch;
964
    /** null implies non-forwarding action. */
965
    @Nullable
966
    final RouteAction routeAction;
967
    /**
968
     * Only one of these interceptors should be used per-RPC. There are only multiple values in the
969
     * list for weighted clusters, in which case the order of the list mirrors the weighted
970
     * clusters.
971
     */
972
    final ImmutableList<ClientInterceptor> filterChoices;
973

974
    RouteData(RouteMatch routeMatch, @Nullable RouteAction routeAction, ClientInterceptor filter) {
975
      this(routeMatch, routeAction, ImmutableList.of(filter));
1✔
976
    }
1✔
977

978
    RouteData(
979
        RouteMatch routeMatch,
980
        @Nullable RouteAction routeAction,
981
        ImmutableList<ClientInterceptor> filterChoices) {
1✔
982
      this.routeMatch = checkNotNull(routeMatch, "routeMatch");
1✔
983
      checkArgument(
1✔
984
          routeAction == null || !filterChoices.isEmpty(),
1✔
985
          "filter may be empty only for non-forwarding action");
986
      this.routeAction = routeAction;
1✔
987
      if (routeAction != null && routeAction.weightedClusters() != null) {
1✔
988
        checkArgument(
1✔
989
            routeAction.weightedClusters().size() == filterChoices.size(),
1✔
990
            "filter choices must match size of weighted clusters");
991
      }
992
      for (ClientInterceptor filter : filterChoices) {
1✔
993
        checkNotNull(filter, "entry in filterChoices is null");
1✔
994
      }
1✔
995
      this.filterChoices = checkNotNull(filterChoices, "filterChoices");
1✔
996
    }
1✔
997
  }
998

999
  private static class ClusterRefState {
1000
    final AtomicInteger refCount;
1001
    @Nullable
1002
    final String traditionalCluster;
1003
    @Nullable
1004
    final RlsPluginConfig rlsPluginConfig;
1005

1006
    private ClusterRefState(
1007
        AtomicInteger refCount, @Nullable String traditionalCluster,
1008
        @Nullable RlsPluginConfig rlsPluginConfig) {
1✔
1009
      this.refCount = refCount;
1✔
1010
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
1011
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
1012
      this.traditionalCluster = traditionalCluster;
1✔
1013
      this.rlsPluginConfig = rlsPluginConfig;
1✔
1014
    }
1✔
1015

1016
    private Map<String, ?> toLbPolicy() {
1017
      if (traditionalCluster != null) {
1✔
1018
        return ImmutableMap.of(
1✔
1019
            XdsLbPolicies.CDS_POLICY_NAME,
1020
            ImmutableMap.of("cluster", traditionalCluster));
1✔
1021
      } else {
1022
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
1023
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
1024
            .put(
1✔
1025
                "childPolicy",
1026
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
1✔
1027
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
1028
            .buildOrThrow();
1✔
1029
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
1030
      }
1031
    }
1032

1033
    static ClusterRefState forCluster(AtomicInteger refCount, String name) {
1034
      return new ClusterRefState(refCount, name, null);
1✔
1035
    }
1036

1037
    static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
1038
      return new ClusterRefState(refCount, null, rlsPluginConfig);
1✔
1039
    }
1040
  }
1041
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc