• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19712

03 Mar 2025 10:28PM UTC coverage: 88.538% (-0.007%) from 88.545%
#19712

push

github

web-flow
xds: ensure server interceptors are created in a sync context (#11930)

`XdsServerWrapper#generatePerRouteInterceptors` was always intended
to be executed within a sync context. This PR ensures that by calling
`syncContext.throwIfNotInThisSynchronizationContext()`.

This change is needed for upcoming xDS filter state retention because
the new tests in XdsServerWrapperTest flake with this NPE:

> `Cannot invoke "io.grpc.xds.client.XdsClient$ResourceWatcher.onChanged(io.grpc.xds.client.XdsClient$ResourceUpdate)" because "this.ldsWatcher" is null`

34435 of 38893 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/../xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.auto.value.AutoValue;
24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.InternalServerInterceptors;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.MetricRecorder;
33
import io.grpc.Server;
34
import io.grpc.ServerBuilder;
35
import io.grpc.ServerCall;
36
import io.grpc.ServerCall.Listener;
37
import io.grpc.ServerCallHandler;
38
import io.grpc.ServerInterceptor;
39
import io.grpc.ServerServiceDefinition;
40
import io.grpc.Status;
41
import io.grpc.StatusException;
42
import io.grpc.SynchronizationContext;
43
import io.grpc.SynchronizationContext.ScheduledHandle;
44
import io.grpc.internal.GrpcUtil;
45
import io.grpc.internal.ObjectPool;
46
import io.grpc.internal.SharedResourceHolder;
47
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
48
import io.grpc.xds.Filter.FilterConfig;
49
import io.grpc.xds.Filter.NamedFilterConfig;
50
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
51
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
52
import io.grpc.xds.VirtualHost.Route;
53
import io.grpc.xds.XdsListenerResource.LdsUpdate;
54
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
55
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
56
import io.grpc.xds.client.XdsClient;
57
import io.grpc.xds.client.XdsClient.ResourceWatcher;
58
import io.grpc.xds.internal.security.SslContextProviderSupplier;
59
import java.io.IOException;
60
import java.net.SocketAddress;
61
import java.util.ArrayList;
62
import java.util.HashMap;
63
import java.util.HashSet;
64
import java.util.List;
65
import java.util.Map;
66
import java.util.Set;
67
import java.util.concurrent.CountDownLatch;
68
import java.util.concurrent.ExecutionException;
69
import java.util.concurrent.ScheduledExecutorService;
70
import java.util.concurrent.TimeUnit;
71
import java.util.concurrent.atomic.AtomicBoolean;
72
import java.util.concurrent.atomic.AtomicReference;
73
import java.util.logging.Level;
74
import java.util.logging.Logger;
75
import javax.annotation.Nullable;
76

77
final class XdsServerWrapper extends Server {
78
  private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
1✔
79

80
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
81
      new Thread.UncaughtExceptionHandler() {
1✔
82
        @Override
83
        public void uncaughtException(Thread t, Throwable e) {
84
          logger.log(Level.SEVERE, "Exception!" + e);
×
85
          // TODO(chengyuanzhang): implement cleanup.
86
        }
×
87
      });
88

89
  public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
90
      ATTR_SERVER_ROUTING_CONFIG =
1✔
91
      Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
1✔
92

93
  @VisibleForTesting
94
  static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
1✔
95
  private final String listenerAddress;
96
  private final ServerBuilder<?> delegateBuilder;
97
  private boolean sharedTimeService;
98
  private final ScheduledExecutorService timeService;
99
  private final FilterRegistry filterRegistry;
100
  private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
1✔
101
  private final XdsClientPoolFactory xdsClientPoolFactory;
102
  private final XdsServingStatusListener listener;
103
  private final FilterChainSelectorManager filterChainSelectorManager;
104
  private final AtomicBoolean started = new AtomicBoolean(false);
1✔
105
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
106
  private boolean isServing;
107
  private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
1✔
108
  private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
1✔
109
  private boolean initialStarted;
110
  private ScheduledHandle restartTimer;
111
  private ObjectPool<XdsClient> xdsClientPool;
112
  private XdsClient xdsClient;
113
  private DiscoveryState discoveryState;
114
  private volatile Server delegate;
115

116
  XdsServerWrapper(
117
      String listenerAddress,
118
      ServerBuilder<?> delegateBuilder,
119
      XdsServingStatusListener listener,
120
      FilterChainSelectorManager filterChainSelectorManager,
121
      XdsClientPoolFactory xdsClientPoolFactory,
122
      FilterRegistry filterRegistry) {
123
    this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager,
1✔
124
        xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
1✔
125
    sharedTimeService = true;
1✔
126
  }
1✔
127

128
  @VisibleForTesting
129
  XdsServerWrapper(
130
          String listenerAddress,
131
          ServerBuilder<?> delegateBuilder,
132
          XdsServingStatusListener listener,
133
          FilterChainSelectorManager filterChainSelectorManager,
134
          XdsClientPoolFactory xdsClientPoolFactory,
135
          FilterRegistry filterRegistry,
136
          ScheduledExecutorService timeService) {
1✔
137
    this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
1✔
138
    this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
1✔
139
    this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
1✔
140
    this.listener = checkNotNull(listener, "listener");
1✔
141
    this.filterChainSelectorManager
1✔
142
        = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
1✔
143
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
144
    this.timeService = checkNotNull(timeService, "timeService");
1✔
145
    this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
1✔
146
    this.delegate = delegateBuilder.build();
1✔
147
  }
1✔
148

149
  @Override
150
  public Server start() throws IOException {
151
    checkState(started.compareAndSet(false, true), "Already started");
1✔
152
    syncContext.execute(new Runnable() {
1✔
153
      @Override
154
      public void run() {
155
        internalStart();
1✔
156
      }
1✔
157
    });
158
    Exception exception;
159
    try {
160
      exception = initialStartFuture.get();
1✔
161
    } catch (InterruptedException | ExecutionException e) {
×
162
      throw new RuntimeException(e);
×
163
    }
1✔
164
    if (exception != null) {
1✔
165
      throw (exception instanceof IOException) ? (IOException) exception :
1✔
166
              new IOException(exception);
1✔
167
    }
168
    return this;
1✔
169
  }
170

171
  private void internalStart() {
172
    try {
173
      xdsClientPool = xdsClientPoolFactory.getOrCreate("#server", new MetricRecorder() {});
1✔
174
    } catch (Exception e) {
×
175
      StatusException statusException = Status.UNAVAILABLE.withDescription(
×
176
              "Failed to initialize xDS").withCause(e).asException();
×
177
      listener.onNotServing(statusException);
×
178
      initialStartFuture.set(statusException);
×
179
      return;
×
180
    }
1✔
181
    xdsClient = xdsClientPool.getObject();
1✔
182
    String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
1✔
183
    if (listenerTemplate == null) {
1✔
184
      StatusException statusException =
1✔
185
          Status.UNAVAILABLE.withDescription(
1✔
186
              "Can only support xDS v3 with listener resource name template").asException();
1✔
187
      listener.onNotServing(statusException);
1✔
188
      initialStartFuture.set(statusException);
1✔
189
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
190
      return;
1✔
191
    }
192
    String replacement = listenerAddress;
1✔
193
    if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
1✔
194
      replacement = XdsClient.percentEncodePath(replacement);
1✔
195
    }
196
    discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
1✔
197
  }
1✔
198

199
  @Override
200
  public Server shutdown() {
201
    if (!shutdown.compareAndSet(false, true)) {
1✔
202
      return this;
1✔
203
    }
204
    syncContext.execute(new Runnable() {
1✔
205
      @Override
206
      public void run() {
207
        if (!delegate.isShutdown()) {
1✔
208
          delegate.shutdown();
1✔
209
        }
210
        internalShutdown();
1✔
211
      }
1✔
212
    });
213
    return this;
1✔
214
  }
215

216
  @Override
217
  public Server shutdownNow() {
218
    if (!shutdown.compareAndSet(false, true)) {
1✔
219
      return this;
1✔
220
    }
221
    syncContext.execute(new Runnable() {
1✔
222
      @Override
223
      public void run() {
224
        if (!delegate.isShutdown()) {
1✔
225
          delegate.shutdownNow();
1✔
226
        }
227
        internalShutdown();
1✔
228
        initialStartFuture.set(new IOException("server is forcefully shut down"));
1✔
229
      }
1✔
230
    });
231
    return this;
1✔
232
  }
233

234
  // Must run in SynchronizationContext
235
  private void internalShutdown() {
236
    logger.log(Level.FINER, "Shutting down XdsServerWrapper");
1✔
237
    if (discoveryState != null) {
1✔
238
      discoveryState.shutdown();
1✔
239
    }
240
    if (xdsClient != null) {
1✔
241
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
242
    }
243
    if (restartTimer != null) {
1✔
244
      restartTimer.cancel();
1✔
245
    }
246
    if (sharedTimeService) {
1✔
247
      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
1✔
248
    }
249
    isServing = false;
1✔
250
    internalTerminationLatch.countDown();
1✔
251
  }
1✔
252

253
  @Override
254
  public boolean isShutdown() {
255
    return shutdown.get();
1✔
256
  }
257

258
  @Override
259
  public boolean isTerminated() {
260
    return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
1✔
261
  }
262

263
  @Override
264
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
265
    long startTime = System.nanoTime();
1✔
266
    if (!internalTerminationLatch.await(timeout, unit)) {
1✔
267
      return false;
×
268
    }
269
    long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
1✔
270
    return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
1✔
271
  }
272

273
  @Override
274
  public void awaitTermination() throws InterruptedException {
275
    internalTerminationLatch.await();
1✔
276
    delegate.awaitTermination();
1✔
277
  }
1✔
278

279
  @Override
280
  public int getPort() {
281
    return delegate.getPort();
1✔
282
  }
283

284
  @Override
285
  public List<? extends SocketAddress> getListenSockets() {
286
    return delegate.getListenSockets();
1✔
287
  }
288

289
  @Override
290
  public List<ServerServiceDefinition> getServices() {
291
    return delegate.getServices();
×
292
  }
293

294
  @Override
295
  public List<ServerServiceDefinition> getImmutableServices() {
296
    return delegate.getImmutableServices();
×
297
  }
298

299
  @Override
300
  public List<ServerServiceDefinition> getMutableServices() {
301
    return delegate.getMutableServices();
×
302
  }
303

304
  // Must run in SynchronizationContext
305
  private void startDelegateServer() {
306
    if (restartTimer != null && restartTimer.isPending()) {
1✔
307
      return;
×
308
    }
309
    if (isServing) {
1✔
310
      return;
1✔
311
    }
312
    if (delegate.isShutdown()) {
1✔
313
      delegate = delegateBuilder.build();
1✔
314
    }
315
    try {
316
      delegate.start();
1✔
317
      listener.onServing();
1✔
318
      isServing = true;
1✔
319
      if (!initialStarted) {
1✔
320
        initialStarted = true;
1✔
321
        initialStartFuture.set(null);
1✔
322
      }
323
      logger.log(Level.FINER, "Delegate server started.");
1✔
324
    } catch (IOException e) {
1✔
325
      logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
1✔
326
      if (!initialStarted) {
1✔
327
        initialStarted = true;
1✔
328
        initialStartFuture.set(e);
1✔
329
      } else {
330
        listener.onNotServing(e);
1✔
331
      }
332
      restartTimer = syncContext.schedule(
1✔
333
        new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
334
    }
1✔
335
  }
1✔
336

337
  private final class RestartTask implements Runnable {
1✔
338
    @Override
339
    public void run() {
340
      startDelegateServer();
1✔
341
    }
1✔
342
  }
343

344
  private final class DiscoveryState implements ResourceWatcher<LdsUpdate> {
345
    private final String resourceName;
346
    // RDS resource name is the key.
347
    private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
1✔
348
    // Track pending RDS resources using rds name.
349
    private final Set<String> pendingRds = new HashSet<>();
1✔
350
    // Most recently discovered filter chains.
351
    private List<FilterChain> filterChains = new ArrayList<>();
1✔
352
    // Most recently discovered default filter chain.
353
    @Nullable
354
    private FilterChain defaultFilterChain;
355
    private boolean stopped;
356
    private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef 
1✔
357
        = new HashMap<>();
358
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
359
      @Override
360
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
361
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
362
        return next.startCall(call, headers);
1✔
363
      }
364
    };
365

366
    private DiscoveryState(String resourceName) {
1✔
367
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
368
      xdsClient.watchXdsResource(
1✔
369
          XdsListenerResource.getInstance(), resourceName, this, syncContext);
1✔
370
    }
1✔
371

372
    @Override
373
    public void onChanged(final LdsUpdate update) {
374
      if (stopped) {
1✔
375
        return;
×
376
      }
377
      logger.log(Level.FINEST, "Received Lds update {0}", update);
1✔
378
      checkNotNull(update.listener(), "update");
1✔
379
      if (!pendingRds.isEmpty()) {
1✔
380
        // filter chain state has not yet been applied to filterChainSelectorManager and there
381
        // are two sets of sslContextProviderSuppliers, so we release the old ones.
382
        releaseSuppliersInFlight();
1✔
383
        pendingRds.clear();
1✔
384
      }
385
      filterChains = update.listener().filterChains();
1✔
386
      defaultFilterChain = update.listener().defaultFilterChain();
1✔
387
      List<FilterChain> allFilterChains = filterChains;
1✔
388
      if (defaultFilterChain != null) {
1✔
389
        allFilterChains = new ArrayList<>(filterChains);
1✔
390
        allFilterChains.add(defaultFilterChain);
1✔
391
      }
392
      Set<String> allRds = new HashSet<>();
1✔
393
      for (FilterChain filterChain : allFilterChains) {
1✔
394
        HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
395
        if (hcm.virtualHosts() == null) {
1✔
396
          RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
1✔
397
          if (rdsState == null) {
1✔
398
            rdsState = new RouteDiscoveryState(hcm.rdsName());
1✔
399
            routeDiscoveryStates.put(hcm.rdsName(), rdsState);
1✔
400
            xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
401
                hcm.rdsName(), rdsState, syncContext);
1✔
402
          }
403
          if (rdsState.isPending) {
1✔
404
            pendingRds.add(hcm.rdsName());
1✔
405
          }
406
          allRds.add(hcm.rdsName());
1✔
407
        }
408
      }
1✔
409
      for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
1✔
410
        if (!allRds.contains(entry.getKey())) {
1✔
411
          xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
1✔
412
              entry.getKey(), entry.getValue());
1✔
413
        }
414
      }
1✔
415
      routeDiscoveryStates.keySet().retainAll(allRds);
1✔
416
      if (pendingRds.isEmpty()) {
1✔
417
        updateSelector();
1✔
418
      }
419
    }
1✔
420

421
    @Override
422
    public void onResourceDoesNotExist(final String resourceName) {
423
      if (stopped) {
1✔
424
        return;
×
425
      }
426
      StatusException statusException = Status.UNAVAILABLE.withDescription(
1✔
427
          String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
1✔
428
              xdsClient.getBootstrapInfo().node().getId())).asException();
1✔
429
      handleConfigNotFound(statusException);
1✔
430
    }
1✔
431

432
    @Override
433
    public void onError(final Status error) {
434
      if (stopped) {
1✔
435
        return;
×
436
      }
437
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
438
      Status errorWithNodeId = error.withDescription(
1✔
439
          description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
440
      logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
1✔
441
      if (!isServing) {
1✔
442
        listener.onNotServing(errorWithNodeId.asException());
1✔
443
      }
444
    }
1✔
445

446
    private void shutdown() {
447
      stopped = true;
1✔
448
      cleanUpRouteDiscoveryStates();
1✔
449
      logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
1✔
450
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this);
1✔
451
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
452
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
453
      for (SslContextProviderSupplier s: toRelease) {
1✔
454
        s.close();
1✔
455
      }
1✔
456
      releaseSuppliersInFlight();
1✔
457
    }
1✔
458

459
    private void updateSelector() {
460
      // This is regenerated in generateRoutingConfig() calls below.
461
      savedRdsRoutingConfigRef.clear();
1✔
462

463
      // Prepare server routing config map.
464
      ImmutableMap.Builder<FilterChain, AtomicReference<ServerRoutingConfig>> routingConfigs =
465
          ImmutableMap.builder();
1✔
466
      for (FilterChain filterChain: filterChains) {
1✔
467
        routingConfigs.put(filterChain, generateRoutingConfig(filterChain));
1✔
468
      }
1✔
469

470
      // Prepare the new selector.
471
      FilterChainSelector selector;
472
      if (defaultFilterChain != null) {
1✔
473
        selector = new FilterChainSelector(
1✔
474
            routingConfigs.build(),
1✔
475
            defaultFilterChain.sslContextProviderSupplier(),
1✔
476
            generateRoutingConfig(defaultFilterChain));
1✔
477
      } else {
478
        selector = new FilterChainSelector(routingConfigs.build(), null, new AtomicReference<>());
1✔
479
      }
480

481
      // Prepare the list of current selector's resources to close later.
482
      List<SslContextProviderSupplier> oldSslSuppliers = getSuppliersInUse();
1✔
483

484
      // Swap the selectors, initiate a graceful shutdown of the old one.
485
      logger.log(Level.FINEST, "Updating selector {0}", selector);
1✔
486
      filterChainSelectorManager.updateSelector(selector);
1✔
487

488
      // Release old resources.
489
      for (SslContextProviderSupplier supplier: oldSslSuppliers) {
1✔
490
        supplier.close();
1✔
491
      }
1✔
492

493
      // Now that we have valid Transport Socket config, we can start/restart listening on a port.
494
      startDelegateServer();
1✔
495
    }
1✔
496

497
    private AtomicReference<ServerRoutingConfig> generateRoutingConfig(FilterChain filterChain) {
498
      HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
499
      ImmutableMap<Route, ServerInterceptor> interceptors;
500

501
      // Inlined routes.
502
      if (hcm.virtualHosts() != null) {
1✔
503
        interceptors = generatePerRouteInterceptors(hcm.httpFilterConfigs(), hcm.virtualHosts());
1✔
504
        return new AtomicReference<>(ServerRoutingConfig.create(hcm.virtualHosts(), interceptors));
1✔
505
      }
506

507
      // Routes from RDS.
508
      RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
1✔
509
      checkNotNull(rds, "rds");
1✔
510

511
      ServerRoutingConfig routingConfig;
512
      ImmutableList<VirtualHost> savedVhosts = rds.savedVirtualHosts;
1✔
513
      if (savedVhosts != null) {
1✔
514
        interceptors = generatePerRouteInterceptors(hcm.httpFilterConfigs(), savedVhosts);
1✔
515
        routingConfig = ServerRoutingConfig.create(savedVhosts, interceptors);
1✔
516
      } else {
517
        routingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
518
      }
519

520
      AtomicReference<ServerRoutingConfig> routingConfigRef = new AtomicReference<>(routingConfig);
1✔
521
      savedRdsRoutingConfigRef.put(filterChain, routingConfigRef);
1✔
522
      return routingConfigRef;
1✔
523
    }
524

525
    private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
526
        @Nullable List<NamedFilterConfig> filterConfigs, List<VirtualHost> virtualHosts) {
527
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
528

529
      ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
1✔
530
          new ImmutableMap.Builder<>();
531

532
      for (VirtualHost virtualHost : virtualHosts) {
1✔
533
        for (Route route : virtualHost.routes()) {
1✔
534
          // Short circuit.
535
          if (filterConfigs == null) {
1✔
536
            perRouteInterceptors.put(route, noopInterceptor);
×
537
            continue;
×
538
          }
539

540
          // Override vhost filter configs with more specific per-route configs.
541
          Map<String, FilterConfig> perRouteOverrides = ImmutableMap.<String, FilterConfig>builder()
1✔
542
              .putAll(virtualHost.filterConfigOverrides())
1✔
543
              .putAll(route.filterConfigOverrides())
1✔
544
              .buildKeepingLast();
1✔
545

546
          // Interceptors for this vhost/route combo.
547
          List<ServerInterceptor> interceptors = new ArrayList<>(filterConfigs.size());
1✔
548

549
          for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
550
            FilterConfig config = namedFilter.filterConfig;
1✔
551
            String name = namedFilter.name;
1✔
552
            String typeUrl = config.typeUrl();
1✔
553

554
            Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
555
            if (provider == null || !provider.isServerFilter()) {
1✔
556
              logger.warning("HttpFilter[" + name + "]: not supported on server-side: " + typeUrl);
×
557
              continue;
×
558
            }
559

560
            Filter filter = provider.newInstance();
1✔
561
            ServerInterceptor interceptor =
1✔
562
                filter.buildServerInterceptor(config, perRouteOverrides.get(name));
1✔
563
            if (interceptor != null) {
1✔
564
              interceptors.add(interceptor);
1✔
565
            }
566
          }
1✔
567

568
          // Combine interceptors produced by different filters into a single one that executes
569
          // them sequentially. The order is preserved.
570
          perRouteInterceptors.put(route, combineInterceptors(interceptors));
1✔
571
        }
1✔
572
      }
1✔
573

574
      return perRouteInterceptors.buildOrThrow();
1✔
575
    }
576

577
    private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
578
      if (interceptors.isEmpty()) {
1✔
579
        return noopInterceptor;
1✔
580
      }
581
      if (interceptors.size() == 1) {
1✔
582
        return interceptors.get(0);
×
583
      }
584
      return new ServerInterceptor() {
1✔
585
        @Override
586
        public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
587
            Metadata headers, ServerCallHandler<ReqT, RespT> next) {
588
          // intercept forward
589
          for (int i = interceptors.size() - 1; i >= 0; i--) {
1✔
590
            next = InternalServerInterceptors.interceptCallHandlerCreate(
1✔
591
                interceptors.get(i), next);
1✔
592
          }
593
          return next.startCall(call, headers);
1✔
594
        }
595
      };
596
    }
597

598
    private void handleConfigNotFound(StatusException exception) {
599
      cleanUpRouteDiscoveryStates();
1✔
600
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
601
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
602
      for (SslContextProviderSupplier s: toRelease) {
1✔
603
        s.close();
1✔
604
      }
1✔
605
      if (restartTimer != null) {
1✔
606
        restartTimer.cancel();
1✔
607
      }
608
      if (!delegate.isShutdown()) {
1✔
609
        delegate.shutdown();  // let in-progress calls finish
1✔
610
      }
611
      isServing = false;
1✔
612
      listener.onNotServing(exception);
1✔
613
    }
1✔
614

615
    private void cleanUpRouteDiscoveryStates() {
616
      for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
1✔
617
        String rdsName = rdsState.resourceName;
1✔
618
        logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
1✔
619
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
620
            rdsState);
621
      }
1✔
622
      routeDiscoveryStates.clear();
1✔
623
      savedRdsRoutingConfigRef.clear();
1✔
624
    }
1✔
625

626
    private List<SslContextProviderSupplier> getSuppliersInUse() {
627
      List<SslContextProviderSupplier> toRelease = new ArrayList<>();
1✔
628
      FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
1✔
629
      if (selector != null) {
1✔
630
        for (FilterChain f: selector.getRoutingConfigs().keySet()) {
1✔
631
          if (f.sslContextProviderSupplier() != null) {
1✔
632
            toRelease.add(f.sslContextProviderSupplier());
1✔
633
          }
634
        }
1✔
635
        SslContextProviderSupplier defaultSupplier =
1✔
636
                selector.getDefaultSslContextProviderSupplier();
1✔
637
        if (defaultSupplier != null) {
1✔
638
          toRelease.add(defaultSupplier);
1✔
639
        }
640
      }
641
      return toRelease;
1✔
642
    }
643

644
    private void releaseSuppliersInFlight() {
645
      SslContextProviderSupplier supplier;
646
      for (FilterChain filterChain : filterChains) {
1✔
647
        supplier = filterChain.sslContextProviderSupplier();
1✔
648
        if (supplier != null) {
1✔
649
          supplier.close();
1✔
650
        }
651
      }
1✔
652
      if (defaultFilterChain != null
1✔
653
              && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
1✔
654
        supplier.close();
1✔
655
      }
656
    }
1✔
657

658
    private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
659
      private final String resourceName;
660
      private ImmutableList<VirtualHost> savedVirtualHosts;
661
      private boolean isPending = true;
1✔
662

663
      private RouteDiscoveryState(String resourceName) {
1✔
664
        this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
665
      }
1✔
666

667
      @Override
668
      public void onChanged(final RdsUpdate update) {
669
        syncContext.execute(new Runnable() {
1✔
670
          @Override
671
          public void run() {
672
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
673
              return;
1✔
674
            }
675
            if (savedVirtualHosts == null && !isPending) {
1✔
676
              logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
1✔
677
            }
678
            savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
1✔
679
            updateRdsRoutingConfig();
1✔
680
            maybeUpdateSelector();
1✔
681
          }
1✔
682
        });
683
      }
1✔
684

685
      @Override
686
      public void onResourceDoesNotExist(final String resourceName) {
687
        syncContext.execute(new Runnable() {
1✔
688
          @Override
689
          public void run() {
690
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
691
              return;
×
692
            }
693
            logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
1✔
694
            savedVirtualHosts = null;
1✔
695
            updateRdsRoutingConfig();
1✔
696
            maybeUpdateSelector();
1✔
697
          }
1✔
698
        });
699
      }
1✔
700

701
      @Override
702
      public void onError(final Status error) {
703
        syncContext.execute(new Runnable() {
1✔
704
          @Override
705
          public void run() {
706
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
707
              return;
×
708
            }
709
            String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
710
            Status errorWithNodeId = error.withDescription(
1✔
711
                    description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
712
            logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
1✔
713
                    new Object[]{resourceName, errorWithNodeId});
1✔
714
            maybeUpdateSelector();
1✔
715
          }
1✔
716
        });
717
      }
1✔
718

719
      private void updateRdsRoutingConfig() {
720
        for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
1✔
721
          if (resourceName.equals(filterChain.httpConnectionManager().rdsName())) {
1✔
722
            ServerRoutingConfig updatedRoutingConfig;
723
            if (savedVirtualHosts == null) {
1✔
724
              updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
725
            } else {
726
              ImmutableMap<Route, ServerInterceptor> updatedInterceptors =
1✔
727
                  generatePerRouteInterceptors(
1✔
728
                      filterChain.httpConnectionManager().httpFilterConfigs(),
1✔
729
                      savedVirtualHosts);
730
              updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts,
1✔
731
                  updatedInterceptors);
732
            }
733
            logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
1✔
734
                new Object[]{filterChain.name(), updatedRoutingConfig});
1✔
735
            savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
1✔
736
          }
737
        }
1✔
738
      }
1✔
739

740
      // Update the selector to use the most recently updated configs only after all rds have been
741
      // discovered for the first time. Later changes on rds will be applied through virtual host
742
      // list atomic ref.
743
      private void maybeUpdateSelector() {
744
        isPending = false;
1✔
745
        boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
1✔
746
        if (isLastPending) {
1✔
747
          updateSelector();
1✔
748
        }
749
      }
1✔
750
    }
751
  }
752

753
  @VisibleForTesting
754
  final class ConfigApplyingInterceptor implements ServerInterceptor {
1✔
755
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
756
      @Override
757
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
758
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
759
        return next.startCall(call, headers);
×
760
      }
761
    };
762

763
    @Override
764
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
765
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
766
      AtomicReference<ServerRoutingConfig> routingConfigRef =
1✔
767
          call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
1✔
768
      ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
1✔
769
          routingConfigRef.get();
1✔
770
      if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
1✔
771
        String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
1✔
772
        call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
1✔
773
        return new Listener<ReqT>() {};
1✔
774
      }
775
      List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
1✔
776
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
777
          virtualHosts, call.getAuthority());
1✔
778
      if (virtualHost == null) {
1✔
779
        call.close(
1✔
780
            Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
1✔
781
            new Metadata());
782
        return new Listener<ReqT>() {};
1✔
783
      }
784
      Route selectedRoute = null;
1✔
785
      MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
1✔
786
      for (Route route : virtualHost.routes()) {
1✔
787
        if (RoutingUtils.matchRoute(
1✔
788
            route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
1✔
789
          selectedRoute = route;
1✔
790
          break;
1✔
791
        }
792
      }
1✔
793
      if (selectedRoute == null) {
1✔
794
        call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
1✔
795
            new Metadata());
796
        return new ServerCall.Listener<ReqT>() {};
1✔
797
      }
798
      if (selectedRoute.routeAction() != null) {
1✔
799
        call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
1✔
800
            + "route: only Route.non_forwarding_action should be allowed."), new Metadata());
801
        return new ServerCall.Listener<ReqT>() {};
1✔
802
      }
803
      ServerInterceptor routeInterceptor = noopInterceptor;
1✔
804
      Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
1✔
805
      if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
1✔
806
        routeInterceptor = perRouteInterceptors.get(selectedRoute);
1✔
807
      }
808
      return routeInterceptor.interceptCall(call, headers, next);
1✔
809
    }
810
  }
811

812
  /**
813
   * The HttpConnectionManager level configuration.
814
   */
815
  @AutoValue
816
  abstract static class ServerRoutingConfig {
1✔
817
    @VisibleForTesting
818
    static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
1✔
819
        ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
1✔
820

821
    abstract ImmutableList<VirtualHost> virtualHosts();
822

823
    // Prebuilt per route server interceptors from http filter configs.
824
    abstract ImmutableMap<Route, ServerInterceptor> interceptors();
825

826
    /**
827
     * Server routing configuration.
828
     * */
829
    public static ServerRoutingConfig create(
830
        ImmutableList<VirtualHost> virtualHosts,
831
        ImmutableMap<Route, ServerInterceptor> interceptors) {
832
      checkNotNull(virtualHosts, "virtualHosts");
1✔
833
      checkNotNull(interceptors, "interceptors");
1✔
834
      return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
1✔
835
    }
836
  }
837
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc