• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19717

06 Mar 2025 06:32PM UTC coverage: 88.525% (+0.03%) from 88.494%
#19717

push

github

web-flow
xds: Support filter state retention

This PR adds support filter state retention in Java. The mechanism
will be similar to the one described in [A83]
(https://github.com/grpc/proposal/blob/master/A83-xds-gcp-authn-filter.md#filter-call-credentials-cache)
for C-core, and will serve the same purpose. However, the
implementation details are very different due to the different nature
of xDS HTTP filter support in C-core and Java.

### Filter instance lifecycle
#### xDS gRPC clients
New filter instances are created per combination of:
1. `XdsNameResolver` instance,
2. Filter name+typeUrl as configured in 
   HttpConnectionManager (HCM) http_filters.

Existing client-side filter instances are shutdown:
- A single a filter instance is shutdown when an LDS update contains
  HCM that is missing filter configuration for name+typeUrl
  combination of this instance.
- All filter instances when watched LDS resource is missing from an
  LDS update.
- All filter instances name resolver shutdown.

#### xDS-enabled gRPC servers
New filter instances are created per combination of:
1. Server instance,
2. FilterChain name,
3. Filter name+typeUrl as configured in FilterChain's HCM.http_filters

Filter instances of Default Filter Chain is tracked separately per:
1. Server instance,
2. Filter name+typeUrl in default_filter_chain's HCM.http_filters.

Existing server-side filter instances are shutdown:
- A single a filter instance is shutdown when an LDS update contains
  FilterChain with HCM.http_filters that is missing configuration for
  filter name+typeUrl.
- All filter instances associated with the FilterChain when an LDS
  update no longer contains FilterChain's name.
- All filter instances when watched LDS resource is missing from an
  LDS update.
- All filter instances on server shutdown.

### Related
- Part 1: #11883

34577 of 39059 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.19
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.MetricRecorder;
45
import io.grpc.NameResolver;
46
import io.grpc.Status;
47
import io.grpc.Status.Code;
48
import io.grpc.SynchronizationContext;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.ObjectPool;
51
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
52
import io.grpc.xds.Filter.FilterConfig;
53
import io.grpc.xds.Filter.NamedFilterConfig;
54
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
55
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
56
import io.grpc.xds.VirtualHost.Route;
57
import io.grpc.xds.VirtualHost.Route.RouteAction;
58
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
61
import io.grpc.xds.VirtualHost.Route.RouteMatch;
62
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
63
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
64
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
65
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
66
import io.grpc.xds.client.XdsClient;
67
import io.grpc.xds.client.XdsClient.ResourceWatcher;
68
import io.grpc.xds.client.XdsLogger;
69
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
70
import java.net.URI;
71
import java.util.ArrayList;
72
import java.util.Collections;
73
import java.util.HashMap;
74
import java.util.HashSet;
75
import java.util.List;
76
import java.util.Locale;
77
import java.util.Map;
78
import java.util.Objects;
79
import java.util.Set;
80
import java.util.concurrent.ConcurrentHashMap;
81
import java.util.concurrent.ConcurrentMap;
82
import java.util.concurrent.ScheduledExecutorService;
83
import java.util.concurrent.atomic.AtomicInteger;
84
import javax.annotation.Nullable;
85

86
/**
87
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
88
 *
89
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
90
 * protocol to retrieve service information and produce a service config to the caller.
91
 *
92
 * @see XdsNameResolverProvider
93
 */
94
final class XdsNameResolver extends NameResolver {
95

96
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
97
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
98
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
99
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
100
  static final CallOptions.Key<Boolean> AUTO_HOST_REWRITE_KEY =
1✔
101
      CallOptions.Key.create("io.grpc.xds.AUTO_HOST_REWRITE_KEY");
1✔
102
  @VisibleForTesting
103
  static boolean enableTimeout =
1✔
104
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
105
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
106

107
  private final InternalLogId logId;
108
  private final XdsLogger logger;
109
  @Nullable
110
  private final String targetAuthority;
111
  private final String target;
112
  private final String serviceAuthority;
113
  // Encoded version of the service authority as per 
114
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
115
  private final String encodedServiceAuthority;
116
  private final String overrideAuthority;
117
  private final ServiceConfigParser serviceConfigParser;
118
  private final SynchronizationContext syncContext;
119
  private final ScheduledExecutorService scheduler;
120
  private final XdsClientPoolFactory xdsClientPoolFactory;
121
  private final ThreadSafeRandom random;
122
  private final FilterRegistry filterRegistry;
123
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
124
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
125
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
126
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
127
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
128
  private final long randomChannelId;
129
  private final MetricRecorder metricRecorder;
130
  // Must be accessed in syncContext.
131
  // Filter instances are unique per channel, and per filter (name+typeUrl).
132
  // NamedFilterConfig.filterStateKey -> filter_instance.
133
  private final HashMap<String, Filter> activeFilters = new HashMap<>();
1✔
134

135
  private volatile RoutingConfig routingConfig = RoutingConfig.EMPTY;
1✔
136
  private Listener2 listener;
137
  private ObjectPool<XdsClient> xdsClientPool;
138
  private XdsClient xdsClient;
139
  private CallCounterProvider callCounterProvider;
140
  private ResolveState resolveState;
141
  // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
142
  // XdsClient instead of here.
143
  private boolean receivedConfig;
144

145
  XdsNameResolver(
146
      URI targetUri, String name, @Nullable String overrideAuthority,
147
      ServiceConfigParser serviceConfigParser,
148
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
149
      @Nullable Map<String, ?> bootstrapOverride,
150
      MetricRecorder metricRecorder) {
151
    this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
1✔
152
        syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
1✔
153
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride,
1✔
154
        metricRecorder);
155
  }
1✔
156

157
  @VisibleForTesting
158
  XdsNameResolver(
159
      URI targetUri, @Nullable String targetAuthority, String name,
160
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
161
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
162
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
163
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride,
164
      MetricRecorder metricRecorder) {
1✔
165
    this.targetAuthority = targetAuthority;
1✔
166
    target = targetUri.toString();
1✔
167

168
    // The name might have multiple slashes so encode it before verifying.
169
    serviceAuthority = checkNotNull(name, "name");
1✔
170
    this.encodedServiceAuthority = 
1✔
171
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
172

173
    this.overrideAuthority = overrideAuthority;
1✔
174
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
175
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
176
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
177
    this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
1✔
178
            "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
1✔
179
    this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
1✔
180
    this.random = checkNotNull(random, "random");
1✔
181
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
182
    this.metricRecorder = metricRecorder;
1✔
183
    randomChannelId = random.nextLong();
1✔
184
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
185
    logger = XdsLogger.withLogId(logId);
1✔
186
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
187
  }
1✔
188

189
  @Override
190
  public String getServiceAuthority() {
191
    return encodedServiceAuthority;
1✔
192
  }
193

194
  @Override
195
  public void start(Listener2 listener) {
196
    this.listener = checkNotNull(listener, "listener");
1✔
197
    try {
198
      xdsClientPool = xdsClientPoolFactory.getOrCreate(target, metricRecorder);
1✔
199
    } catch (Exception e) {
1✔
200
      listener.onError(
1✔
201
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
202
      return;
1✔
203
    }
1✔
204
    xdsClient = xdsClientPool.getObject();
1✔
205
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
206
    String listenerNameTemplate;
207
    if (targetAuthority == null) {
1✔
208
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
209
    } else {
210
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
211
      if (authorityInfo == null) {
1✔
212
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
213
            "invalid target URI: target authority not found in the bootstrap"));
214
        return;
1✔
215
      }
216
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
217
    }
218
    String replacement = serviceAuthority;
1✔
219
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
220
      replacement = XdsClient.percentEncodePath(replacement);
1✔
221
    }
222
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
223
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
224
        ) {
225
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
226
          "invalid listener resource URI for service authority: " + serviceAuthority));
227
      return;
×
228
    }
229
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
230
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
231
    resolveState = new ResolveState(ldsResourceName);
1✔
232

233
    resolveState.start();
1✔
234
  }
1✔
235

236
  private static String expandPercentS(String template, String replacement) {
237
    return template.replace("%s", replacement);
1✔
238
  }
239

240
  @Override
241
  public void shutdown() {
242
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
243
    if (resolveState != null) {
1✔
244
      resolveState.stop();
1✔
245
    }
246
    if (xdsClient != null) {
1✔
247
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
248
    }
249
  }
1✔
250

251
  @VisibleForTesting
252
  static Map<String, ?> generateServiceConfigWithMethodConfig(
253
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
254
    if (timeoutNano == null
1✔
255
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
256
      return Collections.emptyMap();
1✔
257
    }
258
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
259
    methodConfig.put(
1✔
260
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
261
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
262
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
263
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
264
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
265
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
266
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
267
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
268
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
269
        codes.add(code.name());
1✔
270
      }
1✔
271
      rawRetryPolicy.put(
1✔
272
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
273
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
274
        rawRetryPolicy.put(
×
275
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
276
      }
277
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
278
    }
279
    if (timeoutNano != null) {
1✔
280
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
281
      methodConfig.put("timeout", timeout);
1✔
282
    }
283
    return Collections.singletonMap(
1✔
284
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
285
  }
286

287
  @VisibleForTesting
288
  XdsClient getXdsClient() {
289
    return xdsClient;
1✔
290
  }
291

292
  // called in syncContext
293
  private void updateResolutionResult() {
294
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
295

296
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
297
    for (String name : clusterRefs.keySet()) {
1✔
298
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
299
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
300
    }
1✔
301
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
302
        "loadBalancingConfig",
303
        ImmutableList.of(ImmutableMap.of(
1✔
304
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
305
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
306

307
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
308
      logger.log(
×
309
          XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
×
310
    }
311
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
312
    Attributes attrs =
313
        Attributes.newBuilder()
1✔
314
            .set(XdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
1✔
315
            .set(XdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
316
            .set(InternalConfigSelector.KEY, configSelector)
1✔
317
            .build();
1✔
318
    ResolutionResult result =
319
        ResolutionResult.newBuilder()
1✔
320
            .setAttributes(attrs)
1✔
321
            .setServiceConfig(parsedServiceConfig)
1✔
322
            .build();
1✔
323
    listener.onResult(result);
1✔
324
    receivedConfig = true;
1✔
325
  }
1✔
326

327
  /**
328
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
329
   * case-insensitive.
330
   *
331
   * <p>Wildcard pattern rules:
332
   * <ol>
333
   * <li>A single asterisk (*) matches any domain.</li>
334
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
335
   *     but not both.</li>
336
   * </ol>
337
   */
338
  @VisibleForTesting
339
  static boolean matchHostName(String hostName, String pattern) {
340
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
341
        "Invalid host name");
342
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
343
        "Invalid pattern/domain name");
344

345
    hostName = hostName.toLowerCase(Locale.US);
1✔
346
    pattern = pattern.toLowerCase(Locale.US);
1✔
347
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
348

349
    if (!pattern.contains("*")) {
1✔
350
      // Not a wildcard pattern -- hostName and pattern must match exactly.
351
      return hostName.equals(pattern);
1✔
352
    }
353
    // Wildcard pattern
354

355
    if (pattern.length() == 1) {
1✔
356
      return true;
×
357
    }
358

359
    int index = pattern.indexOf('*');
1✔
360

361
    // At most one asterisk (*) is allowed.
362
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
363
      return false;
×
364
    }
365

366
    // Asterisk can only match prefix or suffix.
367
    if (index != 0 && index != pattern.length() - 1) {
1✔
368
      return false;
×
369
    }
370

371
    // HostName must be at least as long as the pattern because asterisk has to
372
    // match one or more characters.
373
    if (hostName.length() < pattern.length()) {
1✔
374
      return false;
1✔
375
    }
376

377
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
378
      // Prefix matching fails.
379
      return true;
1✔
380
    }
381

382
    // Pattern matches hostname if suffix matching succeeds.
383
    return index == pattern.length() - 1
1✔
384
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
385
  }
386

387
  private final class ConfigSelector extends InternalConfigSelector {
1✔
388
    @Override
389
    public Result selectConfig(PickSubchannelArgs args) {
390
      RoutingConfig routingCfg;
391
      RouteData selectedRoute;
392
      String cluster;
393
      ClientInterceptor filters;
394
      Metadata headers = args.getHeaders();
1✔
395
      String path = "/" + args.getMethodDescriptor().getFullMethodName();
1✔
396
      do {
397
        routingCfg = routingConfig;
1✔
398
        selectedRoute = null;
1✔
399
        for (RouteData route : routingCfg.routes) {
1✔
400
          if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
1✔
401
            selectedRoute = route;
1✔
402
            break;
1✔
403
          }
404
        }
1✔
405
        if (selectedRoute == null) {
1✔
406
          return Result.forError(
1✔
407
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
408
        }
409
        if (selectedRoute.routeAction == null) {
1✔
410
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
411
              "Could not route RPC to Route with non-forwarding action"));
412
        }
413
        RouteAction action = selectedRoute.routeAction;
1✔
414
        if (action.cluster() != null) {
1✔
415
          cluster = prefixedClusterName(action.cluster());
1✔
416
          filters = selectedRoute.filterChoices.get(0);
1✔
417
        } else if (action.weightedClusters() != null) {
1✔
418
          // XdsRouteConfigureResource verifies the total weight will not be 0 or exceed uint32
419
          long totalWeight = 0;
1✔
420
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
421
            totalWeight += weightedCluster.weight();
1✔
422
          }
1✔
423
          long select = random.nextLong(totalWeight);
1✔
424
          long accumulator = 0;
1✔
425
          for (int i = 0; ; i++) {
1✔
426
            ClusterWeight weightedCluster = action.weightedClusters().get(i);
1✔
427
            accumulator += weightedCluster.weight();
1✔
428
            if (select < accumulator) {
1✔
429
              cluster = prefixedClusterName(weightedCluster.name());
1✔
430
              filters = selectedRoute.filterChoices.get(i);
1✔
431
              break;
1✔
432
            }
433
          }
434
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
435
          cluster =
1✔
436
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
437
          filters = selectedRoute.filterChoices.get(0);
1✔
438
        } else {
439
          // updateRoutes() discards routes with unknown actions
440
          throw new AssertionError();
×
441
        }
442
      } while (!retainCluster(cluster));
1✔
443

444
      final RouteAction routeAction = selectedRoute.routeAction;
1✔
445
      Long timeoutNanos = null;
1✔
446
      if (enableTimeout) {
1✔
447
        timeoutNanos = routeAction.timeoutNano();
1✔
448
        if (timeoutNanos == null) {
1✔
449
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
450
        }
451
        if (timeoutNanos <= 0) {
1✔
452
          timeoutNanos = null;
1✔
453
        }
454
      }
455
      RetryPolicy retryPolicy = routeAction.retryPolicy();
1✔
456
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
457
      Map<String, ?> rawServiceConfig =
1✔
458
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
459
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
460
      Object config = parsedServiceConfig.getConfig();
1✔
461
      if (config == null) {
1✔
462
        releaseCluster(cluster);
×
463
        return Result.forError(
×
464
            parsedServiceConfig.getError().augmentDescription(
×
465
                "Failed to parse service config (method config)"));
466
      }
467
      final String finalCluster = cluster;
1✔
468
      final long hash = generateHash(routeAction.hashPolicies(), headers);
1✔
469
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
470
        @Override
471
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
472
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
473
            final Channel next) {
474
          CallOptions callOptionsForCluster =
1✔
475
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
476
                  .withOption(RPC_HASH_KEY, hash);
1✔
477
          if (routeAction.autoHostRewrite()) {
1✔
478
            callOptionsForCluster = callOptionsForCluster.withOption(AUTO_HOST_REWRITE_KEY, true);
1✔
479
          }
480
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
481
              next.newCall(method, callOptionsForCluster)) {
1✔
482
            @Override
483
            public void start(Listener<RespT> listener, Metadata headers) {
484
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
485
                boolean committed;
486

487
                @Override
488
                public void onHeaders(Metadata headers) {
489
                  committed = true;
1✔
490
                  releaseCluster(finalCluster);
1✔
491
                  delegate().onHeaders(headers);
1✔
492
                }
1✔
493

494
                @Override
495
                public void onClose(Status status, Metadata trailers) {
496
                  if (!committed) {
1✔
497
                    releaseCluster(finalCluster);
1✔
498
                  }
499
                  delegate().onClose(status, trailers);
1✔
500
                }
1✔
501
              };
502
              delegate().start(listener, headers);
1✔
503
            }
1✔
504
          };
505
        }
506
      }
507

508
      return
1✔
509
          Result.newBuilder()
1✔
510
              .setConfig(config)
1✔
511
              .setInterceptor(combineInterceptors(
1✔
512
                  ImmutableList.of(filters, new ClusterSelectionInterceptor())))
1✔
513
              .build();
1✔
514
    }
515

516
    private boolean retainCluster(String cluster) {
517
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
518
      if (clusterRefState == null) {
1✔
519
        return false;
×
520
      }
521
      AtomicInteger refCount = clusterRefState.refCount;
1✔
522
      int count;
523
      do {
524
        count = refCount.get();
1✔
525
        if (count == 0) {
1✔
526
          return false;
×
527
        }
528
      } while (!refCount.compareAndSet(count, count + 1));
1✔
529
      return true;
1✔
530
    }
531

532
    private void releaseCluster(final String cluster) {
533
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
534
      if (count == 0) {
1✔
535
        syncContext.execute(new Runnable() {
1✔
536
          @Override
537
          public void run() {
538
            if (clusterRefs.get(cluster).refCount.get() == 0) {
1✔
539
              clusterRefs.remove(cluster);
1✔
540
              updateResolutionResult();
1✔
541
            }
542
          }
1✔
543
        });
544
      }
545
    }
1✔
546

547
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
548
      Long hash = null;
1✔
549
      for (HashPolicy policy : hashPolicies) {
1✔
550
        Long newHash = null;
1✔
551
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
552
          String value = getHeaderValue(headers, policy.headerName());
1✔
553
          if (value != null) {
1✔
554
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
555
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
556
            }
557
            newHash = hashFunc.hashAsciiString(value);
1✔
558
          }
559
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
560
          newHash = hashFunc.hashLong(randomChannelId);
1✔
561
        }
562
        if (newHash != null ) {
1✔
563
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
564
          // and preserves all of the entropy.
565
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
566
          hash = oldHash ^ newHash;
1✔
567
        }
568
        // If the policy is a terminal policy and a hash has been generated, ignore
569
        // the rest of the hash policies.
570
        if (policy.isTerminal() && hash != null) {
1✔
571
          break;
×
572
        }
573
      }
1✔
574
      return hash == null ? random.nextLong() : hash;
1✔
575
    }
576
  }
577

578
  static final class PassthroughClientInterceptor implements ClientInterceptor {
1✔
579
    @Override
580
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
581
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
582
      return next.newCall(method, callOptions);
1✔
583
    }
584
  }
585

586
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
587
    if (interceptors.size() == 0) {
1✔
588
      return new PassthroughClientInterceptor();
1✔
589
    }
590
    if (interceptors.size() == 1) {
1✔
591
      return interceptors.get(0);
1✔
592
    }
593
    return new ClientInterceptor() {
1✔
594
      @Override
595
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
596
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
597
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
598
        return next.newCall(method, callOptions);
1✔
599
      }
600
    };
601
  }
602

603
  @Nullable
604
  private static String getHeaderValue(Metadata headers, String headerName) {
605
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
606
      return null;
×
607
    }
608
    if (headerName.equals("content-type")) {
1✔
609
      return "application/grpc";
×
610
    }
611
    Metadata.Key<String> key;
612
    try {
613
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
614
    } catch (IllegalArgumentException e) {
×
615
      return null;
×
616
    }
1✔
617
    Iterable<String> values = headers.getAll(key);
1✔
618
    return values == null ? null : Joiner.on(",").join(values);
1✔
619
  }
620

621
  private static String prefixedClusterName(String name) {
622
    return "cluster:" + name;
1✔
623
  }
624

625
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
626
    return "cluster_specifier_plugin:" + pluginName;
1✔
627
  }
628

629
  private static final class FailingConfigSelector extends InternalConfigSelector {
630
    private final Result result;
631

632
    public FailingConfigSelector(Status error) {
1✔
633
      this.result = Result.forError(error);
1✔
634
    }
1✔
635

636
    @Override
637
    public Result selectConfig(PickSubchannelArgs args) {
638
      return result;
1✔
639
    }
640
  }
641

642
  private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
643
    private final ConfigOrError emptyServiceConfig =
1✔
644
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
645
    private final String ldsResourceName;
646
    private boolean stopped;
647
    @Nullable
648
    private Set<String> existingClusters;  // clusters to which new requests can be routed
649
    @Nullable
650
    private RouteDiscoveryState routeDiscoveryState;
651

652
    ResolveState(String ldsResourceName) {
1✔
653
      this.ldsResourceName = ldsResourceName;
1✔
654
    }
1✔
655

656
    @Override
657
    public void onChanged(final XdsListenerResource.LdsUpdate update) {
658
      if (stopped) {
1✔
659
        return;
×
660
      }
661
      logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
1✔
662
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
663
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
664
      String rdsName = httpConnectionManager.rdsName();
1✔
665
      ImmutableList<NamedFilterConfig> filterConfigs = httpConnectionManager.httpFilterConfigs();
1✔
666
      long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano();
1✔
667

668
      // Create/update HCM-bound state.
669
      cleanUpRouteDiscoveryState();
1✔
670
      updateActiveFilters(filterConfigs);
1✔
671

672
      // Routes specified directly in LDS.
673
      if (virtualHosts != null) {
1✔
674
        updateRoutes(virtualHosts, streamDurationNano, filterConfigs);
1✔
675
        return;
1✔
676
      }
677
      // Routes provided by RDS.
678
      routeDiscoveryState = new RouteDiscoveryState(rdsName, streamDurationNano, filterConfigs);
1✔
679
      logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
680
      xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
681
          rdsName, routeDiscoveryState, syncContext);
1✔
682
    }
1✔
683

684
    @Override
685
    public void onError(final Status error) {
686
      if (stopped || receivedConfig) {
1✔
687
        return;
×
688
      }
689
      listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
690
          String.format("Unable to load LDS %s. xDS server returned: %s: %s",
1✔
691
          ldsResourceName, error.getCode(), error.getDescription())));
1✔
692
    }
1✔
693

694
    @Override
695
    public void onResourceDoesNotExist(final String resourceName) {
696
      if (stopped) {
1✔
697
        return;
×
698
      }
699
      String error = "LDS resource does not exist: " + resourceName;
1✔
700
      logger.log(XdsLogLevel.INFO, error);
1✔
701
      cleanUpRouteDiscoveryState();
1✔
702
      updateActiveFilters(null);
1✔
703
      cleanUpRoutes(error);
1✔
704
    }
1✔
705

706
    private void start() {
707
      logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
1✔
708
      xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
1✔
709
          ldsResourceName, this, syncContext);
1✔
710
    }
1✔
711

712
    private void stop() {
713
      logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
1✔
714
      stopped = true;
1✔
715
      cleanUpRouteDiscoveryState();
1✔
716
      updateActiveFilters(null);
1✔
717
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
718
    }
1✔
719

720
    // called in syncContext
721
    private void updateActiveFilters(@Nullable List<NamedFilterConfig> filterConfigs) {
722
      if (filterConfigs == null) {
1✔
723
        filterConfigs = ImmutableList.of();
1✔
724
      }
725
      Set<String> filtersToShutdown = new HashSet<>(activeFilters.keySet());
1✔
726
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
727
        String typeUrl = namedFilter.filterConfig.typeUrl();
1✔
728
        String filterKey = namedFilter.filterStateKey();
1✔
729

730
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
731
        checkNotNull(provider, "provider %s", typeUrl);
1✔
732
        Filter filter = activeFilters.computeIfAbsent(filterKey, k -> provider.newInstance());
1✔
733
        checkNotNull(filter, "filter %s", filterKey);
1✔
734
        filtersToShutdown.remove(filterKey);
1✔
735
      }
1✔
736

737
      // Shutdown filters not present in current HCM.
738
      for (String filterKey : filtersToShutdown) {
1✔
739
        Filter filterToShutdown = activeFilters.remove(filterKey);
1✔
740
        checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
1✔
741
        filterToShutdown.close();
1✔
742
      }
1✔
743
    }
1✔
744

745
    // called in syncContext
746
    private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
747
        @Nullable List<NamedFilterConfig> filterConfigs) {
748
      String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
749
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
1✔
750
      if (virtualHost == null) {
1✔
751
        String error = "Failed to find virtual host matching hostname: " + authority;
1✔
752
        logger.log(XdsLogLevel.WARNING, error);
1✔
753
        cleanUpRoutes(error);
1✔
754
        return;
1✔
755
      }
756

757
      List<Route> routes = virtualHost.routes();
1✔
758
      ImmutableList.Builder<RouteData> routesData = ImmutableList.builder();
1✔
759

760
      // Populate all clusters to which requests can be routed to through the virtual host.
761
      Set<String> clusters = new HashSet<>();
1✔
762
      // uniqueName -> clusterName
763
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
764
      // uniqueName -> pluginConfig
765
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
766
      for (Route route : routes) {
1✔
767
        RouteAction action = route.routeAction();
1✔
768
        String prefixedName;
769
        if (action == null) {
1✔
770
          routesData.add(new RouteData(route.routeMatch(), null, ImmutableList.of()));
1✔
771
        } else if (action.cluster() != null) {
1✔
772
          prefixedName = prefixedClusterName(action.cluster());
1✔
773
          clusters.add(prefixedName);
1✔
774
          clusterNameMap.put(prefixedName, action.cluster());
1✔
775
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
776
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
777
        } else if (action.weightedClusters() != null) {
1✔
778
          ImmutableList.Builder<ClientInterceptor> filterList = ImmutableList.builder();
1✔
779
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
780
            prefixedName = prefixedClusterName(weightedCluster.name());
1✔
781
            clusters.add(prefixedName);
1✔
782
            clusterNameMap.put(prefixedName, weightedCluster.name());
1✔
783
            filterList.add(createFilters(filterConfigs, virtualHost, route, weightedCluster));
1✔
784
          }
1✔
785
          routesData.add(
1✔
786
              new RouteData(route.routeMatch(), route.routeAction(), filterList.build()));
1✔
787
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
788
          PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
789
          if (pluginConfig instanceof RlsPluginConfig) {
1✔
790
            prefixedName = prefixedClusterSpecifierPluginName(
1✔
791
                action.namedClusterSpecifierPluginConfig().name());
1✔
792
            clusters.add(prefixedName);
1✔
793
            rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
794
          }
795
          ClientInterceptor filters = createFilters(filterConfigs, virtualHost, route, null);
1✔
796
          routesData.add(new RouteData(route.routeMatch(), route.routeAction(), filters));
1✔
797
        } else {
798
          // Discard route
799
        }
800
      }
1✔
801

802
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
803
      boolean shouldUpdateResult = existingClusters == null;
1✔
804
      Set<String> addedClusters =
805
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
806
      Set<String> deletedClusters =
807
          existingClusters == null
1✔
808
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
809
      existingClusters = clusters;
1✔
810
      for (String cluster : addedClusters) {
1✔
811
        if (clusterRefs.containsKey(cluster)) {
1✔
812
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
813
        } else {
814
          if (clusterNameMap.containsKey(cluster)) {
1✔
815
            clusterRefs.put(
1✔
816
                cluster,
817
                ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
1✔
818
          }
819
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
820
            clusterRefs.put(
1✔
821
                cluster,
822
                ClusterRefState.forRlsPlugin(
1✔
823
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
824
          }
825
          shouldUpdateResult = true;
1✔
826
        }
827
      }
1✔
828
      for (String cluster : clusters) {
1✔
829
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
830
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
831
          ClusterRefState newClusterRefState =
1✔
832
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
833
          clusterRefs.put(cluster, newClusterRefState);
1✔
834
          shouldUpdateResult = true;
1✔
835
        }
836
      }
1✔
837
      // Update service config to include newly added clusters.
838
      if (shouldUpdateResult) {
1✔
839
        updateResolutionResult();
1✔
840
      }
841
      // Make newly added clusters selectable by config selector and deleted clusters no longer
842
      // selectable.
843
      routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routesData.build());
1✔
844
      shouldUpdateResult = false;
1✔
845
      for (String cluster : deletedClusters) {
1✔
846
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
847
        if (count == 0) {
1✔
848
          clusterRefs.remove(cluster);
1✔
849
          shouldUpdateResult = true;
1✔
850
        }
851
      }
1✔
852
      if (shouldUpdateResult) {
1✔
853
        updateResolutionResult();
1✔
854
      }
855
    }
1✔
856

857
    private ClientInterceptor createFilters(
858
        @Nullable List<NamedFilterConfig> filterConfigs,
859
        VirtualHost virtualHost,
860
        Route route,
861
        @Nullable ClusterWeight weightedCluster) {
862
      if (filterConfigs == null) {
1✔
863
        return new PassthroughClientInterceptor();
1✔
864
      }
865

866
      Map<String, FilterConfig> selectedOverrideConfigs =
1✔
867
          new HashMap<>(virtualHost.filterConfigOverrides());
1✔
868
      selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
869
      if (weightedCluster != null) {
1✔
870
        selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
871
      }
872

873
      ImmutableList.Builder<ClientInterceptor> filterInterceptors = ImmutableList.builder();
1✔
874
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
875
        String name = namedFilter.name;
1✔
876
        FilterConfig config = namedFilter.filterConfig;
1✔
877
        FilterConfig overrideConfig = selectedOverrideConfigs.get(name);
1✔
878
        String filterKey = namedFilter.filterStateKey();
1✔
879

880
        Filter filter = activeFilters.get(filterKey);
1✔
881
        checkNotNull(filter, "activeFilters.get(%s)", filterKey);
1✔
882
        ClientInterceptor interceptor =
1✔
883
            filter.buildClientInterceptor(config, overrideConfig, scheduler);
1✔
884

885
        if (interceptor != null) {
1✔
886
          filterInterceptors.add(interceptor);
1✔
887
        }
888
      }
1✔
889

890
      // Combine interceptors produced by different filters into a single one that executes
891
      // them sequentially. The order is preserved.
892
      return combineInterceptors(filterInterceptors.build());
1✔
893
    }
894

895
    private void cleanUpRoutes(String error) {
896
      if (existingClusters != null) {
1✔
897
        for (String cluster : existingClusters) {
1✔
898
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
899
          if (count == 0) {
1✔
900
            clusterRefs.remove(cluster);
1✔
901
          }
902
        }
1✔
903
        existingClusters = null;
1✔
904
      }
905
      routingConfig = RoutingConfig.EMPTY;
1✔
906
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
907
      // the config selector handles the error message itself. Once the LB API allows providing
908
      // failure information for addresses yet still providing a service config, the config seector
909
      // could be avoided.
910
      String errorWithNodeId =
1✔
911
          error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
912
      listener.onResult(ResolutionResult.newBuilder()
1✔
913
          .setAttributes(Attributes.newBuilder()
1✔
914
            .set(InternalConfigSelector.KEY,
1✔
915
              new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId)))
1✔
916
            .build())
1✔
917
          .setServiceConfig(emptyServiceConfig)
1✔
918
          .build());
1✔
919
      receivedConfig = true;
1✔
920
    }
1✔
921

922
    private void cleanUpRouteDiscoveryState() {
923
      if (routeDiscoveryState != null) {
1✔
924
        String rdsName = routeDiscoveryState.resourceName;
1✔
925
        logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
1✔
926
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
927
            routeDiscoveryState);
928
        routeDiscoveryState = null;
1✔
929
      }
930
    }
1✔
931

932
    /**
933
     * Discovery state for RouteConfiguration resource. One instance for each Listener resource
934
     * update.
935
     */
936
    private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
937
      private final String resourceName;
938
      private final long httpMaxStreamDurationNano;
939
      @Nullable
940
      private final List<NamedFilterConfig> filterConfigs;
941

942
      private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
943
          @Nullable List<NamedFilterConfig> filterConfigs) {
1✔
944
        this.resourceName = resourceName;
1✔
945
        this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
1✔
946
        this.filterConfigs = filterConfigs;
1✔
947
      }
1✔
948

949
      @Override
950
      public void onChanged(final RdsUpdate update) {
951
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
952
          return;
×
953
        }
954
        logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
1✔
955
        updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
1✔
956
      }
1✔
957

958
      @Override
959
      public void onError(final Status error) {
960
        if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
1✔
961
          return;
×
962
        }
963
        listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
964
            String.format("Unable to load RDS %s. xDS server returned: %s: %s",
1✔
965
            resourceName, error.getCode(), error.getDescription())));
1✔
966
      }
1✔
967

968
      @Override
969
      public void onResourceDoesNotExist(final String resourceName) {
970
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
971
          return;
×
972
        }
973
        String error = "RDS resource does not exist: " + resourceName;
1✔
974
        logger.log(XdsLogLevel.INFO, error);
1✔
975
        cleanUpRoutes(error);
1✔
976
      }
1✔
977
    }
978
  }
979

980
  /**
981
   * VirtualHost-level configuration for request routing.
982
   */
983
  private static class RoutingConfig {
984
    private final long fallbackTimeoutNano;
985
    final ImmutableList<RouteData> routes;
986

987
    private static final RoutingConfig EMPTY = new RoutingConfig(0, ImmutableList.of());
1✔
988

989
    private RoutingConfig(long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
1✔
990
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
991
      this.routes = checkNotNull(routes, "routes");
1✔
992
    }
1✔
993
  }
994

995
  static final class RouteData {
996
    final RouteMatch routeMatch;
997
    /** null implies non-forwarding action. */
998
    @Nullable
999
    final RouteAction routeAction;
1000
    /**
1001
     * Only one of these interceptors should be used per-RPC. There are only multiple values in the
1002
     * list for weighted clusters, in which case the order of the list mirrors the weighted
1003
     * clusters.
1004
     */
1005
    final ImmutableList<ClientInterceptor> filterChoices;
1006

1007
    RouteData(RouteMatch routeMatch, @Nullable RouteAction routeAction, ClientInterceptor filter) {
1008
      this(routeMatch, routeAction, ImmutableList.of(filter));
1✔
1009
    }
1✔
1010

1011
    RouteData(
1012
        RouteMatch routeMatch,
1013
        @Nullable RouteAction routeAction,
1014
        ImmutableList<ClientInterceptor> filterChoices) {
1✔
1015
      this.routeMatch = checkNotNull(routeMatch, "routeMatch");
1✔
1016
      checkArgument(
1✔
1017
          routeAction == null || !filterChoices.isEmpty(),
1✔
1018
          "filter may be empty only for non-forwarding action");
1019
      this.routeAction = routeAction;
1✔
1020
      if (routeAction != null && routeAction.weightedClusters() != null) {
1✔
1021
        checkArgument(
1✔
1022
            routeAction.weightedClusters().size() == filterChoices.size(),
1✔
1023
            "filter choices must match size of weighted clusters");
1024
      }
1025
      for (ClientInterceptor filter : filterChoices) {
1✔
1026
        checkNotNull(filter, "entry in filterChoices is null");
1✔
1027
      }
1✔
1028
      this.filterChoices = checkNotNull(filterChoices, "filterChoices");
1✔
1029
    }
1✔
1030
  }
1031

1032
  private static class ClusterRefState {
1033
    final AtomicInteger refCount;
1034
    @Nullable
1035
    final String traditionalCluster;
1036
    @Nullable
1037
    final RlsPluginConfig rlsPluginConfig;
1038

1039
    private ClusterRefState(
1040
        AtomicInteger refCount, @Nullable String traditionalCluster,
1041
        @Nullable RlsPluginConfig rlsPluginConfig) {
1✔
1042
      this.refCount = refCount;
1✔
1043
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
1044
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
1045
      this.traditionalCluster = traditionalCluster;
1✔
1046
      this.rlsPluginConfig = rlsPluginConfig;
1✔
1047
    }
1✔
1048

1049
    private Map<String, ?> toLbPolicy() {
1050
      if (traditionalCluster != null) {
1✔
1051
        return ImmutableMap.of(
1✔
1052
            XdsLbPolicies.CDS_POLICY_NAME,
1053
            ImmutableMap.of("cluster", traditionalCluster));
1✔
1054
      } else {
1055
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
1056
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
1057
            .put(
1✔
1058
                "childPolicy",
1059
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
1✔
1060
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
1061
            .buildOrThrow();
1✔
1062
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
1063
      }
1064
    }
1065

1066
    static ClusterRefState forCluster(AtomicInteger refCount, String name) {
1067
      return new ClusterRefState(refCount, name, null);
1✔
1068
    }
1069

1070
    static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
1071
      return new ClusterRefState(refCount, null, rlsPluginConfig);
1✔
1072
    }
1073
  }
1074
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc