• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19437

23 Aug 2024 08:05PM UTC coverage: 84.514% (+0.01%) from 84.503%
#19437

push

github

web-flow
Xds client split (#11484)

33415 of 39538 relevant lines covered (84.51%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.81
/../xds/src/main/java/io/grpc/xds/XdsNameResolver.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Joiner;
25
import com.google.common.base.Strings;
26
import com.google.common.collect.ImmutableList;
27
import com.google.common.collect.ImmutableMap;
28
import com.google.common.collect.Sets;
29
import com.google.gson.Gson;
30
import com.google.protobuf.util.Durations;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.Channel;
34
import io.grpc.ClientCall;
35
import io.grpc.ClientInterceptor;
36
import io.grpc.ClientInterceptors;
37
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
38
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
39
import io.grpc.InternalConfigSelector;
40
import io.grpc.InternalLogId;
41
import io.grpc.LoadBalancer.PickSubchannelArgs;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.NameResolver;
45
import io.grpc.Status;
46
import io.grpc.Status.Code;
47
import io.grpc.SynchronizationContext;
48
import io.grpc.internal.GrpcUtil;
49
import io.grpc.internal.ObjectPool;
50
import io.grpc.xds.ClusterSpecifierPlugin.PluginConfig;
51
import io.grpc.xds.Filter.ClientInterceptorBuilder;
52
import io.grpc.xds.Filter.FilterConfig;
53
import io.grpc.xds.Filter.NamedFilterConfig;
54
import io.grpc.xds.RouteLookupServiceClusterSpecifierPlugin.RlsPluginConfig;
55
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
56
import io.grpc.xds.VirtualHost.Route;
57
import io.grpc.xds.VirtualHost.Route.RouteAction;
58
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
59
import io.grpc.xds.VirtualHost.Route.RouteAction.HashPolicy;
60
import io.grpc.xds.VirtualHost.Route.RouteAction.RetryPolicy;
61
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
62
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
63
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
64
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
65
import io.grpc.xds.client.XdsClient;
66
import io.grpc.xds.client.XdsClient.ResourceWatcher;
67
import io.grpc.xds.client.XdsLogger;
68
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
69
import java.net.URI;
70
import java.util.ArrayList;
71
import java.util.Collections;
72
import java.util.HashMap;
73
import java.util.HashSet;
74
import java.util.List;
75
import java.util.Locale;
76
import java.util.Map;
77
import java.util.Objects;
78
import java.util.Set;
79
import java.util.concurrent.ConcurrentHashMap;
80
import java.util.concurrent.ConcurrentMap;
81
import java.util.concurrent.ScheduledExecutorService;
82
import java.util.concurrent.atomic.AtomicInteger;
83
import javax.annotation.Nullable;
84

85
/**
86
 * A {@link NameResolver} for resolving gRPC target names with "xds:" scheme.
87
 *
88
 * <p>Resolving a gRPC target involves contacting the control plane management server via xDS
89
 * protocol to retrieve service information and produce a service config to the caller.
90
 *
91
 * @see XdsNameResolverProvider
92
 */
93
final class XdsNameResolver extends NameResolver {
94

95
  static final CallOptions.Key<String> CLUSTER_SELECTION_KEY =
1✔
96
      CallOptions.Key.create("io.grpc.xds.CLUSTER_SELECTION_KEY");
1✔
97
  static final CallOptions.Key<Long> RPC_HASH_KEY =
1✔
98
      CallOptions.Key.create("io.grpc.xds.RPC_HASH_KEY");
1✔
99
  @VisibleForTesting
100
  static boolean enableTimeout =
1✔
101
      Strings.isNullOrEmpty(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"))
1✔
102
          || Boolean.parseBoolean(System.getenv("GRPC_XDS_EXPERIMENTAL_ENABLE_TIMEOUT"));
1✔
103

104
  private final InternalLogId logId;
105
  private final XdsLogger logger;
106
  @Nullable
107
  private final String targetAuthority;
108
  private final String target;
109
  private final String serviceAuthority;
110
  // Encoded version of the service authority as per 
111
  // https://datatracker.ietf.org/doc/html/rfc3986#section-3.2.
112
  private final String encodedServiceAuthority;
113
  private final String overrideAuthority;
114
  private final ServiceConfigParser serviceConfigParser;
115
  private final SynchronizationContext syncContext;
116
  private final ScheduledExecutorService scheduler;
117
  private final XdsClientPoolFactory xdsClientPoolFactory;
118
  private final ThreadSafeRandom random;
119
  private final FilterRegistry filterRegistry;
120
  private final XxHash64 hashFunc = XxHash64.INSTANCE;
1✔
121
  // Clusters (with reference counts) to which new/existing requests can be/are routed.
122
  // put()/remove() must be called in SyncContext, and get() can be called in any thread.
123
  private final ConcurrentMap<String, ClusterRefState> clusterRefs = new ConcurrentHashMap<>();
1✔
124
  private final ConfigSelector configSelector = new ConfigSelector();
1✔
125
  private final long randomChannelId;
126

127
  private volatile RoutingConfig routingConfig = RoutingConfig.empty;
1✔
128
  private Listener2 listener;
129
  private ObjectPool<XdsClient> xdsClientPool;
130
  private XdsClient xdsClient;
131
  private CallCounterProvider callCounterProvider;
132
  private ResolveState resolveState;
133
  // Workaround for https://github.com/grpc/grpc-java/issues/8886 . This should be handled in
134
  // XdsClient instead of here.
135
  private boolean receivedConfig;
136

137
  XdsNameResolver(
138
      URI targetUri, String name, @Nullable String overrideAuthority,
139
      ServiceConfigParser serviceConfigParser,
140
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
141
      @Nullable Map<String, ?> bootstrapOverride) {
142
    this(targetUri, targetUri.getAuthority(), name, overrideAuthority, serviceConfigParser,
1✔
143
        syncContext, scheduler, SharedXdsClientPoolProvider.getDefaultProvider(),
1✔
144
        ThreadSafeRandomImpl.instance, FilterRegistry.getDefaultRegistry(), bootstrapOverride);
1✔
145
  }
1✔
146

147
  @VisibleForTesting
148
  XdsNameResolver(
149
      URI targetUri, @Nullable String targetAuthority, String name,
150
      @Nullable String overrideAuthority, ServiceConfigParser serviceConfigParser,
151
      SynchronizationContext syncContext, ScheduledExecutorService scheduler,
152
      XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
153
      FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
1✔
154
    this.targetAuthority = targetAuthority;
1✔
155
    target = targetUri.toString();
1✔
156

157
    // The name might have multiple slashes so encode it before verifying.
158
    serviceAuthority = checkNotNull(name, "name");
1✔
159
    this.encodedServiceAuthority = 
1✔
160
      GrpcUtil.checkAuthority(GrpcUtil.AuthorityEscaper.encodeAuthority(serviceAuthority));
1✔
161

162
    this.overrideAuthority = overrideAuthority;
1✔
163
    this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
1✔
164
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
165
    this.scheduler = checkNotNull(scheduler, "scheduler");
1✔
166
    this.xdsClientPoolFactory = bootstrapOverride == null ? checkNotNull(xdsClientPoolFactory,
1✔
167
            "xdsClientPoolFactory") : new SharedXdsClientPoolProvider();
1✔
168
    this.xdsClientPoolFactory.setBootstrapOverride(bootstrapOverride);
1✔
169
    this.random = checkNotNull(random, "random");
1✔
170
    this.filterRegistry = checkNotNull(filterRegistry, "filterRegistry");
1✔
171
    randomChannelId = random.nextLong();
1✔
172
    logId = InternalLogId.allocate("xds-resolver", name);
1✔
173
    logger = XdsLogger.withLogId(logId);
1✔
174
    logger.log(XdsLogLevel.INFO, "Created resolver for {0}", name);
1✔
175
  }
1✔
176

177
  @Override
178
  public String getServiceAuthority() {
179
    return encodedServiceAuthority;
1✔
180
  }
181

182
  @Override
183
  public void start(Listener2 listener) {
184
    this.listener = checkNotNull(listener, "listener");
1✔
185
    try {
186
      xdsClientPool = xdsClientPoolFactory.getOrCreate(target);
1✔
187
    } catch (Exception e) {
1✔
188
      listener.onError(
1✔
189
          Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
1✔
190
      return;
1✔
191
    }
1✔
192
    xdsClient = xdsClientPool.getObject();
1✔
193
    BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
1✔
194
    String listenerNameTemplate;
195
    if (targetAuthority == null) {
1✔
196
      listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
1✔
197
    } else {
198
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
1✔
199
      if (authorityInfo == null) {
1✔
200
        listener.onError(Status.INVALID_ARGUMENT.withDescription(
1✔
201
            "invalid target URI: target authority not found in the bootstrap"));
202
        return;
1✔
203
      }
204
      listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
1✔
205
    }
206
    String replacement = serviceAuthority;
1✔
207
    if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
1✔
208
      replacement = XdsClient.percentEncodePath(replacement);
1✔
209
    }
210
    String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
1✔
211
    if (!XdsClient.isResourceNameValid(ldsResourceName, XdsListenerResource.getInstance().typeUrl())
1✔
212
        ) {
213
      listener.onError(Status.INVALID_ARGUMENT.withDescription(
×
214
          "invalid listener resource URI for service authority: " + serviceAuthority));
215
      return;
×
216
    }
217
    ldsResourceName = XdsClient.canonifyResourceName(ldsResourceName);
1✔
218
    callCounterProvider = SharedCallCounterMap.getInstance();
1✔
219
    resolveState = new ResolveState(ldsResourceName);
1✔
220
    resolveState.start();
1✔
221
  }
1✔
222

223
  private static String expandPercentS(String template, String replacement) {
224
    return template.replace("%s", replacement);
1✔
225
  }
226

227
  @Override
228
  public void shutdown() {
229
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
230
    if (resolveState != null) {
1✔
231
      resolveState.stop();
1✔
232
    }
233
    if (xdsClient != null) {
1✔
234
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
235
    }
236
  }
1✔
237

238
  @VisibleForTesting
239
  static Map<String, ?> generateServiceConfigWithMethodConfig(
240
      @Nullable Long timeoutNano, @Nullable RetryPolicy retryPolicy) {
241
    if (timeoutNano == null
1✔
242
        && (retryPolicy == null || retryPolicy.retryableStatusCodes().isEmpty())) {
1✔
243
      return Collections.emptyMap();
1✔
244
    }
245
    ImmutableMap.Builder<String, Object> methodConfig = ImmutableMap.builder();
1✔
246
    methodConfig.put(
1✔
247
        "name", Collections.singletonList(Collections.emptyMap()));
1✔
248
    if (retryPolicy != null && !retryPolicy.retryableStatusCodes().isEmpty()) {
1✔
249
      ImmutableMap.Builder<String, Object> rawRetryPolicy = ImmutableMap.builder();
1✔
250
      rawRetryPolicy.put("maxAttempts", (double) retryPolicy.maxAttempts());
1✔
251
      rawRetryPolicy.put("initialBackoff", Durations.toString(retryPolicy.initialBackoff()));
1✔
252
      rawRetryPolicy.put("maxBackoff", Durations.toString(retryPolicy.maxBackoff()));
1✔
253
      rawRetryPolicy.put("backoffMultiplier", 2D);
1✔
254
      List<String> codes = new ArrayList<>(retryPolicy.retryableStatusCodes().size());
1✔
255
      for (Code code : retryPolicy.retryableStatusCodes()) {
1✔
256
        codes.add(code.name());
1✔
257
      }
1✔
258
      rawRetryPolicy.put(
1✔
259
          "retryableStatusCodes", Collections.unmodifiableList(codes));
1✔
260
      if (retryPolicy.perAttemptRecvTimeout() != null) {
1✔
261
        rawRetryPolicy.put(
×
262
            "perAttemptRecvTimeout", Durations.toString(retryPolicy.perAttemptRecvTimeout()));
×
263
      }
264
      methodConfig.put("retryPolicy", rawRetryPolicy.buildOrThrow());
1✔
265
    }
266
    if (timeoutNano != null) {
1✔
267
      String timeout = timeoutNano / 1_000_000_000.0 + "s";
1✔
268
      methodConfig.put("timeout", timeout);
1✔
269
    }
270
    return Collections.singletonMap(
1✔
271
        "methodConfig", Collections.singletonList(methodConfig.buildOrThrow()));
1✔
272
  }
273

274
  @VisibleForTesting
275
  XdsClient getXdsClient() {
276
    return xdsClient;
1✔
277
  }
278

279
  // called in syncContext
280
  private void updateResolutionResult() {
281
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
282

283
    ImmutableMap.Builder<String, Object> childPolicy = new ImmutableMap.Builder<>();
1✔
284
    for (String name : clusterRefs.keySet()) {
1✔
285
      Map<String, ?> lbPolicy = clusterRefs.get(name).toLbPolicy();
1✔
286
      childPolicy.put(name, ImmutableMap.of("lbPolicy", ImmutableList.of(lbPolicy)));
1✔
287
    }
1✔
288
    Map<String, ?> rawServiceConfig = ImmutableMap.of(
1✔
289
        "loadBalancingConfig",
290
        ImmutableList.of(ImmutableMap.of(
1✔
291
            XdsLbPolicies.CLUSTER_MANAGER_POLICY_NAME,
292
            ImmutableMap.of("childPolicy", childPolicy.buildOrThrow()))));
1✔
293

294
    if (logger.isLoggable(XdsLogLevel.INFO)) {
1✔
295
      logger.log(
×
296
          XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
×
297
    }
298
    ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
299
    Attributes attrs =
300
        Attributes.newBuilder()
1✔
301
            .set(InternalXdsAttributes.XDS_CLIENT_POOL, xdsClientPool)
1✔
302
            .set(InternalXdsAttributes.CALL_COUNTER_PROVIDER, callCounterProvider)
1✔
303
            .set(InternalConfigSelector.KEY, configSelector)
1✔
304
            .build();
1✔
305
    ResolutionResult result =
306
        ResolutionResult.newBuilder()
1✔
307
            .setAttributes(attrs)
1✔
308
            .setServiceConfig(parsedServiceConfig)
1✔
309
            .build();
1✔
310
    listener.onResult(result);
1✔
311
    receivedConfig = true;
1✔
312
  }
1✔
313

314
  /**
315
   * Returns {@code true} iff {@code hostName} matches the domain name {@code pattern} with
316
   * case-insensitive.
317
   *
318
   * <p>Wildcard pattern rules:
319
   * <ol>
320
   * <li>A single asterisk (*) matches any domain.</li>
321
   * <li>Asterisk (*) is only permitted in the left-most or the right-most part of the pattern,
322
   *     but not both.</li>
323
   * </ol>
324
   */
325
  @VisibleForTesting
326
  static boolean matchHostName(String hostName, String pattern) {
327
    checkArgument(hostName.length() != 0 && !hostName.startsWith(".") && !hostName.endsWith("."),
1✔
328
        "Invalid host name");
329
    checkArgument(pattern.length() != 0 && !pattern.startsWith(".") && !pattern.endsWith("."),
1✔
330
        "Invalid pattern/domain name");
331

332
    hostName = hostName.toLowerCase(Locale.US);
1✔
333
    pattern = pattern.toLowerCase(Locale.US);
1✔
334
    // hostName and pattern are now in lower case -- domain names are case-insensitive.
335

336
    if (!pattern.contains("*")) {
1✔
337
      // Not a wildcard pattern -- hostName and pattern must match exactly.
338
      return hostName.equals(pattern);
1✔
339
    }
340
    // Wildcard pattern
341

342
    if (pattern.length() == 1) {
1✔
343
      return true;
×
344
    }
345

346
    int index = pattern.indexOf('*');
1✔
347

348
    // At most one asterisk (*) is allowed.
349
    if (pattern.indexOf('*', index + 1) != -1) {
1✔
350
      return false;
×
351
    }
352

353
    // Asterisk can only match prefix or suffix.
354
    if (index != 0 && index != pattern.length() - 1) {
1✔
355
      return false;
×
356
    }
357

358
    // HostName must be at least as long as the pattern because asterisk has to
359
    // match one or more characters.
360
    if (hostName.length() < pattern.length()) {
1✔
361
      return false;
1✔
362
    }
363

364
    if (index == 0 && hostName.endsWith(pattern.substring(1))) {
1✔
365
      // Prefix matching fails.
366
      return true;
1✔
367
    }
368

369
    // Pattern matches hostname if suffix matching succeeds.
370
    return index == pattern.length() - 1
1✔
371
        && hostName.startsWith(pattern.substring(0, pattern.length() - 1));
1✔
372
  }
373

374
  private final class ConfigSelector extends InternalConfigSelector {
1✔
375
    @Override
376
    public Result selectConfig(PickSubchannelArgs args) {
377
      String cluster = null;
1✔
378
      Route selectedRoute = null;
1✔
379
      RoutingConfig routingCfg;
380
      Map<String, FilterConfig> selectedOverrideConfigs;
381
      List<ClientInterceptor> filterInterceptors = new ArrayList<>();
1✔
382
      Metadata headers = args.getHeaders();
1✔
383
      do {
384
        routingCfg = routingConfig;
1✔
385
        selectedOverrideConfigs = new HashMap<>(routingCfg.virtualHostOverrideConfig);
1✔
386
        for (Route route : routingCfg.routes) {
1✔
387
          if (RoutingUtils.matchRoute(
1✔
388
                  route.routeMatch(), "/" + args.getMethodDescriptor().getFullMethodName(),
1✔
389
              headers, random)) {
1✔
390
            selectedRoute = route;
1✔
391
            selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
392
            break;
1✔
393
          }
394
        }
1✔
395
        if (selectedRoute == null) {
1✔
396
          return Result.forError(
1✔
397
              Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"));
1✔
398
        }
399
        if (selectedRoute.routeAction() == null) {
1✔
400
          return Result.forError(Status.UNAVAILABLE.withDescription(
1✔
401
              "Could not route RPC to Route with non-forwarding action"));
402
        }
403
        RouteAction action = selectedRoute.routeAction();
1✔
404
        if (action.cluster() != null) {
1✔
405
          cluster = prefixedClusterName(action.cluster());
1✔
406
        } else if (action.weightedClusters() != null) {
1✔
407
          long totalWeight = 0;
1✔
408
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
409
            totalWeight += weightedCluster.weight();
1✔
410
          }
1✔
411
          long select = random.nextLong(totalWeight);
1✔
412
          long accumulator = 0;
1✔
413
          for (ClusterWeight weightedCluster : action.weightedClusters()) {
1✔
414
            accumulator += weightedCluster.weight();
1✔
415
            if (select < accumulator) {
1✔
416
              cluster = prefixedClusterName(weightedCluster.name());
1✔
417
              selectedOverrideConfigs.putAll(weightedCluster.filterConfigOverrides());
1✔
418
              break;
1✔
419
            }
420
          }
1✔
421
        } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
422
          cluster =
1✔
423
              prefixedClusterSpecifierPluginName(action.namedClusterSpecifierPluginConfig().name());
1✔
424
        }
425
      } while (!retainCluster(cluster));
1✔
426
      Long timeoutNanos = null;
1✔
427
      if (enableTimeout) {
1✔
428
        if (selectedRoute != null) {
1✔
429
          timeoutNanos = selectedRoute.routeAction().timeoutNano();
1✔
430
        }
431
        if (timeoutNanos == null) {
1✔
432
          timeoutNanos = routingCfg.fallbackTimeoutNano;
1✔
433
        }
434
        if (timeoutNanos <= 0) {
1✔
435
          timeoutNanos = null;
1✔
436
        }
437
      }
438
      RetryPolicy retryPolicy =
439
          selectedRoute == null ? null : selectedRoute.routeAction().retryPolicy();
1✔
440
      // TODO(chengyuanzhang): avoid service config generation and parsing for each call.
441
      Map<String, ?> rawServiceConfig =
1✔
442
          generateServiceConfigWithMethodConfig(timeoutNanos, retryPolicy);
1✔
443
      ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
1✔
444
      Object config = parsedServiceConfig.getConfig();
1✔
445
      if (config == null) {
1✔
446
        releaseCluster(cluster);
×
447
        return Result.forError(
×
448
            parsedServiceConfig.getError().augmentDescription(
×
449
                "Failed to parse service config (method config)"));
450
      }
451
      if (routingCfg.filterChain != null) {
1✔
452
        for (NamedFilterConfig namedFilter : routingCfg.filterChain) {
1✔
453
          FilterConfig filterConfig = namedFilter.filterConfig;
1✔
454
          Filter filter = filterRegistry.get(filterConfig.typeUrl());
1✔
455
          if (filter instanceof ClientInterceptorBuilder) {
1✔
456
            ClientInterceptor interceptor = ((ClientInterceptorBuilder) filter)
1✔
457
                .buildClientInterceptor(
1✔
458
                    filterConfig, selectedOverrideConfigs.get(namedFilter.name),
1✔
459
                    args, scheduler);
1✔
460
            if (interceptor != null) {
1✔
461
              filterInterceptors.add(interceptor);
1✔
462
            }
463
          }
464
        }
1✔
465
      }
466
      final String finalCluster = cluster;
1✔
467
      final long hash = generateHash(selectedRoute.routeAction().hashPolicies(), headers);
1✔
468
      class ClusterSelectionInterceptor implements ClientInterceptor {
1✔
469
        @Override
470
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
471
            final MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
472
            final Channel next) {
473
          final CallOptions callOptionsForCluster =
1✔
474
              callOptions.withOption(CLUSTER_SELECTION_KEY, finalCluster)
1✔
475
                  .withOption(RPC_HASH_KEY, hash);
1✔
476
          return new SimpleForwardingClientCall<ReqT, RespT>(
1✔
477
              next.newCall(method, callOptionsForCluster)) {
1✔
478
            @Override
479
            public void start(Listener<RespT> listener, Metadata headers) {
480
              listener = new SimpleForwardingClientCallListener<RespT>(listener) {
1✔
481
                boolean committed;
482

483
                @Override
484
                public void onHeaders(Metadata headers) {
485
                  committed = true;
1✔
486
                  releaseCluster(finalCluster);
1✔
487
                  delegate().onHeaders(headers);
1✔
488
                }
1✔
489

490
                @Override
491
                public void onClose(Status status, Metadata trailers) {
492
                  if (!committed) {
1✔
493
                    releaseCluster(finalCluster);
1✔
494
                  }
495
                  delegate().onClose(status, trailers);
1✔
496
                }
1✔
497
              };
498
              delegate().start(listener, headers);
1✔
499
            }
1✔
500
          };
501
        }
502
      }
503

504
      filterInterceptors.add(new ClusterSelectionInterceptor());
1✔
505
      return
1✔
506
          Result.newBuilder()
1✔
507
              .setConfig(config)
1✔
508
              .setInterceptor(combineInterceptors(filterInterceptors))
1✔
509
              .build();
1✔
510
    }
511

512
    private boolean retainCluster(String cluster) {
513
      ClusterRefState clusterRefState = clusterRefs.get(cluster);
1✔
514
      if (clusterRefState == null) {
1✔
515
        return false;
×
516
      }
517
      AtomicInteger refCount = clusterRefState.refCount;
1✔
518
      int count;
519
      do {
520
        count = refCount.get();
1✔
521
        if (count == 0) {
1✔
522
          return false;
×
523
        }
524
      } while (!refCount.compareAndSet(count, count + 1));
1✔
525
      return true;
1✔
526
    }
527

528
    private void releaseCluster(final String cluster) {
529
      int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
530
      if (count == 0) {
1✔
531
        syncContext.execute(new Runnable() {
1✔
532
          @Override
533
          public void run() {
534
            if (clusterRefs.get(cluster).refCount.get() == 0) {
1✔
535
              clusterRefs.remove(cluster);
1✔
536
              updateResolutionResult();
1✔
537
            }
538
          }
1✔
539
        });
540
      }
541
    }
1✔
542

543
    private long generateHash(List<HashPolicy> hashPolicies, Metadata headers) {
544
      Long hash = null;
1✔
545
      for (HashPolicy policy : hashPolicies) {
1✔
546
        Long newHash = null;
1✔
547
        if (policy.type() == HashPolicy.Type.HEADER) {
1✔
548
          String value = getHeaderValue(headers, policy.headerName());
1✔
549
          if (value != null) {
1✔
550
            if (policy.regEx() != null && policy.regExSubstitution() != null) {
1✔
551
              value = policy.regEx().matcher(value).replaceAll(policy.regExSubstitution());
1✔
552
            }
553
            newHash = hashFunc.hashAsciiString(value);
1✔
554
          }
555
        } else if (policy.type() == HashPolicy.Type.CHANNEL_ID) {
1✔
556
          newHash = hashFunc.hashLong(randomChannelId);
1✔
557
        }
558
        if (newHash != null ) {
1✔
559
          // Rotating the old value prevents duplicate hash rules from cancelling each other out
560
          // and preserves all of the entropy.
561
          long oldHash = hash != null ? ((hash << 1L) | (hash >> 63L)) : 0;
1✔
562
          hash = oldHash ^ newHash;
1✔
563
        }
564
        // If the policy is a terminal policy and a hash has been generated, ignore
565
        // the rest of the hash policies.
566
        if (policy.isTerminal() && hash != null) {
1✔
567
          break;
×
568
        }
569
      }
1✔
570
      return hash == null ? random.nextLong() : hash;
1✔
571
    }
572
  }
573

574
  private static ClientInterceptor combineInterceptors(final List<ClientInterceptor> interceptors) {
575
    checkArgument(!interceptors.isEmpty(), "empty interceptors");
1✔
576
    if (interceptors.size() == 1) {
1✔
577
      return interceptors.get(0);
1✔
578
    }
579
    return new ClientInterceptor() {
1✔
580
      @Override
581
      public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
582
          MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
583
        next = ClientInterceptors.interceptForward(next, interceptors);
1✔
584
        return next.newCall(method, callOptions);
1✔
585
      }
586
    };
587
  }
588

589
  @Nullable
590
  private static String getHeaderValue(Metadata headers, String headerName) {
591
    if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
1✔
592
      return null;
×
593
    }
594
    if (headerName.equals("content-type")) {
1✔
595
      return "application/grpc";
×
596
    }
597
    Metadata.Key<String> key;
598
    try {
599
      key = Metadata.Key.of(headerName, Metadata.ASCII_STRING_MARSHALLER);
1✔
600
    } catch (IllegalArgumentException e) {
×
601
      return null;
×
602
    }
1✔
603
    Iterable<String> values = headers.getAll(key);
1✔
604
    return values == null ? null : Joiner.on(",").join(values);
1✔
605
  }
606

607
  private static String prefixedClusterName(String name) {
608
    return "cluster:" + name;
1✔
609
  }
610

611
  private static String prefixedClusterSpecifierPluginName(String pluginName) {
612
    return "cluster_specifier_plugin:" + pluginName;
1✔
613
  }
614

615
  private static final class FailingConfigSelector extends InternalConfigSelector {
616
    private final Result result;
617

618
    public FailingConfigSelector(Status error) {
1✔
619
      this.result = Result.forError(error);
1✔
620
    }
1✔
621

622
    @Override
623
    public Result selectConfig(PickSubchannelArgs args) {
624
      return result;
1✔
625
    }
626
  }
627

628
  private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
629
    private final ConfigOrError emptyServiceConfig =
1✔
630
        serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
1✔
631
    private final String ldsResourceName;
632
    private boolean stopped;
633
    @Nullable
634
    private Set<String> existingClusters;  // clusters to which new requests can be routed
635
    @Nullable
636
    private RouteDiscoveryState routeDiscoveryState;
637

638
    ResolveState(String ldsResourceName) {
1✔
639
      this.ldsResourceName = ldsResourceName;
1✔
640
    }
1✔
641

642
    @Override
643
    public void onChanged(final XdsListenerResource.LdsUpdate update) {
644
      if (stopped) {
1✔
645
        return;
×
646
      }
647
      logger.log(XdsLogLevel.INFO, "Receive LDS resource update: {0}", update);
1✔
648
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
649
      List<VirtualHost> virtualHosts = httpConnectionManager.virtualHosts();
1✔
650
      String rdsName = httpConnectionManager.rdsName();
1✔
651
      cleanUpRouteDiscoveryState();
1✔
652
      if (virtualHosts != null) {
1✔
653
        updateRoutes(virtualHosts, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
654
            httpConnectionManager.httpFilterConfigs());
1✔
655
      } else {
656
        routeDiscoveryState = new RouteDiscoveryState(
1✔
657
            rdsName, httpConnectionManager.httpMaxStreamDurationNano(),
1✔
658
            httpConnectionManager.httpFilterConfigs());
1✔
659
        logger.log(XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
660
        xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
661
            rdsName, routeDiscoveryState, syncContext);
1✔
662
      }
663
    }
1✔
664

665
    @Override
666
    public void onError(final Status error) {
667
      if (stopped || receivedConfig) {
1✔
668
        return;
×
669
      }
670
      listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
671
          String.format("Unable to load LDS %s. xDS server returned: %s: %s",
1✔
672
          ldsResourceName, error.getCode(), error.getDescription())));
1✔
673
    }
1✔
674

675
    @Override
676
    public void onResourceDoesNotExist(final String resourceName) {
677
      if (stopped) {
1✔
678
        return;
×
679
      }
680
      String error = "LDS resource does not exist: " + resourceName;
1✔
681
      logger.log(XdsLogLevel.INFO, error);
1✔
682
      cleanUpRouteDiscoveryState();
1✔
683
      cleanUpRoutes(error);
1✔
684
    }
1✔
685

686
    private void start() {
687
      logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
1✔
688
      xdsClient.watchXdsResource(XdsListenerResource.getInstance(),
1✔
689
          ldsResourceName, this, syncContext);
1✔
690
    }
1✔
691

692
    private void stop() {
693
      logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
1✔
694
      stopped = true;
1✔
695
      cleanUpRouteDiscoveryState();
1✔
696
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), ldsResourceName, this);
1✔
697
    }
1✔
698

699
    // called in syncContext
700
    private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
701
        @Nullable List<NamedFilterConfig> filterConfigs) {
702
      String authority = overrideAuthority != null ? overrideAuthority : encodedServiceAuthority;
1✔
703
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(virtualHosts, authority);
1✔
704
      if (virtualHost == null) {
1✔
705
        String error = "Failed to find virtual host matching hostname: " + authority;
1✔
706
        logger.log(XdsLogLevel.WARNING, error);
1✔
707
        cleanUpRoutes(error);
1✔
708
        return;
1✔
709
      }
710

711
      List<Route> routes = virtualHost.routes();
1✔
712

713
      // Populate all clusters to which requests can be routed to through the virtual host.
714
      Set<String> clusters = new HashSet<>();
1✔
715
      // uniqueName -> clusterName
716
      Map<String, String> clusterNameMap = new HashMap<>();
1✔
717
      // uniqueName -> pluginConfig
718
      Map<String, RlsPluginConfig> rlsPluginConfigMap = new HashMap<>();
1✔
719
      for (Route route : routes) {
1✔
720
        RouteAction action = route.routeAction();
1✔
721
        String prefixedName;
722
        if (action != null) {
1✔
723
          if (action.cluster() != null) {
1✔
724
            prefixedName = prefixedClusterName(action.cluster());
1✔
725
            clusters.add(prefixedName);
1✔
726
            clusterNameMap.put(prefixedName, action.cluster());
1✔
727
          } else if (action.weightedClusters() != null) {
1✔
728
            for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
729
              prefixedName = prefixedClusterName(weighedCluster.name());
1✔
730
              clusters.add(prefixedName);
1✔
731
              clusterNameMap.put(prefixedName, weighedCluster.name());
1✔
732
            }
1✔
733
          } else if (action.namedClusterSpecifierPluginConfig() != null) {
1✔
734
            PluginConfig pluginConfig = action.namedClusterSpecifierPluginConfig().config();
1✔
735
            if (pluginConfig instanceof RlsPluginConfig) {
1✔
736
              prefixedName = prefixedClusterSpecifierPluginName(
1✔
737
                  action.namedClusterSpecifierPluginConfig().name());
1✔
738
              clusters.add(prefixedName);
1✔
739
              rlsPluginConfigMap.put(prefixedName, (RlsPluginConfig) pluginConfig);
1✔
740
            }
741
          }
742
        }
743
      }
1✔
744

745
      // Updates channel's load balancing config whenever the set of selectable clusters changes.
746
      boolean shouldUpdateResult = existingClusters == null;
1✔
747
      Set<String> addedClusters =
748
          existingClusters == null ? clusters : Sets.difference(clusters, existingClusters);
1✔
749
      Set<String> deletedClusters =
750
          existingClusters == null
1✔
751
              ? Collections.emptySet() : Sets.difference(existingClusters, clusters);
1✔
752
      existingClusters = clusters;
1✔
753
      for (String cluster : addedClusters) {
1✔
754
        if (clusterRefs.containsKey(cluster)) {
1✔
755
          clusterRefs.get(cluster).refCount.incrementAndGet();
1✔
756
        } else {
757
          if (clusterNameMap.containsKey(cluster)) {
1✔
758
            clusterRefs.put(
1✔
759
                cluster,
760
                ClusterRefState.forCluster(new AtomicInteger(1), clusterNameMap.get(cluster)));
1✔
761
          }
762
          if (rlsPluginConfigMap.containsKey(cluster)) {
1✔
763
            clusterRefs.put(
1✔
764
                cluster,
765
                ClusterRefState.forRlsPlugin(
1✔
766
                    new AtomicInteger(1), rlsPluginConfigMap.get(cluster)));
1✔
767
          }
768
          shouldUpdateResult = true;
1✔
769
        }
770
      }
1✔
771
      for (String cluster : clusters) {
1✔
772
        RlsPluginConfig rlsPluginConfig = rlsPluginConfigMap.get(cluster);
1✔
773
        if (!Objects.equals(rlsPluginConfig, clusterRefs.get(cluster).rlsPluginConfig)) {
1✔
774
          ClusterRefState newClusterRefState =
1✔
775
              ClusterRefState.forRlsPlugin(clusterRefs.get(cluster).refCount, rlsPluginConfig);
1✔
776
          clusterRefs.put(cluster, newClusterRefState);
1✔
777
          shouldUpdateResult = true;
1✔
778
        }
779
      }
1✔
780
      // Update service config to include newly added clusters.
781
      if (shouldUpdateResult) {
1✔
782
        updateResolutionResult();
1✔
783
      }
784
      // Make newly added clusters selectable by config selector and deleted clusters no longer
785
      // selectable.
786
      routingConfig =
1✔
787
          new RoutingConfig(
788
              httpMaxStreamDurationNano, routes, filterConfigs,
789
              virtualHost.filterConfigOverrides());
1✔
790
      shouldUpdateResult = false;
1✔
791
      for (String cluster : deletedClusters) {
1✔
792
        int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
793
        if (count == 0) {
1✔
794
          clusterRefs.remove(cluster);
1✔
795
          shouldUpdateResult = true;
1✔
796
        }
797
      }
1✔
798
      if (shouldUpdateResult) {
1✔
799
        updateResolutionResult();
1✔
800
      }
801
    }
1✔
802

803
    private void cleanUpRoutes(String error) {
804
      if (existingClusters != null) {
1✔
805
        for (String cluster : existingClusters) {
1✔
806
          int count = clusterRefs.get(cluster).refCount.decrementAndGet();
1✔
807
          if (count == 0) {
1✔
808
            clusterRefs.remove(cluster);
1✔
809
          }
810
        }
1✔
811
        existingClusters = null;
1✔
812
      }
813
      routingConfig = RoutingConfig.empty;
1✔
814
      // Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
815
      // the config selector handles the error message itself. Once the LB API allows providing
816
      // failure information for addresses yet still providing a service config, the config seector
817
      // could be avoided.
818
      listener.onResult(ResolutionResult.newBuilder()
1✔
819
          .setAttributes(Attributes.newBuilder()
1✔
820
            .set(InternalConfigSelector.KEY,
1✔
821
              new FailingConfigSelector(Status.UNAVAILABLE.withDescription(error)))
1✔
822
            .build())
1✔
823
          .setServiceConfig(emptyServiceConfig)
1✔
824
          .build());
1✔
825
      receivedConfig = true;
1✔
826
    }
1✔
827

828
    private void cleanUpRouteDiscoveryState() {
829
      if (routeDiscoveryState != null) {
1✔
830
        String rdsName = routeDiscoveryState.resourceName;
1✔
831
        logger.log(XdsLogLevel.INFO, "Stop watching RDS resource {0}", rdsName);
1✔
832
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
833
            routeDiscoveryState);
834
        routeDiscoveryState = null;
1✔
835
      }
836
    }
1✔
837

838
    /**
839
     * Discovery state for RouteConfiguration resource. One instance for each Listener resource
840
     * update.
841
     */
842
    private class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
843
      private final String resourceName;
844
      private final long httpMaxStreamDurationNano;
845
      @Nullable
846
      private final List<NamedFilterConfig> filterConfigs;
847

848
      private RouteDiscoveryState(String resourceName, long httpMaxStreamDurationNano,
849
          @Nullable List<NamedFilterConfig> filterConfigs) {
1✔
850
        this.resourceName = resourceName;
1✔
851
        this.httpMaxStreamDurationNano = httpMaxStreamDurationNano;
1✔
852
        this.filterConfigs = filterConfigs;
1✔
853
      }
1✔
854

855
      @Override
856
      public void onChanged(final RdsUpdate update) {
857
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
858
          return;
×
859
        }
860
        logger.log(XdsLogLevel.INFO, "Received RDS resource update: {0}", update);
1✔
861
        updateRoutes(update.virtualHosts, httpMaxStreamDurationNano, filterConfigs);
1✔
862
      }
1✔
863

864
      @Override
865
      public void onError(final Status error) {
866
        if (RouteDiscoveryState.this != routeDiscoveryState || receivedConfig) {
1✔
867
          return;
×
868
        }
869
        listener.onError(Status.UNAVAILABLE.withCause(error.getCause()).withDescription(
1✔
870
            String.format("Unable to load RDS %s. xDS server returned: %s: %s",
1✔
871
            resourceName, error.getCode(), error.getDescription())));
1✔
872
      }
1✔
873

874
      @Override
875
      public void onResourceDoesNotExist(final String resourceName) {
876
        if (RouteDiscoveryState.this != routeDiscoveryState) {
1✔
877
          return;
×
878
        }
879
        String error = "RDS resource does not exist: " + resourceName;
1✔
880
        logger.log(XdsLogLevel.INFO, error);
1✔
881
        cleanUpRoutes(error);
1✔
882
      }
1✔
883
    }
884
  }
885

886
  /**
887
   * VirtualHost-level configuration for request routing.
888
   */
889
  private static class RoutingConfig {
890
    private final long fallbackTimeoutNano;
891
    final List<Route> routes;
892
    // Null if HttpFilter is not supported.
893
    @Nullable final List<NamedFilterConfig> filterChain;
894
    final Map<String, FilterConfig> virtualHostOverrideConfig;
895

896
    private static RoutingConfig empty = new RoutingConfig(
1✔
897
        0, Collections.emptyList(), null, Collections.emptyMap());
1✔
898

899
    private RoutingConfig(
900
        long fallbackTimeoutNano, List<Route> routes, @Nullable List<NamedFilterConfig> filterChain,
901
        Map<String, FilterConfig> virtualHostOverrideConfig) {
1✔
902
      this.fallbackTimeoutNano = fallbackTimeoutNano;
1✔
903
      this.routes = routes;
1✔
904
      checkArgument(filterChain == null || !filterChain.isEmpty(), "filterChain is empty");
1✔
905
      this.filterChain = filterChain == null ? null : Collections.unmodifiableList(filterChain);
1✔
906
      this.virtualHostOverrideConfig = Collections.unmodifiableMap(virtualHostOverrideConfig);
1✔
907
    }
1✔
908
  }
909

910
  private static class ClusterRefState {
911
    final AtomicInteger refCount;
912
    @Nullable
913
    final String traditionalCluster;
914
    @Nullable
915
    final RlsPluginConfig rlsPluginConfig;
916

917
    private ClusterRefState(
918
        AtomicInteger refCount, @Nullable String traditionalCluster,
919
        @Nullable RlsPluginConfig rlsPluginConfig) {
1✔
920
      this.refCount = refCount;
1✔
921
      checkArgument(traditionalCluster == null ^ rlsPluginConfig == null,
1✔
922
          "There must be exactly one non-null value in traditionalCluster and pluginConfig");
923
      this.traditionalCluster = traditionalCluster;
1✔
924
      this.rlsPluginConfig = rlsPluginConfig;
1✔
925
    }
1✔
926

927
    private Map<String, ?> toLbPolicy() {
928
      if (traditionalCluster != null) {
1✔
929
        return ImmutableMap.of(
1✔
930
            XdsLbPolicies.CDS_POLICY_NAME,
931
            ImmutableMap.of("cluster", traditionalCluster));
1✔
932
      } else {
933
        ImmutableMap<String, ?> rlsConfig = new ImmutableMap.Builder<String, Object>()
1✔
934
            .put("routeLookupConfig", rlsPluginConfig.config())
1✔
935
            .put(
1✔
936
                "childPolicy",
937
                ImmutableList.of(ImmutableMap.of(XdsLbPolicies.CDS_POLICY_NAME, ImmutableMap.of())))
1✔
938
            .put("childPolicyConfigTargetFieldName", "cluster")
1✔
939
            .buildOrThrow();
1✔
940
        return ImmutableMap.of("rls_experimental", rlsConfig);
1✔
941
      }
942
    }
943

944
    static ClusterRefState forCluster(AtomicInteger refCount, String name) {
945
      return new ClusterRefState(refCount, name, null);
1✔
946
    }
947

948
    static ClusterRefState forRlsPlugin(AtomicInteger refCount, RlsPluginConfig rlsPluginConfig) {
949
      return new ClusterRefState(refCount, null, rlsPluginConfig);
1✔
950
    }
951
  }
952
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc