• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18720

pending completion
#18720

push

github-actions

web-flow
xds: Encode the service authority in XdsNameResolver (#10207)

Encode the service authority before passing it into gRPC util in the xDS name resolver to handle xDS requests which might contain multiple slashes. Example: xds:///path/to/service:port.

As currently the underlying Java URI library does not break the encoded authority into host/port correctly simplify the check to just look for '@' as we are only interested in checking for user info to validate the authority for HTTP.

This change also leads to few changes in unit tests that relied on this check for invalid authorities which now will be considered valid.

Just like #9376, depending on Guava packages such as URLEscapers or PercentEscapers leads to internal failures(Ex: Unresolvable reference to com.google.common.escape.Escaper from io.grpc.internal.GrpcUtil). To avoid these issues create an in house version that is heavily inspired by grpc-go/grpc.

30655 of 34740 relevant lines covered (88.24%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.92
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.NameResolver;
45
import io.grpc.Status;
46
import io.grpc.Status.Code;
47
import io.grpc.SynchronizationContext;
48
import io.grpc.internal.GrpcUtil;
49
import io.grpc.internal.ObjectPool;
50
import io.grpc.xds.Bootstrapper.AuthorityInfo;
51
import io.grpc.xds.Bootstrapper.BootstrapInfo;
52
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
53
import io.grpc.xds.Filter.ClientInterceptorBuilder;
54
import io.grpc.xds.Filter.FilterConfig;
55
import io.grpc.xds.Filter.NamedFilterConfig;
56
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
57
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
58
import io.grpc.xds.VirtualHost.Route;
59
import io.grpc.xds.VirtualHost.Route.RouteAction;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
61
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
62
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
63
import io.grpc.xds.XdsClient.ResourceWatcher;
64
import io.grpc.xds.XdsListenerResource.LdsUpdate;
65
import io.grpc.xds.XdsLogger.XdsLogLevel;
66
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
67
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
68
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
69
import java.util.ArrayList;
70
import java.util.Collections;
71
import java.util.HashMap;
72
import java.util.HashSet;
73
import java.util.List;
74
import java.util.Locale;
75
import java.util.Map;
76
import java.util.Objects;
77
import java.util.Set;
78
import java.util.concurrent.ConcurrentHashMap;
79
import java.util.concurrent.ConcurrentMap;
80
import java.util.concurrent.ScheduledExecutorService;
81
import java.util.concurrent.atomic.AtomicInteger;
82
import javax.annotation.Nullable;
83

84
/**
85
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
86
 *
87
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
88
 * protocol to retrieve service information and produce a service config to the caller.
89
 *
90
 * @see XdsNameResolverProvider
91
 */
92
final class XdsNameResolver extends NameResolver {
93

94
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
95
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
96
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
97
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
98
  @VisibleForTesting
99
  static boolean enableTimeout =
1✔
100
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
101
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
102

103
  private final InternalLogId logId;
104
  private final XdsLogger logger;
105
  @Nullable
106
  private final String targetAuthority;
107
  private final String serviceAuthority;
108
  private final String overrideAuthority;
109
  private final ServiceConfigParser serviceConfigParser;
110
  private final SynchronizationContext syncContext;
111
  private final ScheduledExecutorService scheduler;
112
  private final XdsClientPoolFactory xdsClientPoolFactory;
113
  private final ThreadSafeRandom random;
114
  private final FilterRegistry filterRegistry;
115
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
116
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
117
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
118
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
119
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
120
  private final long randomChannelId;
121

122
  private volatile RoutingConfig routingConfig = RoutingConfig.empty;
1✔
123
  private Listener2 listener;
124
  private ObjectPool<XdsClient> xdsClientPool;
125
  private XdsClient xdsClient;
126
  private CallCounterProvider callCounterProvider;
127
  private ResolveState resolveState;
128
  // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
129
  // XdsClient instead of here.
130
  private boolean receivedConfig;
131

132
  XdsNameResolver(
133
      @Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
134
      ServiceConfigParser serviceConfigParser,
135
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
136
      @Nullable Map<String, ?> bootstrapOverride) {
137
    this(targetAuthority, name, overrideAuthority, serviceConfigParser, syncContext, scheduler,
1✔
138
        SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
1✔
139
        FilterRegistry.getDefaultRegistry(), bootstrapOverride);
1✔
140
  }
1✔
141

142
  @VisibleForTesting
143
  XdsNameResolver(
144
      @Nullable String targetAuthority, String name, @Nullable String overrideAuthority,
145
      ServiceConfigParser serviceConfigParser,
146
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
147
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
148
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
1✔
149
    this.targetAuthority = targetAuthority;
1✔
150

151
    // The name might have multiple slashes so encode it before verifying.
152
    String authority = GrpcUtil.AuthorityEscaper.encodeAuthority(checkNotNull(name, "name"));
1✔
153
    serviceAuthority = GrpcUtil.checkAuthority(authority);
1✔
154

155
    this.overrideAuthority = overrideAuthority;
1✔
156
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
157
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
158
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
159
    this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
1✔
160
            "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
1✔
161
    this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
1✔
162
    this.random = checkNotNull(random, "random");
1✔
163
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
164
    randomChannelId = random.nextLong();
1✔
165
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
166
    logger = XdsLogger.withLogId(logId);
1✔
167
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
168
  }
1✔
169

170
  @Override
171
  public String getServiceAuthority() {
172
    return serviceAuthority;
1✔
173
  }
174

175
  @Override
176
  public void start(Listener2 listener) {
177
    this.listener = checkNotNull(listener, "listener");
1✔
178
    try {
179
      xdsClientPool = xdsClientPoolFactory.getOrCreate();
1✔
180
    } catch (Exception e) {
1✔
181
      listener.onError(
1✔
182
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
183
      return;
1✔
184
    }
1✔
185
    xdsClient = xdsClientPool.getObject();
1✔
186
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
187
    String listenerNameTemplate;
188
    if (targetAuthority == null) {
1✔
189
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
190
    } else {
191
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
192
      if (authorityInfo == null) {
1✔
193
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
194
            "invalid target URI: target authority not found in the bootstrap"));
195
        return;
1✔
196
      }
197
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
198
    }
199
    String replacement = serviceAuthority;
1✔
200
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
201
      replacement = XdsClient.percentEncodePath(replacement);
1✔
202
    }
203
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
204
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
205
        ) {
206
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
207
          "invalid listener resource URI for service authority: " + serviceAuthority));
208
      return;
×
209
    }
210
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
211
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
212
    resolveState = new ResolveState(ldsResourceName);
1✔
213
    resolveState.start();
1✔
214
  }
1✔
215

216
  private static String expandPercentS(String template, String replacement) {
217
    return template.replace("%s", replacement);
1✔
218
  }
219

220
  @Override
221
  public void shutdown() {
222
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
223
    if (resolveState != null) {
1✔
224
      resolveState.stop();
1✔
225
    }
226
    if (xdsClient != null) {
1✔
227
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
228
    }
229
  }
1✔
230

231
  @VisibleForTesting
232
  static Map<String, ?> generateServiceConfigWithMethodConfig(
233
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
234
    if (timeoutNano == null
1✔
235
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
236
      return Collections.emptyMap();
1✔
237
    }
238
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
239
    methodConfig.put(
1✔
240
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
241
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
242
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
243
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
244
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
245
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
246
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
247
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
248
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
249
        codes.add(code.name());
1✔
250
      }
1✔
251
      rawRetryPolicy.put(
1✔
252
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
253
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
254
        rawRetryPolicy.put(
×
255
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
256
      }
257
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
258
    }
259
    if (timeoutNano != null) {
1✔
260
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
261
      methodConfig.put("timeout", timeout);
1✔
262
    }
263
    return Collections.singletonMap(
1✔
264
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
265
  }
266

267
  @VisibleForTesting
268
  XdsClient getXdsClient() {
269
    return xdsClient;
1✔
270
  }
271

272
  // called in syncContext
273
  private void updateResolutionResult() {
274
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
275

276
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
277
    for (String name : clusterRefs.keySet()) {
1✔
278
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
279
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
280
    }
1✔
281
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
282
        "loadBalancingConfig",
283
        ImmutableList.of(ImmutableMap.of(
1✔
284
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
285
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
286

287
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
288
      logger.log(
×
289
          XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
×
290
    }
291
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
292
    Attributes attrs =
293
        Attributes.newBuilder()
1✔
294
            .set(InternalXdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
1✔
295
            .set(InternalXdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
296
            .set(InternalConfigSelector.KEY, configSelector)
1✔
297
            .build();
1✔
298
    ResolutionResult result =
299
        ResolutionResult.newBuilder()
1✔
300
            .setAttributes(attrs)
1✔
301
            .setServiceConfig(parsedServiceConfig)
1✔
302
            .build();
1✔
303
    listener.onResult(result);
1✔
304
    receivedConfig = true;
1✔
305
  }
1✔
306

307
  /**
308
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
309
   * case-insensitive.
310
   *
311
   * <p>Wildcard pattern rules:
312
   * <ol>
313
   * <li>A single asterisk (*) matches any domain.</li>
314
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
315
   *     but not both.</li>
316
   * </ol>
317
   */
318
  @VisibleForTesting
319
  static boolean matchHostName(String hostName, String pattern) {
320
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
321
        "Invalid host name");
322
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
323
        "Invalid pattern/domain name");
324

325
    hostName = hostName.toLowerCase(Locale.US);
1✔
326
    pattern = pattern.toLowerCase(Locale.US);
1✔
327
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
328

329
    if (!pattern.contains("*")) {
1✔
330
      // Not a wildcard pattern -- hostName and pattern must match exactly.
331
      return hostName.equals(pattern);
1✔
332
    }
333
    // Wildcard pattern
334

335
    if (pattern.length() == 1) {
1✔
336
      return true;
×
337
    }
338

339
    int index = pattern.indexOf('*');
1✔
340

341
    // At most one asterisk (*) is allowed.
342
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
343
      return false;
×
344
    }
345

346
    // Asterisk can only match prefix or suffix.
347
    if (index != 0 && index != pattern.length() - 1) {
1✔
348
      return false;
×
349
    }
350

351
    // HostName must be at least as long as the pattern because asterisk has to
352
    // match one or more characters.
353
    if (hostName.length() < pattern.length()) {
1✔
354
      return false;
1✔
355
    }
356

357
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
358
      // Prefix matching fails.
359
      return true;
1✔
360
    }
361

362
    // Pattern matches hostname if suffix matching succeeds.
363
    return index == pattern.length() - 1
1✔
364
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
365
  }
366

367
  private final class ConfigSelector extends InternalConfigSelector {
1✔
368
    @Override
369
    public Result selectConfig(PickSubchannelArgs args) {
370
      String cluster = null;
1✔
371
      Route selectedRoute = null;
1✔
372
      RoutingConfig routingCfg;
373
      Map<String, FilterConfig> selectedOverrideConfigs;
374
      List<ClientInterceptor> filterInterceptors = new ArrayList<>();
1✔
375
      Metadata headers = args.getHeaders();
1✔
376
      do {
377
        routingCfg = routingConfig;
1✔
378
        selectedOverrideConfigs = new HashMap<>(routingCfg.virtualHostOverrideConfig);
1✔
379
        for (Route route : routingCfg.routes) {
1✔
380
          if (RoutingUtils.matchRoute(
1✔
381
                  route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(),
1✔
382
              headers, random)) {
1✔
383
            selectedRoute = route;
1✔
384
            selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
385
            break;
1✔
386
          }
387
        }
1✔
388
        if (selectedRoute == null) {
1✔
389
          return Result.forError(
1✔
390
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
391
        }
392
        if (selectedRoute.routeAction() == null) {
1✔
393
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
394
              "Could not route RPC to Route with non-forwarding action"));
395
        }
396
        RouteAction action = selectedRoute.routeAction();
1✔
397
        if (action.cluster() != null) {
1✔
398
          cluster = prefixedClusterName(action.cluster());
1✔
399
        } else if (action.weightedClusters() != null) {
1✔
400
          long totalWeight = 0;
1✔
401
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
402
            totalWeight += weightedCluster.weight();
1✔
403
          }
1✔
404
          long select = random.nextLong(totalWeight);
1✔
405
          long accumulator = 0;
1✔
406
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
407
            accumulator += weightedCluster.weight();
1✔
408
            if (select < accumulator) {
1✔
409
              cluster = prefixedClusterName(weightedCluster.name());
1✔
410
              selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
411
              break;
1✔
412
            }
413
          }
1✔
414
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
415
          cluster =
1✔
416
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
417
        }
418
      } while (!retainCluster(cluster));
1✔
419
      Long timeoutNanos = null;
1✔
420
      if (enableTimeout) {
1✔
421
        if (selectedRoute != null) {
1✔
422
          timeoutNanos = selectedRoute.routeAction().timeoutNano();
1✔
423
        }
424
        if (timeoutNanos == null) {
1✔
425
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
426
        }
427
        if (timeoutNanos <= 0) {
1✔
428
          timeoutNanos = null;
1✔
429
        }
430
      }
431
      RetryPolicy retryPolicy =
432
          selectedRoute == null ? null : selectedRoute.routeAction().retryPolicy();
1✔
433
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
434
      Map<String, ?> rawServiceConfig =
1✔
435
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
436
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
437
      Object config = parsedServiceConfig.getConfig();
1✔
438
      if (config == null) {
1✔
439
        releaseCluster(cluster);
×
440
        return Result.forError(
×
441
            parsedServiceConfig.getError().augmentDescription(
×
442
                "Failed to parse service config (method config)"));
443
      }
444
      if (routingCfg.filterChain != null) {
1✔
445
        for (NamedFilterConfig namedFilter : routingCfg.filterChain) {
1✔
446
          FilterConfig filterConfig = namedFilter.filterConfig;
1✔
447
          Filter filter = filterRegistry.get(filterConfig.typeUrl());
1✔
448
          if (filter instanceof ClientInterceptorBuilder) {
1✔
449
            ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter)
1✔
450
                .buildClientInterceptor(
1✔
451
                    filterConfig, selectedOverrideConfigs.get(namedFilter.name),
1✔
452
                    args, scheduler);
1✔
453
            if (interceptor != null) {
1✔
454
              filterInterceptors.add(interceptor);
1✔
455
            }
456
          }
457
        }
1✔
458
      }
459
      final String finalCluster = cluster;
1✔
460
      final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), headers);
1✔
461
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
462
        @Override
463
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
464
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
465
            final Channel next) {
466
          final CallOptions callOptionsForCluster =
1✔
467
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
468
                  .withOption(RPC_HASH_KEY, hash);
1✔
469
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
470
              next.newCall(method, callOptionsForCluster)) {
1✔
471
            @Override
472
            public void start(Listener<RespT> listener, Metadata headers) {
473
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
474
                boolean committed;
475

476
                @Override
477
                public void onHeaders(Metadata headers) {
478
                  committed = true;
1✔
479
                  releaseCluster(finalCluster);
1✔
480
                  delegate().onHeaders(headers);
1✔
481
                }
1✔
482

483
                @Override
484
                public void onClose(Status status, Metadata trailers) {
485
                  if (!committed) {
1✔
486
                    releaseCluster(finalCluster);
1✔
487
                  }
488
                  delegate().onClose(status, trailers);
1✔
489
                }
1✔
490
              };
491
              delegate().start(listener, headers);
1✔
492
            }
1✔
493
          };
494
        }
495
      }
496

497
      filterInterceptors.add(new ClusterSelectionInterceptor());
1✔
498
      return
1✔
499
          Result.newBuilder()
1✔
500
              .setConfig(config)
1✔
501
              .setInterceptor(combineInterceptors(filterInterceptors))
1✔
502
              .build();
1✔
503
    }
504

505
    private boolean retainCluster(String cluster) {
506
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
507
      if (clusterRefState == null) {
1✔
508
        return false;
×
509
      }
510
      AtomicInteger refCount = clusterRefState.refCount;
1✔
511
      int count;
512
      do {
513
        count = refCount.get();
1✔
514
        if (count == 0) {
1✔
515
          return false;
×
516
        }
517
      } while (!refCount.compareAndSet(count, count + 1));
1✔
518
      return true;
1✔
519
    }
520

521
    private void releaseCluster(final String cluster) {
522
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
523
      if (count == 0) {
1✔
524
        syncContext.execute(new Runnable() {
1✔
525
          @Override
526
          public void run() {
527
            if (clusterRefs.get(cluster).refCount.get() == 0) {
1✔
528
              clusterRefs.remove(cluster);
1✔
529
              updateResolutionResult();
1✔
530
            }
531
          }
1✔
532
        });
533
      }
534
    }
1✔
535

536
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
537
      Long hash = null;
1✔
538
      for (HashPolicy policy : hashPolicies) {
1✔
539
        Long newHash = null;
1✔
540
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
541
          String value = getHeaderValue(headers, policy.headerName());
1✔
542
          if (value != null) {
1✔
543
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
544
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
545
            }
546
            newHash = hashFunc.hashAsciiString(value);
1✔
547
          }
548
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
549
          newHash = hashFunc.hashLong(randomChannelId);
1✔
550
        }
551
        if (newHash != null ) {
1✔
552
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
553
          // and preserves all of the entropy.
554
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
555
          hash = oldHash ^ newHash;
1✔
556
        }
557
        // If the policy is a terminal policy and a hash has been generated, ignore
558
        // the rest of the hash policies.
559
        if (policy.isTerminal() && hash != null) {
1✔
560
          break;
×
561
        }
562
      }
1✔
563
      return hash == null ? random.nextLong() : hash;
1✔
564
    }
565
  }
566

567
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
568
    checkArgument(!interceptors.isEmpty(), "empty interceptors");
1✔
569
    if (interceptors.size() == 1) {
1✔
570
      return interceptors.get(0);
1✔
571
    }
572
    return new ClientInterceptor() {
1✔
573
      @Override
574
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
575
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
576
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
577
        return next.newCall(method, callOptions);
1✔
578
      }
579
    };
580
  }
581

582
  @Nullable
583
  private static String getHeaderValue(Metadata headers, String headerName) {
584
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
585
      return null;
×
586
    }
587
    if (headerName.equals("content-type")) {
1✔
588
      return "application/grpc";
×
589
    }
590
    Metadata.Key<String> key;
591
    try {
592
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
593
    } catch (IllegalArgumentException e) {
×
594
      return null;
×
595
    }
1✔
596
    Iterable<String> values = headers.getAll(key);
1✔
597
    return values == null ? null : Joiner.on(",").join(values);
1✔
598
  }
599

600
  private static String prefixedClusterName(String name) {
601
    return "cluster:" + name;
1✔
602
  }
603

604
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
605
    return "cluster_specifier_plugin:" + pluginName;
1✔
606
  }
607

608
  private static final class FailingConfigSelector extends InternalConfigSelector {
609
    private final Result result;
610

611
    public FailingConfigSelector(Status error) {
1✔
612
      this.result = Result.forError(error);
1✔
613
    }
1✔
614

615
    @Override
616
    public Result selectConfig(PickSubchannelArgs args) {
617
      return result;
1✔
618
    }
619
  }
620

621
  private class ResolveState implements ResourceWatcher<LdsUpdate> {
622
    private final ConfigOrError emptyServiceConfig =
1✔
623
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
624
    private final String ldsResourceName;
625
    private boolean stopped;
626
    @Nullable
627
    private Set<String> existingClusters;  // clusters to which new requests can be routed
628
    @Nullable
629
    private RouteDiscoveryState routeDiscoveryState;
630

631
    ResolveState(String ldsResourceName) {
1✔
632
      this.ldsResourceName = ldsResourceName;
1✔
633
    }
1✔
634

635
    @Override
636
    public void onChanged(final LdsUpdate update) {
637
      syncContext.execute(new Runnable() {
1✔
638
        @Override
639
        public void run() {
640
          if (stopped) {
1✔
641
            return;
×
642
          }
643
          logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
1✔
644
          HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
645
          List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
646
          String rdsName = httpConnectionManager.rdsName();
1✔
647
          cleanUpRouteDiscoveryState();
1✔
648
          if (virtualHosts != null) {
1✔
649
            updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
650
                httpConnectionManager.httpFilterConfigs());
1✔
651
          } else {
652
            routeDiscoveryState = new RouteDiscoveryState(
1✔
653
                rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
654
                httpConnectionManager.httpFilterConfigs());
1✔
655
            logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
656
            xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
657
                rdsName, routeDiscoveryState);
1✔
658
          }
659
        }
1✔
660
      });
661
    }
1✔
662

663
    @Override
664
    public void onError(final Status error) {
665
      syncContext.execute(new Runnable() {
1✔
666
        @Override
667
        public void run() {
668
          if (stopped || receivedConfig) {
1✔
669
            return;
×
670
          }
671
          listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
672
              String.format("Unable to load LDS %s. xDS server returned: %s: %s",
1✔
673
              ldsResourceName, error.getCode(), error.getDescription())));
1✔
674
        }
1✔
675
      });
676
    }
1✔
677

678
    @Override
679
    public void onResourceDoesNotExist(final String resourceName) {
680
      syncContext.execute(new Runnable() {
1✔
681
        @Override
682
        public void run() {
683
          if (stopped) {
1✔
684
            return;
×
685
          }
686
          String error = "LDS resource does not exist: " + resourceName;
1✔
687
          logger.log(XdsLogLevel.INFO, error);
1✔
688
          cleanUpRouteDiscoveryState();
1✔
689
          cleanUpRoutes(error);
1✔
690
        }
1✔
691
      });
692
    }
1✔
693

694
    private void start() {
695
      logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
1✔
696
      xdsClient.watchXdsResource(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
697
    }
1✔
698

699
    private void stop() {
700
      logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
1✔
701
      stopped = true;
1✔
702
      cleanUpRouteDiscoveryState();
1✔
703
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
704
    }
1✔
705

706
    // called in syncContext
707
    private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
708
        @Nullable List<NamedFilterConfig> filterConfigs) {
709
      String authority = overrideAuthority != null ? overrideAuthority : ldsResourceName;
1✔
710
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
1✔
711
      if (virtualHost == null) {
1✔
712
        String error = "Failed to find virtual host matching hostname: " + authority;
1✔
713
        logger.log(XdsLogLevel.WARNING, error);
1✔
714
        cleanUpRoutes(error);
1✔
715
        return;
1✔
716
      }
717

718
      List<Route> routes = virtualHost.routes();
1✔
719

720
      // Populate all clusters to which requests can be routed to through the virtual host.
721
      Set<String> clusters = new HashSet<>();
1✔
722
      // uniqueName -> clusterName
723
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
724
      // uniqueName -> pluginConfig
725
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
726
      for (Route route : routes) {
1✔
727
        RouteAction action = route.routeAction();
1✔
728
        String prefixedName;
729
        if (action != null) {
1✔
730
          if (action.cluster() != null) {
1✔
731
            prefixedName = prefixedClusterName(action.cluster());
1✔
732
            clusters.add(prefixedName);
1✔
733
            clusterNameMap.put(prefixedName, action.cluster());
1✔
734
          } else if (action.weightedClusters() != null) {
1✔
735
            for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
736
              prefixedName = prefixedClusterName(weighedCluster.name());
1✔
737
              clusters.add(prefixedName);
1✔
738
              clusterNameMap.put(prefixedName, weighedCluster.name());
1✔
739
            }
1✔
740
          } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
741
            PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
742
            if (pluginConfig instanceof RlsPluginConfig) {
1✔
743
              prefixedName = prefixedClusterSpecifierPluginName(
1✔
744
                  action.namedClusterSpecifierPluginConfig().name());
1✔
745
              clusters.add(prefixedName);
1✔
746
              rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
747
            }
748
          }
749
        }
750
      }
1✔
751

752
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
753
      boolean shouldUpdateResult = existingClusters == null;
1✔
754
      Set<String> addedClusters =
755
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
756
      Set<String> deletedClusters =
757
          existingClusters == null
1✔
758
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
759
      existingClusters = clusters;
1✔
760
      for (String cluster : addedClusters) {
1✔
761
        if (clusterRefs.containsKey(cluster)) {
1✔
762
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
763
        } else {
764
          if (clusterNameMap.containsKey(cluster)) {
1✔
765
            clusterRefs.put(
1✔
766
                cluster,
767
                ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
1✔
768
          }
769
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
770
            clusterRefs.put(
1✔
771
                cluster,
772
                ClusterRefState.forRlsPlugin(
1✔
773
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
774
          }
775
          shouldUpdateResult = true;
1✔
776
        }
777
      }
1✔
778
      for (String cluster : clusters) {
1✔
779
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
780
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
781
          ClusterRefState newClusterRefState =
1✔
782
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
783
          clusterRefs.put(cluster, newClusterRefState);
1✔
784
          shouldUpdateResult = true;
1✔
785
        }
786
      }
1✔
787
      // Update service config to include newly added clusters.
788
      if (shouldUpdateResult) {
1✔
789
        updateResolutionResult();
1✔
790
      }
791
      // Make newly added clusters selectable by config selector and deleted clusters no longer
792
      // selectable.
793
      routingConfig =
1✔
794
          new RoutingConfig(
795
              httpMaxStreamDurationNano, routes, filterConfigs,
796
              virtualHost.filterConfigOverrides());
1✔
797
      shouldUpdateResult = false;
1✔
798
      for (String cluster : deletedClusters) {
1✔
799
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
800
        if (count == 0) {
1✔
801
          clusterRefs.remove(cluster);
1✔
802
          shouldUpdateResult = true;
1✔
803
        }
804
      }
1✔
805
      if (shouldUpdateResult) {
1✔
806
        updateResolutionResult();
1✔
807
      }
808
    }
1✔
809

810
    private void cleanUpRoutes(String error) {
811
      if (existingClusters != null) {
1✔
812
        for (String cluster : existingClusters) {
1✔
813
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
814
          if (count == 0) {
1✔
815
            clusterRefs.remove(cluster);
1✔
816
          }
817
        }
1✔
818
        existingClusters = null;
1✔
819
      }
820
      routingConfig = RoutingConfig.empty;
1✔
821
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
822
      // the config selector handles the error message itself. Once the LB API allows providing
823
      // failure information for addresses yet still providing a service config, the config seector
824
      // could be avoided.
825
      listener.onResult(ResolutionResult.newBuilder()
1✔
826
          .setAttributes(Attributes.newBuilder()
1✔
827
            .set(InternalConfigSelector.KEY,
1✔
828
              new FailingConfigSelector(Status.UNAVAILABLE.withDescription(error)))
1✔
829
            .build())
1✔
830
          .setServiceConfig(emptyServiceConfig)
1✔
831
          .build());
1✔
832
      receivedConfig = true;
1✔
833
    }
1✔
834

835
    private void cleanUpRouteDiscoveryState() {
836
      if (routeDiscoveryState != null) {
1✔
837
        String rdsName = routeDiscoveryState.resourceName;
1✔
838
        logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
1✔
839
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
840
            routeDiscoveryState);
841
        routeDiscoveryState = null;
1✔
842
      }
843
    }
1✔
844

845
    /**
846
     * Discovery state for RouteConfiguration resource. One instance for each Listener resource
847
     * update.
848
     */
849
    private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
850
      private final String resourceName;
851
      private final long httpMaxStreamDurationNano;
852
      @Nullable
853
      private final List<NamedFilterConfig> filterConfigs;
854

855
      private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
856
          @Nullable List<NamedFilterConfig> filterConfigs) {
1✔
857
        this.resourceName = resourceName;
1✔
858
        this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
1✔
859
        this.filterConfigs = filterConfigs;
1✔
860
      }
1✔
861

862
      @Override
863
      public void onChanged(final RdsUpdate update) {
864
        syncContext.execute(new Runnable() {
1✔
865
          @Override
866
          public void run() {
867
            if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
868
              return;
×
869
            }
870
            logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
1✔
871
            updateRoutes(update.virtualHosts, httpMaxStreamDurationNano,
1✔
872
                filterConfigs);
1✔
873
          }
1✔
874
        });
875
      }
1✔
876

877
      @Override
878
      public void onError(final Status error) {
879
        syncContext.execute(new Runnable() {
1✔
880
          @Override
881
          public void run() {
882
            if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
1✔
883
              return;
×
884
            }
885
            listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
886
                String.format("Unable to load RDS %s. xDS server returned: %s: %s",
1✔
887
                resourceName, error.getCode(), error.getDescription())));
1✔
888
          }
1✔
889
        });
890
      }
1✔
891

892
      @Override
893
      public void onResourceDoesNotExist(final String resourceName) {
894
        syncContext.execute(new Runnable() {
1✔
895
          @Override
896
          public void run() {
897
            if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
898
              return;
×
899
            }
900
            String error = "RDS resource does not exist: " + resourceName;
1✔
901
            logger.log(XdsLogLevel.INFO, error);
1✔
902
            cleanUpRoutes(error);
1✔
903
          }
1✔
904
        });
905
      }
1✔
906
    }
907
  }
908

909
  /**
910
   * VirtualHost-level configuration for request routing.
911
   */
912
  private static class RoutingConfig {
913
    private final long fallbackTimeoutNano;
914
    final List<Route> routes;
915
    // Null if HttpFilter is not supported.
916
    @Nullable final List<NamedFilterConfig> filterChain;
917
    final Map<String, FilterConfig> virtualHostOverrideConfig;
918

919
    private static RoutingConfig empty = new RoutingConfig(
1✔
920
        0, Collections.emptyList(), null, Collections.emptyMap());
1✔
921

922
    private RoutingConfig(
923
        long fallbackTimeoutNano, List<Route> routes, @Nullable List<NamedFilterConfig> filterChain,
924
        Map<String, FilterConfig> virtualHostOverrideConfig) {
1✔
925
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
926
      this.routes = routes;
1✔
927
      checkArgument(filterChain == null || !filterChain.isEmpty(), "filterChain is empty");
1✔
928
      this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain);
1✔
929
      this.virtualHostOverrideConfig = Collections.unmodifiableMap(virtualHostOverrideConfig);
1✔
930
    }
1✔
931
  }
932

933
  private static class ClusterRefState {
934
    final AtomicInteger refCount;
935
    @Nullable
936
    final String traditionalCluster;
937
    @Nullable
938
    final RlsPluginConfig rlsPluginConfig;
939

940
    private ClusterRefState(
941
        AtomicInteger refCount, @Nullable String traditionalCluster,
942
        @Nullable RlsPluginConfig rlsPluginConfig) {
1✔
943
      this.refCount = refCount;
1✔
944
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
945
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
946
      this.traditionalCluster = traditionalCluster;
1✔
947
      this.rlsPluginConfig = rlsPluginConfig;
1✔
948
    }
1✔
949

950
    private Map<String, ?> toLbPolicy() {
951
      if (traditionalCluster != null) {
1✔
952
        return ImmutableMap.of(
1✔
953
            XdsLbPolicies.CDS_POLICY_NAME,
954
            ImmutableMap.of("cluster", traditionalCluster));
1✔
955
      } else {
956
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
957
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
958
            .put(
1✔
959
                "childPolicy",
960
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
1✔
961
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
962
            .buildOrThrow();
1✔
963
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
964
      }
965
    }
966

967
    static ClusterRefState forCluster(AtomicInteger refCount, String name) {
968
      return new ClusterRefState(refCount, name, null);
1✔
969
    }
970

971
    static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
972
      return new ClusterRefState(refCount, null, rlsPluginConfig);
1✔
973
    }
974
  }
975
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc