• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20100

30 Nov 2025 07:31AM UTC coverage: 88.622% (+0.05%) from 88.572%
#20100

push

github

web-flow
xds: gRFC A88 - Changes to XdsClient Watcher APIs (#12446)

Implements
https://github.com/grpc/proposal/blob/master/A88-xds-data-error-handling.md#a88-xds-data-error-handling

35150 of 39663 relevant lines covered (88.62%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.07
/../xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.auto.value.AutoValue;
24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import com.google.common.net.HostAndPort;
28
import com.google.common.net.InetAddresses;
29
import com.google.common.util.concurrent.SettableFuture;
30
import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol;
31
import io.grpc.Attributes;
32
import io.grpc.InternalServerInterceptors;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.MetricRecorder;
36
import io.grpc.Server;
37
import io.grpc.ServerBuilder;
38
import io.grpc.ServerCall;
39
import io.grpc.ServerCall.Listener;
40
import io.grpc.ServerCallHandler;
41
import io.grpc.ServerInterceptor;
42
import io.grpc.ServerServiceDefinition;
43
import io.grpc.Status;
44
import io.grpc.StatusException;
45
import io.grpc.StatusOr;
46
import io.grpc.SynchronizationContext;
47
import io.grpc.SynchronizationContext.ScheduledHandle;
48
import io.grpc.internal.GrpcUtil;
49
import io.grpc.internal.ObjectPool;
50
import io.grpc.internal.SharedResourceHolder;
51
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
52
import io.grpc.xds.Filter.FilterConfig;
53
import io.grpc.xds.Filter.NamedFilterConfig;
54
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
55
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
56
import io.grpc.xds.VirtualHost.Route;
57
import io.grpc.xds.XdsListenerResource.LdsUpdate;
58
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
59
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
60
import io.grpc.xds.client.Bootstrapper.BootstrapInfo;
61
import io.grpc.xds.client.XdsClient;
62
import io.grpc.xds.client.XdsClient.ResourceWatcher;
63
import io.grpc.xds.internal.security.SslContextProviderSupplier;
64
import java.io.IOException;
65
import java.net.InetAddress;
66
import java.net.SocketAddress;
67
import java.util.ArrayList;
68
import java.util.HashMap;
69
import java.util.HashSet;
70
import java.util.List;
71
import java.util.Map;
72
import java.util.Set;
73
import java.util.concurrent.CountDownLatch;
74
import java.util.concurrent.ExecutionException;
75
import java.util.concurrent.ScheduledExecutorService;
76
import java.util.concurrent.TimeUnit;
77
import java.util.concurrent.atomic.AtomicBoolean;
78
import java.util.concurrent.atomic.AtomicReference;
79
import java.util.logging.Level;
80
import java.util.logging.Logger;
81
import javax.annotation.Nullable;
82

83
final class XdsServerWrapper extends Server {
84
  private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
1✔
85

86
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
87
      new Thread.UncaughtExceptionHandler() {
1✔
88
        @Override
89
        public void uncaughtException(Thread t, Throwable e) {
90
          logger.log(Level.SEVERE, "Exception!" + e);
×
91
          // TODO(chengyuanzhang): implement cleanup.
92
        }
×
93
      });
94

95
  public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
96
      ATTR_SERVER_ROUTING_CONFIG =
1✔
97
      Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
1✔
98

99
  @VisibleForTesting
100
  static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
1✔
101
  private final String listenerAddress;
102
  private final ServerBuilder<?> delegateBuilder;
103
  private boolean sharedTimeService;
104
  private final ScheduledExecutorService timeService;
105
  private final FilterRegistry filterRegistry;
106
  private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
1✔
107
  private final XdsClientPoolFactory xdsClientPoolFactory;
108
  private final @Nullable Map<String, ?> bootstrapOverride;
109
  private final XdsServingStatusListener listener;
110
  private final FilterChainSelectorManager filterChainSelectorManager;
111
  private final AtomicBoolean started = new AtomicBoolean(false);
1✔
112
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
113
  private boolean isServing;
114
  private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
1✔
115
  private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
1✔
116
  private boolean initialStarted;
117
  private ScheduledHandle restartTimer;
118
  private ObjectPool<XdsClient> xdsClientPool;
119
  private XdsClient xdsClient;
120
  private DiscoveryState discoveryState;
121
  private volatile Server delegate;
122

123
  // Must be accessed in syncContext.
124
  // Filter instances are unique per Server, per FilterChain, and per filter's name+typeUrl.
125
  // FilterChain.name -> <NamedFilterConfig.filterStateKey -> filter_instance>.
126
  private final HashMap<String, HashMap<String, Filter>> activeFilters = new HashMap<>();
1✔
127
  // Default filter chain Filter instances are unique per Server, and per filter's name+typeUrl.
128
  // NamedFilterConfig.filterStateKey -> filter_instance.
129
  private final HashMap<String, Filter> activeFiltersDefaultChain = new HashMap<>();
1✔
130

131
  XdsServerWrapper(
132
      String listenerAddress,
133
      ServerBuilder<?> delegateBuilder,
134
      XdsServingStatusListener listener,
135
      FilterChainSelectorManager filterChainSelectorManager,
136
      XdsClientPoolFactory xdsClientPoolFactory,
137
      @Nullable Map<String, ?> bootstrapOverride,
138
      FilterRegistry filterRegistry) {
139
    this(
1✔
140
        listenerAddress,
141
        delegateBuilder,
142
        listener,
143
        filterChainSelectorManager,
144
        xdsClientPoolFactory,
145
        bootstrapOverride,
146
        filterRegistry,
147
        SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
1✔
148
    sharedTimeService = true;
1✔
149
  }
1✔
150

151
  @VisibleForTesting
152
  XdsServerWrapper(
153
          String listenerAddress,
154
          ServerBuilder<?> delegateBuilder,
155
          XdsServingStatusListener listener,
156
          FilterChainSelectorManager filterChainSelectorManager,
157
          XdsClientPoolFactory xdsClientPoolFactory,
158
          @Nullable Map<String, ?> bootstrapOverride,
159
          FilterRegistry filterRegistry,
160
          ScheduledExecutorService timeService) {
1✔
161
    this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
1✔
162
    this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
1✔
163
    this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
1✔
164
    this.listener = checkNotNull(listener, "listener");
1✔
165
    this.filterChainSelectorManager
1✔
166
        = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
1✔
167
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
168
    this.bootstrapOverride = bootstrapOverride;
1✔
169
    this.timeService = checkNotNull(timeService, "timeService");
1✔
170
    this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
1✔
171
    this.delegate = delegateBuilder.build();
1✔
172
  }
1✔
173

174
  @Override
175
  public Server start() throws IOException {
176
    checkState(started.compareAndSet(false, true), "Already started");
1✔
177
    syncContext.execute(new Runnable() {
1✔
178
      @Override
179
      public void run() {
180
        internalStart();
1✔
181
      }
1✔
182
    });
183
    Exception exception;
184
    try {
185
      exception = initialStartFuture.get();
1✔
186
    } catch (InterruptedException | ExecutionException e) {
×
187
      throw new RuntimeException(e);
×
188
    }
1✔
189
    if (exception != null) {
1✔
190
      throw (exception instanceof IOException) ? (IOException) exception :
1✔
191
              new IOException(exception);
1✔
192
    }
193
    return this;
1✔
194
  }
195

196
  private void internalStart() {
197
    try {
198
      BootstrapInfo bootstrapInfo;
199
      if (bootstrapOverride == null) {
1✔
200
        bootstrapInfo = GrpcBootstrapperImpl.defaultBootstrap();
×
201
      } else {
202
        bootstrapInfo = new GrpcBootstrapperImpl().bootstrap(bootstrapOverride);
1✔
203
      }
204
      xdsClientPool = xdsClientPoolFactory.getOrCreate(
1✔
205
          "#server", bootstrapInfo, new MetricRecorder() {});
1✔
206
    } catch (Exception e) {
×
207
      StatusException statusException = Status.UNAVAILABLE.withDescription(
×
208
              "Failed to initialize xDS").withCause(e).asException();
×
209
      listener.onNotServing(statusException);
×
210
      initialStartFuture.set(statusException);
×
211
      return;
×
212
    }
1✔
213
    xdsClient = xdsClientPool.getObject();
1✔
214
    String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
1✔
215
    if (listenerTemplate == null) {
1✔
216
      StatusException statusException =
1✔
217
          Status.UNAVAILABLE.withDescription(
1✔
218
              "Can only support xDS v3 with listener resource name template").asException();
1✔
219
      listener.onNotServing(statusException);
1✔
220
      initialStartFuture.set(statusException);
1✔
221
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
222
      return;
1✔
223
    }
224
    String replacement = listenerAddress;
1✔
225
    if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
1✔
226
      replacement = XdsClient.percentEncodePath(replacement);
1✔
227
    }
228
    discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
1✔
229
  }
1✔
230

231
  @Override
232
  public Server shutdown() {
233
    if (!shutdown.compareAndSet(false, true)) {
1✔
234
      return this;
1✔
235
    }
236
    syncContext.execute(new Runnable() {
1✔
237
      @Override
238
      public void run() {
239
        if (!delegate.isShutdown()) {
1✔
240
          delegate.shutdown();
1✔
241
        }
242
        internalShutdown();
1✔
243
      }
1✔
244
    });
245
    return this;
1✔
246
  }
247

248
  @Override
249
  public Server shutdownNow() {
250
    if (!shutdown.compareAndSet(false, true)) {
1✔
251
      return this;
1✔
252
    }
253
    syncContext.execute(new Runnable() {
1✔
254
      @Override
255
      public void run() {
256
        if (!delegate.isShutdown()) {
1✔
257
          delegate.shutdownNow();
1✔
258
        }
259
        internalShutdown();
1✔
260
        initialStartFuture.set(new IOException("server is forcefully shut down"));
1✔
261
      }
1✔
262
    });
263
    return this;
1✔
264
  }
265

266
  // Must run in SynchronizationContext
267
  private void internalShutdown() {
268
    logger.log(Level.FINER, "Shutting down XdsServerWrapper");
1✔
269
    if (discoveryState != null) {
1✔
270
      discoveryState.shutdown();
1✔
271
    }
272
    if (xdsClient != null) {
1✔
273
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
274
    }
275
    if (restartTimer != null) {
1✔
276
      restartTimer.cancel();
1✔
277
    }
278
    if (sharedTimeService) {
1✔
279
      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
1✔
280
    }
281
    isServing = false;
1✔
282
    internalTerminationLatch.countDown();
1✔
283
  }
1✔
284

285
  @Override
286
  public boolean isShutdown() {
287
    return shutdown.get();
1✔
288
  }
289

290
  @Override
291
  public boolean isTerminated() {
292
    return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
1✔
293
  }
294

295
  @Override
296
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
297
    long startTime = System.nanoTime();
1✔
298
    if (!internalTerminationLatch.await(timeout, unit)) {
1✔
299
      return false;
×
300
    }
301
    long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
1✔
302
    return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
1✔
303
  }
304

305
  @Override
306
  public void awaitTermination() throws InterruptedException {
307
    internalTerminationLatch.await();
1✔
308
    delegate.awaitTermination();
1✔
309
  }
1✔
310

311
  @Override
312
  public int getPort() {
313
    return delegate.getPort();
1✔
314
  }
315

316
  @Override
317
  public List<? extends SocketAddress> getListenSockets() {
318
    return delegate.getListenSockets();
1✔
319
  }
320

321
  @Override
322
  public List<ServerServiceDefinition> getServices() {
323
    return delegate.getServices();
×
324
  }
325

326
  @Override
327
  public List<ServerServiceDefinition> getImmutableServices() {
328
    return delegate.getImmutableServices();
×
329
  }
330

331
  @Override
332
  public List<ServerServiceDefinition> getMutableServices() {
333
    return delegate.getMutableServices();
×
334
  }
335

336
  // Must run in SynchronizationContext
337
  private void startDelegateServer() {
338
    if (restartTimer != null && restartTimer.isPending()) {
1✔
339
      return;
×
340
    }
341
    if (isServing) {
1✔
342
      return;
1✔
343
    }
344
    if (delegate.isShutdown()) {
1✔
345
      delegate = delegateBuilder.build();
1✔
346
    }
347
    try {
348
      delegate.start();
1✔
349
      listener.onServing();
1✔
350
      isServing = true;
1✔
351
      if (!initialStarted) {
1✔
352
        initialStarted = true;
1✔
353
        initialStartFuture.set(null);
1✔
354
      }
355
      logger.log(Level.FINER, "Delegate server started.");
1✔
356
    } catch (IOException e) {
1✔
357
      logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
1✔
358
      if (!initialStarted) {
1✔
359
        initialStarted = true;
1✔
360
        initialStartFuture.set(e);
1✔
361
      } else {
362
        listener.onNotServing(e);
1✔
363
      }
364
      restartTimer = syncContext.schedule(
1✔
365
        new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
366
    }
1✔
367
  }
1✔
368

369
  private final class RestartTask implements Runnable {
1✔
370
    @Override
371
    public void run() {
372
      startDelegateServer();
1✔
373
    }
1✔
374
  }
375

376
  private final class DiscoveryState implements ResourceWatcher<LdsUpdate> {
377
    private final String resourceName;
378
    // RDS resource name is the key.
379
    private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
1✔
380
    // Track pending RDS resources using rds name.
381
    private final Set<String> pendingRds = new HashSet<>();
1✔
382
    // Most recently discovered filter chains.
383
    private List<FilterChain> filterChains = new ArrayList<>();
1✔
384
    // Most recently discovered default filter chain.
385
    @Nullable
386
    private FilterChain defaultFilterChain;
387
    private boolean stopped;
388
    private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef 
1✔
389
        = new HashMap<>();
390
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
391
      @Override
392
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
393
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
394
        return next.startCall(call, headers);
1✔
395
      }
396
    };
397

398
    private DiscoveryState(String resourceName) {
1✔
399
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
400
      xdsClient.watchXdsResource(
1✔
401
          XdsListenerResource.getInstance(), resourceName, this, syncContext);
1✔
402
    }
1✔
403

404
    @Override
405
    public void onResourceChanged(final StatusOr<LdsUpdate> update) {
406
      if (stopped) {
1✔
407
        return;
×
408
      }
409

410
      if (!update.hasValue()) {
1✔
411
        Status status = update.getStatus();
1✔
412
        StatusException statusException = Status.UNAVAILABLE.withDescription(
1✔
413
                String.format("Listener %s unavailable: %s", resourceName, status.getDescription()))
1✔
414
            .withCause(status.asException())
1✔
415
            .asException();
1✔
416
        handleConfigNotFoundOrMismatch(statusException);
1✔
417
        return;
1✔
418
      }
419

420
      final LdsUpdate ldsUpdate = update.getValue();
1✔
421
      logger.log(Level.FINEST, "Received Lds update {0}", ldsUpdate);
1✔
422
      if (ldsUpdate.listener() == null) {
1✔
423
        handleConfigNotFoundOrMismatch(
1✔
424
            Status.NOT_FOUND.withDescription("Listener is null in LdsUpdate").asException());
1✔
425
        return;
1✔
426
      }
427
      String ldsAddress = ldsUpdate.listener().address();
1✔
428
      if (ldsAddress == null || ldsUpdate.listener().protocol() != Protocol.TCP
1✔
429
          || !ipAddressesMatch(ldsAddress)) {
1✔
430
        handleConfigNotFoundOrMismatch(
1✔
431
            Status.UNKNOWN.withDescription(
1✔
432
                String.format(
1✔
433
                    "Listener address mismatch: expected %s, but got %s.",
434
                    listenerAddress, ldsAddress)).asException());
1✔
435
        return;
1✔
436
      }
437

438
      if (!pendingRds.isEmpty()) {
1✔
439
        // filter chain state has not yet been applied to filterChainSelectorManager and there
440
        releaseSuppliersInFlight();
1✔
441
        pendingRds.clear();
1✔
442
      }
443

444
      filterChains = ldsUpdate.listener().filterChains();
1✔
445
      defaultFilterChain = ldsUpdate.listener().defaultFilterChain();
1✔
446
      updateActiveFilters();
1✔
447

448
      List<FilterChain> allFilterChains = filterChains;
1✔
449
      if (defaultFilterChain != null) {
1✔
450
        allFilterChains = new ArrayList<>(filterChains);
1✔
451
        allFilterChains.add(defaultFilterChain);
1✔
452
      }
453

454
      Set<String> allRds = new HashSet<>();
1✔
455
      for (FilterChain filterChain : allFilterChains) {
1✔
456
        HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
457
        if (hcm.virtualHosts() == null) {
1✔
458
          RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
1✔
459
          if (rdsState == null) {
1✔
460
            rdsState = new RouteDiscoveryState(hcm.rdsName());
1✔
461
            routeDiscoveryStates.put(hcm.rdsName(), rdsState);
1✔
462
            xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
463
                hcm.rdsName(), rdsState, syncContext);
1✔
464
          }
465
          if (rdsState.isPending) {
1✔
466
            pendingRds.add(hcm.rdsName());
1✔
467
          }
468
          allRds.add(hcm.rdsName());
1✔
469
        }
470
      }
1✔
471

472
      for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
1✔
473
        if (!allRds.contains(entry.getKey())) {
1✔
474
          xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
1✔
475
              entry.getKey(), entry.getValue());
1✔
476
        }
477
      }
1✔
478
      routeDiscoveryStates.keySet().retainAll(allRds);
1✔
479
      if (pendingRds.isEmpty()) {
1✔
480
        updateSelector();
1✔
481
      }
482
    }
1✔
483

484
    @Override
485
    public void onAmbientError(final Status error) {
486
      if (stopped) {
1✔
487
        return;
×
488
      }
489
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
490
      Status errorWithNodeId = error.withDescription(
1✔
491
          description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
492
      logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
1✔
493

494
      if (!isServing) {
1✔
495
        listener.onNotServing(errorWithNodeId.asException());
×
496
      }
497
    }
1✔
498

499
    private boolean ipAddressesMatch(String ldsAddress) {
500
      HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress);
1✔
501
      HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress);
1✔
502
      if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort()
1✔
503
          || ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) {
1✔
504
        return false;
1✔
505
      }
506
      InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost());
1✔
507
      InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost());
1✔
508
      return listenerIp.equals(ldsIp);
1✔
509
    }
510

511
    private void shutdown() {
512
      stopped = true;
1✔
513
      cleanUpRouteDiscoveryStates();
1✔
514
      logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
1✔
515
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this);
1✔
516
      shutdownActiveFilters();
1✔
517
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
518
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
519
      for (SslContextProviderSupplier s: toRelease) {
1✔
520
        s.close();
1✔
521
      }
1✔
522
      releaseSuppliersInFlight();
1✔
523
    }
1✔
524

525
    private void updateSelector() {
526
      // This is regenerated in generateRoutingConfig() calls below.
527
      savedRdsRoutingConfigRef.clear();
1✔
528

529
      // Prepare server routing config map.
530
      ImmutableMap.Builder<FilterChain, AtomicReference<ServerRoutingConfig>> routingConfigs =
531
          ImmutableMap.builder();
1✔
532
      for (FilterChain filterChain: filterChains) {
1✔
533
        HashMap<String, Filter> chainFilters = activeFilters.get(filterChain.name());
1✔
534
        routingConfigs.put(filterChain, generateRoutingConfig(filterChain, chainFilters));
1✔
535
      }
1✔
536

537
      // Prepare the new selector.
538
      FilterChainSelector selector;
539
      if (defaultFilterChain != null) {
1✔
540
        selector = new FilterChainSelector(
1✔
541
            routingConfigs.build(),
1✔
542
            defaultFilterChain.sslContextProviderSupplier(),
1✔
543
            generateRoutingConfig(defaultFilterChain, activeFiltersDefaultChain));
1✔
544
      } else {
545
        selector = new FilterChainSelector(routingConfigs.build());
1✔
546
      }
547

548
      // Prepare the list of current selector's resources to close later.
549
      List<SslContextProviderSupplier> oldSslSuppliers = getSuppliersInUse();
1✔
550

551
      // Swap the selectors, initiate a graceful shutdown of the old one.
552
      logger.log(Level.FINEST, "Updating selector {0}", selector);
1✔
553
      filterChainSelectorManager.updateSelector(selector);
1✔
554

555
      // Release old resources.
556
      for (SslContextProviderSupplier supplier: oldSslSuppliers) {
1✔
557
        supplier.close();
1✔
558
      }
1✔
559

560
      // Now that we have valid Transport Socket config, we can start/restart listening on a port.
561
      startDelegateServer();
1✔
562
    }
1✔
563

564
    // called in syncContext
565
    private void updateActiveFilters() {
566
      Set<String> removedChains = new HashSet<>(activeFilters.keySet());
1✔
567
      for (FilterChain filterChain: filterChains) {
1✔
568
        removedChains.remove(filterChain.name());
1✔
569
        updateActiveFiltersForChain(
1✔
570
            activeFilters.computeIfAbsent(filterChain.name(), k -> new HashMap<>()),
1✔
571
            filterChain.httpConnectionManager().httpFilterConfigs());
1✔
572
      }
1✔
573

574
      // Shutdown all filters of chains missing from the LDS.
575
      for (String chainToShutdown : removedChains) {
1✔
576
        HashMap<String, Filter> filtersToShutdown = activeFilters.get(chainToShutdown);
1✔
577
        checkNotNull(filtersToShutdown, "filtersToShutdown of chain %s", chainToShutdown);
1✔
578
        updateActiveFiltersForChain(filtersToShutdown, null);
1✔
579
        activeFilters.remove(chainToShutdown);
1✔
580
      }
1✔
581

582
      // Default chain.
583
      ImmutableList<NamedFilterConfig> defaultChainConfigs = null;
1✔
584
      if (defaultFilterChain != null) {
1✔
585
        defaultChainConfigs = defaultFilterChain.httpConnectionManager().httpFilterConfigs();
1✔
586
      }
587
      updateActiveFiltersForChain(activeFiltersDefaultChain, defaultChainConfigs);
1✔
588
    }
1✔
589

590
    // called in syncContext
591
    private void shutdownActiveFilters() {
592
      for (HashMap<String, Filter> chainFilters : activeFilters.values()) {
1✔
593
        checkNotNull(chainFilters, "chainFilters");
1✔
594
        updateActiveFiltersForChain(chainFilters, null);
1✔
595
      }
1✔
596
      activeFilters.clear();
1✔
597
      updateActiveFiltersForChain(activeFiltersDefaultChain, null);
1✔
598
    }
1✔
599

600
    // called in syncContext
601
    private void updateActiveFiltersForChain(
602
        Map<String, Filter> chainFilters, @Nullable List<NamedFilterConfig> filterConfigs) {
603
      if (filterConfigs == null) {
1✔
604
        filterConfigs = ImmutableList.of();
1✔
605
      }
606

607
      Set<String> filtersToShutdown = new HashSet<>(chainFilters.keySet());
1✔
608
      for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
609
        String typeUrl = namedFilter.filterConfig.typeUrl();
1✔
610
        String filterKey = namedFilter.filterStateKey();
1✔
611

612
        Filter.Provider provider = filterRegistry.get(typeUrl);
1✔
613
        checkNotNull(provider, "provider %s", typeUrl);
1✔
614
        Filter filter = chainFilters.computeIfAbsent(
1✔
615
            filterKey, k -> provider.newInstance(namedFilter.name));
1✔
616
        checkNotNull(filter, "filter %s", filterKey);
1✔
617
        filtersToShutdown.remove(filterKey);
1✔
618
      }
1✔
619

620
      // Shutdown filters not present in current HCM.
621
      for (String filterKey : filtersToShutdown) {
1✔
622
        Filter filterToShutdown = chainFilters.remove(filterKey);
1✔
623
        checkNotNull(filterToShutdown, "filterToShutdown %s", filterKey);
1✔
624
        filterToShutdown.close();
1✔
625
      }
1✔
626
    }
1✔
627

628
    private AtomicReference<ServerRoutingConfig> generateRoutingConfig(
629
        FilterChain filterChain, Map<String, Filter> chainFilters) {
630
      HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
631
      ServerRoutingConfig routingConfig;
632

633
      // Inlined routes.
634
      ImmutableList<VirtualHost> vhosts = hcm.virtualHosts();
1✔
635
      if (vhosts != null) {
1✔
636
        routingConfig = ServerRoutingConfig.create(vhosts,
1✔
637
            generatePerRouteInterceptors(hcm.httpFilterConfigs(), vhosts, chainFilters));
1✔
638
        return new AtomicReference<>(routingConfig);
1✔
639
      }
640

641
      // Routes from RDS.
642
      RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
1✔
643
      checkNotNull(rds, "rds");
1✔
644

645
      ImmutableList<VirtualHost> savedVhosts = rds.savedVirtualHosts;
1✔
646
      if (savedVhosts != null) {
1✔
647
        routingConfig = ServerRoutingConfig.create(savedVhosts,
1✔
648
            generatePerRouteInterceptors(hcm.httpFilterConfigs(), savedVhosts, chainFilters));
1✔
649
      } else {
650
        routingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
651
      }
652
      AtomicReference<ServerRoutingConfig> routingConfigRef = new AtomicReference<>(routingConfig);
1✔
653
      savedRdsRoutingConfigRef.put(filterChain, routingConfigRef);
1✔
654
      return routingConfigRef;
1✔
655
    }
656

657
    private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
658
        @Nullable List<NamedFilterConfig> filterConfigs,
659
        List<VirtualHost> virtualHosts,
660
        Map<String, Filter> chainFilters) {
661
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
662

663
      checkNotNull(chainFilters, "chainFilters");
1✔
664
      ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
1✔
665
          new ImmutableMap.Builder<>();
666

667
      for (VirtualHost virtualHost : virtualHosts) {
1✔
668
        for (Route route : virtualHost.routes()) {
1✔
669
          // Short circuit.
670
          if (filterConfigs == null) {
1✔
671
            perRouteInterceptors.put(route, noopInterceptor);
×
672
            continue;
×
673
          }
674

675
          // Override vhost filter configs with more specific per-route configs.
676
          Map<String, FilterConfig> perRouteOverrides = ImmutableMap.<String, FilterConfig>builder()
1✔
677
              .putAll(virtualHost.filterConfigOverrides())
1✔
678
              .putAll(route.filterConfigOverrides())
1✔
679
              .buildKeepingLast();
1✔
680

681
          // Interceptors for this vhost/route combo.
682
          List<ServerInterceptor> interceptors = new ArrayList<>(filterConfigs.size());
1✔
683
          for (NamedFilterConfig namedFilter : filterConfigs) {
1✔
684
            String name = namedFilter.name;
1✔
685
            FilterConfig config = namedFilter.filterConfig;
1✔
686
            FilterConfig overrideConfig = perRouteOverrides.get(name);
1✔
687
            String filterKey = namedFilter.filterStateKey();
1✔
688

689
            Filter filter = chainFilters.get(filterKey);
1✔
690
            checkNotNull(filter, "chainFilters.get(%s)", filterKey);
1✔
691
            ServerInterceptor interceptor = filter.buildServerInterceptor(config, overrideConfig);
1✔
692

693
            if (interceptor != null) {
1✔
694
              interceptors.add(interceptor);
1✔
695
            }
696
          }
1✔
697

698
          // Combine interceptors produced by different filters into a single one that executes
699
          // them sequentially. The order is preserved.
700
          perRouteInterceptors.put(route, combineInterceptors(interceptors));
1✔
701
        }
1✔
702
      }
1✔
703

704
      return perRouteInterceptors.buildOrThrow();
1✔
705
    }
706

707
    private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
708
      if (interceptors.isEmpty()) {
1✔
709
        return noopInterceptor;
1✔
710
      }
711
      if (interceptors.size() == 1) {
1✔
712
        return interceptors.get(0);
×
713
      }
714
      return new ServerInterceptor() {
1✔
715
        @Override
716
        public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
717
            Metadata headers, ServerCallHandler<ReqT, RespT> next) {
718
          // intercept forward
719
          for (int i = interceptors.size() - 1; i >= 0; i--) {
1✔
720
            next = InternalServerInterceptors.interceptCallHandlerCreate(
1✔
721
                interceptors.get(i), next);
1✔
722
          }
723
          return next.startCall(call, headers);
1✔
724
        }
725
      };
726
    }
727

728
    private void handleConfigNotFoundOrMismatch(StatusException exception) {
729
      cleanUpRouteDiscoveryStates();
1✔
730
      shutdownActiveFilters();
1✔
731
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
732
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
733
      for (SslContextProviderSupplier s: toRelease) {
1✔
734
        s.close();
1✔
735
      }
1✔
736
      if (restartTimer != null) {
1✔
737
        restartTimer.cancel();
1✔
738
      }
739
      if (!delegate.isShutdown()) {
1✔
740
        delegate.shutdown();  // let in-progress calls finish
1✔
741
      }
742
      isServing = false;
1✔
743
      listener.onNotServing(exception);
1✔
744
    }
1✔
745

746
    private void cleanUpRouteDiscoveryStates() {
747
      for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
1✔
748
        String rdsName = rdsState.resourceName;
1✔
749
        logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
1✔
750
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
751
            rdsState);
752
      }
1✔
753
      routeDiscoveryStates.clear();
1✔
754
      savedRdsRoutingConfigRef.clear();
1✔
755
    }
1✔
756

757
    private List<SslContextProviderSupplier> getSuppliersInUse() {
758
      List<SslContextProviderSupplier> toRelease = new ArrayList<>();
1✔
759
      FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
1✔
760
      if (selector != null) {
1✔
761
        for (FilterChain f: selector.getRoutingConfigs().keySet()) {
1✔
762
          if (f.sslContextProviderSupplier() != null) {
1✔
763
            toRelease.add(f.sslContextProviderSupplier());
1✔
764
          }
765
        }
1✔
766
        SslContextProviderSupplier defaultSupplier =
1✔
767
                selector.getDefaultSslContextProviderSupplier();
1✔
768
        if (defaultSupplier != null) {
1✔
769
          toRelease.add(defaultSupplier);
1✔
770
        }
771
      }
772
      return toRelease;
1✔
773
    }
774

775
    private void releaseSuppliersInFlight() {
776
      SslContextProviderSupplier supplier;
777
      for (FilterChain filterChain : filterChains) {
1✔
778
        supplier = filterChain.sslContextProviderSupplier();
1✔
779
        if (supplier != null) {
1✔
780
          supplier.close();
1✔
781
        }
782
      }
1✔
783
      if (defaultFilterChain != null
1✔
784
              && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
1✔
785
        supplier.close();
1✔
786
      }
787
    }
1✔
788

789
    private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
790
      private final String resourceName;
791
      private ImmutableList<VirtualHost> savedVirtualHosts;
792
      private boolean isPending = true;
1✔
793

794
      private RouteDiscoveryState(String resourceName) {
1✔
795
        this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
796
      }
1✔
797

798
      @Override
799
      public void onResourceChanged(final StatusOr<RdsUpdate> update) {
800
        syncContext.execute(() -> {
1✔
801
          if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
802
            return; // Watcher has been cancelled.
1✔
803
          }
804

805
          if (update.hasValue()) {
1✔
806
            if (savedVirtualHosts == null && !isPending) {
1✔
807
              logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
1✔
808
            }
809
            savedVirtualHosts = ImmutableList.copyOf(update.getValue().virtualHosts);
1✔
810
          } else {
811
            logger.log(Level.WARNING, "Rds {0} unavailable: {1}",
1✔
812
                new Object[]{resourceName, update.getStatus()});
1✔
813
            savedVirtualHosts = null;
1✔
814
          }
815
          // In both cases, a change has occurred that requires a config update.
816
          updateRdsRoutingConfig();
1✔
817
          maybeUpdateSelector();
1✔
818
        });
1✔
819
      }
1✔
820

821
      @Override
822
      public void onAmbientError(final Status error) {
823
        syncContext.execute(() -> {
1✔
824
          if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
825
            return; // Watcher has been cancelled.
×
826
          }
827
          String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
828
          Status errorWithNodeId = error.withDescription(
1✔
829
              description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
830
          logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
1✔
831
              new Object[]{resourceName, errorWithNodeId});
832

833
          // Per gRFC A88, ambient errors should not trigger a configuration change.
834
          // Therefore, we do NOT call maybeUpdateSelector() here.
835
        });
1✔
836
      }
1✔
837

838
      private void updateRdsRoutingConfig() {
839
        for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
1✔
840
          HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
841
          if (!resourceName.equals(hcm.rdsName())) {
1✔
842
            continue;
1✔
843
          }
844

845
          ServerRoutingConfig updatedRoutingConfig;
846
          if (savedVirtualHosts == null) {
1✔
847
            updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
848
          } else {
849
            HashMap<String, Filter> chainFilters = activeFilters.get(filterChain.name());
1✔
850
            ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
1✔
851
                hcm.httpFilterConfigs(), savedVirtualHosts, chainFilters);
1✔
852
            updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts, interceptors);
1✔
853
          }
854

855
          logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
1✔
856
              new Object[]{filterChain.name(), updatedRoutingConfig});
1✔
857
          savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
1✔
858
        }
1✔
859
      }
1✔
860

861
      // Update the selector to use the most recently updated configs only after all rds have been
862
      // discovered for the first time. Later changes on rds will be applied through virtual host
863
      // list atomic ref.
864
      private void maybeUpdateSelector() {
865
        isPending = false;
1✔
866
        boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
1✔
867
        if (isLastPending) {
1✔
868
          updateSelector();
1✔
869
        }
870
      }
1✔
871
    }
872
  }
873

874
  @VisibleForTesting
875
  final class ConfigApplyingInterceptor implements ServerInterceptor {
1✔
876
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
877
      @Override
878
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
879
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
880
        return next.startCall(call, headers);
×
881
      }
882
    };
883

884
    @Override
885
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
886
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
887
      AtomicReference<ServerRoutingConfig> routingConfigRef =
1✔
888
          call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
1✔
889
      ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
1✔
890
          routingConfigRef.get();
1✔
891
      if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
1✔
892
        String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
1✔
893
        call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
1✔
894
        return new Listener<ReqT>() {};
1✔
895
      }
896
      List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
1✔
897
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
898
          virtualHosts, call.getAuthority());
1✔
899
      if (virtualHost == null) {
1✔
900
        call.close(
1✔
901
            Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
1✔
902
            new Metadata());
903
        return new Listener<ReqT>() {};
1✔
904
      }
905
      Route selectedRoute = null;
1✔
906
      MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
1✔
907
      for (Route route : virtualHost.routes()) {
1✔
908
        if (RoutingUtils.matchRoute(
1✔
909
            route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
1✔
910
          selectedRoute = route;
1✔
911
          break;
1✔
912
        }
913
      }
1✔
914
      if (selectedRoute == null) {
1✔
915
        call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
1✔
916
            new Metadata());
917
        return new ServerCall.Listener<ReqT>() {};
1✔
918
      }
919
      if (selectedRoute.routeAction() != null) {
1✔
920
        call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
1✔
921
            + "route: only Route.non_forwarding_action should be allowed."), new Metadata());
922
        return new ServerCall.Listener<ReqT>() {};
1✔
923
      }
924
      ServerInterceptor routeInterceptor = noopInterceptor;
1✔
925
      Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
1✔
926
      if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
1✔
927
        routeInterceptor = perRouteInterceptors.get(selectedRoute);
1✔
928
      }
929
      return routeInterceptor.interceptCall(call, headers, next);
1✔
930
    }
931
  }
932

933
  /**
934
   * The HttpConnectionManager level configuration.
935
   */
936
  @AutoValue
937
  abstract static class ServerRoutingConfig {
1✔
938
    @VisibleForTesting
939
    static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
1✔
940
        ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
1✔
941

942
    abstract ImmutableList<VirtualHost> virtualHosts();
943

944
    // Prebuilt per route server interceptors from http filter configs.
945
    abstract ImmutableMap<Route, ServerInterceptor> interceptors();
946

947
    /**
948
     * Server routing configuration.
949
     * */
950
    public static ServerRoutingConfig create(
951
        ImmutableList<VirtualHost> virtualHosts,
952
        ImmutableMap<Route, ServerInterceptor> interceptors) {
953
      checkNotNull(virtualHosts, "virtualHosts");
1✔
954
      checkNotNull(interceptors, "interceptors");
1✔
955
      return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
1✔
956
    }
957
  }
958
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc