• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20033

29 Oct 2025 04:43PM UTC coverage: 88.533% (-0.03%) from 88.561%
#20033

push

github

web-flow
xds,googleapis: Allow wrapping NameResolver to inject XdsClient (#12450)

Since there is no longer a single global XdsClient, it makes more sense
to allow things like the c2p name resolver to inject its own bootstrap
even if there is one defined in an environment variable.
GoogleCloudToProdNameResolver can now pass an XdsClient instance to
XdsNameResolver, and SharedXdsClientPoolProvider allows
GoogleCloudToProdNameResolver to choose the bootstrap for that one
specific target.

Since XdsNameResolver is no longer in control of the XdsClient pool the
XdsClient instance is now passed to ClusterImplLb. A channel will now
only access the global XdsClient pool exactly once: in the name
resolver.

BootstrapInfo is purposefully being shared across channels, as we really
want to share things like credentials which can have significant memory
use and may have caches which reduce I/O when shared. That is why
SharedXdsClientPoolProvider receives BootstrapInfo instead of
Map<String,?>.

Verifying BootstrapInfo.server() is not empty was moved from
SharedXdsClientPoolProvider to GrpcBootstrapperImpl so avoid
getOrCreate() throwing an exception in only that one case. It might make
sense to move that to BootstrapperImpl, but that will need more
investigation.

A lot of tests needed updating because XdsClientPoolProvider is no
longer responsible for parsing the bootstrap, so we now need bootstraps
even if XdsClientPoolProvider will ignore it.

This also fixes a bug in GoogleCloudToProdNameResolver where it would
initialize the delegate even when it failed to create the bootstrap.
That would certainly cause all RPCs on the channel to fail because of
the missing bootstrap and it defeated the point of `succeeded == false`
and `refresh()` which was supposed to retry contacting the metadata
server.

The server tests were enhanced to give a useful error when
server.start() throws an exception, as otherwise the real error is lost.

b/442819521

34966 of 39495 relevant lines covered (88.53%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.91
/../xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.auto.value.AutoValue;
24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import com.google.common.net.HostAndPort;
28
import com.google.common.net.InetAddresses;
29
import com.google.common.util.concurrent.SettableFuture;
30
import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol;
31
import io.grpc.Attributes;
32
import io.grpc.InternalServerInterceptors;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.MetricRecorder;
36
import io.grpc.Server;
37
import io.grpc.ServerBuilder;
38
import io.grpc.ServerCall;
39
import io.grpc.ServerCall.Listener;
40
import io.grpc.ServerCallHandler;
41
import io.grpc.ServerInterceptor;
42
import io.grpc.ServerServiceDefinition;
43
import io.grpc.Status;
44
import io.grpc.StatusException;
45
import io.grpc.SynchronizationContext;
46
import io.grpc.SynchronizationContext.ScheduledHandle;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.ObjectPool;
49
import io.grpc.internal.SharedResourceHolder;
50
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
51
import io.grpc.xds.Filter.FilterConfig;
52
import io.grpc.xds.Filter.NamedFilterConfig;
53
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
54
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
55
import io.grpc.xds.VirtualHost.Route;
56
import io.grpc.xds.XdsListenerResource.LdsUpdate;
57
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
58
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
59
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
60
import io.grpc.xds.client.XdsClient;
61
import io.grpc.xds.client.XdsClient.ResourceWatcher;
62
import io.grpc.xds.internal.security.SslContextProviderSupplier;
63
import java.io.IOException;
64
import java.net.InetAddress;
65
import java.net.SocketAddress;
66
import java.util.ArrayList;
67
import java.util.HashMap;
68
import java.util.HashSet;
69
import java.util.List;
70
import java.util.Map;
71
import java.util.Set;
72
import java.util.concurrent.CountDownLatch;
73
import java.util.concurrent.ExecutionException;
74
import java.util.concurrent.ScheduledExecutorService;
75
import java.util.concurrent.TimeUnit;
76
import java.util.concurrent.atomic.AtomicBoolean;
77
import java.util.concurrent.atomic.AtomicReference;
78
import java.util.logging.Level;
79
import java.util.logging.Logger;
80
import javax.annotation.Nullable;
81

82
final class XdsServerWrapper extends Server {
83
  private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
1✔
84

85
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
86
      new Thread.UncaughtExceptionHandler() {
1✔
87
        @Override
88
        public void uncaughtException(Thread t, Throwable e) {
89
          logger.log(Level.SEVERE, "Exception!" + e);
×
90
          // TODO(chengyuanzhang): implement cleanup.
91
        }
×
92
      });
93

94
  public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
95
      ATTR_SERVER_ROUTING_CONFIG =
1✔
96
      Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
1✔
97

98
  @VisibleForTesting
99
  static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
1✔
100
  private final String listenerAddress;
101
  private final ServerBuilder<?> delegateBuilder;
102
  private boolean sharedTimeService;
103
  private final ScheduledExecutorService timeService;
104
  private final FilterRegistry filterRegistry;
105
  private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
1✔
106
  private final XdsClientPoolFactory xdsClientPoolFactory;
107
  private final @Nullable Map<String, ?> bootstrapOverride;
108
  private final XdsServingStatusListener listener;
109
  private final FilterChainSelectorManager filterChainSelectorManager;
110
  private final AtomicBoolean started = new AtomicBoolean(false);
1✔
111
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
112
  private boolean isServing;
113
  private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
1✔
114
  private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
1✔
115
  private boolean initialStarted;
116
  private ScheduledHandle restartTimer;
117
  private ObjectPool<XdsClient> xdsClientPool;
118
  private XdsClient xdsClient;
119
  private DiscoveryState discoveryState;
120
  private volatile Server delegate;
121

122
  // Must be accessed in syncContext.
123
  // Filter instances are unique per Server, per FilterChain, and per filter's name+typeUrl.
124
  // FilterChain.name -> <NamedFilterConfig.filterStateKey -> filter_instance>.
125
  private final HashMap<String, HashMap<String, Filter>> activeFilters = new HashMap<>();
1✔
126
  // Default filter chain Filter instances are unique per Server, and per filter's name+typeUrl.
127
  // NamedFilterConfig.filterStateKey -> filter_instance.
128
  private final HashMap<String, Filter> activeFiltersDefaultChain = new HashMap<>();
1✔
129

130
  XdsServerWrapper(
131
      String listenerAddress,
132
      ServerBuilder<?> delegateBuilder,
133
      XdsServingStatusListener listener,
134
      FilterChainSelectorManager filterChainSelectorManager,
135
      XdsClientPoolFactory xdsClientPoolFactory,
136
      @Nullable Map<String, ?> bootstrapOverride,
137
      FilterRegistry filterRegistry) {
138
    this(
1✔
139
        listenerAddress,
140
        delegateBuilder,
141
        listener,
142
        filterChainSelectorManager,
143
        xdsClientPoolFactory,
144
        bootstrapOverride,
145
        filterRegistry,
146
        SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
1✔
147
    sharedTimeService = true;
1✔
148
  }
1✔
149

150
  @VisibleForTesting
151
  XdsServerWrapper(
152
          String listenerAddress,
153
          ServerBuilder<?> delegateBuilder,
154
          XdsServingStatusListener listener,
155
          FilterChainSelectorManager filterChainSelectorManager,
156
          XdsClientPoolFactory xdsClientPoolFactory,
157
          @Nullable Map<String, ?> bootstrapOverride,
158
          FilterRegistry filterRegistry,
159
          ScheduledExecutorService timeService) {
1✔
160
    this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
1✔
161
    this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
1✔
162
    this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
1✔
163
    this.listener = checkNotNull(listener, "listener");
1✔
164
    this.filterChainSelectorManager
1✔
165
        = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
1✔
166
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
167
    this.bootstrapOverride = bootstrapOverride;
1✔
168
    this.timeService = checkNotNull(timeService, "timeService");
1✔
169
    this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
1✔
170
    this.delegate = delegateBuilder.build();
1✔
171
  }
1✔
172

173
  @Override
174
  public Server start() throws IOException {
175
    checkState(started.compareAndSet(false, true), "Already started");
1✔
176
    syncContext.execute(new Runnable() {
1✔
177
      @Override
178
      public void run() {
179
        internalStart();
1✔
180
      }
1✔
181
    });
182
    Exception exception;
183
    try {
184
      exception = initialStartFuture.get();
1✔
185
    } catch (InterruptedException | ExecutionException e) {
×
186
      throw new RuntimeException(e);
×
187
    }
1✔
188
    if (exception != null) {
1✔
189
      throw (exception instanceof IOException) ? (IOException) exception :
1✔
190
              new IOException(exception);
1✔
191
    }
192
    return this;
1✔
193
  }
194

195
  private void internalStart() {
196
    try {
197
      BootstrapInfo bootstrapInfo;
198
      if (bootstrapOverride == null) {
1✔
199
        bootstrapInfo = GrpcBootstrapperImpl.defaultBootstrap();
×
200
      } else {
201
        bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride);
1✔
202
      }
203
      xdsClientPool = xdsClientPoolFactory.getOrCreate(
1✔
204
          "#server", bootstrapInfo, new MetricRecorder() {});
1✔
205
    } catch (Exception e) {
×
206
      StatusException statusException = Status.UNAVAILABLE.withDescription(
×
207
              "Failed to initialize xDS").withCause(e).asException();
×
208
      listener.onNotServing(statusException);
×
209
      initialStartFuture.set(statusException);
×
210
      return;
×
211
    }
1✔
212
    xdsClient = xdsClientPool.getObject();
1✔
213
    String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
1✔
214
    if (listenerTemplate == null) {
1✔
215
      StatusException statusException =
1✔
216
          Status.UNAVAILABLE.withDescription(
1✔
217
              "Can only support xDS v3 with listener resource name template").asException();
1✔
218
      listener.onNotServing(statusException);
1✔
219
      initialStartFuture.set(statusException);
1✔
220
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
221
      return;
1✔
222
    }
223
    String replacement = listenerAddress;
1✔
224
    if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
1✔
225
      replacement = XdsClient.percentEncodePath(replacement);
1✔
226
    }
227
    discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
1✔
228
  }
1✔
229

230
  @Override
231
  public Server shutdown() {
232
    if (!shutdown.compareAndSet(false, true)) {
1✔
233
      return this;
1✔
234
    }
235
    syncContext.execute(new Runnable() {
1✔
236
      @Override
237
      public void run() {
238
        if (!delegate.isShutdown()) {
1✔
239
          delegate.shutdown();
1✔
240
        }
241
        internalShutdown();
1✔
242
      }
1✔
243
    });
244
    return this;
1✔
245
  }
246

247
  @Override
248
  public Server shutdownNow() {
249
    if (!shutdown.compareAndSet(false, true)) {
1✔
250
      return this;
1✔
251
    }
252
    syncContext.execute(new Runnable() {
1✔
253
      @Override
254
      public void run() {
255
        if (!delegate.isShutdown()) {
1✔
256
          delegate.shutdownNow();
1✔
257
        }
258
        internalShutdown();
1✔
259
        initialStartFuture.set(new IOException("server is forcefully shut down"));
1✔
260
      }
1✔
261
    });
262
    return this;
1✔
263
  }
264

265
  // Must run in SynchronizationContext
266
  private void internalShutdown() {
267
    logger.log(Level.FINER, "Shutting down XdsServerWrapper");
1✔
268
    if (discoveryState != null) {
1✔
269
      discoveryState.shutdown();
1✔
270
    }
271
    if (xdsClient != null) {
1✔
272
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
273
    }
274
    if (restartTimer != null) {
1✔
275
      restartTimer.cancel();
1✔
276
    }
277
    if (sharedTimeService) {
1✔
278
      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
1✔
279
    }
280
    isServing = false;
1✔
281
    internalTerminationLatch.countDown();
1✔
282
  }
1✔
283

284
  @Override
285
  public boolean isShutdown() {
286
    return shutdown.get();
1✔
287
  }
288

289
  @Override
290
  public boolean isTerminated() {
291
    return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
1✔
292
  }
293

294
  @Override
295
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
296
    long startTime = System.nanoTime();
1✔
297
    if (!internalTerminationLatch.await(timeout, unit)) {
1✔
298
      return false;
×
299
    }
300
    long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
1✔
301
    return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
1✔
302
  }
303

304
  @Override
305
  public void awaitTermination() throws InterruptedException {
306
    internalTerminationLatch.await();
1✔
307
    delegate.awaitTermination();
1✔
308
  }
1✔
309

310
  @Override
311
  public int getPort() {
312
    return delegate.getPort();
1✔
313
  }
314

315
  @Override
316
  public List<? extends SocketAddress> getListenSockets() {
317
    return delegate.getListenSockets();
1✔
318
  }
319

320
  @Override
321
  public List<ServerServiceDefinition> getServices() {
322
    return delegate.getServices();
×
323
  }
324

325
  @Override
326
  public List<ServerServiceDefinition> getImmutableServices() {
327
    return delegate.getImmutableServices();
×
328
  }
329

330
  @Override
331
  public List<ServerServiceDefinition> getMutableServices() {
332
    return delegate.getMutableServices();
×
333
  }
334

335
  // Must run in SynchronizationContext
336
  private void startDelegateServer() {
337
    if (restartTimer != null && restartTimer.isPending()) {
1✔
338
      return;
×
339
    }
340
    if (isServing) {
1✔
341
      return;
1✔
342
    }
343
    if (delegate.isShutdown()) {
1✔
344
      delegate = delegateBuilder.build();
1✔
345
    }
346
    try {
347
      delegate.start();
1✔
348
      listener.onServing();
1✔
349
      isServing = true;
1✔
350
      if (!initialStarted) {
1✔
351
        initialStarted = true;
1✔
352
        initialStartFuture.set(null);
1✔
353
      }
354
      logger.log(Level.FINER, "Delegate server started.");
1✔
355
    } catch (IOException e) {
1✔
356
      logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
1✔
357
      if (!initialStarted) {
1✔
358
        initialStarted = true;
1✔
359
        initialStartFuture.set(e);
1✔
360
      } else {
361
        listener.onNotServing(e);
1✔
362
      }
363
      restartTimer = syncContext.schedule(
1✔
364
        new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
365
    }
1✔
366
  }
1✔
367

368
  private final class RestartTask implements Runnable {
1✔
369
    @Override
370
    public void run() {
371
      startDelegateServer();
1✔
372
    }
1✔
373
  }
374

375
  private final class DiscoveryState implements ResourceWatcher<LdsUpdate> {
376
    private final String resourceName;
377
    // RDS resource name is the key.
378
    private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
1✔
379
    // Track pending RDS resources using rds name.
380
    private final Set<String> pendingRds = new HashSet<>();
1✔
381
    // Most recently discovered filter chains.
382
    private List<FilterChain> filterChains = new ArrayList<>();
1✔
383
    // Most recently discovered default filter chain.
384
    @Nullable
385
    private FilterChain defaultFilterChain;
386
    private boolean stopped;
387
    private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef 
1✔
388
        = new HashMap<>();
389
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
390
      @Override
391
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
392
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
393
        return next.startCall(call, headers);
1✔
394
      }
395
    };
396

397
    private DiscoveryState(String resourceName) {
1✔
398
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
399
      xdsClient.watchXdsResource(
1✔
400
          XdsListenerResource.getInstance(), resourceName, this, syncContext);
1✔
401
    }
1✔
402

403
    @Override
404
    public void onChanged(final LdsUpdate update) {
405
      if (stopped) {
1✔
406
        return;
×
407
      }
408
      logger.log(Level.FINEST, "Received Lds update {0}", update);
1✔
409
      if (update.listener() == null) {
1✔
410
        onResourceDoesNotExist("Non-API");
1✔
411
        return;
1✔
412
      }
413

414
      String ldsAddress = update.listener().address();
1✔
415
      if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
1✔
416
          || !ipAddressesMatch(ldsAddress)) {
1✔
417
        handleConfigNotFoundOrMismatch(
1✔
418
            Status.UNKNOWN.withDescription(
1✔
419
                String.format(
1✔
420
                    "Listener address mismatch: expected %s, but got %s.",
421
                    listenerAddress, ldsAddress)).asException());
1✔
422
        return;
1✔
423
      }
424
      if (!pendingRds.isEmpty()) {
1✔
425
        // filter chain state has not yet been applied to filterChainSelectorManager and there
426
        // are two sets of sslContextProviderSuppliers, so we release the old ones.
427
        releaseSuppliersInFlight();
1✔
428
        pendingRds.clear();
1✔
429
      }
430

431
      filterChains = update.listener().filterChains();
1✔
432
      defaultFilterChain = update.listener().defaultFilterChain();
1✔
433
      // Filters are loaded even if the server isn't serving yet.
434
      updateActiveFilters();
1✔
435

436
      List<FilterChain> allFilterChains = filterChains;
1✔
437
      if (defaultFilterChain != null) {
1✔
438
        allFilterChains = new ArrayList<>(filterChains);
1✔
439
        allFilterChains.add(defaultFilterChain);
1✔
440
      }
441

442
      Set<String> allRds = new HashSet<>();
1✔
443
      for (FilterChain filterChain : allFilterChains) {
1✔
444
        HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
445
        if (hcm.virtualHosts() == null) {
1✔
446
          RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
1✔
447
          if (rdsState == null) {
1✔
448
            rdsState = new RouteDiscoveryState(hcm.rdsName());
1✔
449
            routeDiscoveryStates.put(hcm.rdsName(), rdsState);
1✔
450
            xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
451
                hcm.rdsName(), rdsState, syncContext);
1✔
452
          }
453
          if (rdsState.isPending) {
1✔
454
            pendingRds.add(hcm.rdsName());
1✔
455
          }
456
          allRds.add(hcm.rdsName());
1✔
457
        }
458
      }
1✔
459

460
      for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
1✔
461
        if (!allRds.contains(entry.getKey())) {
1✔
462
          xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
1✔
463
              entry.getKey(), entry.getValue());
1✔
464
        }
465
      }
1✔
466
      routeDiscoveryStates.keySet().retainAll(allRds);
1✔
467
      if (pendingRds.isEmpty()) {
1✔
468
        updateSelector();
1✔
469
      }
470
    }
1✔
471

472
    private boolean ipAddressesMatch(String ldsAddress) {
473
      HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
1✔
474
      HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
1✔
475
      if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
1✔
476
          || ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
1✔
477
        return false;
1✔
478
      }
479
      InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
1✔
480
      InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
1✔
481
      return listenerIp.equals(ldsIp);
1✔
482
    }
483

484
    @Override
485
    public void onResourceDoesNotExist(final String resourceName) {
486
      if (stopped) {
1✔
487
        return;
×
488
      }
489
      StatusException statusException = Status.UNAVAILABLE.withDescription(
1✔
490
          String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
1✔
491
              xdsClient.getBootstrapInfo().node().getId())).asException();
1✔
492
      handleConfigNotFoundOrMismatch(statusException);
1✔
493
    }
1✔
494

495
    @Override
496
    public void onError(final Status error) {
497
      if (stopped) {
1✔
498
        return;
×
499
      }
500
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
501
      Status errorWithNodeId = error.withDescription(
1✔
502
          description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
503
      logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
1✔
504
      if (!isServing) {
1✔
505
        listener.onNotServing(errorWithNodeId.asException());
1✔
506
      }
507
    }
1✔
508

509
    private void shutdown() {
510
      stopped = true;
1✔
511
      cleanUpRouteDiscoveryStates();
1✔
512
      logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
1✔
513
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this);
1✔
514
      shutdownActiveFilters();
1✔
515
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
516
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
517
      for (SslContextProviderSupplier s: toRelease) {
1✔
518
        s.close();
1✔
519
      }
1✔
520
      releaseSuppliersInFlight();
1✔
521
    }
1✔
522

523
    private void updateSelector() {
524
      // This is regenerated in generateRoutingConfig() calls below.
525
      savedRdsRoutingConfigRef.clear();
1✔
526

527
      // Prepare server routing config map.
528
      ImmutableMap.Builder<FilterChain, AtomicReference<ServerRoutingConfig>> routingConfigs =
529
          ImmutableMap.builder();
1✔
530
      for (FilterChain filterChain: filterChains) {
1✔
531
        HashMap<String, Filter> chainFilters = activeFilters.get(filterChain.name());
1✔
532
        routingConfigs.put(filterChain, generateRoutingConfig(filterChain, chainFilters));
1✔
533
      }
1✔
534

535
      // Prepare the new selector.
536
      FilterChainSelector selector;
537
      if (defaultFilterChain != null) {
1✔
538
        selector = new FilterChainSelector(
1✔
539
            routingConfigs.build(),
1✔
540
            defaultFilterChain.sslContextProviderSupplier(),
1✔
541
            generateRoutingConfig(defaultFilterChain, activeFiltersDefaultChain));
1✔
542
      } else {
543
        selector = new FilterChainSelector(routingConfigs.build());
1✔
544
      }
545

546
      // Prepare the list of current selector's resources to close later.
547
      List<SslContextProviderSupplier> oldSslSuppliers = getSuppliersInUse();
1✔
548

549
      // Swap the selectors, initiate a graceful shutdown of the old one.
550
      logger.log(Level.FINEST, "Updating selector {0}", selector);
1✔
551
      filterChainSelectorManager.updateSelector(selector);
1✔
552

553
      // Release old resources.
554
      for (SslContextProviderSupplier supplier: oldSslSuppliers) {
1✔
555
        supplier.close();
1✔
556
      }
1✔
557

558
      // Now that we have valid Transport Socket config, we can start/restart listening on a port.
559
      startDelegateServer();
1✔
560
    }
1✔
561

562
    // called in syncContext
563
    private void updateActiveFilters() {
564
      Set<String> removedChains = new HashSet<>(activeFilters.keySet());
1✔
565
      for (FilterChain filterChain: filterChains) {
1✔
566
        removedChains.remove(filterChain.name());
1✔
567
        updateActiveFiltersForChain(
1✔
568
            activeFilters.computeIfAbsent(filterChain.name(), k -> new HashMap<>()),
1✔
569
            filterChain.httpConnectionManager().httpFilterConfigs());
1✔
570
      }
1✔
571

572
      // Shutdown all filters of chains missing from the LDS.
573
      for (String chainToShutdown : removedChains) {
1✔
574
        HashMap<String, Filter> filtersToShutdown = activeFilters.get(chainToShutdown);
1✔
575
        checkNotNull(filtersToShutdown, "filtersToShutdown of chain %s", chainToShutdown);
1✔
576
        updateActiveFiltersForChain(filtersToShutdown, null);
1✔
577
        activeFilters.remove(chainToShutdown);
1✔
578
      }
1✔
579

580
      // Default chain.
581
      ImmutableList<NamedFilterConfig> defaultChainConfigs = null;
1✔
582
      if (defaultFilterChain != null) {
1✔
583
        defaultChainConfigs = defaultFilterChain.httpConnectionManager().httpFilterConfigs();
1✔
584
      }
585
      updateActiveFiltersForChain(activeFiltersDefaultChain, defaultChainConfigs);
1✔
586
    }
1✔
587

588
    // called in syncContext
589
    private void shutdownActiveFilters() {
590
      for (HashMap<String, Filter> chainFilters : activeFilters.values()) {
1✔
591
        checkNotNull(chainFilters, "chainFilters");
1✔
592
        updateActiveFiltersForChain(chainFilters, null);
1✔
593
      }
1✔
594
      activeFilters.clear();
1✔
595
      updateActiveFiltersForChain(activeFiltersDefaultChain, null);
1✔
596
    }
1✔
597

598
    // called in syncContext
599
    private void updateActiveFiltersForChain(
600
        Map<String, Filter> chainFilters, @Nullable List<NamedFilterConfig> filterConfigs) {
601
      if (filterConfigs == null) {
1✔
602
        filterConfigs = ImmutableList.of();
1✔
603
      }
604

605
      Set<String> filtersToShutdown = new HashSet<>(chainFilters.keySet());
1✔
606
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
607
        String typeUrl = namedFilter.filterConfig.typeUrl();
1✔
608
        String filterKey = namedFilter.filterStateKey();
1✔
609

610
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
611
        checkNotNull(provider, "provider %s", typeUrl);
1✔
612
        Filter filter = chainFilters.computeIfAbsent(
1✔
613
            filterKey, k -> provider.newInstance(namedFilter.name));
1✔
614
        checkNotNull(filter, "filter %s", filterKey);
1✔
615
        filtersToShutdown.remove(filterKey);
1✔
616
      }
1✔
617

618
      // Shutdown filters not present in current HCM.
619
      for (String filterKey : filtersToShutdown) {
1✔
620
        Filter filterToShutdown = chainFilters.remove(filterKey);
1✔
621
        checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
1✔
622
        filterToShutdown.close();
1✔
623
      }
1✔
624
    }
1✔
625

626
    private AtomicReference<ServerRoutingConfig> generateRoutingConfig(
627
        FilterChain filterChain, Map<String, Filter> chainFilters) {
628
      HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
629
      ServerRoutingConfig routingConfig;
630

631
      // Inlined routes.
632
      ImmutableList<VirtualHost> vhosts = hcm.virtualHosts();
1✔
633
      if (vhosts != null) {
1✔
634
        routingConfig = ServerRoutingConfig.create(vhosts,
1✔
635
            generatePerRouteInterceptors(hcm.httpFilterConfigs(), vhosts, chainFilters));
1✔
636
        return new AtomicReference<>(routingConfig);
1✔
637
      }
638

639
      // Routes from RDS.
640
      RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
1✔
641
      checkNotNull(rds, "rds");
1✔
642

643
      ImmutableList<VirtualHost> savedVhosts = rds.savedVirtualHosts;
1✔
644
      if (savedVhosts != null) {
1✔
645
        routingConfig = ServerRoutingConfig.create(savedVhosts,
1✔
646
            generatePerRouteInterceptors(hcm.httpFilterConfigs(), savedVhosts, chainFilters));
1✔
647
      } else {
648
        routingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
649
      }
650
      AtomicReference<ServerRoutingConfig> routingConfigRef = new AtomicReference<>(routingConfig);
1✔
651
      savedRdsRoutingConfigRef.put(filterChain, routingConfigRef);
1✔
652
      return routingConfigRef;
1✔
653
    }
654

655
    private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
656
        @Nullable List<NamedFilterConfig> filterConfigs,
657
        List<VirtualHost> virtualHosts,
658
        Map<String, Filter> chainFilters) {
659
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
660

661
      checkNotNull(chainFilters, "chainFilters");
1✔
662
      ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
1✔
663
          new ImmutableMap.Builder<>();
664

665
      for (VirtualHost virtualHost : virtualHosts) {
1✔
666
        for (Route route : virtualHost.routes()) {
1✔
667
          // Short circuit.
668
          if (filterConfigs == null) {
1✔
669
            perRouteInterceptors.put(route, noopInterceptor);
×
670
            continue;
×
671
          }
672

673
          // Override vhost filter configs with more specific per-route configs.
674
          Map<String, FilterConfig> perRouteOverrides = ImmutableMap.<String, FilterConfig>builder()
1✔
675
              .putAll(virtualHost.filterConfigOverrides())
1✔
676
              .putAll(route.filterConfigOverrides())
1✔
677
              .buildKeepingLast();
1✔
678

679
          // Interceptors for this vhost/route combo.
680
          List<ServerInterceptor> interceptors = new ArrayList<>(filterConfigs.size());
1✔
681
          for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
682
            String name = namedFilter.name;
1✔
683
            FilterConfig config = namedFilter.filterConfig;
1✔
684
            FilterConfig overrideConfig = perRouteOverrides.get(name);
1✔
685
            String filterKey = namedFilter.filterStateKey();
1✔
686

687
            Filter filter = chainFilters.get(filterKey);
1✔
688
            checkNotNull(filter, "chainFilters.get(%s)", filterKey);
1✔
689
            ServerInterceptor interceptor = filter.buildServerInterceptor(config, overrideConfig);
1✔
690

691
            if (interceptor != null) {
1✔
692
              interceptors.add(interceptor);
1✔
693
            }
694
          }
1✔
695

696
          // Combine interceptors produced by different filters into a single one that executes
697
          // them sequentially. The order is preserved.
698
          perRouteInterceptors.put(route, combineInterceptors(interceptors));
1✔
699
        }
1✔
700
      }
1✔
701

702
      return perRouteInterceptors.buildOrThrow();
1✔
703
    }
704

705
    private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
706
      if (interceptors.isEmpty()) {
1✔
707
        return noopInterceptor;
1✔
708
      }
709
      if (interceptors.size() == 1) {
1✔
710
        return interceptors.get(0);
×
711
      }
712
      return new ServerInterceptor() {
1✔
713
        @Override
714
        public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
715
            Metadata headers, ServerCallHandler<ReqT, RespT> next) {
716
          // intercept forward
717
          for (int i = interceptors.size() - 1; i >= 0; i--) {
1✔
718
            next = InternalServerInterceptors.interceptCallHandlerCreate(
1✔
719
                interceptors.get(i), next);
1✔
720
          }
721
          return next.startCall(call, headers);
1✔
722
        }
723
      };
724
    }
725

726
    private void handleConfigNotFoundOrMismatch(StatusException exception) {
727
      cleanUpRouteDiscoveryStates();
1✔
728
      shutdownActiveFilters();
1✔
729
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
730
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
731
      for (SslContextProviderSupplier s: toRelease) {
1✔
732
        s.close();
1✔
733
      }
1✔
734
      if (restartTimer != null) {
1✔
735
        restartTimer.cancel();
1✔
736
      }
737
      if (!delegate.isShutdown()) {
1✔
738
        delegate.shutdown();  // let in-progress calls finish
1✔
739
      }
740
      isServing = false;
1✔
741
      listener.onNotServing(exception);
1✔
742
    }
1✔
743

744
    private void cleanUpRouteDiscoveryStates() {
745
      for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
1✔
746
        String rdsName = rdsState.resourceName;
1✔
747
        logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
1✔
748
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
749
            rdsState);
750
      }
1✔
751
      routeDiscoveryStates.clear();
1✔
752
      savedRdsRoutingConfigRef.clear();
1✔
753
    }
1✔
754

755
    private List<SslContextProviderSupplier> getSuppliersInUse() {
756
      List<SslContextProviderSupplier> toRelease = new ArrayList<>();
1✔
757
      FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
1✔
758
      if (selector != null) {
1✔
759
        for (FilterChain f: selector.getRoutingConfigs().keySet()) {
1✔
760
          if (f.sslContextProviderSupplier() != null) {
1✔
761
            toRelease.add(f.sslContextProviderSupplier());
1✔
762
          }
763
        }
1✔
764
        SslContextProviderSupplier defaultSupplier =
1✔
765
                selector.getDefaultSslContextProviderSupplier();
1✔
766
        if (defaultSupplier != null) {
1✔
767
          toRelease.add(defaultSupplier);
1✔
768
        }
769
      }
770
      return toRelease;
1✔
771
    }
772

773
    private void releaseSuppliersInFlight() {
774
      SslContextProviderSupplier supplier;
775
      for (FilterChain filterChain : filterChains) {
1✔
776
        supplier = filterChain.sslContextProviderSupplier();
1✔
777
        if (supplier != null) {
1✔
778
          supplier.close();
1✔
779
        }
780
      }
1✔
781
      if (defaultFilterChain != null
1✔
782
              && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
1✔
783
        supplier.close();
1✔
784
      }
785
    }
1✔
786

787
    private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
788
      private final String resourceName;
789
      private ImmutableList<VirtualHost> savedVirtualHosts;
790
      private boolean isPending = true;
1✔
791

792
      private RouteDiscoveryState(String resourceName) {
1✔
793
        this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
794
      }
1✔
795

796
      @Override
797
      public void onChanged(final RdsUpdate update) {
798
        syncContext.execute(new Runnable() {
1✔
799
          @Override
800
          public void run() {
801
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
802
              return;
1✔
803
            }
804
            if (savedVirtualHosts == null && !isPending) {
1✔
805
              logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
1✔
806
            }
807
            savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
1✔
808
            updateRdsRoutingConfig();
1✔
809
            maybeUpdateSelector();
1✔
810
          }
1✔
811
        });
812
      }
1✔
813

814
      @Override
815
      public void onResourceDoesNotExist(final String resourceName) {
816
        syncContext.execute(new Runnable() {
1✔
817
          @Override
818
          public void run() {
819
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
820
              return;
×
821
            }
822
            logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
1✔
823
            savedVirtualHosts = null;
1✔
824
            updateRdsRoutingConfig();
1✔
825
            maybeUpdateSelector();
1✔
826
          }
1✔
827
        });
828
      }
1✔
829

830
      @Override
831
      public void onError(final Status error) {
832
        syncContext.execute(new Runnable() {
1✔
833
          @Override
834
          public void run() {
835
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
836
              return;
×
837
            }
838
            String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
839
            Status errorWithNodeId = error.withDescription(
1✔
840
                    description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
841
            logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
1✔
842
                    new Object[]{resourceName, errorWithNodeId});
1✔
843
            maybeUpdateSelector();
1✔
844
          }
1✔
845
        });
846
      }
1✔
847

848
      private void updateRdsRoutingConfig() {
849
        for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
1✔
850
          HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
851
          if (!resourceName.equals(hcm.rdsName())) {
1✔
852
            continue;
1✔
853
          }
854

855
          ServerRoutingConfig updatedRoutingConfig;
856
          if (savedVirtualHosts == null) {
1✔
857
            updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
858
          } else {
859
            HashMap<String, Filter> chainFilters = activeFilters.get(filterChain.name());
1✔
860
            ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
1✔
861
                hcm.httpFilterConfigs(), savedVirtualHosts, chainFilters);
1✔
862
            updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts, interceptors);
1✔
863
          }
864

865
          logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
1✔
866
              new Object[]{filterChain.name(), updatedRoutingConfig});
1✔
867
          savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
1✔
868
        }
1✔
869
      }
1✔
870

871
      // Update the selector to use the most recently updated configs only after all rds have been
872
      // discovered for the first time. Later changes on rds will be applied through virtual host
873
      // list atomic ref.
874
      private void maybeUpdateSelector() {
875
        isPending = false;
1✔
876
        boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
1✔
877
        if (isLastPending) {
1✔
878
          updateSelector();
1✔
879
        }
880
      }
1✔
881
    }
882
  }
883

884
  @VisibleForTesting
885
  final class ConfigApplyingInterceptor implements ServerInterceptor {
1✔
886
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
887
      @Override
888
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
889
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
890
        return next.startCall(call, headers);
×
891
      }
892
    };
893

894
    @Override
895
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
896
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
897
      AtomicReference<ServerRoutingConfig> routingConfigRef =
1✔
898
          call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
1✔
899
      ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
1✔
900
          routingConfigRef.get();
1✔
901
      if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
1✔
902
        String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
1✔
903
        call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
1✔
904
        return new Listener<ReqT>() {};
1✔
905
      }
906
      List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
1✔
907
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
908
          virtualHosts, call.getAuthority());
1✔
909
      if (virtualHost == null) {
1✔
910
        call.close(
1✔
911
            Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
1✔
912
            new Metadata());
913
        return new Listener<ReqT>() {};
1✔
914
      }
915
      Route selectedRoute = null;
1✔
916
      MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
1✔
917
      for (Route route : virtualHost.routes()) {
1✔
918
        if (RoutingUtils.matchRoute(
1✔
919
            route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
1✔
920
          selectedRoute = route;
1✔
921
          break;
1✔
922
        }
923
      }
1✔
924
      if (selectedRoute == null) {
1✔
925
        call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
1✔
926
            new Metadata());
927
        return new ServerCall.Listener<ReqT>() {};
1✔
928
      }
929
      if (selectedRoute.routeAction() != null) {
1✔
930
        call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
1✔
931
            + "route: only Route.non_forwarding_action should be allowed."), new Metadata());
932
        return new ServerCall.Listener<ReqT>() {};
1✔
933
      }
934
      ServerInterceptor routeInterceptor = noopInterceptor;
1✔
935
      Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
1✔
936
      if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
1✔
937
        routeInterceptor = perRouteInterceptors.get(selectedRoute);
1✔
938
      }
939
      return routeInterceptor.interceptCall(call, headers, next);
1✔
940
    }
941
  }
942

943
  /**
944
   * The HttpConnectionManager level configuration.
945
   */
946
  @AutoValue
947
  abstract static class ServerRoutingConfig {
1✔
948
    @VisibleForTesting
949
    static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
1✔
950
        ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
1✔
951

952
    abstract ImmutableList<VirtualHost> virtualHosts();
953

954
    // Prebuilt per route server interceptors from http filter configs.
955
    abstract ImmutableMap<Route, ServerInterceptor> interceptors();
956

957
    /**
958
     * Server routing configuration.
959
     * */
960
    public static ServerRoutingConfig create(
961
        ImmutableList<VirtualHost> virtualHosts,
962
        ImmutableMap<Route, ServerInterceptor> interceptors) {
963
      checkNotNull(virtualHosts, "virtualHosts");
1✔
964
      checkNotNull(interceptors, "interceptors");
1✔
965
      return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
1✔
966
    }
967
  }
968
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc