• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19678

05 Feb 2025 06:37PM UTC coverage: 88.566% (-0.03%) from 88.592%
#19678

push

github

web-flow
xds: Improve XdsNR's selectConfig() variable handling

The variables from the do-while are no longer initialized to let the
compiler verify that the loop sets each. Unnecessary comparisons to null
are also removed and is more obvious as the variables are never set to
null. Added a minor optimization of computing the RPCs path once instead
of once for each route. The variable declarations were also sorted to
match their initialization order.

This does fix an unlikely bug where if the old code could successfully
matched a route but fail to retain the cluster, then when trying a
second time if the route was _not_ matched it would re-use the prior route
and thus infinite-loop failing to retain that same cluster.

It also adds a missing cast to unsigned long for a uint32 weight. The old
code would detect if the _sum_ was negative, but a weight using 32 bits
would have been negative and never selected.

33755 of 38113 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.98
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.MetricRecorder;
45
import io.grpc.NameResolver;
46
import io.grpc.Status;
47
import io.grpc.Status.Code;
48
import io.grpc.SynchronizationContext;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.ObjectPool;
51
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
52
import io.grpc.xds.Filter.ClientInterceptorBuilder;
53
import io.grpc.xds.Filter.FilterConfig;
54
import io.grpc.xds.Filter.NamedFilterConfig;
55
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
56
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
57
import io.grpc.xds.VirtualHost.Route;
58
import io.grpc.xds.VirtualHost.Route.RouteAction;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
61
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
62
import io.grpc.xds.VirtualHost.Route.RouteMatch;
63
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
64
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
65
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
66
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
67
import io.grpc.xds.client.XdsClient;
68
import io.grpc.xds.client.XdsClient.ResourceWatcher;
69
import io.grpc.xds.client.XdsLogger;
70
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
71
import java.net.URI;
72
import java.util.ArrayList;
73
import java.util.Collections;
74
import java.util.HashMap;
75
import java.util.HashSet;
76
import java.util.List;
77
import java.util.Locale;
78
import java.util.Map;
79
import java.util.Objects;
80
import java.util.Set;
81
import java.util.concurrent.ConcurrentHashMap;
82
import java.util.concurrent.ConcurrentMap;
83
import java.util.concurrent.ScheduledExecutorService;
84
import java.util.concurrent.atomic.AtomicInteger;
85
import javax.annotation.Nullable;
86

87
/**
88
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
89
 *
90
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
91
 * protocol to retrieve service information and produce a service config to the caller.
92
 *
93
 * @see XdsNameResolverProvider
94
 */
95
final class XdsNameResolver extends NameResolver {
96

97
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
98
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
99
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
100
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
101
  static final CallOptions.Key<Boolean> AUTO_HOST_REWRITE_KEY =
1✔
102
      CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY");
1✔
103
  @VisibleForTesting
104
  static boolean enableTimeout =
1✔
105
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
106
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
107

108
  private final InternalLogId logId;
109
  private final XdsLogger logger;
110
  @Nullable
111
  private final String targetAuthority;
112
  private final String target;
113
  private final String serviceAuthority;
114
  // Encoded version of the service authority as per 
115
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
116
  private final String encodedServiceAuthority;
117
  private final String overrideAuthority;
118
  private final ServiceConfigParser serviceConfigParser;
119
  private final SynchronizationContext syncContext;
120
  private final ScheduledExecutorService scheduler;
121
  private final XdsClientPoolFactory xdsClientPoolFactory;
122
  private final ThreadSafeRandom random;
123
  private final FilterRegistry filterRegistry;
124
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
125
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
126
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
127
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
128
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
129
  private final long randomChannelId;
130
  private final MetricRecorder metricRecorder;
131

132
  private volatile RoutingConfig routingConfig = RoutingConfig.EMPTY;
1✔
133
  private Listener2 listener;
134
  private ObjectPool<XdsClient> xdsClientPool;
135
  private XdsClient xdsClient;
136
  private CallCounterProvider callCounterProvider;
137
  private ResolveState resolveState;
138
  // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
139
  // XdsClient instead of here.
140
  private boolean receivedConfig;
141

142
  XdsNameResolver(
143
      URI targetUri, String name, @Nullable String overrideAuthority,
144
      ServiceConfigParser serviceConfigParser,
145
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
146
      @Nullable Map<String, ?> bootstrapOverride,
147
      MetricRecorder metricRecorder) {
148
    this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
1✔
149
        syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
1✔
150
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
1✔
151
        metricRecorder);
152
  }
1✔
153

154
  @VisibleForTesting
155
  XdsNameResolver(
156
      URI targetUri, @Nullable String targetAuthority, String name,
157
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
158
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
159
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
160
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
161
      MetricRecorder metricRecorder) {
1✔
162
    this.targetAuthority = targetAuthority;
1✔
163
    target = targetUri.toString();
1✔
164

165
    // The name might have multiple slashes so encode it before verifying.
166
    serviceAuthority = checkNotNull(name, "name");
1✔
167
    this.encodedServiceAuthority = 
1✔
168
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
169

170
    this.overrideAuthority = overrideAuthority;
1✔
171
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
172
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
173
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
174
    this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
1✔
175
            "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
1✔
176
    this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
1✔
177
    this.random = checkNotNull(random, "random");
1✔
178
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
179
    this.metricRecorder = metricRecorder;
1✔
180
    randomChannelId = random.nextLong();
1✔
181
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
182
    logger = XdsLogger.withLogId(logId);
1✔
183
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
184
  }
1✔
185

186
  @Override
187
  public String getServiceAuthority() {
188
    return encodedServiceAuthority;
1✔
189
  }
190

191
  @Override
192
  public void start(Listener2 listener) {
193
    this.listener = checkNotNull(listener, "listener");
1✔
194
    try {
195
      xdsClientPool = xdsClientPoolFactory.getOrCreate(target, metricRecorder);
1✔
196
    } catch (Exception e) {
1✔
197
      listener.onError(
1✔
198
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
199
      return;
1✔
200
    }
1✔
201
    xdsClient = xdsClientPool.getObject();
1✔
202
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
203
    String listenerNameTemplate;
204
    if (targetAuthority == null) {
1✔
205
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
206
    } else {
207
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
208
      if (authorityInfo == null) {
1✔
209
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
210
            "invalid target URI: target authority not found in the bootstrap"));
211
        return;
1✔
212
      }
213
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
214
    }
215
    String replacement = serviceAuthority;
1✔
216
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
217
      replacement = XdsClient.percentEncodePath(replacement);
1✔
218
    }
219
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
220
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
221
        ) {
222
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
223
          "invalid listener resource URI for service authority: " + serviceAuthority));
224
      return;
×
225
    }
226
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
227
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
228
    resolveState = new ResolveState(ldsResourceName);
1✔
229

230
    resolveState.start();
1✔
231
  }
1✔
232

233
  private static String expandPercentS(String template, String replacement) {
234
    return template.replace("%s", replacement);
1✔
235
  }
236

237
  @Override
238
  public void shutdown() {
239
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
240
    if (resolveState != null) {
1✔
241
      resolveState.stop();
1✔
242
    }
243
    if (xdsClient != null) {
1✔
244
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
245
    }
246
  }
1✔
247

248
  @VisibleForTesting
249
  static Map<String, ?> generateServiceConfigWithMethodConfig(
250
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
251
    if (timeoutNano == null
1✔
252
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
253
      return Collections.emptyMap();
1✔
254
    }
255
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
256
    methodConfig.put(
1✔
257
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
258
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
259
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
260
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
261
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
262
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
263
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
264
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
265
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
266
        codes.add(code.name());
1✔
267
      }
1✔
268
      rawRetryPolicy.put(
1✔
269
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
270
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
271
        rawRetryPolicy.put(
×
272
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
273
      }
274
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
275
    }
276
    if (timeoutNano != null) {
1✔
277
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
278
      methodConfig.put("timeout", timeout);
1✔
279
    }
280
    return Collections.singletonMap(
1✔
281
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
282
  }
283

284
  @VisibleForTesting
285
  XdsClient getXdsClient() {
286
    return xdsClient;
1✔
287
  }
288

289
  // called in syncContext
290
  private void updateResolutionResult() {
291
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
292

293
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
294
    for (String name : clusterRefs.keySet()) {
1✔
295
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
296
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
297
    }
1✔
298
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
299
        "loadBalancingConfig",
300
        ImmutableList.of(ImmutableMap.of(
1✔
301
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
302
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
303

304
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
305
      logger.log(
×
306
          XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
×
307
    }
308
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
309
    Attributes attrs =
310
        Attributes.newBuilder()
1✔
311
            .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
1✔
312
            .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
313
            .set(InternalConfigSelector.KEY, configSelector)
1✔
314
            .build();
1✔
315
    ResolutionResult result =
316
        ResolutionResult.newBuilder()
1✔
317
            .setAttributes(attrs)
1✔
318
            .setServiceConfig(parsedServiceConfig)
1✔
319
            .build();
1✔
320
    listener.onResult(result);
1✔
321
    receivedConfig = true;
1✔
322
  }
1✔
323

324
  /**
325
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
326
   * case-insensitive.
327
   *
328
   * <p>Wildcard pattern rules:
329
   * <ol>
330
   * <li>A single asterisk (*) matches any domain.</li>
331
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
332
   *     but not both.</li>
333
   * </ol>
334
   */
335
  @VisibleForTesting
336
  static boolean matchHostName(String hostName, String pattern) {
337
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
338
        "Invalid host name");
339
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
340
        "Invalid pattern/domain name");
341

342
    hostName = hostName.toLowerCase(Locale.US);
1✔
343
    pattern = pattern.toLowerCase(Locale.US);
1✔
344
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
345

346
    if (!pattern.contains("*")) {
1✔
347
      // Not a wildcard pattern -- hostName and pattern must match exactly.
348
      return hostName.equals(pattern);
1✔
349
    }
350
    // Wildcard pattern
351

352
    if (pattern.length() == 1) {
1✔
353
      return true;
×
354
    }
355

356
    int index = pattern.indexOf('*');
1✔
357

358
    // At most one asterisk (*) is allowed.
359
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
360
      return false;
×
361
    }
362

363
    // Asterisk can only match prefix or suffix.
364
    if (index != 0 && index != pattern.length() - 1) {
1✔
365
      return false;
×
366
    }
367

368
    // HostName must be at least as long as the pattern because asterisk has to
369
    // match one or more characters.
370
    if (hostName.length() < pattern.length()) {
1✔
371
      return false;
1✔
372
    }
373

374
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
375
      // Prefix matching fails.
376
      return true;
1✔
377
    }
378

379
    // Pattern matches hostname if suffix matching succeeds.
380
    return index == pattern.length() - 1
1✔
381
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
382
  }
383

384
  private final class ConfigSelector extends InternalConfigSelector {
1✔
385
    @Override
386
    public Result selectConfig(PickSubchannelArgs args) {
387
      RoutingConfig routingCfg;
388
      RouteData selectedRoute;
389
      String cluster;
390
      ClientInterceptor filters;
391
      Metadata headers = args.getHeaders();
1✔
392
      String path = "/" + args.getMethodDescriptor().getFullMethodName();
1✔
393
      do {
394
        routingCfg = routingConfig;
1✔
395
        selectedRoute = null;
1✔
396
        for (RouteData route : routingCfg.routes) {
1✔
397
          if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
1✔
398
            selectedRoute = route;
1✔
399
            break;
1✔
400
          }
401
        }
1✔
402
        if (selectedRoute == null) {
1✔
403
          return Result.forError(
1✔
404
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
405
        }
406
        if (selectedRoute.routeAction == null) {
1✔
407
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
408
              "Could not route RPC to Route with non-forwarding action"));
409
        }
410
        RouteAction action = selectedRoute.routeAction;
1✔
411
        if (action.cluster() != null) {
1✔
412
          cluster = prefixedClusterName(action.cluster());
1✔
413
          filters = selectedRoute.filterChoices.get(0);
1✔
414
        } else if (action.weightedClusters() != null) {
1✔
415
          // XdsRouteConfigureResource verifies the total weight will not be 0 or exceed uint32
416
          long totalWeight = 0;
1✔
417
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
418
            totalWeight += weightedCluster.weight();
1✔
419
          }
1✔
420
          long select = random.nextLong(totalWeight);
1✔
421
          long accumulator = 0;
1✔
422
          for (int i = 0; ; i++) {
1✔
423
            ClusterWeight weightedCluster = action.weightedClusters().get(i);
1✔
424
            accumulator += weightedCluster.weight();
1✔
425
            if (select < accumulator) {
1✔
426
              cluster = prefixedClusterName(weightedCluster.name());
1✔
427
              filters = selectedRoute.filterChoices.get(i);
1✔
428
              break;
1✔
429
            }
430
          }
431
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
432
          cluster =
1✔
433
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
434
          filters = selectedRoute.filterChoices.get(0);
1✔
435
        } else {
436
          // updateRoutes() discards routes with unknown actions
437
          throw new AssertionError();
×
438
        }
439
      } while (!retainCluster(cluster));
1✔
440

441
      final RouteAction routeAction = selectedRoute.routeAction;
1✔
442
      Long timeoutNanos = null;
1✔
443
      if (enableTimeout) {
1✔
444
        timeoutNanos = routeAction.timeoutNano();
1✔
445
        if (timeoutNanos == null) {
1✔
446
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
447
        }
448
        if (timeoutNanos <= 0) {
1✔
449
          timeoutNanos = null;
1✔
450
        }
451
      }
452
      RetryPolicy retryPolicy = routeAction.retryPolicy();
1✔
453
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
454
      Map<String, ?> rawServiceConfig =
1✔
455
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
456
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
457
      Object config = parsedServiceConfig.getConfig();
1✔
458
      if (config == null) {
1✔
459
        releaseCluster(cluster);
×
460
        return Result.forError(
×
461
            parsedServiceConfig.getError().augmentDescription(
×
462
                "Failed to parse service config (method config)"));
463
      }
464
      final String finalCluster = cluster;
1✔
465
      final long hash = generateHash(routeAction.hashPolicies(), headers);
1✔
466
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
467
        @Override
468
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
469
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
470
            final Channel next) {
471
          CallOptions callOptionsForCluster =
1✔
472
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
473
                  .withOption(RPC_HASH_KEY, hash);
1✔
474
          if (routeAction.autoHostRewrite()) {
1✔
475
            callOptionsForCluster = callOptionsForCluster.withOption(AUTO_HOST_REWRITE_KEY, true);
1✔
476
          }
477
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
478
              next.newCall(method, callOptionsForCluster)) {
1✔
479
            @Override
480
            public void start(Listener<RespT> listener, Metadata headers) {
481
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
482
                boolean committed;
483

484
                @Override
485
                public void onHeaders(Metadata headers) {
486
                  committed = true;
1✔
487
                  releaseCluster(finalCluster);
1✔
488
                  delegate().onHeaders(headers);
1✔
489
                }
1✔
490

491
                @Override
492
                public void onClose(Status status, Metadata trailers) {
493
                  if (!committed) {
1✔
494
                    releaseCluster(finalCluster);
1✔
495
                  }
496
                  delegate().onClose(status, trailers);
1✔
497
                }
1✔
498
              };
499
              delegate().start(listener, headers);
1✔
500
            }
1✔
501
          };
502
        }
503
      }
504

505
      return
1✔
506
          Result.newBuilder()
1✔
507
              .setConfig(config)
1✔
508
              .setInterceptor(combineInterceptors(
1✔
509
                  ImmutableList.of(filters, new ClusterSelectionInterceptor())))
1✔
510
              .build();
1✔
511
    }
512

513
    private boolean retainCluster(String cluster) {
514
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
515
      if (clusterRefState == null) {
1✔
516
        return false;
×
517
      }
518
      AtomicInteger refCount = clusterRefState.refCount;
1✔
519
      int count;
520
      do {
521
        count = refCount.get();
1✔
522
        if (count == 0) {
1✔
523
          return false;
×
524
        }
525
      } while (!refCount.compareAndSet(count, count + 1));
1✔
526
      return true;
1✔
527
    }
528

529
    private void releaseCluster(final String cluster) {
530
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
531
      if (count == 0) {
1✔
532
        syncContext.execute(new Runnable() {
1✔
533
          @Override
534
          public void run() {
535
            if (clusterRefs.get(cluster).refCount.get() == 0) {
1✔
536
              clusterRefs.remove(cluster);
1✔
537
              updateResolutionResult();
1✔
538
            }
539
          }
1✔
540
        });
541
      }
542
    }
1✔
543

544
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
545
      Long hash = null;
1✔
546
      for (HashPolicy policy : hashPolicies) {
1✔
547
        Long newHash = null;
1✔
548
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
549
          String value = getHeaderValue(headers, policy.headerName());
1✔
550
          if (value != null) {
1✔
551
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
552
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
553
            }
554
            newHash = hashFunc.hashAsciiString(value);
1✔
555
          }
556
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
557
          newHash = hashFunc.hashLong(randomChannelId);
1✔
558
        }
559
        if (newHash != null ) {
1✔
560
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
561
          // and preserves all of the entropy.
562
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
563
          hash = oldHash ^ newHash;
1✔
564
        }
565
        // If the policy is a terminal policy and a hash has been generated, ignore
566
        // the rest of the hash policies.
567
        if (policy.isTerminal() && hash != null) {
1✔
568
          break;
×
569
        }
570
      }
1✔
571
      return hash == null ? random.nextLong() : hash;
1✔
572
    }
573
  }
574

575
  static final class PassthroughClientInterceptor implements ClientInterceptor {
1✔
576
    @Override
577
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
578
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
579
      return next.newCall(method, callOptions);
1✔
580
    }
581
  }
582

583
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
584
    if (interceptors.size() == 0) {
1✔
585
      return new PassthroughClientInterceptor();
1✔
586
    }
587
    if (interceptors.size() == 1) {
1✔
588
      return interceptors.get(0);
1✔
589
    }
590
    return new ClientInterceptor() {
1✔
591
      @Override
592
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
593
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
594
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
595
        return next.newCall(method, callOptions);
1✔
596
      }
597
    };
598
  }
599

600
  @Nullable
601
  private static String getHeaderValue(Metadata headers, String headerName) {
602
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
603
      return null;
×
604
    }
605
    if (headerName.equals("content-type")) {
1✔
606
      return "application/grpc";
×
607
    }
608
    Metadata.Key<String> key;
609
    try {
610
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
611
    } catch (IllegalArgumentException e) {
×
612
      return null;
×
613
    }
1✔
614
    Iterable<String> values = headers.getAll(key);
1✔
615
    return values == null ? null : Joiner.on(",").join(values);
1✔
616
  }
617

618
  private static String prefixedClusterName(String name) {
619
    return "cluster:" + name;
1✔
620
  }
621

622
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
623
    return "cluster_specifier_plugin:" + pluginName;
1✔
624
  }
625

626
  private static final class FailingConfigSelector extends InternalConfigSelector {
627
    private final Result result;
628

629
    public FailingConfigSelector(Status error) {
1✔
630
      this.result = Result.forError(error);
1✔
631
    }
1✔
632

633
    @Override
634
    public Result selectConfig(PickSubchannelArgs args) {
635
      return result;
1✔
636
    }
637
  }
638

639
  private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
640
    private final ConfigOrError emptyServiceConfig =
1✔
641
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
642
    private final String ldsResourceName;
643
    private boolean stopped;
644
    @Nullable
645
    private Set<String> existingClusters;  // clusters to which new requests can be routed
646
    @Nullable
647
    private RouteDiscoveryState routeDiscoveryState;
648

649
    ResolveState(String ldsResourceName) {
1✔
650
      this.ldsResourceName = ldsResourceName;
1✔
651
    }
1✔
652

653
    @Override
654
    public void onChanged(final XdsListenerResource.LdsUpdate update) {
655
      if (stopped) {
1✔
656
        return;
×
657
      }
658
      logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
1✔
659
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
660
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
661
      String rdsName = httpConnectionManager.rdsName();
1✔
662
      cleanUpRouteDiscoveryState();
1✔
663
      if (virtualHosts != null) {
1✔
664
        updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
665
            httpConnectionManager.httpFilterConfigs());
1✔
666
      } else {
667
        routeDiscoveryState = new RouteDiscoveryState(
1✔
668
            rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
669
            httpConnectionManager.httpFilterConfigs());
1✔
670
        logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
671
        xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
672
            rdsName, routeDiscoveryState, syncContext);
1✔
673
      }
674
    }
1✔
675

676
    @Override
677
    public void onError(final Status error) {
678
      if (stopped || receivedConfig) {
1✔
679
        return;
×
680
      }
681
      listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
682
          String.format("Unable to load LDS %s. xDS server returned: %s: %s",
1✔
683
          ldsResourceName, error.getCode(), error.getDescription())));
1✔
684
    }
1✔
685

686
    @Override
687
    public void onResourceDoesNotExist(final String resourceName) {
688
      if (stopped) {
1✔
689
        return;
×
690
      }
691
      String error = "LDS resource does not exist: " + resourceName;
1✔
692
      logger.log(XdsLogLevel.INFO, error);
1✔
693
      cleanUpRouteDiscoveryState();
1✔
694
      cleanUpRoutes(error);
1✔
695
    }
1✔
696

697
    private void start() {
698
      logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
1✔
699
      xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
1✔
700
          ldsResourceName, this, syncContext);
1✔
701
    }
1✔
702

703
    private void stop() {
704
      logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
1✔
705
      stopped = true;
1✔
706
      cleanUpRouteDiscoveryState();
1✔
707
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
708
    }
1✔
709

710
    // called in syncContext
711
    private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
712
        @Nullable List<NamedFilterConfig> filterConfigs) {
713
      String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
714
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
1✔
715
      if (virtualHost == null) {
1✔
716
        String error = "Failed to find virtual host matching hostname: " + authority;
1✔
717
        logger.log(XdsLogLevel.WARNING, error);
1✔
718
        cleanUpRoutes(error);
1✔
719
        return;
1✔
720
      }
721

722
      List<Route> routes = virtualHost.routes();
1✔
723
      ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
1✔
724

725
      // Populate all clusters to which requests can be routed to through the virtual host.
726
      Set<String> clusters = new HashSet<>();
1✔
727
      // uniqueName -> clusterName
728
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
729
      // uniqueName -> pluginConfig
730
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
731
      for (Route route : routes) {
1✔
732
        RouteAction action = route.routeAction();
1✔
733
        String prefixedName;
734
        if (action == null) {
1✔
735
          routesData.add(new RouteData(route.routeMatch(), null, ImmutableList.of()));
1✔
736
        } else if (action.cluster() != null) {
1✔
737
          prefixedName = prefixedClusterName(action.cluster());
1✔
738
          clusters.add(prefixedName);
1✔
739
          clusterNameMap.put(prefixedName, action.cluster());
1✔
740
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
741
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
742
        } else if (action.weightedClusters() != null) {
1✔
743
          ImmutableList.Builder<ClientInterceptor> filterList = ImmutableList.builder();
1✔
744
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
745
            prefixedName = prefixedClusterName(weightedCluster.name());
1✔
746
            clusters.add(prefixedName);
1✔
747
            clusterNameMap.put(prefixedName, weightedCluster.name());
1✔
748
            filterList.add(createFilters(filterConfigs, virtualHost, route, weightedCluster));
1✔
749
          }
1✔
750
          routesData.add(
1✔
751
              new RouteData(route.routeMatch(), route.routeAction(), filterList.build()));
1✔
752
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
753
          PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
754
          if (pluginConfig instanceof RlsPluginConfig) {
1✔
755
            prefixedName = prefixedClusterSpecifierPluginName(
1✔
756
                action.namedClusterSpecifierPluginConfig().name());
1✔
757
            clusters.add(prefixedName);
1✔
758
            rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
759
          }
760
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
761
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
762
        } else {
763
          // Discard route
764
        }
765
      }
1✔
766

767
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
768
      boolean shouldUpdateResult = existingClusters == null;
1✔
769
      Set<String> addedClusters =
770
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
771
      Set<String> deletedClusters =
772
          existingClusters == null
1✔
773
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
774
      existingClusters = clusters;
1✔
775
      for (String cluster : addedClusters) {
1✔
776
        if (clusterRefs.containsKey(cluster)) {
1✔
777
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
778
        } else {
779
          if (clusterNameMap.containsKey(cluster)) {
1✔
780
            clusterRefs.put(
1✔
781
                cluster,
782
                ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
1✔
783
          }
784
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
785
            clusterRefs.put(
1✔
786
                cluster,
787
                ClusterRefState.forRlsPlugin(
1✔
788
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
789
          }
790
          shouldUpdateResult = true;
1✔
791
        }
792
      }
1✔
793
      for (String cluster : clusters) {
1✔
794
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
795
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
796
          ClusterRefState newClusterRefState =
1✔
797
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
798
          clusterRefs.put(cluster, newClusterRefState);
1✔
799
          shouldUpdateResult = true;
1✔
800
        }
801
      }
1✔
802
      // Update service config to include newly added clusters.
803
      if (shouldUpdateResult) {
1✔
804
        updateResolutionResult();
1✔
805
      }
806
      // Make newly added clusters selectable by config selector and deleted clusters no longer
807
      // selectable.
808
      routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routesData.build());
1✔
809
      shouldUpdateResult = false;
1✔
810
      for (String cluster : deletedClusters) {
1✔
811
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
812
        if (count == 0) {
1✔
813
          clusterRefs.remove(cluster);
1✔
814
          shouldUpdateResult = true;
1✔
815
        }
816
      }
1✔
817
      if (shouldUpdateResult) {
1✔
818
        updateResolutionResult();
1✔
819
      }
820
    }
1✔
821

822
    private ClientInterceptor createFilters(
823
        @Nullable List<NamedFilterConfig> filterConfigs,
824
        VirtualHost virtualHost,
825
        Route route,
826
        @Nullable ClusterWeight weightedCluster) {
827
      if (filterConfigs == null) {
1✔
828
        return new PassthroughClientInterceptor();
1✔
829
      }
830
      Map<String, FilterConfig> selectedOverrideConfigs =
1✔
831
          new HashMap<>(virtualHost.filterConfigOverrides());
1✔
832
      selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
833
      if (weightedCluster != null) {
1✔
834
        selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
835
      }
836
      ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
1✔
837
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
838
        FilterConfig filterConfig = namedFilter.filterConfig;
1✔
839
        Filter filter = filterRegistry.get(filterConfig.typeUrl());
1✔
840
        if (filter instanceof ClientInterceptorBuilder) {
1✔
841
          ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter)
1✔
842
              .buildClientInterceptor(
1✔
843
                  filterConfig, selectedOverrideConfigs.get(namedFilter.name),
1✔
844
                  scheduler);
1✔
845
          if (interceptor != null) {
1✔
846
            filterInterceptors.add(interceptor);
1✔
847
          }
848
        }
849
      }
1✔
850
      return combineInterceptors(filterInterceptors.build());
1✔
851
    }
852

853
    private void cleanUpRoutes(String error) {
854
      if (existingClusters != null) {
1✔
855
        for (String cluster : existingClusters) {
1✔
856
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
857
          if (count == 0) {
1✔
858
            clusterRefs.remove(cluster);
1✔
859
          }
860
        }
1✔
861
        existingClusters = null;
1✔
862
      }
863
      routingConfig = RoutingConfig.EMPTY;
1✔
864
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
865
      // the config selector handles the error message itself. Once the LB API allows providing
866
      // failure information for addresses yet still providing a service config, the config seector
867
      // could be avoided.
868
      String errorWithNodeId =
1✔
869
          error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
870
      listener.onResult(ResolutionResult.newBuilder()
1✔
871
          .setAttributes(Attributes.newBuilder()
1✔
872
            .set(InternalConfigSelector.KEY,
1✔
873
              new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId)))
1✔
874
            .build())
1✔
875
          .setServiceConfig(emptyServiceConfig)
1✔
876
          .build());
1✔
877
      receivedConfig = true;
1✔
878
    }
1✔
879

880
    private void cleanUpRouteDiscoveryState() {
881
      if (routeDiscoveryState != null) {
1✔
882
        String rdsName = routeDiscoveryState.resourceName;
1✔
883
        logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
1✔
884
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
885
            routeDiscoveryState);
886
        routeDiscoveryState = null;
1✔
887
      }
888
    }
1✔
889

890
    /**
891
     * Discovery state for RouteConfiguration resource. One instance for each Listener resource
892
     * update.
893
     */
894
    private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
895
      private final String resourceName;
896
      private final long httpMaxStreamDurationNano;
897
      @Nullable
898
      private final List<NamedFilterConfig> filterConfigs;
899

900
      private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
901
          @Nullable List<NamedFilterConfig> filterConfigs) {
1✔
902
        this.resourceName = resourceName;
1✔
903
        this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
1✔
904
        this.filterConfigs = filterConfigs;
1✔
905
      }
1✔
906

907
      @Override
908
      public void onChanged(final RdsUpdate update) {
909
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
910
          return;
×
911
        }
912
        logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
1✔
913
        updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
1✔
914
      }
1✔
915

916
      @Override
917
      public void onError(final Status error) {
918
        if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
1✔
919
          return;
×
920
        }
921
        listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
922
            String.format("Unable to load RDS %s. xDS server returned: %s: %s",
1✔
923
            resourceName, error.getCode(), error.getDescription())));
1✔
924
      }
1✔
925

926
      @Override
927
      public void onResourceDoesNotExist(final String resourceName) {
928
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
929
          return;
×
930
        }
931
        String error = "RDS resource does not exist: " + resourceName;
1✔
932
        logger.log(XdsLogLevel.INFO, error);
1✔
933
        cleanUpRoutes(error);
1✔
934
      }
1✔
935
    }
936
  }
937

938
  /**
939
   * VirtualHost-level configuration for request routing.
940
   */
941
  private static class RoutingConfig {
942
    private final long fallbackTimeoutNano;
943
    final ImmutableList<RouteData> routes;
944

945
    private static final RoutingConfig EMPTY = new RoutingConfig(0, ImmutableList.of());
1✔
946

947
    private RoutingConfig(long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
1✔
948
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
949
      this.routes = checkNotNull(routes, "routes");
1✔
950
    }
1✔
951
  }
952

953
  static final class RouteData {
954
    final RouteMatch routeMatch;
955
    /** null implies non-forwarding action. */
956
    @Nullable
957
    final RouteAction routeAction;
958
    /**
959
     * Only one of these interceptors should be used per-RPC. There are only multiple values in the
960
     * list for weighted clusters, in which case the order of the list mirrors the weighted
961
     * clusters.
962
     */
963
    final ImmutableList<ClientInterceptor> filterChoices;
964

965
    RouteData(RouteMatch routeMatch, @Nullable RouteAction routeAction, ClientInterceptor filter) {
966
      this(routeMatch, routeAction, ImmutableList.of(filter));
1✔
967
    }
1✔
968

969
    RouteData(
970
        RouteMatch routeMatch,
971
        @Nullable RouteAction routeAction,
972
        ImmutableList<ClientInterceptor> filterChoices) {
1✔
973
      this.routeMatch = checkNotNull(routeMatch, "routeMatch");
1✔
974
      checkArgument(
1✔
975
          routeAction == null || !filterChoices.isEmpty(),
1✔
976
          "filter may be empty only for non-forwarding action");
977
      this.routeAction = routeAction;
1✔
978
      if (routeAction != null && routeAction.weightedClusters() != null) {
1✔
979
        checkArgument(
1✔
980
            routeAction.weightedClusters().size() == filterChoices.size(),
1✔
981
            "filter choices must match size of weighted clusters");
982
      }
983
      for (ClientInterceptor filter : filterChoices) {
1✔
984
        checkNotNull(filter, "entry in filterChoices is null");
1✔
985
      }
1✔
986
      this.filterChoices = checkNotNull(filterChoices, "filterChoices");
1✔
987
    }
1✔
988
  }
989

990
  private static class ClusterRefState {
991
    final AtomicInteger refCount;
992
    @Nullable
993
    final String traditionalCluster;
994
    @Nullable
995
    final RlsPluginConfig rlsPluginConfig;
996

997
    private ClusterRefState(
998
        AtomicInteger refCount, @Nullable String traditionalCluster,
999
        @Nullable RlsPluginConfig rlsPluginConfig) {
1✔
1000
      this.refCount = refCount;
1✔
1001
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
1002
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
1003
      this.traditionalCluster = traditionalCluster;
1✔
1004
      this.rlsPluginConfig = rlsPluginConfig;
1✔
1005
    }
1✔
1006

1007
    private Map<String, ?> toLbPolicy() {
1008
      if (traditionalCluster != null) {
1✔
1009
        return ImmutableMap.of(
1✔
1010
            XdsLbPolicies.CDS_POLICY_NAME,
1011
            ImmutableMap.of("cluster", traditionalCluster));
1✔
1012
      } else {
1013
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
1014
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
1015
            .put(
1✔
1016
                "childPolicy",
1017
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
1✔
1018
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
1019
            .buildOrThrow();
1✔
1020
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
1021
      }
1022
    }
1023

1024
    static ClusterRefState forCluster(AtomicInteger refCount, String name) {
1025
      return new ClusterRefState(refCount, name, null);
1✔
1026
    }
1027

1028
    static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
1029
      return new ClusterRefState(refCount, null, rlsPluginConfig);
1✔
1030
    }
1031
  }
1032
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc