• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19720

07 Mar 2025 06:33PM UTC coverage: 88.517% (+0.006%) from 88.511%
#19720

push

github

web-flow
xds: Fix cluster selection races when updating config selector

Listener2.onResult() doesn't require running in the sync context, so
when called from the sync context it is guaranteed not to do its
processing immediately (instead, it schedules work into the sync
context).

The code was doing an update dance: 1) update service config to add new
cluster, 2) update config selector to use new cluster, 3) update service
config to remove old clusters. But the onResult() wasn't being processed
immediately, so the actual execution order was 2, 1, 3 which has a small
window where RPCs will fail. But onResult2() does run immediately. And
since ca4819ac6, updateBalancingState() updates the picker immediately.

cleanUpRoutes() was also racy because it updated the routingConfig
before swapping to the new config selector, so RPCs could fail saying
there was no route instead of the useful error message. Even with the
opposite order, some RPCs may be executing the while loop of
selectConfig(), trying to acquire a cluster. The code unreffed the
clusters before updating the routingConfig, so those RPCs could go into
a tight loop until the routingConfig was updated. Also, once the
routingConfig was updated to EMPTY those RPCs would similarly
see the wrong error message. To give the correct error message,
selectConfig() must fail such RPCs directly, and once it can do that
there's no need to stop using the config selector in error cases. This
has the benefit of fewer moving parts and more consistent threading
among cases.

The added test was able to detect the race 2% of the time. The slower
the code/machine, the more reliable the test failed. ca4819ac6 along
with this commit reduced it to 0 failures in 1000 runs.

Discovered when investigating b/394850611

34573 of 39058 relevant lines covered (88.52%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.21
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.MetricRecorder;
45
import io.grpc.NameResolver;
46
import io.grpc.Status;
47
import io.grpc.Status.Code;
48
import io.grpc.SynchronizationContext;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.ObjectPool;
51
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
52
import io.grpc.xds.Filter.FilterConfig;
53
import io.grpc.xds.Filter.NamedFilterConfig;
54
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
55
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
56
import io.grpc.xds.VirtualHost.Route;
57
import io.grpc.xds.VirtualHost.Route.RouteAction;
58
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
61
import io.grpc.xds.VirtualHost.Route.RouteMatch;
62
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
63
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
64
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
65
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
66
import io.grpc.xds.client.XdsClient;
67
import io.grpc.xds.client.XdsClient.ResourceWatcher;
68
import io.grpc.xds.client.XdsLogger;
69
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
70
import java.net.URI;
71
import java.util.ArrayList;
72
import java.util.Collections;
73
import java.util.HashMap;
74
import java.util.HashSet;
75
import java.util.List;
76
import java.util.Locale;
77
import java.util.Map;
78
import java.util.Objects;
79
import java.util.Set;
80
import java.util.concurrent.ConcurrentHashMap;
81
import java.util.concurrent.ConcurrentMap;
82
import java.util.concurrent.ScheduledExecutorService;
83
import java.util.concurrent.atomic.AtomicInteger;
84
import javax.annotation.Nullable;
85

86
/**
87
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
88
 *
89
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
90
 * protocol to retrieve service information and produce a service config to the caller.
91
 *
92
 * @see XdsNameResolverProvider
93
 */
94
final class XdsNameResolver extends NameResolver {
95

96
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
97
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
98
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
99
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
100
  static final CallOptions.Key<Boolean> AUTO_HOST_REWRITE_KEY =
1✔
101
      CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY");
1✔
102
  @VisibleForTesting
103
  static boolean enableTimeout =
1✔
104
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
105
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
106

107
  private final InternalLogId logId;
108
  private final XdsLogger logger;
109
  @Nullable
110
  private final String targetAuthority;
111
  private final String target;
112
  private final String serviceAuthority;
113
  // Encoded version of the service authority as per 
114
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
115
  private final String encodedServiceAuthority;
116
  private final String overrideAuthority;
117
  private final ServiceConfigParser serviceConfigParser;
118
  private final SynchronizationContext syncContext;
119
  private final ScheduledExecutorService scheduler;
120
  private final XdsClientPoolFactory xdsClientPoolFactory;
121
  private final ThreadSafeRandom random;
122
  private final FilterRegistry filterRegistry;
123
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
124
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
125
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
126
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
127
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
128
  private final long randomChannelId;
129
  private final MetricRecorder metricRecorder;
130
  // Must be accessed in syncContext.
131
  // Filter instances are unique per channel, and per filter (name+typeUrl).
132
  // NamedFilterConfig.filterStateKey -> filter_instance.
133
  private final HashMap<String, Filter> activeFilters = new HashMap<>();
1✔
134

135
  private volatile RoutingConfig routingConfig;
136
  private Listener2 listener;
137
  private ObjectPool<XdsClient> xdsClientPool;
138
  private XdsClient xdsClient;
139
  private CallCounterProvider callCounterProvider;
140
  private ResolveState resolveState;
141
  // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
142
  // XdsClient instead of here.
143
  private boolean receivedConfig;
144

145
  XdsNameResolver(
146
      URI targetUri, String name, @Nullable String overrideAuthority,
147
      ServiceConfigParser serviceConfigParser,
148
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
149
      @Nullable Map<String, ?> bootstrapOverride,
150
      MetricRecorder metricRecorder) {
151
    this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
1✔
152
        syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
1✔
153
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
1✔
154
        metricRecorder);
155
  }
1✔
156

157
  @VisibleForTesting
158
  XdsNameResolver(
159
      URI targetUri, @Nullable String targetAuthority, String name,
160
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
161
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
162
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
163
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
164
      MetricRecorder metricRecorder) {
1✔
165
    this.targetAuthority = targetAuthority;
1✔
166
    target = targetUri.toString();
1✔
167

168
    // The name might have multiple slashes so encode it before verifying.
169
    serviceAuthority = checkNotNull(name, "name");
1✔
170
    this.encodedServiceAuthority = 
1✔
171
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
172

173
    this.overrideAuthority = overrideAuthority;
1✔
174
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
175
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
176
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
177
    this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
1✔
178
            "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
1✔
179
    this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
1✔
180
    this.random = checkNotNull(random, "random");
1✔
181
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
182
    this.metricRecorder = metricRecorder;
1✔
183
    randomChannelId = random.nextLong();
1✔
184
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
185
    logger = XdsLogger.withLogId(logId);
1✔
186
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
187
  }
1✔
188

189
  @Override
190
  public String getServiceAuthority() {
191
    return encodedServiceAuthority;
1✔
192
  }
193

194
  @Override
195
  public void start(Listener2 listener) {
196
    this.listener = checkNotNull(listener, "listener");
1✔
197
    try {
198
      xdsClientPool = xdsClientPoolFactory.getOrCreate(target, metricRecorder);
1✔
199
    } catch (Exception e) {
1✔
200
      listener.onError(
1✔
201
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
202
      return;
1✔
203
    }
1✔
204
    xdsClient = xdsClientPool.getObject();
1✔
205
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
206
    String listenerNameTemplate;
207
    if (targetAuthority == null) {
1✔
208
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
209
    } else {
210
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
211
      if (authorityInfo == null) {
1✔
212
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
213
            "invalid target URI: target authority not found in the bootstrap"));
214
        return;
1✔
215
      }
216
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
217
    }
218
    String replacement = serviceAuthority;
1✔
219
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
220
      replacement = XdsClient.percentEncodePath(replacement);
1✔
221
    }
222
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
223
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
224
        ) {
225
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
226
          "invalid listener resource URI for service authority: " + serviceAuthority));
227
      return;
×
228
    }
229
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
230
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
231
    resolveState = new ResolveState(ldsResourceName);
1✔
232

233
    resolveState.start();
1✔
234
  }
1✔
235

236
  private static String expandPercentS(String template, String replacement) {
237
    return template.replace("%s", replacement);
1✔
238
  }
239

240
  @Override
241
  public void shutdown() {
242
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
243
    if (resolveState != null) {
1✔
244
      resolveState.stop();
1✔
245
    }
246
    if (xdsClient != null) {
1✔
247
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
248
    }
249
  }
1✔
250

251
  @VisibleForTesting
252
  static Map<String, ?> generateServiceConfigWithMethodConfig(
253
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
254
    if (timeoutNano == null
1✔
255
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
256
      return Collections.emptyMap();
1✔
257
    }
258
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
259
    methodConfig.put(
1✔
260
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
261
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
262
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
263
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
264
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
265
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
266
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
267
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
268
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
269
        codes.add(code.name());
1✔
270
      }
1✔
271
      rawRetryPolicy.put(
1✔
272
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
273
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
274
        rawRetryPolicy.put(
×
275
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
276
      }
277
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
278
    }
279
    if (timeoutNano != null) {
1✔
280
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
281
      methodConfig.put("timeout", timeout);
1✔
282
    }
283
    return Collections.singletonMap(
1✔
284
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
285
  }
286

287
  @VisibleForTesting
288
  XdsClient getXdsClient() {
289
    return xdsClient;
1✔
290
  }
291

292
  // called in syncContext
293
  private void updateResolutionResult() {
294
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
295

296
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
297
    for (String name : clusterRefs.keySet()) {
1✔
298
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
299
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
300
    }
1✔
301
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
302
        "loadBalancingConfig",
303
        ImmutableList.of(ImmutableMap.of(
1✔
304
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
305
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
306

307
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
308
      logger.log(
×
309
          XdsLogLevel.INFO, "Generated service config: {0}", new Gson().toJson(rawServiceConfig));
×
310
    }
311
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
312
    Attributes attrs =
313
        Attributes.newBuilder()
1✔
314
            .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
1✔
315
            .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
316
            .set(InternalConfigSelector.KEY, configSelector)
1✔
317
            .build();
1✔
318
    ResolutionResult result =
319
        ResolutionResult.newBuilder()
1✔
320
            .setAttributes(attrs)
1✔
321
            .setServiceConfig(parsedServiceConfig)
1✔
322
            .build();
1✔
323
    listener.onResult2(result);
1✔
324
    receivedConfig = true;
1✔
325
  }
1✔
326

327
  /**
328
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
329
   * case-insensitive.
330
   *
331
   * <p>Wildcard pattern rules:
332
   * <ol>
333
   * <li>A single asterisk (*) matches any domain.</li>
334
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
335
   *     but not both.</li>
336
   * </ol>
337
   */
338
  @VisibleForTesting
339
  static boolean matchHostName(String hostName, String pattern) {
340
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
341
        "Invalid host name");
342
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
343
        "Invalid pattern/domain name");
344

345
    hostName = hostName.toLowerCase(Locale.US);
1✔
346
    pattern = pattern.toLowerCase(Locale.US);
1✔
347
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
348

349
    if (!pattern.contains("*")) {
1✔
350
      // Not a wildcard pattern -- hostName and pattern must match exactly.
351
      return hostName.equals(pattern);
1✔
352
    }
353
    // Wildcard pattern
354

355
    if (pattern.length() == 1) {
1✔
356
      return true;
×
357
    }
358

359
    int index = pattern.indexOf('*');
1✔
360

361
    // At most one asterisk (*) is allowed.
362
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
363
      return false;
×
364
    }
365

366
    // Asterisk can only match prefix or suffix.
367
    if (index != 0 && index != pattern.length() - 1) {
1✔
368
      return false;
×
369
    }
370

371
    // HostName must be at least as long as the pattern because asterisk has to
372
    // match one or more characters.
373
    if (hostName.length() < pattern.length()) {
1✔
374
      return false;
1✔
375
    }
376

377
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
378
      // Prefix matching fails.
379
      return true;
1✔
380
    }
381

382
    // Pattern matches hostname if suffix matching succeeds.
383
    return index == pattern.length() - 1
1✔
384
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
385
  }
386

387
  private final class ConfigSelector extends InternalConfigSelector {
1✔
388
    @Override
389
    public Result selectConfig(PickSubchannelArgs args) {
390
      RoutingConfig routingCfg;
391
      RouteData selectedRoute;
392
      String cluster;
393
      ClientInterceptor filters;
394
      Metadata headers = args.getHeaders();
1✔
395
      String path = "/" + args.getMethodDescriptor().getFullMethodName();
1✔
396
      do {
397
        routingCfg = routingConfig;
1✔
398
        if (routingCfg.errorStatus != null) {
1✔
399
          return Result.forError(routingCfg.errorStatus);
1✔
400
        }
401
        selectedRoute = null;
1✔
402
        for (RouteData route : routingCfg.routes) {
1✔
403
          if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
1✔
404
            selectedRoute = route;
1✔
405
            break;
1✔
406
          }
407
        }
1✔
408
        if (selectedRoute == null) {
1✔
409
          return Result.forError(
1✔
410
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
411
        }
412
        if (selectedRoute.routeAction == null) {
1✔
413
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
414
              "Could not route RPC to Route with non-forwarding action"));
415
        }
416
        RouteAction action = selectedRoute.routeAction;
1✔
417
        if (action.cluster() != null) {
1✔
418
          cluster = prefixedClusterName(action.cluster());
1✔
419
          filters = selectedRoute.filterChoices.get(0);
1✔
420
        } else if (action.weightedClusters() != null) {
1✔
421
          // XdsRouteConfigureResource verifies the total weight will not be 0 or exceed uint32
422
          long totalWeight = 0;
1✔
423
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
424
            totalWeight += weightedCluster.weight();
1✔
425
          }
1✔
426
          long select = random.nextLong(totalWeight);
1✔
427
          long accumulator = 0;
1✔
428
          for (int i = 0; ; i++) {
1✔
429
            ClusterWeight weightedCluster = action.weightedClusters().get(i);
1✔
430
            accumulator += weightedCluster.weight();
1✔
431
            if (select < accumulator) {
1✔
432
              cluster = prefixedClusterName(weightedCluster.name());
1✔
433
              filters = selectedRoute.filterChoices.get(i);
1✔
434
              break;
1✔
435
            }
436
          }
437
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
438
          cluster =
1✔
439
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
440
          filters = selectedRoute.filterChoices.get(0);
1✔
441
        } else {
442
          // updateRoutes() discards routes with unknown actions
443
          throw new AssertionError();
×
444
        }
445
      } while (!retainCluster(cluster));
1✔
446

447
      final RouteAction routeAction = selectedRoute.routeAction;
1✔
448
      Long timeoutNanos = null;
1✔
449
      if (enableTimeout) {
1✔
450
        timeoutNanos = routeAction.timeoutNano();
1✔
451
        if (timeoutNanos == null) {
1✔
452
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
453
        }
454
        if (timeoutNanos <= 0) {
1✔
455
          timeoutNanos = null;
1✔
456
        }
457
      }
458
      RetryPolicy retryPolicy = routeAction.retryPolicy();
1✔
459
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
460
      Map<String, ?> rawServiceConfig =
1✔
461
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
462
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
463
      Object config = parsedServiceConfig.getConfig();
1✔
464
      if (config == null) {
1✔
465
        releaseCluster(cluster);
×
466
        return Result.forError(
×
467
            parsedServiceConfig.getError().augmentDescription(
×
468
                "Failed to parse service config (method config)"));
469
      }
470
      final String finalCluster = cluster;
1✔
471
      final long hash = generateHash(routeAction.hashPolicies(), headers);
1✔
472
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
473
        @Override
474
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
475
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
476
            final Channel next) {
477
          CallOptions callOptionsForCluster =
1✔
478
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
479
                  .withOption(RPC_HASH_KEY, hash);
1✔
480
          if (routeAction.autoHostRewrite()) {
1✔
481
            callOptionsForCluster = callOptionsForCluster.withOption(AUTO_HOST_REWRITE_KEY, true);
1✔
482
          }
483
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
484
              next.newCall(method, callOptionsForCluster)) {
1✔
485
            @Override
486
            public void start(Listener<RespT> listener, Metadata headers) {
487
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
488
                boolean committed;
489

490
                @Override
491
                public void onHeaders(Metadata headers) {
492
                  committed = true;
1✔
493
                  releaseCluster(finalCluster);
1✔
494
                  delegate().onHeaders(headers);
1✔
495
                }
1✔
496

497
                @Override
498
                public void onClose(Status status, Metadata trailers) {
499
                  if (!committed) {
1✔
500
                    releaseCluster(finalCluster);
1✔
501
                  }
502
                  delegate().onClose(status, trailers);
1✔
503
                }
1✔
504
              };
505
              delegate().start(listener, headers);
1✔
506
            }
1✔
507
          };
508
        }
509
      }
510

511
      return
1✔
512
          Result.newBuilder()
1✔
513
              .setConfig(config)
1✔
514
              .setInterceptor(combineInterceptors(
1✔
515
                  ImmutableList.of(filters, new ClusterSelectionInterceptor())))
1✔
516
              .build();
1✔
517
    }
518

519
    private boolean retainCluster(String cluster) {
520
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
521
      if (clusterRefState == null) {
1✔
522
        return false;
×
523
      }
524
      AtomicInteger refCount = clusterRefState.refCount;
1✔
525
      int count;
526
      do {
527
        count = refCount.get();
1✔
528
        if (count == 0) {
1✔
529
          return false;
×
530
        }
531
      } while (!refCount.compareAndSet(count, count + 1));
1✔
532
      return true;
1✔
533
    }
534

535
    private void releaseCluster(final String cluster) {
536
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
537
      if (count == 0) {
1✔
538
        syncContext.execute(new Runnable() {
1✔
539
          @Override
540
          public void run() {
541
            if (clusterRefs.get(cluster).refCount.get() == 0) {
1✔
542
              clusterRefs.remove(cluster);
1✔
543
              updateResolutionResult();
1✔
544
            }
545
          }
1✔
546
        });
547
      }
548
    }
1✔
549

550
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
551
      Long hash = null;
1✔
552
      for (HashPolicy policy : hashPolicies) {
1✔
553
        Long newHash = null;
1✔
554
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
555
          String value = getHeaderValue(headers, policy.headerName());
1✔
556
          if (value != null) {
1✔
557
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
558
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
559
            }
560
            newHash = hashFunc.hashAsciiString(value);
1✔
561
          }
562
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
563
          newHash = hashFunc.hashLong(randomChannelId);
1✔
564
        }
565
        if (newHash != null ) {
1✔
566
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
567
          // and preserves all of the entropy.
568
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
569
          hash = oldHash ^ newHash;
1✔
570
        }
571
        // If the policy is a terminal policy and a hash has been generated, ignore
572
        // the rest of the hash policies.
573
        if (policy.isTerminal() && hash != null) {
1✔
574
          break;
×
575
        }
576
      }
1✔
577
      return hash == null ? random.nextLong() : hash;
1✔
578
    }
579
  }
580

581
  static final class PassthroughClientInterceptor implements ClientInterceptor {
1✔
582
    @Override
583
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
584
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
585
      return next.newCall(method, callOptions);
1✔
586
    }
587
  }
588

589
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
590
    if (interceptors.size() == 0) {
1✔
591
      return new PassthroughClientInterceptor();
1✔
592
    }
593
    if (interceptors.size() == 1) {
1✔
594
      return interceptors.get(0);
1✔
595
    }
596
    return new ClientInterceptor() {
1✔
597
      @Override
598
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
599
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
600
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
601
        return next.newCall(method, callOptions);
1✔
602
      }
603
    };
604
  }
605

606
  @Nullable
607
  private static String getHeaderValue(Metadata headers, String headerName) {
608
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
609
      return null;
×
610
    }
611
    if (headerName.equals("content-type")) {
1✔
612
      return "application/grpc";
×
613
    }
614
    Metadata.Key<String> key;
615
    try {
616
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
617
    } catch (IllegalArgumentException e) {
×
618
      return null;
×
619
    }
1✔
620
    Iterable<String> values = headers.getAll(key);
1✔
621
    return values == null ? null : Joiner.on(",").join(values);
1✔
622
  }
623

624
  private static String prefixedClusterName(String name) {
625
    return "cluster:" + name;
1✔
626
  }
627

628
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
629
    return "cluster_specifier_plugin:" + pluginName;
1✔
630
  }
631

632
  private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
633
    private final ConfigOrError emptyServiceConfig =
1✔
634
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
635
    private final String ldsResourceName;
636
    private boolean stopped;
637
    @Nullable
638
    private Set<String> existingClusters;  // clusters to which new requests can be routed
639
    @Nullable
640
    private RouteDiscoveryState routeDiscoveryState;
641

642
    ResolveState(String ldsResourceName) {
1✔
643
      this.ldsResourceName = ldsResourceName;
1✔
644
    }
1✔
645

646
    @Override
647
    public void onChanged(final XdsListenerResource.LdsUpdate update) {
648
      if (stopped) {
1✔
649
        return;
×
650
      }
651
      logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
1✔
652
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
653
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
654
      String rdsName = httpConnectionManager.rdsName();
1✔
655
      ImmutableList<NamedFilterConfig> filterConfigs = httpConnectionManager.httpFilterConfigs();
1✔
656
      long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano();
1✔
657

658
      // Create/update HCM-bound state.
659
      cleanUpRouteDiscoveryState();
1✔
660
      updateActiveFilters(filterConfigs);
1✔
661

662
      // Routes specified directly in LDS.
663
      if (virtualHosts != null) {
1✔
664
        updateRoutes(virtualHosts, streamDurationNano, filterConfigs);
1✔
665
        return;
1✔
666
      }
667
      // Routes provided by RDS.
668
      routeDiscoveryState = new RouteDiscoveryState(rdsName, streamDurationNano, filterConfigs);
1✔
669
      logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
670
      xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
671
          rdsName, routeDiscoveryState, syncContext);
1✔
672
    }
1✔
673

674
    @Override
675
    public void onError(final Status error) {
676
      if (stopped || receivedConfig) {
1✔
677
        return;
×
678
      }
679
      listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
680
          String.format("Unable to load LDS %s. xDS server returned: %s: %s",
1✔
681
          ldsResourceName, error.getCode(), error.getDescription())));
1✔
682
    }
1✔
683

684
    @Override
685
    public void onResourceDoesNotExist(final String resourceName) {
686
      if (stopped) {
1✔
687
        return;
×
688
      }
689
      String error = "LDS resource does not exist: " + resourceName;
1✔
690
      logger.log(XdsLogLevel.INFO, error);
1✔
691
      cleanUpRouteDiscoveryState();
1✔
692
      updateActiveFilters(null);
1✔
693
      cleanUpRoutes(error);
1✔
694
    }
1✔
695

696
    private void start() {
697
      logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
1✔
698
      xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
1✔
699
          ldsResourceName, this, syncContext);
1✔
700
    }
1✔
701

702
    private void stop() {
703
      logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
1✔
704
      stopped = true;
1✔
705
      cleanUpRouteDiscoveryState();
1✔
706
      updateActiveFilters(null);
1✔
707
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
708
    }
1✔
709

710
    // called in syncContext
711
    private void updateActiveFilters(@Nullable List<NamedFilterConfig> filterConfigs) {
712
      if (filterConfigs == null) {
1✔
713
        filterConfigs = ImmutableList.of();
1✔
714
      }
715
      Set<String> filtersToShutdown = new HashSet<>(activeFilters.keySet());
1✔
716
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
717
        String typeUrl = namedFilter.filterConfig.typeUrl();
1✔
718
        String filterKey = namedFilter.filterStateKey();
1✔
719

720
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
721
        checkNotNull(provider, "provider %s", typeUrl);
1✔
722
        Filter filter = activeFilters.computeIfAbsent(filterKey, k -> provider.newInstance());
1✔
723
        checkNotNull(filter, "filter %s", filterKey);
1✔
724
        filtersToShutdown.remove(filterKey);
1✔
725
      }
1✔
726

727
      // Shutdown filters not present in current HCM.
728
      for (String filterKey : filtersToShutdown) {
1✔
729
        Filter filterToShutdown = activeFilters.remove(filterKey);
1✔
730
        checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
1✔
731
        filterToShutdown.close();
1✔
732
      }
1✔
733
    }
1✔
734

735
    // called in syncContext
736
    private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
737
        @Nullable List<NamedFilterConfig> filterConfigs) {
738
      String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
739
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
1✔
740
      if (virtualHost == null) {
1✔
741
        String error = "Failed to find virtual host matching hostname: " + authority;
1✔
742
        logger.log(XdsLogLevel.WARNING, error);
1✔
743
        cleanUpRoutes(error);
1✔
744
        return;
1✔
745
      }
746

747
      List<Route> routes = virtualHost.routes();
1✔
748
      ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
1✔
749

750
      // Populate all clusters to which requests can be routed to through the virtual host.
751
      Set<String> clusters = new HashSet<>();
1✔
752
      // uniqueName -> clusterName
753
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
754
      // uniqueName -> pluginConfig
755
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
756
      for (Route route : routes) {
1✔
757
        RouteAction action = route.routeAction();
1✔
758
        String prefixedName;
759
        if (action == null) {
1✔
760
          routesData.add(new RouteData(route.routeMatch(), null, ImmutableList.of()));
1✔
761
        } else if (action.cluster() != null) {
1✔
762
          prefixedName = prefixedClusterName(action.cluster());
1✔
763
          clusters.add(prefixedName);
1✔
764
          clusterNameMap.put(prefixedName, action.cluster());
1✔
765
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
766
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
767
        } else if (action.weightedClusters() != null) {
1✔
768
          ImmutableList.Builder<ClientInterceptor> filterList = ImmutableList.builder();
1✔
769
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
770
            prefixedName = prefixedClusterName(weightedCluster.name());
1✔
771
            clusters.add(prefixedName);
1✔
772
            clusterNameMap.put(prefixedName, weightedCluster.name());
1✔
773
            filterList.add(createFilters(filterConfigs, virtualHost, route, weightedCluster));
1✔
774
          }
1✔
775
          routesData.add(
1✔
776
              new RouteData(route.routeMatch(), route.routeAction(), filterList.build()));
1✔
777
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
778
          PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
779
          if (pluginConfig instanceof RlsPluginConfig) {
1✔
780
            prefixedName = prefixedClusterSpecifierPluginName(
1✔
781
                action.namedClusterSpecifierPluginConfig().name());
1✔
782
            clusters.add(prefixedName);
1✔
783
            rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
784
          }
785
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
786
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
787
        } else {
788
          // Discard route
789
        }
790
      }
1✔
791

792
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
793
      boolean shouldUpdateResult = existingClusters == null;
1✔
794
      Set<String> addedClusters =
795
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
796
      Set<String> deletedClusters =
797
          existingClusters == null
1✔
798
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
799
      existingClusters = clusters;
1✔
800
      for (String cluster : addedClusters) {
1✔
801
        if (clusterRefs.containsKey(cluster)) {
1✔
802
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
803
        } else {
804
          if (clusterNameMap.containsKey(cluster)) {
1✔
805
            clusterRefs.put(
1✔
806
                cluster,
807
                ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
1✔
808
          }
809
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
810
            clusterRefs.put(
1✔
811
                cluster,
812
                ClusterRefState.forRlsPlugin(
1✔
813
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
814
          }
815
          shouldUpdateResult = true;
1✔
816
        }
817
      }
1✔
818
      for (String cluster : clusters) {
1✔
819
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
820
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
821
          ClusterRefState newClusterRefState =
1✔
822
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
823
          clusterRefs.put(cluster, newClusterRefState);
1✔
824
          shouldUpdateResult = true;
1✔
825
        }
826
      }
1✔
827
      // Update service config to include newly added clusters.
828
      if (shouldUpdateResult && routingConfig != null) {
1✔
829
        updateResolutionResult();
1✔
830
        shouldUpdateResult = false;
1✔
831
      }
832
      // Make newly added clusters selectable by config selector and deleted clusters no longer
833
      // selectable.
834
      routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routesData.build());
1✔
835
      for (String cluster : deletedClusters) {
1✔
836
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
837
        if (count == 0) {
1✔
838
          clusterRefs.remove(cluster);
1✔
839
          shouldUpdateResult = true;
1✔
840
        }
841
      }
1✔
842
      if (shouldUpdateResult) {
1✔
843
        updateResolutionResult();
1✔
844
      }
845
    }
1✔
846

847
    private ClientInterceptor createFilters(
848
        @Nullable List<NamedFilterConfig> filterConfigs,
849
        VirtualHost virtualHost,
850
        Route route,
851
        @Nullable ClusterWeight weightedCluster) {
852
      if (filterConfigs == null) {
1✔
853
        return new PassthroughClientInterceptor();
1✔
854
      }
855

856
      Map<String, FilterConfig> selectedOverrideConfigs =
1✔
857
          new HashMap<>(virtualHost.filterConfigOverrides());
1✔
858
      selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
859
      if (weightedCluster != null) {
1✔
860
        selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
861
      }
862

863
      ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
1✔
864
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
865
        String name = namedFilter.name;
1✔
866
        FilterConfig config = namedFilter.filterConfig;
1✔
867
        FilterConfig overrideConfig = selectedOverrideConfigs.get(name);
1✔
868
        String filterKey = namedFilter.filterStateKey();
1✔
869

870
        Filter filter = activeFilters.get(filterKey);
1✔
871
        checkNotNull(filter, "activeFilters.get(%s)", filterKey);
1✔
872
        ClientInterceptor interceptor =
1✔
873
            filter.buildClientInterceptor(config, overrideConfig, scheduler);
1✔
874

875
        if (interceptor != null) {
1✔
876
          filterInterceptors.add(interceptor);
1✔
877
        }
878
      }
1✔
879

880
      // Combine interceptors produced by different filters into a single one that executes
881
      // them sequentially. The order is preserved.
882
      return combineInterceptors(filterInterceptors.build());
1✔
883
    }
884

885
    private void cleanUpRoutes(String error) {
886
      String errorWithNodeId =
1✔
887
          error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
888
      routingConfig = new RoutingConfig(Status.UNAVAILABLE.withDescription(errorWithNodeId));
1✔
889
      if (existingClusters != null) {
1✔
890
        for (String cluster : existingClusters) {
1✔
891
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
892
          if (count == 0) {
1✔
893
            clusterRefs.remove(cluster);
1✔
894
          }
895
        }
1✔
896
        existingClusters = null;
1✔
897
      }
898

899
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
900
      // the config selector handles the error message itself.
901
      listener.onResult2(ResolutionResult.newBuilder()
1✔
902
          .setAttributes(Attributes.newBuilder()
1✔
903
            .set(InternalConfigSelector.KEY, configSelector)
1✔
904
            .build())
1✔
905
          .setServiceConfig(emptyServiceConfig)
1✔
906
          .build());
1✔
907
      receivedConfig = true;
1✔
908
    }
1✔
909

910
    private void cleanUpRouteDiscoveryState() {
911
      if (routeDiscoveryState != null) {
1✔
912
        String rdsName = routeDiscoveryState.resourceName;
1✔
913
        logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
1✔
914
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
915
            routeDiscoveryState);
916
        routeDiscoveryState = null;
1✔
917
      }
918
    }
1✔
919

920
    /**
921
     * Discovery state for RouteConfiguration resource. One instance for each Listener resource
922
     * update.
923
     */
924
    private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
925
      private final String resourceName;
926
      private final long httpMaxStreamDurationNano;
927
      @Nullable
928
      private final List<NamedFilterConfig> filterConfigs;
929

930
      private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
931
          @Nullable List<NamedFilterConfig> filterConfigs) {
1✔
932
        this.resourceName = resourceName;
1✔
933
        this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
1✔
934
        this.filterConfigs = filterConfigs;
1✔
935
      }
1✔
936

937
      @Override
938
      public void onChanged(final RdsUpdate update) {
939
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
940
          return;
×
941
        }
942
        logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
1✔
943
        updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
1✔
944
      }
1✔
945

946
      @Override
947
      public void onError(final Status error) {
948
        if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
1✔
949
          return;
×
950
        }
951
        listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
952
            String.format("Unable to load RDS %s. xDS server returned: %s: %s",
1✔
953
            resourceName, error.getCode(), error.getDescription())));
1✔
954
      }
1✔
955

956
      @Override
957
      public void onResourceDoesNotExist(final String resourceName) {
958
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
959
          return;
×
960
        }
961
        String error = "RDS resource does not exist: " + resourceName;
1✔
962
        logger.log(XdsLogLevel.INFO, error);
1✔
963
        cleanUpRoutes(error);
1✔
964
      }
1✔
965
    }
966
  }
967

968
  /**
969
   * VirtualHost-level configuration for request routing.
970
   */
971
  private static class RoutingConfig {
972
    private final long fallbackTimeoutNano;
973
    final ImmutableList<RouteData> routes;
974
    final Status errorStatus;
975

976
    private RoutingConfig(long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
1✔
977
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
978
      this.routes = checkNotNull(routes, "routes");
1✔
979
      this.errorStatus = null;
1✔
980
    }
1✔
981

982
    private RoutingConfig(Status errorStatus) {
1✔
983
      this.fallbackTimeoutNano = 0;
1✔
984
      this.routes = null;
1✔
985
      this.errorStatus = checkNotNull(errorStatus, "errorStatus");
1✔
986
      checkArgument(!errorStatus.isOk(), "errorStatus should not be okay");
1✔
987
    }
1✔
988
  }
989

990
  static final class RouteData {
991
    final RouteMatch routeMatch;
992
    /** null implies non-forwarding action. */
993
    @Nullable
994
    final RouteAction routeAction;
995
    /**
996
     * Only one of these interceptors should be used per-RPC. There are only multiple values in the
997
     * list for weighted clusters, in which case the order of the list mirrors the weighted
998
     * clusters.
999
     */
1000
    final ImmutableList<ClientInterceptor> filterChoices;
1001

1002
    RouteData(RouteMatch routeMatch, @Nullable RouteAction routeAction, ClientInterceptor filter) {
1003
      this(routeMatch, routeAction, ImmutableList.of(filter));
1✔
1004
    }
1✔
1005

1006
    RouteData(
1007
        RouteMatch routeMatch,
1008
        @Nullable RouteAction routeAction,
1009
        ImmutableList<ClientInterceptor> filterChoices) {
1✔
1010
      this.routeMatch = checkNotNull(routeMatch, "routeMatch");
1✔
1011
      checkArgument(
1✔
1012
          routeAction == null || !filterChoices.isEmpty(),
1✔
1013
          "filter may be empty only for non-forwarding action");
1014
      this.routeAction = routeAction;
1✔
1015
      if (routeAction != null && routeAction.weightedClusters() != null) {
1✔
1016
        checkArgument(
1✔
1017
            routeAction.weightedClusters().size() == filterChoices.size(),
1✔
1018
            "filter choices must match size of weighted clusters");
1019
      }
1020
      for (ClientInterceptor filter : filterChoices) {
1✔
1021
        checkNotNull(filter, "entry in filterChoices is null");
1✔
1022
      }
1✔
1023
      this.filterChoices = checkNotNull(filterChoices, "filterChoices");
1✔
1024
    }
1✔
1025
  }
1026

1027
  private static class ClusterRefState {
1028
    final AtomicInteger refCount;
1029
    @Nullable
1030
    final String traditionalCluster;
1031
    @Nullable
1032
    final RlsPluginConfig rlsPluginConfig;
1033

1034
    private ClusterRefState(
1035
        AtomicInteger refCount, @Nullable String traditionalCluster,
1036
        @Nullable RlsPluginConfig rlsPluginConfig) {
1✔
1037
      this.refCount = refCount;
1✔
1038
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
1039
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
1040
      this.traditionalCluster = traditionalCluster;
1✔
1041
      this.rlsPluginConfig = rlsPluginConfig;
1✔
1042
    }
1✔
1043

1044
    private Map<String, ?> toLbPolicy() {
1045
      if (traditionalCluster != null) {
1✔
1046
        return ImmutableMap.of(
1✔
1047
            XdsLbPolicies.CDS_POLICY_NAME,
1048
            ImmutableMap.of("cluster", traditionalCluster));
1✔
1049
      } else {
1050
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
1051
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
1052
            .put(
1✔
1053
                "childPolicy",
1054
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
1✔
1055
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
1056
            .buildOrThrow();
1✔
1057
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
1058
      }
1059
    }
1060

1061
    static ClusterRefState forCluster(AtomicInteger refCount, String name) {
1062
      return new ClusterRefState(refCount, name, null);
1✔
1063
    }
1064

1065
    static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
1066
      return new ClusterRefState(refCount, null, rlsPluginConfig);
1✔
1067
    }
1068
  }
1069
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc