• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19762

03 Apr 2025 05:52AM UTC coverage: 88.598% (-0.005%) from 88.603%
#19762

push

github

web-flow
xds: listener type validation (#11933)

34732 of 39202 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.06
/../xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.auto.value.AutoValue;
24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import com.google.common.net.HostAndPort;
28
import com.google.common.net.InetAddresses;
29
import com.google.common.util.concurrent.SettableFuture;
30
import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol;
31
import io.grpc.Attributes;
32
import io.grpc.InternalServerInterceptors;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.MetricRecorder;
36
import io.grpc.Server;
37
import io.grpc.ServerBuilder;
38
import io.grpc.ServerCall;
39
import io.grpc.ServerCall.Listener;
40
import io.grpc.ServerCallHandler;
41
import io.grpc.ServerInterceptor;
42
import io.grpc.ServerServiceDefinition;
43
import io.grpc.Status;
44
import io.grpc.StatusException;
45
import io.grpc.SynchronizationContext;
46
import io.grpc.SynchronizationContext.ScheduledHandle;
47
import io.grpc.internal.GrpcUtil;
48
import io.grpc.internal.ObjectPool;
49
import io.grpc.internal.SharedResourceHolder;
50
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
51
import io.grpc.xds.Filter.FilterConfig;
52
import io.grpc.xds.Filter.NamedFilterConfig;
53
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
54
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
55
import io.grpc.xds.VirtualHost.Route;
56
import io.grpc.xds.XdsListenerResource.LdsUpdate;
57
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
58
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
59
import io.grpc.xds.client.XdsClient;
60
import io.grpc.xds.client.XdsClient.ResourceWatcher;
61
import io.grpc.xds.internal.security.SslContextProviderSupplier;
62
import java.io.IOException;
63
import java.net.InetAddress;
64
import java.net.SocketAddress;
65
import java.util.ArrayList;
66
import java.util.HashMap;
67
import java.util.HashSet;
68
import java.util.List;
69
import java.util.Map;
70
import java.util.Set;
71
import java.util.concurrent.CountDownLatch;
72
import java.util.concurrent.ExecutionException;
73
import java.util.concurrent.ScheduledExecutorService;
74
import java.util.concurrent.TimeUnit;
75
import java.util.concurrent.atomic.AtomicBoolean;
76
import java.util.concurrent.atomic.AtomicReference;
77
import java.util.logging.Level;
78
import java.util.logging.Logger;
79
import javax.annotation.Nullable;
80

81
final class XdsServerWrapper extends Server {
82
  private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
1✔
83

84
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
85
      new Thread.UncaughtExceptionHandler() {
1✔
86
        @Override
87
        public void uncaughtException(Thread t, Throwable e) {
88
          logger.log(Level.SEVERE, "Exception!" + e);
×
89
          // TODO(chengyuanzhang): implement cleanup.
90
        }
×
91
      });
92

93
  public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
94
      ATTR_SERVER_ROUTING_CONFIG =
1✔
95
      Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
1✔
96

97
  @VisibleForTesting
98
  static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
1✔
99
  private final String listenerAddress;
100
  private final ServerBuilder<?> delegateBuilder;
101
  private boolean sharedTimeService;
102
  private final ScheduledExecutorService timeService;
103
  private final FilterRegistry filterRegistry;
104
  private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
1✔
105
  private final XdsClientPoolFactory xdsClientPoolFactory;
106
  private final XdsServingStatusListener listener;
107
  private final FilterChainSelectorManager filterChainSelectorManager;
108
  private final AtomicBoolean started = new AtomicBoolean(false);
1✔
109
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
110
  private boolean isServing;
111
  private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
1✔
112
  private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
1✔
113
  private boolean initialStarted;
114
  private ScheduledHandle restartTimer;
115
  private ObjectPool<XdsClient> xdsClientPool;
116
  private XdsClient xdsClient;
117
  private DiscoveryState discoveryState;
118
  private volatile Server delegate;
119

120
  // Must be accessed in syncContext.
121
  // Filter instances are unique per Server, per FilterChain, and per filter's name+typeUrl.
122
  // FilterChain.name -> <NamedFilterConfig.filterStateKey -> filter_instance>.
123
  private final HashMap<String, HashMap<String, Filter>> activeFilters = new HashMap<>();
1✔
124
  // Default filter chain Filter instances are unique per Server, and per filter's name+typeUrl.
125
  // NamedFilterConfig.filterStateKey -> filter_instance.
126
  private final HashMap<String, Filter> activeFiltersDefaultChain = new HashMap<>();
1✔
127

128
  XdsServerWrapper(
129
      String listenerAddress,
130
      ServerBuilder<?> delegateBuilder,
131
      XdsServingStatusListener listener,
132
      FilterChainSelectorManager filterChainSelectorManager,
133
      XdsClientPoolFactory xdsClientPoolFactory,
134
      FilterRegistry filterRegistry) {
135
    this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager,
1✔
136
        xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
1✔
137
    sharedTimeService = true;
1✔
138
  }
1✔
139

140
  @VisibleForTesting
141
  XdsServerWrapper(
142
          String listenerAddress,
143
          ServerBuilder<?> delegateBuilder,
144
          XdsServingStatusListener listener,
145
          FilterChainSelectorManager filterChainSelectorManager,
146
          XdsClientPoolFactory xdsClientPoolFactory,
147
          FilterRegistry filterRegistry,
148
          ScheduledExecutorService timeService) {
1✔
149
    this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
1✔
150
    this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
1✔
151
    this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
1✔
152
    this.listener = checkNotNull(listener, "listener");
1✔
153
    this.filterChainSelectorManager
1✔
154
        = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
1✔
155
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
156
    this.timeService = checkNotNull(timeService, "timeService");
1✔
157
    this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
1✔
158
    this.delegate = delegateBuilder.build();
1✔
159
  }
1✔
160

161
  @Override
162
  public Server start() throws IOException {
163
    checkState(started.compareAndSet(false, true), "Already started");
1✔
164
    syncContext.execute(new Runnable() {
1✔
165
      @Override
166
      public void run() {
167
        internalStart();
1✔
168
      }
1✔
169
    });
170
    Exception exception;
171
    try {
172
      exception = initialStartFuture.get();
1✔
173
    } catch (InterruptedException | ExecutionException e) {
×
174
      throw new RuntimeException(e);
×
175
    }
1✔
176
    if (exception != null) {
1✔
177
      throw (exception instanceof IOException) ? (IOException) exception :
1✔
178
              new IOException(exception);
1✔
179
    }
180
    return this;
1✔
181
  }
182

183
  private void internalStart() {
184
    try {
185
      xdsClientPool = xdsClientPoolFactory.getOrCreate("#server", new MetricRecorder() {});
1✔
186
    } catch (Exception e) {
×
187
      StatusException statusException = Status.UNAVAILABLE.withDescription(
×
188
              "Failed to initialize xDS").withCause(e).asException();
×
189
      listener.onNotServing(statusException);
×
190
      initialStartFuture.set(statusException);
×
191
      return;
×
192
    }
1✔
193
    xdsClient = xdsClientPool.getObject();
1✔
194
    String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
1✔
195
    if (listenerTemplate == null) {
1✔
196
      StatusException statusException =
1✔
197
          Status.UNAVAILABLE.withDescription(
1✔
198
              "Can only support xDS v3 with listener resource name template").asException();
1✔
199
      listener.onNotServing(statusException);
1✔
200
      initialStartFuture.set(statusException);
1✔
201
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
202
      return;
1✔
203
    }
204
    String replacement = listenerAddress;
1✔
205
    if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
1✔
206
      replacement = XdsClient.percentEncodePath(replacement);
1✔
207
    }
208
    discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
1✔
209
  }
1✔
210

211
  @Override
212
  public Server shutdown() {
213
    if (!shutdown.compareAndSet(false, true)) {
1✔
214
      return this;
1✔
215
    }
216
    syncContext.execute(new Runnable() {
1✔
217
      @Override
218
      public void run() {
219
        if (!delegate.isShutdown()) {
1✔
220
          delegate.shutdown();
1✔
221
        }
222
        internalShutdown();
1✔
223
      }
1✔
224
    });
225
    return this;
1✔
226
  }
227

228
  @Override
229
  public Server shutdownNow() {
230
    if (!shutdown.compareAndSet(false, true)) {
1✔
231
      return this;
1✔
232
    }
233
    syncContext.execute(new Runnable() {
1✔
234
      @Override
235
      public void run() {
236
        if (!delegate.isShutdown()) {
1✔
237
          delegate.shutdownNow();
1✔
238
        }
239
        internalShutdown();
1✔
240
        initialStartFuture.set(new IOException("server is forcefully shut down"));
1✔
241
      }
1✔
242
    });
243
    return this;
1✔
244
  }
245

246
  // Must run in SynchronizationContext
247
  private void internalShutdown() {
248
    logger.log(Level.FINER, "Shutting down XdsServerWrapper");
1✔
249
    if (discoveryState != null) {
1✔
250
      discoveryState.shutdown();
1✔
251
    }
252
    if (xdsClient != null) {
1✔
253
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
254
    }
255
    if (restartTimer != null) {
1✔
256
      restartTimer.cancel();
1✔
257
    }
258
    if (sharedTimeService) {
1✔
259
      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
1✔
260
    }
261
    isServing = false;
1✔
262
    internalTerminationLatch.countDown();
1✔
263
  }
1✔
264

265
  @Override
266
  public boolean isShutdown() {
267
    return shutdown.get();
1✔
268
  }
269

270
  @Override
271
  public boolean isTerminated() {
272
    return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
1✔
273
  }
274

275
  @Override
276
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
277
    long startTime = System.nanoTime();
1✔
278
    if (!internalTerminationLatch.await(timeout, unit)) {
1✔
279
      return false;
×
280
    }
281
    long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
1✔
282
    return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
1✔
283
  }
284

285
  @Override
286
  public void awaitTermination() throws InterruptedException {
287
    internalTerminationLatch.await();
1✔
288
    delegate.awaitTermination();
1✔
289
  }
1✔
290

291
  @Override
292
  public int getPort() {
293
    return delegate.getPort();
1✔
294
  }
295

296
  @Override
297
  public List<? extends SocketAddress> getListenSockets() {
298
    return delegate.getListenSockets();
1✔
299
  }
300

301
  @Override
302
  public List<ServerServiceDefinition> getServices() {
303
    return delegate.getServices();
×
304
  }
305

306
  @Override
307
  public List<ServerServiceDefinition> getImmutableServices() {
308
    return delegate.getImmutableServices();
×
309
  }
310

311
  @Override
312
  public List<ServerServiceDefinition> getMutableServices() {
313
    return delegate.getMutableServices();
×
314
  }
315

316
  // Must run in SynchronizationContext
317
  private void startDelegateServer() {
318
    if (restartTimer != null && restartTimer.isPending()) {
1✔
319
      return;
×
320
    }
321
    if (isServing) {
1✔
322
      return;
1✔
323
    }
324
    if (delegate.isShutdown()) {
1✔
325
      delegate = delegateBuilder.build();
1✔
326
    }
327
    try {
328
      delegate.start();
1✔
329
      listener.onServing();
1✔
330
      isServing = true;
1✔
331
      if (!initialStarted) {
1✔
332
        initialStarted = true;
1✔
333
        initialStartFuture.set(null);
1✔
334
      }
335
      logger.log(Level.FINER, "Delegate server started.");
1✔
336
    } catch (IOException e) {
1✔
337
      logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
1✔
338
      if (!initialStarted) {
1✔
339
        initialStarted = true;
1✔
340
        initialStartFuture.set(e);
1✔
341
      } else {
342
        listener.onNotServing(e);
1✔
343
      }
344
      restartTimer = syncContext.schedule(
1✔
345
        new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
346
    }
1✔
347
  }
1✔
348

349
  private final class RestartTask implements Runnable {
1✔
350
    @Override
351
    public void run() {
352
      startDelegateServer();
1✔
353
    }
1✔
354
  }
355

356
  private final class DiscoveryState implements ResourceWatcher<LdsUpdate> {
357
    private final String resourceName;
358
    // RDS resource name is the key.
359
    private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
1✔
360
    // Track pending RDS resources using rds name.
361
    private final Set<String> pendingRds = new HashSet<>();
1✔
362
    // Most recently discovered filter chains.
363
    private List<FilterChain> filterChains = new ArrayList<>();
1✔
364
    // Most recently discovered default filter chain.
365
    @Nullable
366
    private FilterChain defaultFilterChain;
367
    private boolean stopped;
368
    private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef 
1✔
369
        = new HashMap<>();
370
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
371
      @Override
372
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
373
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
374
        return next.startCall(call, headers);
1✔
375
      }
376
    };
377

378
    private DiscoveryState(String resourceName) {
1✔
379
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
380
      xdsClient.watchXdsResource(
1✔
381
          XdsListenerResource.getInstance(), resourceName, this, syncContext);
1✔
382
    }
1✔
383

384
    @Override
385
    public void onChanged(final LdsUpdate update) {
386
      if (stopped) {
1✔
387
        return;
×
388
      }
389
      logger.log(Level.FINEST, "Received Lds update {0}", update);
1✔
390
      if (update.listener() == null) {
1✔
391
        onResourceDoesNotExist("Non-API");
1✔
392
        return;
1✔
393
      }
394

395
      String ldsAddress = update.listener().address();
1✔
396
      if (ldsAddress == null || update.listener().protocol() != Protocol.TCP
1✔
397
          || !ipAddressesMatch(ldsAddress)) {
1✔
398
        handleConfigNotFoundOrMismatch(
1✔
399
            Status.UNKNOWN.withDescription(
1✔
400
                String.format(
1✔
401
                    "Listener address mismatch: expected %s, but got %s.",
402
                    listenerAddress, ldsAddress)).asException());
1✔
403
        return;
1✔
404
      }
405
      if (!pendingRds.isEmpty()) {
1✔
406
        // filter chain state has not yet been applied to filterChainSelectorManager and there
407
        // are two sets of sslContextProviderSuppliers, so we release the old ones.
408
        releaseSuppliersInFlight();
1✔
409
        pendingRds.clear();
1✔
410
      }
411

412
      filterChains = update.listener().filterChains();
1✔
413
      defaultFilterChain = update.listener().defaultFilterChain();
1✔
414
      // Filters are loaded even if the server isn't serving yet.
415
      updateActiveFilters();
1✔
416

417
      List<FilterChain> allFilterChains = filterChains;
1✔
418
      if (defaultFilterChain != null) {
1✔
419
        allFilterChains = new ArrayList<>(filterChains);
1✔
420
        allFilterChains.add(defaultFilterChain);
1✔
421
      }
422

423
      Set<String> allRds = new HashSet<>();
1✔
424
      for (FilterChain filterChain : allFilterChains) {
1✔
425
        HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
426
        if (hcm.virtualHosts() == null) {
1✔
427
          RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
1✔
428
          if (rdsState == null) {
1✔
429
            rdsState = new RouteDiscoveryState(hcm.rdsName());
1✔
430
            routeDiscoveryStates.put(hcm.rdsName(), rdsState);
1✔
431
            xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
432
                hcm.rdsName(), rdsState, syncContext);
1✔
433
          }
434
          if (rdsState.isPending) {
1✔
435
            pendingRds.add(hcm.rdsName());
1✔
436
          }
437
          allRds.add(hcm.rdsName());
1✔
438
        }
439
      }
1✔
440

441
      for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
1✔
442
        if (!allRds.contains(entry.getKey())) {
1✔
443
          xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
1✔
444
              entry.getKey(), entry.getValue());
1✔
445
        }
446
      }
1✔
447
      routeDiscoveryStates.keySet().retainAll(allRds);
1✔
448
      if (pendingRds.isEmpty()) {
1✔
449
        updateSelector();
1✔
450
      }
451
    }
1✔
452

453
    private boolean ipAddressesMatch(String ldsAddress) {
454
      HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
1✔
455
      HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
1✔
456
      if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
1✔
457
          || ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
1✔
458
        return false;
1✔
459
      }
460
      InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
1✔
461
      InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
1✔
462
      return listenerIp.equals(ldsIp);
1✔
463
    }
464

465
    @Override
466
    public void onResourceDoesNotExist(final String resourceName) {
467
      if (stopped) {
1✔
468
        return;
×
469
      }
470
      StatusException statusException = Status.UNAVAILABLE.withDescription(
1✔
471
          String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
1✔
472
              xdsClient.getBootstrapInfo().node().getId())).asException();
1✔
473
      handleConfigNotFoundOrMismatch(statusException);
1✔
474
    }
1✔
475

476
    @Override
477
    public void onError(final Status error) {
478
      if (stopped) {
1✔
479
        return;
×
480
      }
481
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
482
      Status errorWithNodeId = error.withDescription(
1✔
483
          description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
484
      logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
1✔
485
      if (!isServing) {
1✔
486
        listener.onNotServing(errorWithNodeId.asException());
1✔
487
      }
488
    }
1✔
489

490
    private void shutdown() {
491
      stopped = true;
1✔
492
      cleanUpRouteDiscoveryStates();
1✔
493
      logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
1✔
494
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this);
1✔
495
      shutdownActiveFilters();
1✔
496
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
497
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
498
      for (SslContextProviderSupplier s: toRelease) {
1✔
499
        s.close();
1✔
500
      }
1✔
501
      releaseSuppliersInFlight();
1✔
502
    }
1✔
503

504
    private void updateSelector() {
505
      // This is regenerated in generateRoutingConfig() calls below.
506
      savedRdsRoutingConfigRef.clear();
1✔
507

508
      // Prepare server routing config map.
509
      ImmutableMap.Builder<FilterChain, AtomicReference<ServerRoutingConfig>> routingConfigs =
510
          ImmutableMap.builder();
1✔
511
      for (FilterChain filterChain: filterChains) {
1✔
512
        HashMap<String, Filter> chainFilters = activeFilters.get(filterChain.name());
1✔
513
        routingConfigs.put(filterChain, generateRoutingConfig(filterChain, chainFilters));
1✔
514
      }
1✔
515

516
      // Prepare the new selector.
517
      FilterChainSelector selector;
518
      if (defaultFilterChain != null) {
1✔
519
        selector = new FilterChainSelector(
1✔
520
            routingConfigs.build(),
1✔
521
            defaultFilterChain.sslContextProviderSupplier(),
1✔
522
            generateRoutingConfig(defaultFilterChain, activeFiltersDefaultChain));
1✔
523
      } else {
524
        selector = new FilterChainSelector(routingConfigs.build());
1✔
525
      }
526

527
      // Prepare the list of current selector's resources to close later.
528
      List<SslContextProviderSupplier> oldSslSuppliers = getSuppliersInUse();
1✔
529

530
      // Swap the selectors, initiate a graceful shutdown of the old one.
531
      logger.log(Level.FINEST, "Updating selector {0}", selector);
1✔
532
      filterChainSelectorManager.updateSelector(selector);
1✔
533

534
      // Release old resources.
535
      for (SslContextProviderSupplier supplier: oldSslSuppliers) {
1✔
536
        supplier.close();
1✔
537
      }
1✔
538

539
      // Now that we have valid Transport Socket config, we can start/restart listening on a port.
540
      startDelegateServer();
1✔
541
    }
1✔
542

543
    // called in syncContext
544
    private void updateActiveFilters() {
545
      Set<String> removedChains = new HashSet<>(activeFilters.keySet());
1✔
546
      for (FilterChain filterChain: filterChains) {
1✔
547
        removedChains.remove(filterChain.name());
1✔
548
        updateActiveFiltersForChain(
1✔
549
            activeFilters.computeIfAbsent(filterChain.name(), k -> new HashMap<>()),
1✔
550
            filterChain.httpConnectionManager().httpFilterConfigs());
1✔
551
      }
1✔
552

553
      // Shutdown all filters of chains missing from the LDS.
554
      for (String chainToShutdown : removedChains) {
1✔
555
        HashMap<String, Filter> filtersToShutdown = activeFilters.get(chainToShutdown);
1✔
556
        checkNotNull(filtersToShutdown, "filtersToShutdown of chain %s", chainToShutdown);
1✔
557
        updateActiveFiltersForChain(filtersToShutdown, null);
1✔
558
        activeFilters.remove(chainToShutdown);
1✔
559
      }
1✔
560

561
      // Default chain.
562
      ImmutableList<NamedFilterConfig> defaultChainConfigs = null;
1✔
563
      if (defaultFilterChain != null) {
1✔
564
        defaultChainConfigs = defaultFilterChain.httpConnectionManager().httpFilterConfigs();
1✔
565
      }
566
      updateActiveFiltersForChain(activeFiltersDefaultChain, defaultChainConfigs);
1✔
567
    }
1✔
568

569
    // called in syncContext
570
    private void shutdownActiveFilters() {
571
      for (HashMap<String, Filter> chainFilters : activeFilters.values()) {
1✔
572
        checkNotNull(chainFilters, "chainFilters");
1✔
573
        updateActiveFiltersForChain(chainFilters, null);
1✔
574
      }
1✔
575
      activeFilters.clear();
1✔
576
      updateActiveFiltersForChain(activeFiltersDefaultChain, null);
1✔
577
    }
1✔
578

579
    // called in syncContext
580
    private void updateActiveFiltersForChain(
581
        Map<String, Filter> chainFilters, @Nullable List<NamedFilterConfig> filterConfigs) {
582
      if (filterConfigs == null) {
1✔
583
        filterConfigs = ImmutableList.of();
1✔
584
      }
585

586
      Set<String> filtersToShutdown = new HashSet<>(chainFilters.keySet());
1✔
587
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
588
        String typeUrl = namedFilter.filterConfig.typeUrl();
1✔
589
        String filterKey = namedFilter.filterStateKey();
1✔
590

591
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
592
        checkNotNull(provider, "provider %s", typeUrl);
1✔
593
        Filter filter = chainFilters.computeIfAbsent(
1✔
594
            filterKey, k -> provider.newInstance(namedFilter.name));
1✔
595
        checkNotNull(filter, "filter %s", filterKey);
1✔
596
        filtersToShutdown.remove(filterKey);
1✔
597
      }
1✔
598

599
      // Shutdown filters not present in current HCM.
600
      for (String filterKey : filtersToShutdown) {
1✔
601
        Filter filterToShutdown = chainFilters.remove(filterKey);
1✔
602
        checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
1✔
603
        filterToShutdown.close();
1✔
604
      }
1✔
605
    }
1✔
606

607
    private AtomicReference<ServerRoutingConfig> generateRoutingConfig(
608
        FilterChain filterChain, Map<String, Filter> chainFilters) {
609
      HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
610
      ServerRoutingConfig routingConfig;
611

612
      // Inlined routes.
613
      ImmutableList<VirtualHost> vhosts = hcm.virtualHosts();
1✔
614
      if (vhosts != null) {
1✔
615
        routingConfig = ServerRoutingConfig.create(vhosts,
1✔
616
            generatePerRouteInterceptors(hcm.httpFilterConfigs(), vhosts, chainFilters));
1✔
617
        return new AtomicReference<>(routingConfig);
1✔
618
      }
619

620
      // Routes from RDS.
621
      RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
1✔
622
      checkNotNull(rds, "rds");
1✔
623

624
      ImmutableList<VirtualHost> savedVhosts = rds.savedVirtualHosts;
1✔
625
      if (savedVhosts != null) {
1✔
626
        routingConfig = ServerRoutingConfig.create(savedVhosts,
1✔
627
            generatePerRouteInterceptors(hcm.httpFilterConfigs(), savedVhosts, chainFilters));
1✔
628
      } else {
629
        routingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
630
      }
631
      AtomicReference<ServerRoutingConfig> routingConfigRef = new AtomicReference<>(routingConfig);
1✔
632
      savedRdsRoutingConfigRef.put(filterChain, routingConfigRef);
1✔
633
      return routingConfigRef;
1✔
634
    }
635

636
    private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
637
        @Nullable List<NamedFilterConfig> filterConfigs,
638
        List<VirtualHost> virtualHosts,
639
        Map<String, Filter> chainFilters) {
640
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
641

642
      checkNotNull(chainFilters, "chainFilters");
1✔
643
      ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
1✔
644
          new ImmutableMap.Builder<>();
645

646
      for (VirtualHost virtualHost : virtualHosts) {
1✔
647
        for (Route route : virtualHost.routes()) {
1✔
648
          // Short circuit.
649
          if (filterConfigs == null) {
1✔
650
            perRouteInterceptors.put(route, noopInterceptor);
×
651
            continue;
×
652
          }
653

654
          // Override vhost filter configs with more specific per-route configs.
655
          Map<String, FilterConfig> perRouteOverrides = ImmutableMap.<String, FilterConfig>builder()
1✔
656
              .putAll(virtualHost.filterConfigOverrides())
1✔
657
              .putAll(route.filterConfigOverrides())
1✔
658
              .buildKeepingLast();
1✔
659

660
          // Interceptors for this vhost/route combo.
661
          List<ServerInterceptor> interceptors = new ArrayList<>(filterConfigs.size());
1✔
662
          for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
663
            String name = namedFilter.name;
1✔
664
            FilterConfig config = namedFilter.filterConfig;
1✔
665
            FilterConfig overrideConfig = perRouteOverrides.get(name);
1✔
666
            String filterKey = namedFilter.filterStateKey();
1✔
667

668
            Filter filter = chainFilters.get(filterKey);
1✔
669
            checkNotNull(filter, "chainFilters.get(%s)", filterKey);
1✔
670
            ServerInterceptor interceptor = filter.buildServerInterceptor(config, overrideConfig);
1✔
671

672
            if (interceptor != null) {
1✔
673
              interceptors.add(interceptor);
1✔
674
            }
675
          }
1✔
676

677
          // Combine interceptors produced by different filters into a single one that executes
678
          // them sequentially. The order is preserved.
679
          perRouteInterceptors.put(route, combineInterceptors(interceptors));
1✔
680
        }
1✔
681
      }
1✔
682

683
      return perRouteInterceptors.buildOrThrow();
1✔
684
    }
685

686
    private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
687
      if (interceptors.isEmpty()) {
1✔
688
        return noopInterceptor;
1✔
689
      }
690
      if (interceptors.size() == 1) {
1✔
691
        return interceptors.get(0);
×
692
      }
693
      return new ServerInterceptor() {
1✔
694
        @Override
695
        public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
696
            Metadata headers, ServerCallHandler<ReqT, RespT> next) {
697
          // intercept forward
698
          for (int i = interceptors.size() - 1; i >= 0; i--) {
1✔
699
            next = InternalServerInterceptors.interceptCallHandlerCreate(
1✔
700
                interceptors.get(i), next);
1✔
701
          }
702
          return next.startCall(call, headers);
1✔
703
        }
704
      };
705
    }
706

707
    private void handleConfigNotFoundOrMismatch(StatusException exception) {
708
      cleanUpRouteDiscoveryStates();
1✔
709
      shutdownActiveFilters();
1✔
710
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
711
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
712
      for (SslContextProviderSupplier s: toRelease) {
1✔
713
        s.close();
1✔
714
      }
1✔
715
      if (restartTimer != null) {
1✔
716
        restartTimer.cancel();
1✔
717
      }
718
      if (!delegate.isShutdown()) {
1✔
719
        delegate.shutdown();  // let in-progress calls finish
1✔
720
      }
721
      isServing = false;
1✔
722
      listener.onNotServing(exception);
1✔
723
    }
1✔
724

725
    private void cleanUpRouteDiscoveryStates() {
726
      for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
1✔
727
        String rdsName = rdsState.resourceName;
1✔
728
        logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
1✔
729
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
730
            rdsState);
731
      }
1✔
732
      routeDiscoveryStates.clear();
1✔
733
      savedRdsRoutingConfigRef.clear();
1✔
734
    }
1✔
735

736
    private List<SslContextProviderSupplier> getSuppliersInUse() {
737
      List<SslContextProviderSupplier> toRelease = new ArrayList<>();
1✔
738
      FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
1✔
739
      if (selector != null) {
1✔
740
        for (FilterChain f: selector.getRoutingConfigs().keySet()) {
1✔
741
          if (f.sslContextProviderSupplier() != null) {
1✔
742
            toRelease.add(f.sslContextProviderSupplier());
1✔
743
          }
744
        }
1✔
745
        SslContextProviderSupplier defaultSupplier =
1✔
746
                selector.getDefaultSslContextProviderSupplier();
1✔
747
        if (defaultSupplier != null) {
1✔
748
          toRelease.add(defaultSupplier);
1✔
749
        }
750
      }
751
      return toRelease;
1✔
752
    }
753

754
    private void releaseSuppliersInFlight() {
755
      SslContextProviderSupplier supplier;
756
      for (FilterChain filterChain : filterChains) {
1✔
757
        supplier = filterChain.sslContextProviderSupplier();
1✔
758
        if (supplier != null) {
1✔
759
          supplier.close();
1✔
760
        }
761
      }
1✔
762
      if (defaultFilterChain != null
1✔
763
              && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
1✔
764
        supplier.close();
1✔
765
      }
766
    }
1✔
767

768
    private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
769
      private final String resourceName;
770
      private ImmutableList<VirtualHost> savedVirtualHosts;
771
      private boolean isPending = true;
1✔
772

773
      private RouteDiscoveryState(String resourceName) {
1✔
774
        this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
775
      }
1✔
776

777
      @Override
778
      public void onChanged(final RdsUpdate update) {
779
        syncContext.execute(new Runnable() {
1✔
780
          @Override
781
          public void run() {
782
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
783
              return;
1✔
784
            }
785
            if (savedVirtualHosts == null && !isPending) {
1✔
786
              logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
1✔
787
            }
788
            savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
1✔
789
            updateRdsRoutingConfig();
1✔
790
            maybeUpdateSelector();
1✔
791
          }
1✔
792
        });
793
      }
1✔
794

795
      @Override
796
      public void onResourceDoesNotExist(final String resourceName) {
797
        syncContext.execute(new Runnable() {
1✔
798
          @Override
799
          public void run() {
800
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
801
              return;
×
802
            }
803
            logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
1✔
804
            savedVirtualHosts = null;
1✔
805
            updateRdsRoutingConfig();
1✔
806
            maybeUpdateSelector();
1✔
807
          }
1✔
808
        });
809
      }
1✔
810

811
      @Override
812
      public void onError(final Status error) {
813
        syncContext.execute(new Runnable() {
1✔
814
          @Override
815
          public void run() {
816
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
817
              return;
×
818
            }
819
            String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
820
            Status errorWithNodeId = error.withDescription(
1✔
821
                    description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
822
            logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
1✔
823
                    new Object[]{resourceName, errorWithNodeId});
1✔
824
            maybeUpdateSelector();
1✔
825
          }
1✔
826
        });
827
      }
1✔
828

829
      private void updateRdsRoutingConfig() {
830
        for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
1✔
831
          HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
832
          if (!resourceName.equals(hcm.rdsName())) {
1✔
833
            continue;
1✔
834
          }
835

836
          ServerRoutingConfig updatedRoutingConfig;
837
          if (savedVirtualHosts == null) {
1✔
838
            updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
839
          } else {
840
            HashMap<String, Filter> chainFilters = activeFilters.get(filterChain.name());
1✔
841
            ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
1✔
842
                hcm.httpFilterConfigs(), savedVirtualHosts, chainFilters);
1✔
843
            updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts, interceptors);
1✔
844
          }
845

846
          logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
1✔
847
              new Object[]{filterChain.name(), updatedRoutingConfig});
1✔
848
          savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
1✔
849
        }
1✔
850
      }
1✔
851

852
      // Update the selector to use the most recently updated configs only after all rds have been
853
      // discovered for the first time. Later changes on rds will be applied through virtual host
854
      // list atomic ref.
855
      private void maybeUpdateSelector() {
856
        isPending = false;
1✔
857
        boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
1✔
858
        if (isLastPending) {
1✔
859
          updateSelector();
1✔
860
        }
861
      }
1✔
862
    }
863
  }
864

865
  @VisibleForTesting
866
  final class ConfigApplyingInterceptor implements ServerInterceptor {
1✔
867
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
868
      @Override
869
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
870
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
871
        return next.startCall(call, headers);
×
872
      }
873
    };
874

875
    @Override
876
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
877
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
878
      AtomicReference<ServerRoutingConfig> routingConfigRef =
1✔
879
          call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
1✔
880
      ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
1✔
881
          routingConfigRef.get();
1✔
882
      if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
1✔
883
        String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
1✔
884
        call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
1✔
885
        return new Listener<ReqT>() {};
1✔
886
      }
887
      List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
1✔
888
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
889
          virtualHosts, call.getAuthority());
1✔
890
      if (virtualHost == null) {
1✔
891
        call.close(
1✔
892
            Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
1✔
893
            new Metadata());
894
        return new Listener<ReqT>() {};
1✔
895
      }
896
      Route selectedRoute = null;
1✔
897
      MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
1✔
898
      for (Route route : virtualHost.routes()) {
1✔
899
        if (RoutingUtils.matchRoute(
1✔
900
            route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
1✔
901
          selectedRoute = route;
1✔
902
          break;
1✔
903
        }
904
      }
1✔
905
      if (selectedRoute == null) {
1✔
906
        call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
1✔
907
            new Metadata());
908
        return new ServerCall.Listener<ReqT>() {};
1✔
909
      }
910
      if (selectedRoute.routeAction() != null) {
1✔
911
        call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
1✔
912
            + "route: only Route.non_forwarding_action should be allowed."), new Metadata());
913
        return new ServerCall.Listener<ReqT>() {};
1✔
914
      }
915
      ServerInterceptor routeInterceptor = noopInterceptor;
1✔
916
      Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
1✔
917
      if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
1✔
918
        routeInterceptor = perRouteInterceptors.get(selectedRoute);
1✔
919
      }
920
      return routeInterceptor.interceptCall(call, headers, next);
1✔
921
    }
922
  }
923

924
  /**
925
   * The HttpConnectionManager level configuration.
926
   */
927
  @AutoValue
928
  abstract static class ServerRoutingConfig {
1✔
929
    @VisibleForTesting
930
    static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
1✔
931
        ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
1✔
932

933
    abstract ImmutableList<VirtualHost> virtualHosts();
934

935
    // Prebuilt per route server interceptors from http filter configs.
936
    abstract ImmutableMap<Route, ServerInterceptor> interceptors();
937

938
    /**
939
     * Server routing configuration.
940
     * */
941
    public static ServerRoutingConfig create(
942
        ImmutableList<VirtualHost> virtualHosts,
943
        ImmutableMap<Route, ServerInterceptor> interceptors) {
944
      checkNotNull(virtualHosts, "virtualHosts");
1✔
945
      checkNotNull(interceptors, "interceptors");
1✔
946
      return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
1✔
947
    }
948
  }
949
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc