• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19702

18 Feb 2025 06:47PM UTC coverage: 88.587% (-0.005%) from 88.592%
#19702

push

github

web-flow
xds: Change how xDS filters are created by introducing Filter.Provider (#11883)

This is the first step towards supporting filter state retention in
Java. The mechanism will be similar to the one described in [A83]
(https://github.com/grpc/proposal/blob/master/A83-xds-gcp-authn-filter.md#filter-call-credentials-cache)
for C-core, and will serve the same purpose. However, the
implementation details are very different due to the different nature
of xDS HTTP filter support in C-core and Java.

In Java, xDS HTTP filters are backed by classes implementing
`io.grpc.xds.Filter`, from here just called "Filters". To support
Filter state retention (next PR), Java's xDS implementation must be
able to create unique Filter instances per:
- Per HCM
  `envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager`
- Per filter name as specified in
  `envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter.name`

This PR **does not** implements Filter state retention, but lays the
groundwork for it by changing how filters are registered and
instantiated. To achieve this, all existing Filter classes had to be
updated to the new instantiation mechanism described below.

Prior to these this PR, Filters had no livecycle. FilterRegistry
provided singleton instances for a given typeUrl. This PR introduces
a new interface `Filter.Provider`, which instantiates Filter classes.
All functionality that doesn't need an instance of a Filter is moved
to the Filter.Provider. This includes parsing filter config proto
into FilterConfig and determining the filter kind
(client-side, server-side, or both).

This PR is limited to refactoring, and there's no changes to the
existing behavior. Note that all Filter Providers still return
singleton Filter instances. However, with this PR, it is now possible
to create Providers that return a new Filter instance each time
`newInstance` is called.

34252 of 38665 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.73
/../xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.auto.value.AutoValue;
24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.InternalServerInterceptors;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.MetricRecorder;
33
import io.grpc.Server;
34
import io.grpc.ServerBuilder;
35
import io.grpc.ServerCall;
36
import io.grpc.ServerCall.Listener;
37
import io.grpc.ServerCallHandler;
38
import io.grpc.ServerInterceptor;
39
import io.grpc.ServerServiceDefinition;
40
import io.grpc.Status;
41
import io.grpc.StatusException;
42
import io.grpc.SynchronizationContext;
43
import io.grpc.SynchronizationContext.ScheduledHandle;
44
import io.grpc.internal.GrpcUtil;
45
import io.grpc.internal.ObjectPool;
46
import io.grpc.internal.SharedResourceHolder;
47
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
48
import io.grpc.xds.Filter.FilterConfig;
49
import io.grpc.xds.Filter.NamedFilterConfig;
50
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
51
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
52
import io.grpc.xds.VirtualHost.Route;
53
import io.grpc.xds.XdsListenerResource.LdsUpdate;
54
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
55
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
56
import io.grpc.xds.client.XdsClient;
57
import io.grpc.xds.client.XdsClient.ResourceWatcher;
58
import io.grpc.xds.internal.security.SslContextProviderSupplier;
59
import java.io.IOException;
60
import java.net.SocketAddress;
61
import java.util.ArrayList;
62
import java.util.HashMap;
63
import java.util.HashSet;
64
import java.util.List;
65
import java.util.Map;
66
import java.util.Set;
67
import java.util.concurrent.CountDownLatch;
68
import java.util.concurrent.ExecutionException;
69
import java.util.concurrent.ScheduledExecutorService;
70
import java.util.concurrent.TimeUnit;
71
import java.util.concurrent.atomic.AtomicBoolean;
72
import java.util.concurrent.atomic.AtomicReference;
73
import java.util.logging.Level;
74
import java.util.logging.Logger;
75
import javax.annotation.Nullable;
76

77
final class XdsServerWrapper extends Server {
78
  private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
1✔
79

80
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
81
      new Thread.UncaughtExceptionHandler() {
1✔
82
        @Override
83
        public void uncaughtException(Thread t, Throwable e) {
84
          logger.log(Level.SEVERE, "Exception!" + e);
×
85
          // TODO(chengyuanzhang): implement cleanup.
86
        }
×
87
      });
88

89
  public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
90
      ATTR_SERVER_ROUTING_CONFIG =
1✔
91
      Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
1✔
92

93
  @VisibleForTesting
94
  static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
1✔
95
  private final String listenerAddress;
96
  private final ServerBuilder<?> delegateBuilder;
97
  private boolean sharedTimeService;
98
  private final ScheduledExecutorService timeService;
99
  private final FilterRegistry filterRegistry;
100
  private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
1✔
101
  private final XdsClientPoolFactory xdsClientPoolFactory;
102
  private final XdsServingStatusListener listener;
103
  private final FilterChainSelectorManager filterChainSelectorManager;
104
  private final AtomicBoolean started = new AtomicBoolean(false);
1✔
105
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
106
  private boolean isServing;
107
  private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
1✔
108
  private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
1✔
109
  private boolean initialStarted;
110
  private ScheduledHandle restartTimer;
111
  private ObjectPool<XdsClient> xdsClientPool;
112
  private XdsClient xdsClient;
113
  private DiscoveryState discoveryState;
114
  private volatile Server delegate;
115

116
  XdsServerWrapper(
117
      String listenerAddress,
118
      ServerBuilder<?> delegateBuilder,
119
      XdsServingStatusListener listener,
120
      FilterChainSelectorManager filterChainSelectorManager,
121
      XdsClientPoolFactory xdsClientPoolFactory,
122
      FilterRegistry filterRegistry) {
123
    this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager,
1✔
124
        xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
1✔
125
    sharedTimeService = true;
1✔
126
  }
1✔
127

128
  @VisibleForTesting
129
  XdsServerWrapper(
130
          String listenerAddress,
131
          ServerBuilder<?> delegateBuilder,
132
          XdsServingStatusListener listener,
133
          FilterChainSelectorManager filterChainSelectorManager,
134
          XdsClientPoolFactory xdsClientPoolFactory,
135
          FilterRegistry filterRegistry,
136
          ScheduledExecutorService timeService) {
1✔
137
    this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
1✔
138
    this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
1✔
139
    this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
1✔
140
    this.listener = checkNotNull(listener, "listener");
1✔
141
    this.filterChainSelectorManager
1✔
142
        = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
1✔
143
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
144
    this.timeService = checkNotNull(timeService, "timeService");
1✔
145
    this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
1✔
146
    this.delegate = delegateBuilder.build();
1✔
147
  }
1✔
148

149
  @Override
150
  public Server start() throws IOException {
151
    checkState(started.compareAndSet(false, true), "Already started");
1✔
152
    syncContext.execute(new Runnable() {
1✔
153
      @Override
154
      public void run() {
155
        internalStart();
1✔
156
      }
1✔
157
    });
158
    Exception exception;
159
    try {
160
      exception = initialStartFuture.get();
1✔
161
    } catch (InterruptedException | ExecutionException e) {
×
162
      throw new RuntimeException(e);
×
163
    }
1✔
164
    if (exception != null) {
1✔
165
      throw (exception instanceof IOException) ? (IOException) exception :
1✔
166
              new IOException(exception);
1✔
167
    }
168
    return this;
1✔
169
  }
170

171
  private void internalStart() {
172
    try {
173
      xdsClientPool = xdsClientPoolFactory.getOrCreate("#server", new MetricRecorder() {});
1✔
174
    } catch (Exception e) {
×
175
      StatusException statusException = Status.UNAVAILABLE.withDescription(
×
176
              "Failed to initialize xDS").withCause(e).asException();
×
177
      listener.onNotServing(statusException);
×
178
      initialStartFuture.set(statusException);
×
179
      return;
×
180
    }
1✔
181
    xdsClient = xdsClientPool.getObject();
1✔
182
    String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
1✔
183
    if (listenerTemplate == null) {
1✔
184
      StatusException statusException =
1✔
185
          Status.UNAVAILABLE.withDescription(
1✔
186
              "Can only support xDS v3 with listener resource name template").asException();
1✔
187
      listener.onNotServing(statusException);
1✔
188
      initialStartFuture.set(statusException);
1✔
189
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
190
      return;
1✔
191
    }
192
    String replacement = listenerAddress;
1✔
193
    if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
1✔
194
      replacement = XdsClient.percentEncodePath(replacement);
1✔
195
    }
196
    discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
1✔
197
  }
1✔
198

199
  @Override
200
  public Server shutdown() {
201
    if (!shutdown.compareAndSet(false, true)) {
1✔
202
      return this;
1✔
203
    }
204
    syncContext.execute(new Runnable() {
1✔
205
      @Override
206
      public void run() {
207
        if (!delegate.isShutdown()) {
1✔
208
          delegate.shutdown();
1✔
209
        }
210
        internalShutdown();
1✔
211
      }
1✔
212
    });
213
    return this;
1✔
214
  }
215

216
  @Override
217
  public Server shutdownNow() {
218
    if (!shutdown.compareAndSet(false, true)) {
1✔
219
      return this;
1✔
220
    }
221
    syncContext.execute(new Runnable() {
1✔
222
      @Override
223
      public void run() {
224
        if (!delegate.isShutdown()) {
1✔
225
          delegate.shutdownNow();
1✔
226
        }
227
        internalShutdown();
1✔
228
        initialStartFuture.set(new IOException("server is forcefully shut down"));
1✔
229
      }
1✔
230
    });
231
    return this;
1✔
232
  }
233

234
  // Must run in SynchronizationContext
235
  private void internalShutdown() {
236
    logger.log(Level.FINER, "Shutting down XdsServerWrapper");
1✔
237
    if (discoveryState != null) {
1✔
238
      discoveryState.shutdown();
1✔
239
    }
240
    if (xdsClient != null) {
1✔
241
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
242
    }
243
    if (restartTimer != null) {
1✔
244
      restartTimer.cancel();
1✔
245
    }
246
    if (sharedTimeService) {
1✔
247
      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
1✔
248
    }
249
    isServing = false;
1✔
250
    internalTerminationLatch.countDown();
1✔
251
  }
1✔
252

253
  @Override
254
  public boolean isShutdown() {
255
    return shutdown.get();
1✔
256
  }
257

258
  @Override
259
  public boolean isTerminated() {
260
    return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
1✔
261
  }
262

263
  @Override
264
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
265
    long startTime = System.nanoTime();
1✔
266
    if (!internalTerminationLatch.await(timeout, unit)) {
1✔
267
      return false;
×
268
    }
269
    long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
1✔
270
    return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
1✔
271
  }
272

273
  @Override
274
  public void awaitTermination() throws InterruptedException {
275
    internalTerminationLatch.await();
1✔
276
    delegate.awaitTermination();
1✔
277
  }
1✔
278

279
  @Override
280
  public int getPort() {
281
    return delegate.getPort();
1✔
282
  }
283

284
  @Override
285
  public List<? extends SocketAddress> getListenSockets() {
286
    return delegate.getListenSockets();
1✔
287
  }
288

289
  @Override
290
  public List<ServerServiceDefinition> getServices() {
291
    return delegate.getServices();
×
292
  }
293

294
  @Override
295
  public List<ServerServiceDefinition> getImmutableServices() {
296
    return delegate.getImmutableServices();
×
297
  }
298

299
  @Override
300
  public List<ServerServiceDefinition> getMutableServices() {
301
    return delegate.getMutableServices();
×
302
  }
303

304
  // Must run in SynchronizationContext
305
  private void startDelegateServer() {
306
    if (restartTimer != null && restartTimer.isPending()) {
1✔
307
      return;
×
308
    }
309
    if (isServing) {
1✔
310
      return;
1✔
311
    }
312
    if (delegate.isShutdown()) {
1✔
313
      delegate = delegateBuilder.build();
1✔
314
    }
315
    try {
316
      delegate.start();
1✔
317
      listener.onServing();
1✔
318
      isServing = true;
1✔
319
      if (!initialStarted) {
1✔
320
        initialStarted = true;
1✔
321
        initialStartFuture.set(null);
1✔
322
      }
323
      logger.log(Level.FINER, "Delegate server started.");
1✔
324
    } catch (IOException e) {
1✔
325
      logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
1✔
326
      if (!initialStarted) {
1✔
327
        initialStarted = true;
1✔
328
        initialStartFuture.set(e);
1✔
329
      } else {
330
        listener.onNotServing(e);
1✔
331
      }
332
      restartTimer = syncContext.schedule(
1✔
333
        new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
334
    }
1✔
335
  }
1✔
336

337
  private final class RestartTask implements Runnable {
1✔
338
    @Override
339
    public void run() {
340
      startDelegateServer();
1✔
341
    }
1✔
342
  }
343

344
  private final class DiscoveryState implements ResourceWatcher<LdsUpdate> {
345
    private final String resourceName;
346
    // RDS resource name is the key.
347
    private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
1✔
348
    // Track pending RDS resources using rds name.
349
    private final Set<String> pendingRds = new HashSet<>();
1✔
350
    // Most recently discovered filter chains.
351
    private List<FilterChain> filterChains = new ArrayList<>();
1✔
352
    // Most recently discovered default filter chain.
353
    @Nullable
354
    private FilterChain defaultFilterChain;
355
    private boolean stopped;
356
    private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef 
1✔
357
        = new HashMap<>();
358
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
359
      @Override
360
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
361
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
362
        return next.startCall(call, headers);
1✔
363
      }
364
    };
365

366
    private DiscoveryState(String resourceName) {
1✔
367
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
368
      xdsClient.watchXdsResource(
1✔
369
          XdsListenerResource.getInstance(), resourceName, this, syncContext);
1✔
370
    }
1✔
371

372
    @Override
373
    public void onChanged(final LdsUpdate update) {
374
      if (stopped) {
1✔
375
        return;
×
376
      }
377
      logger.log(Level.FINEST, "Received Lds update {0}", update);
1✔
378
      checkNotNull(update.listener(), "update");
1✔
379
      if (!pendingRds.isEmpty()) {
1✔
380
        // filter chain state has not yet been applied to filterChainSelectorManager and there
381
        // are two sets of sslContextProviderSuppliers, so we release the old ones.
382
        releaseSuppliersInFlight();
1✔
383
        pendingRds.clear();
1✔
384
      }
385
      filterChains = update.listener().filterChains();
1✔
386
      defaultFilterChain = update.listener().defaultFilterChain();
1✔
387
      List<FilterChain> allFilterChains = filterChains;
1✔
388
      if (defaultFilterChain != null) {
1✔
389
        allFilterChains = new ArrayList<>(filterChains);
1✔
390
        allFilterChains.add(defaultFilterChain);
1✔
391
      }
392
      Set<String> allRds = new HashSet<>();
1✔
393
      for (FilterChain filterChain : allFilterChains) {
1✔
394
        HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
395
        if (hcm.virtualHosts() == null) {
1✔
396
          RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
1✔
397
          if (rdsState == null) {
1✔
398
            rdsState = new RouteDiscoveryState(hcm.rdsName());
1✔
399
            routeDiscoveryStates.put(hcm.rdsName(), rdsState);
1✔
400
            xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
401
                hcm.rdsName(), rdsState, syncContext);
1✔
402
          }
403
          if (rdsState.isPending) {
1✔
404
            pendingRds.add(hcm.rdsName());
1✔
405
          }
406
          allRds.add(hcm.rdsName());
1✔
407
        }
408
      }
1✔
409
      for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
1✔
410
        if (!allRds.contains(entry.getKey())) {
1✔
411
          xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
1✔
412
              entry.getKey(), entry.getValue());
1✔
413
        }
414
      }
1✔
415
      routeDiscoveryStates.keySet().retainAll(allRds);
1✔
416
      if (pendingRds.isEmpty()) {
1✔
417
        updateSelector();
1✔
418
      }
419
    }
1✔
420

421
    @Override
422
    public void onResourceDoesNotExist(final String resourceName) {
423
      if (stopped) {
1✔
424
        return;
×
425
      }
426
      StatusException statusException = Status.UNAVAILABLE.withDescription(
1✔
427
          String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
1✔
428
              xdsClient.getBootstrapInfo().node().getId())).asException();
1✔
429
      handleConfigNotFound(statusException);
1✔
430
    }
1✔
431

432
    @Override
433
    public void onError(final Status error) {
434
      if (stopped) {
1✔
435
        return;
×
436
      }
437
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
438
      Status errorWithNodeId = error.withDescription(
1✔
439
          description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
440
      logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
1✔
441
      if (!isServing) {
1✔
442
        listener.onNotServing(errorWithNodeId.asException());
1✔
443
      }
444
    }
1✔
445

446
    private void shutdown() {
447
      stopped = true;
1✔
448
      cleanUpRouteDiscoveryStates();
1✔
449
      logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
1✔
450
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this);
1✔
451
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
452
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
453
      for (SslContextProviderSupplier s: toRelease) {
1✔
454
        s.close();
1✔
455
      }
1✔
456
      releaseSuppliersInFlight();
1✔
457
    }
1✔
458

459
    private void updateSelector() {
460
      // This is regenerated in generateRoutingConfig() calls below.
461
      savedRdsRoutingConfigRef.clear();
1✔
462

463
      // Prepare server routing config map.
464
      ImmutableMap.Builder<FilterChain, AtomicReference<ServerRoutingConfig>> routingConfigs =
465
          ImmutableMap.builder();
1✔
466
      for (FilterChain filterChain: filterChains) {
1✔
467
        routingConfigs.put(filterChain, generateRoutingConfig(filterChain));
1✔
468
      }
1✔
469

470
      // Prepare the new selector.
471
      FilterChainSelector selector;
472
      if (defaultFilterChain != null) {
1✔
473
        selector = new FilterChainSelector(
1✔
474
            routingConfigs.build(),
1✔
475
            defaultFilterChain.sslContextProviderSupplier(),
1✔
476
            generateRoutingConfig(defaultFilterChain));
1✔
477
      } else {
478
        selector = new FilterChainSelector(routingConfigs.build(), null, new AtomicReference<>());
1✔
479
      }
480

481
      // Prepare the list of current selector's resources to close later.
482
      List<SslContextProviderSupplier> oldSslSuppliers = getSuppliersInUse();
1✔
483

484
      // Swap the selectors, initiate a graceful shutdown of the old one.
485
      logger.log(Level.FINEST, "Updating selector {0}", selector);
1✔
486
      filterChainSelectorManager.updateSelector(selector);
1✔
487

488
      // Release old resources.
489
      for (SslContextProviderSupplier supplier: oldSslSuppliers) {
1✔
490
        supplier.close();
1✔
491
      }
1✔
492

493
      // Now that we have valid Transport Socket config, we can start/restart listening on a port.
494
      startDelegateServer();
1✔
495
    }
1✔
496

497
    private AtomicReference<ServerRoutingConfig> generateRoutingConfig(FilterChain filterChain) {
498
      HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
499
      ImmutableMap<Route, ServerInterceptor> interceptors;
500

501
      // Inlined routes.
502
      if (hcm.virtualHosts() != null) {
1✔
503
        interceptors = generatePerRouteInterceptors(hcm.httpFilterConfigs(), hcm.virtualHosts());
1✔
504
        return new AtomicReference<>(ServerRoutingConfig.create(hcm.virtualHosts(), interceptors));
1✔
505
      }
506

507
      // Routes from RDS.
508
      RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
1✔
509
      checkNotNull(rds, "rds");
1✔
510

511
      ServerRoutingConfig routingConfig;
512
      ImmutableList<VirtualHost> savedVhosts = rds.savedVirtualHosts;
1✔
513
      if (savedVhosts != null) {
1✔
514
        interceptors = generatePerRouteInterceptors(hcm.httpFilterConfigs(), savedVhosts);
1✔
515
        routingConfig = ServerRoutingConfig.create(savedVhosts, interceptors);
1✔
516
      } else {
517
        routingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
518
      }
519

520
      AtomicReference<ServerRoutingConfig> routingConfigRef = new AtomicReference<>(routingConfig);
1✔
521
      savedRdsRoutingConfigRef.put(filterChain, routingConfigRef);
1✔
522
      return routingConfigRef;
1✔
523
    }
524

525
    private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
526
        @Nullable List<NamedFilterConfig> filterConfigs, List<VirtualHost> virtualHosts) {
527
      // This should always be called from the sync context.
528
      // Ideally we'd want to throw otherwise, but this breaks the tests now.
529
      // syncContext.throwIfNotInThisSynchronizationContext();
530

531
      ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
1✔
532
          new ImmutableMap.Builder<>();
533

534
      for (VirtualHost virtualHost : virtualHosts) {
1✔
535
        for (Route route : virtualHost.routes()) {
1✔
536
          // Short circuit.
537
          if (filterConfigs == null) {
1✔
538
            perRouteInterceptors.put(route, noopInterceptor);
×
539
            continue;
×
540
          }
541

542
          // Override vhost filter configs with more specific per-route configs.
543
          Map<String, FilterConfig> perRouteOverrides = ImmutableMap.<String, FilterConfig>builder()
1✔
544
              .putAll(virtualHost.filterConfigOverrides())
1✔
545
              .putAll(route.filterConfigOverrides())
1✔
546
              .buildKeepingLast();
1✔
547

548
          // Interceptors for this vhost/route combo.
549
          List<ServerInterceptor> interceptors = new ArrayList<>(filterConfigs.size());
1✔
550

551
          for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
552
            FilterConfig config = namedFilter.filterConfig;
1✔
553
            String name = namedFilter.name;
1✔
554
            String typeUrl = config.typeUrl();
1✔
555

556
            Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
557
            if (provider == null || !provider.isServerFilter()) {
1✔
558
              logger.warning("HttpFilter[" + name + "]: not supported on server-side: " + typeUrl);
×
559
              continue;
×
560
            }
561

562
            Filter filter = provider.newInstance();
1✔
563
            ServerInterceptor interceptor =
1✔
564
                filter.buildServerInterceptor(config, perRouteOverrides.get(name));
1✔
565
            if (interceptor != null) {
1✔
566
              interceptors.add(interceptor);
1✔
567
            }
568
          }
1✔
569

570
          // Combine interceptors produced by different filters into a single one that executes
571
          // them sequentially. The order is preserved.
572
          perRouteInterceptors.put(route, combineInterceptors(interceptors));
1✔
573
        }
1✔
574
      }
1✔
575

576
      return perRouteInterceptors.buildOrThrow();
1✔
577
    }
578

579
    private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
580
      if (interceptors.isEmpty()) {
1✔
581
        return noopInterceptor;
1✔
582
      }
583
      if (interceptors.size() == 1) {
1✔
584
        return interceptors.get(0);
×
585
      }
586
      return new ServerInterceptor() {
1✔
587
        @Override
588
        public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
589
            Metadata headers, ServerCallHandler<ReqT, RespT> next) {
590
          // intercept forward
591
          for (int i = interceptors.size() - 1; i >= 0; i--) {
1✔
592
            next = InternalServerInterceptors.interceptCallHandlerCreate(
1✔
593
                interceptors.get(i), next);
1✔
594
          }
595
          return next.startCall(call, headers);
1✔
596
        }
597
      };
598
    }
599

600
    private void handleConfigNotFound(StatusException exception) {
601
      cleanUpRouteDiscoveryStates();
1✔
602
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
603
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
604
      for (SslContextProviderSupplier s: toRelease) {
1✔
605
        s.close();
1✔
606
      }
1✔
607
      if (restartTimer != null) {
1✔
608
        restartTimer.cancel();
1✔
609
      }
610
      if (!delegate.isShutdown()) {
1✔
611
        delegate.shutdown();  // let in-progress calls finish
1✔
612
      }
613
      isServing = false;
1✔
614
      listener.onNotServing(exception);
1✔
615
    }
1✔
616

617
    private void cleanUpRouteDiscoveryStates() {
618
      for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
1✔
619
        String rdsName = rdsState.resourceName;
1✔
620
        logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
1✔
621
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
622
            rdsState);
623
      }
1✔
624
      routeDiscoveryStates.clear();
1✔
625
      savedRdsRoutingConfigRef.clear();
1✔
626
    }
1✔
627

628
    private List<SslContextProviderSupplier> getSuppliersInUse() {
629
      List<SslContextProviderSupplier> toRelease = new ArrayList<>();
1✔
630
      FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
1✔
631
      if (selector != null) {
1✔
632
        for (FilterChain f: selector.getRoutingConfigs().keySet()) {
1✔
633
          if (f.sslContextProviderSupplier() != null) {
1✔
634
            toRelease.add(f.sslContextProviderSupplier());
1✔
635
          }
636
        }
1✔
637
        SslContextProviderSupplier defaultSupplier =
1✔
638
                selector.getDefaultSslContextProviderSupplier();
1✔
639
        if (defaultSupplier != null) {
1✔
640
          toRelease.add(defaultSupplier);
1✔
641
        }
642
      }
643
      return toRelease;
1✔
644
    }
645

646
    private void releaseSuppliersInFlight() {
647
      SslContextProviderSupplier supplier;
648
      for (FilterChain filterChain : filterChains) {
1✔
649
        supplier = filterChain.sslContextProviderSupplier();
1✔
650
        if (supplier != null) {
1✔
651
          supplier.close();
1✔
652
        }
653
      }
1✔
654
      if (defaultFilterChain != null
1✔
655
              && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
1✔
656
        supplier.close();
1✔
657
      }
658
    }
1✔
659

660
    private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
661
      private final String resourceName;
662
      private ImmutableList<VirtualHost> savedVirtualHosts;
663
      private boolean isPending = true;
1✔
664

665
      private RouteDiscoveryState(String resourceName) {
1✔
666
        this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
667
      }
1✔
668

669
      @Override
670
      public void onChanged(final RdsUpdate update) {
671
        syncContext.execute(new Runnable() {
1✔
672
          @Override
673
          public void run() {
674
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
675
              return;
1✔
676
            }
677
            if (savedVirtualHosts == null && !isPending) {
1✔
678
              logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
1✔
679
            }
680
            savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
1✔
681
            updateRdsRoutingConfig();
1✔
682
            maybeUpdateSelector();
1✔
683
          }
1✔
684
        });
685
      }
1✔
686

687
      @Override
688
      public void onResourceDoesNotExist(final String resourceName) {
689
        syncContext.execute(new Runnable() {
1✔
690
          @Override
691
          public void run() {
692
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
693
              return;
×
694
            }
695
            logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
1✔
696
            savedVirtualHosts = null;
1✔
697
            updateRdsRoutingConfig();
1✔
698
            maybeUpdateSelector();
1✔
699
          }
1✔
700
        });
701
      }
1✔
702

703
      @Override
704
      public void onError(final Status error) {
705
        syncContext.execute(new Runnable() {
1✔
706
          @Override
707
          public void run() {
708
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
709
              return;
×
710
            }
711
            String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
712
            Status errorWithNodeId = error.withDescription(
1✔
713
                    description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
714
            logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
1✔
715
                    new Object[]{resourceName, errorWithNodeId});
1✔
716
            maybeUpdateSelector();
1✔
717
          }
1✔
718
        });
719
      }
1✔
720

721
      private void updateRdsRoutingConfig() {
722
        for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
1✔
723
          if (resourceName.equals(filterChain.httpConnectionManager().rdsName())) {
1✔
724
            ServerRoutingConfig updatedRoutingConfig;
725
            if (savedVirtualHosts == null) {
1✔
726
              updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
727
            } else {
728
              ImmutableMap<Route, ServerInterceptor> updatedInterceptors =
1✔
729
                  generatePerRouteInterceptors(
1✔
730
                      filterChain.httpConnectionManager().httpFilterConfigs(),
1✔
731
                      savedVirtualHosts);
732
              updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts,
1✔
733
                  updatedInterceptors);
734
            }
735
            logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
1✔
736
                new Object[]{filterChain.name(), updatedRoutingConfig});
1✔
737
            savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
1✔
738
          }
739
        }
1✔
740
      }
1✔
741

742
      // Update the selector to use the most recently updated configs only after all rds have been
743
      // discovered for the first time. Later changes on rds will be applied through virtual host
744
      // list atomic ref.
745
      private void maybeUpdateSelector() {
746
        isPending = false;
1✔
747
        boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
1✔
748
        if (isLastPending) {
1✔
749
          updateSelector();
1✔
750
        }
751
      }
1✔
752
    }
753
  }
754

755
  @VisibleForTesting
756
  final class ConfigApplyingInterceptor implements ServerInterceptor {
1✔
757
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
758
      @Override
759
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
760
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
761
        return next.startCall(call, headers);
×
762
      }
763
    };
764

765
    @Override
766
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
767
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
768
      AtomicReference<ServerRoutingConfig> routingConfigRef =
1✔
769
          call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
1✔
770
      ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
1✔
771
          routingConfigRef.get();
1✔
772
      if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
1✔
773
        String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
1✔
774
        call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
1✔
775
        return new Listener<ReqT>() {};
1✔
776
      }
777
      List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
1✔
778
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
779
          virtualHosts, call.getAuthority());
1✔
780
      if (virtualHost == null) {
1✔
781
        call.close(
1✔
782
            Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
1✔
783
            new Metadata());
784
        return new Listener<ReqT>() {};
1✔
785
      }
786
      Route selectedRoute = null;
1✔
787
      MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
1✔
788
      for (Route route : virtualHost.routes()) {
1✔
789
        if (RoutingUtils.matchRoute(
1✔
790
            route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
1✔
791
          selectedRoute = route;
1✔
792
          break;
1✔
793
        }
794
      }
1✔
795
      if (selectedRoute == null) {
1✔
796
        call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
1✔
797
            new Metadata());
798
        return new ServerCall.Listener<ReqT>() {};
1✔
799
      }
800
      if (selectedRoute.routeAction() != null) {
1✔
801
        call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
1✔
802
            + "route: only Route.non_forwarding_action should be allowed."), new Metadata());
803
        return new ServerCall.Listener<ReqT>() {};
1✔
804
      }
805
      ServerInterceptor routeInterceptor = noopInterceptor;
1✔
806
      Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
1✔
807
      if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
1✔
808
        routeInterceptor = perRouteInterceptors.get(selectedRoute);
1✔
809
      }
810
      return routeInterceptor.interceptCall(call, headers, next);
1✔
811
    }
812
  }
813

814
  /**
815
   * The HttpConnectionManager level configuration.
816
   */
817
  @AutoValue
818
  abstract static class ServerRoutingConfig {
1✔
819
    @VisibleForTesting
820
    static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
1✔
821
        ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
1✔
822

823
    abstract ImmutableList<VirtualHost> virtualHosts();
824

825
    // Prebuilt per route server interceptors from http filter configs.
826
    abstract ImmutableMap<Route, ServerInterceptor> interceptors();
827

828
    /**
829
     * Server routing configuration.
830
     * */
831
    public static ServerRoutingConfig create(
832
        ImmutableList<VirtualHost> virtualHosts,
833
        ImmutableMap<Route, ServerInterceptor> interceptors) {
834
      checkNotNull(virtualHosts, "virtualHosts");
1✔
835
      checkNotNull(interceptors, "interceptors");
1✔
836
      return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
1✔
837
    }
838
  }
839
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc