• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19563

26 Nov 2024 12:47AM UTC coverage: 88.559% (-0.02%) from 88.582%
#19563

push

github

web-flow
xds: Add counter and gauge metrics  (#11661)

Adds the following xDS client metrics defined in [A78](https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient).

Counters
- grpc.xds_client.server_failure
- grpc.xds_client.resource_updates_valid
- grpc.xds_client.resource_updates_invalid

Gauges
- grpc.xds_client.connected
- grpc.xds_client.resources

33368 of 37679 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.22
/../xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22

23
import com.google.auto.value.AutoValue;
24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import com.google.common.collect.ImmutableMap;
27
import com.google.common.util.concurrent.SettableFuture;
28
import io.grpc.Attributes;
29
import io.grpc.InternalServerInterceptors;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.MetricRecorder;
33
import io.grpc.Server;
34
import io.grpc.ServerBuilder;
35
import io.grpc.ServerCall;
36
import io.grpc.ServerCall.Listener;
37
import io.grpc.ServerCallHandler;
38
import io.grpc.ServerInterceptor;
39
import io.grpc.ServerServiceDefinition;
40
import io.grpc.Status;
41
import io.grpc.StatusException;
42
import io.grpc.SynchronizationContext;
43
import io.grpc.SynchronizationContext.ScheduledHandle;
44
import io.grpc.internal.GrpcUtil;
45
import io.grpc.internal.ObjectPool;
46
import io.grpc.internal.SharedResourceHolder;
47
import io.grpc.xds.EnvoyServerProtoData.FilterChain;
48
import io.grpc.xds.Filter.FilterConfig;
49
import io.grpc.xds.Filter.NamedFilterConfig;
50
import io.grpc.xds.Filter.ServerInterceptorBuilder;
51
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
52
import io.grpc.xds.ThreadSafeRandom.ThreadSafeRandomImpl;
53
import io.grpc.xds.VirtualHost.Route;
54
import io.grpc.xds.XdsListenerResource.LdsUpdate;
55
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
56
import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener;
57
import io.grpc.xds.client.XdsClient;
58
import io.grpc.xds.client.XdsClient.ResourceWatcher;
59
import io.grpc.xds.internal.security.SslContextProviderSupplier;
60
import java.io.IOException;
61
import java.net.SocketAddress;
62
import java.util.ArrayList;
63
import java.util.Collections;
64
import java.util.HashMap;
65
import java.util.HashSet;
66
import java.util.List;
67
import java.util.Map;
68
import java.util.Set;
69
import java.util.concurrent.CountDownLatch;
70
import java.util.concurrent.ExecutionException;
71
import java.util.concurrent.ScheduledExecutorService;
72
import java.util.concurrent.TimeUnit;
73
import java.util.concurrent.atomic.AtomicBoolean;
74
import java.util.concurrent.atomic.AtomicReference;
75
import java.util.logging.Level;
76
import java.util.logging.Logger;
77
import javax.annotation.Nullable;
78

79
final class XdsServerWrapper extends Server {
80
  private static final Logger logger = Logger.getLogger(XdsServerWrapper.class.getName());
1✔
81

82
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
83
      new Thread.UncaughtExceptionHandler() {
1✔
84
        @Override
85
        public void uncaughtException(Thread t, Throwable e) {
86
          logger.log(Level.SEVERE, "Exception!" + e);
×
87
          // TODO(chengyuanzhang): implement cleanup.
88
        }
×
89
      });
90

91
  public static final Attributes.Key<AtomicReference<ServerRoutingConfig>>
92
      ATTR_SERVER_ROUTING_CONFIG =
1✔
93
      Attributes.Key.create("io.grpc.xds.ServerWrapper.serverRoutingConfig");
1✔
94

95
  @VisibleForTesting
96
  static final long RETRY_DELAY_NANOS = TimeUnit.MINUTES.toNanos(1);
1✔
97
  private final String listenerAddress;
98
  private final ServerBuilder<?> delegateBuilder;
99
  private boolean sharedTimeService;
100
  private final ScheduledExecutorService timeService;
101
  private final FilterRegistry filterRegistry;
102
  private final ThreadSafeRandom random = ThreadSafeRandomImpl.instance;
1✔
103
  private final XdsClientPoolFactory xdsClientPoolFactory;
104
  private final XdsServingStatusListener listener;
105
  private final FilterChainSelectorManager filterChainSelectorManager;
106
  private final AtomicBoolean started = new AtomicBoolean(false);
1✔
107
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
1✔
108
  private boolean isServing;
109
  private final CountDownLatch internalTerminationLatch = new CountDownLatch(1);
1✔
110
  private final SettableFuture<Exception> initialStartFuture = SettableFuture.create();
1✔
111
  private boolean initialStarted;
112
  private ScheduledHandle restartTimer;
113
  private ObjectPool<XdsClient> xdsClientPool;
114
  private XdsClient xdsClient;
115
  private DiscoveryState discoveryState;
116
  private volatile Server delegate;
117

118
  XdsServerWrapper(
119
      String listenerAddress,
120
      ServerBuilder<?> delegateBuilder,
121
      XdsServingStatusListener listener,
122
      FilterChainSelectorManager filterChainSelectorManager,
123
      XdsClientPoolFactory xdsClientPoolFactory,
124
      FilterRegistry filterRegistry) {
125
    this(listenerAddress, delegateBuilder, listener, filterChainSelectorManager,
1✔
126
        xdsClientPoolFactory, filterRegistry, SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE));
1✔
127
    sharedTimeService = true;
1✔
128
  }
1✔
129

130
  @VisibleForTesting
131
  XdsServerWrapper(
132
          String listenerAddress,
133
          ServerBuilder<?> delegateBuilder,
134
          XdsServingStatusListener listener,
135
          FilterChainSelectorManager filterChainSelectorManager,
136
          XdsClientPoolFactory xdsClientPoolFactory,
137
          FilterRegistry filterRegistry,
138
          ScheduledExecutorService timeService) {
1✔
139
    this.listenerAddress = checkNotNull(listenerAddress, "listenerAddress");
1✔
140
    this.delegateBuilder = checkNotNull(delegateBuilder, "delegateBuilder");
1✔
141
    this.delegateBuilder.intercept(new ConfigApplyingInterceptor());
1✔
142
    this.listener = checkNotNull(listener, "listener");
1✔
143
    this.filterChainSelectorManager
1✔
144
        = checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
1✔
145
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
1✔
146
    this.timeService = checkNotNull(timeService, "timeService");
1✔
147
    this.filterRegistry = checkNotNull(filterRegistry,"filterRegistry");
1✔
148
    this.delegate = delegateBuilder.build();
1✔
149
  }
1✔
150

151
  @Override
152
  public Server start() throws IOException {
153
    checkState(started.compareAndSet(false, true), "Already started");
1✔
154
    syncContext.execute(new Runnable() {
1✔
155
      @Override
156
      public void run() {
157
        internalStart();
1✔
158
      }
1✔
159
    });
160
    Exception exception;
161
    try {
162
      exception = initialStartFuture.get();
1✔
163
    } catch (InterruptedException | ExecutionException e) {
×
164
      throw new RuntimeException(e);
×
165
    }
1✔
166
    if (exception != null) {
1✔
167
      throw (exception instanceof IOException) ? (IOException) exception :
1✔
168
              new IOException(exception);
1✔
169
    }
170
    return this;
1✔
171
  }
172

173
  private void internalStart() {
174
    try {
175
      // TODO(dnvindhya): Add "#server" as "grpc.target" attribute value for
176
      // xDS enabled servers.
177
      xdsClientPool = xdsClientPoolFactory.getOrCreate("", new MetricRecorder() {});
1✔
178
    } catch (Exception e) {
×
179
      StatusException statusException = Status.UNAVAILABLE.withDescription(
×
180
              "Failed to initialize xDS").withCause(e).asException();
×
181
      listener.onNotServing(statusException);
×
182
      initialStartFuture.set(statusException);
×
183
      return;
×
184
    }
1✔
185
    xdsClient = xdsClientPool.getObject();
1✔
186
    String listenerTemplate = xdsClient.getBootstrapInfo().serverListenerResourceNameTemplate();
1✔
187
    if (listenerTemplate == null) {
1✔
188
      StatusException statusException =
1✔
189
          Status.UNAVAILABLE.withDescription(
1✔
190
              "Can only support xDS v3 with listener resource name template").asException();
1✔
191
      listener.onNotServing(statusException);
1✔
192
      initialStartFuture.set(statusException);
1✔
193
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
194
      return;
1✔
195
    }
196
    String replacement = listenerAddress;
1✔
197
    if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
1✔
198
      replacement = XdsClient.percentEncodePath(replacement);
1✔
199
    }
200
    discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
1✔
201
  }
1✔
202

203
  @Override
204
  public Server shutdown() {
205
    if (!shutdown.compareAndSet(false, true)) {
1✔
206
      return this;
1✔
207
    }
208
    syncContext.execute(new Runnable() {
1✔
209
      @Override
210
      public void run() {
211
        if (!delegate.isShutdown()) {
1✔
212
          delegate.shutdown();
1✔
213
        }
214
        internalShutdown();
1✔
215
      }
1✔
216
    });
217
    return this;
1✔
218
  }
219

220
  @Override
221
  public Server shutdownNow() {
222
    if (!shutdown.compareAndSet(false, true)) {
1✔
223
      return this;
1✔
224
    }
225
    syncContext.execute(new Runnable() {
1✔
226
      @Override
227
      public void run() {
228
        if (!delegate.isShutdown()) {
1✔
229
          delegate.shutdownNow();
1✔
230
        }
231
        internalShutdown();
1✔
232
        initialStartFuture.set(new IOException("server is forcefully shut down"));
1✔
233
      }
1✔
234
    });
235
    return this;
1✔
236
  }
237

238
  // Must run in SynchronizationContext
239
  private void internalShutdown() {
240
    logger.log(Level.FINER, "Shutting down XdsServerWrapper");
1✔
241
    if (discoveryState != null) {
1✔
242
      discoveryState.shutdown();
1✔
243
    }
244
    if (xdsClient != null) {
1✔
245
      xdsClient = xdsClientPool.returnObject(xdsClient);
1✔
246
    }
247
    if (restartTimer != null) {
1✔
248
      restartTimer.cancel();
1✔
249
    }
250
    if (sharedTimeService) {
1✔
251
      SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeService);
1✔
252
    }
253
    isServing = false;
1✔
254
    internalTerminationLatch.countDown();
1✔
255
  }
1✔
256

257
  @Override
258
  public boolean isShutdown() {
259
    return shutdown.get();
1✔
260
  }
261

262
  @Override
263
  public boolean isTerminated() {
264
    return internalTerminationLatch.getCount() == 0 && delegate.isTerminated();
1✔
265
  }
266

267
  @Override
268
  public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
269
    long startTime = System.nanoTime();
1✔
270
    if (!internalTerminationLatch.await(timeout, unit)) {
1✔
271
      return false;
×
272
    }
273
    long remainingTime = unit.toNanos(timeout) - (System.nanoTime() - startTime);
1✔
274
    return delegate.awaitTermination(remainingTime, TimeUnit.NANOSECONDS);
1✔
275
  }
276

277
  @Override
278
  public void awaitTermination() throws InterruptedException {
279
    internalTerminationLatch.await();
1✔
280
    delegate.awaitTermination();
1✔
281
  }
1✔
282

283
  @Override
284
  public int getPort() {
285
    return delegate.getPort();
1✔
286
  }
287

288
  @Override
289
  public List<? extends SocketAddress> getListenSockets() {
290
    return delegate.getListenSockets();
1✔
291
  }
292

293
  @Override
294
  public List<ServerServiceDefinition> getServices() {
295
    return delegate.getServices();
×
296
  }
297

298
  @Override
299
  public List<ServerServiceDefinition> getImmutableServices() {
300
    return delegate.getImmutableServices();
×
301
  }
302

303
  @Override
304
  public List<ServerServiceDefinition> getMutableServices() {
305
    return delegate.getMutableServices();
×
306
  }
307

308
  // Must run in SynchronizationContext
309
  private void startDelegateServer() {
310
    if (restartTimer != null && restartTimer.isPending()) {
1✔
311
      return;
×
312
    }
313
    if (isServing) {
1✔
314
      return;
1✔
315
    }
316
    if (delegate.isShutdown()) {
1✔
317
      delegate = delegateBuilder.build();
1✔
318
    }
319
    try {
320
      delegate.start();
1✔
321
      listener.onServing();
1✔
322
      isServing = true;
1✔
323
      if (!initialStarted) {
1✔
324
        initialStarted = true;
1✔
325
        initialStartFuture.set(null);
1✔
326
      }
327
      logger.log(Level.FINER, "Delegate server started.");
1✔
328
    } catch (IOException e) {
1✔
329
      logger.log(Level.FINE, "Fail to start delegate server: {0}", e);
1✔
330
      if (!initialStarted) {
1✔
331
        initialStarted = true;
1✔
332
        initialStartFuture.set(e);
1✔
333
      } else {
334
        listener.onNotServing(e);
1✔
335
      }
336
      restartTimer = syncContext.schedule(
1✔
337
        new RestartTask(), RETRY_DELAY_NANOS, TimeUnit.NANOSECONDS, timeService);
338
    }
1✔
339
  }
1✔
340

341
  private final class RestartTask implements Runnable {
1✔
342
    @Override
343
    public void run() {
344
      startDelegateServer();
1✔
345
    }
1✔
346
  }
347

348
  private final class DiscoveryState implements ResourceWatcher<LdsUpdate> {
349
    private final String resourceName;
350
    // RDS resource name is the key.
351
    private final Map<String, RouteDiscoveryState> routeDiscoveryStates = new HashMap<>();
1✔
352
    // Track pending RDS resources using rds name.
353
    private final Set<String> pendingRds = new HashSet<>();
1✔
354
    // Most recently discovered filter chains.
355
    private List<FilterChain> filterChains = new ArrayList<>();
1✔
356
    // Most recently discovered default filter chain.
357
    @Nullable
358
    private FilterChain defaultFilterChain;
359
    private boolean stopped;
360
    private final Map<FilterChain, AtomicReference<ServerRoutingConfig>> savedRdsRoutingConfigRef 
1✔
361
        = new HashMap<>();
362
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
363
      @Override
364
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
365
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
366
        return next.startCall(call, headers);
1✔
367
      }
368
    };
369

370
    private DiscoveryState(String resourceName) {
1✔
371
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
372
      xdsClient.watchXdsResource(
1✔
373
          XdsListenerResource.getInstance(), resourceName, this, syncContext);
1✔
374
    }
1✔
375

376
    @Override
377
    public void onChanged(final LdsUpdate update) {
378
      if (stopped) {
1✔
379
        return;
×
380
      }
381
      logger.log(Level.FINEST, "Received Lds update {0}", update);
1✔
382
      checkNotNull(update.listener(), "update");
1✔
383
      if (!pendingRds.isEmpty()) {
1✔
384
        // filter chain state has not yet been applied to filterChainSelectorManager and there
385
        // are two sets of sslContextProviderSuppliers, so we release the old ones.
386
        releaseSuppliersInFlight();
1✔
387
        pendingRds.clear();
1✔
388
      }
389
      filterChains = update.listener().filterChains();
1✔
390
      defaultFilterChain = update.listener().defaultFilterChain();
1✔
391
      List<FilterChain> allFilterChains = filterChains;
1✔
392
      if (defaultFilterChain != null) {
1✔
393
        allFilterChains = new ArrayList<>(filterChains);
1✔
394
        allFilterChains.add(defaultFilterChain);
1✔
395
      }
396
      Set<String> allRds = new HashSet<>();
1✔
397
      for (FilterChain filterChain : allFilterChains) {
1✔
398
        HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
399
        if (hcm.virtualHosts() == null) {
1✔
400
          RouteDiscoveryState rdsState = routeDiscoveryStates.get(hcm.rdsName());
1✔
401
          if (rdsState == null) {
1✔
402
            rdsState = new RouteDiscoveryState(hcm.rdsName());
1✔
403
            routeDiscoveryStates.put(hcm.rdsName(), rdsState);
1✔
404
            xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(),
1✔
405
                hcm.rdsName(), rdsState, syncContext);
1✔
406
          }
407
          if (rdsState.isPending) {
1✔
408
            pendingRds.add(hcm.rdsName());
1✔
409
          }
410
          allRds.add(hcm.rdsName());
1✔
411
        }
412
      }
1✔
413
      for (Map.Entry<String, RouteDiscoveryState> entry: routeDiscoveryStates.entrySet()) {
1✔
414
        if (!allRds.contains(entry.getKey())) {
1✔
415
          xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(),
1✔
416
              entry.getKey(), entry.getValue());
1✔
417
        }
418
      }
1✔
419
      routeDiscoveryStates.keySet().retainAll(allRds);
1✔
420
      if (pendingRds.isEmpty()) {
1✔
421
        updateSelector();
1✔
422
      }
423
    }
1✔
424

425
    @Override
426
    public void onResourceDoesNotExist(final String resourceName) {
427
      if (stopped) {
1✔
428
        return;
×
429
      }
430
      StatusException statusException = Status.UNAVAILABLE.withDescription(
1✔
431
          String.format("Listener %s unavailable, xDS node ID: %s", resourceName,
1✔
432
              xdsClient.getBootstrapInfo().node().getId())).asException();
1✔
433
      handleConfigNotFound(statusException);
1✔
434
    }
1✔
435

436
    @Override
437
    public void onError(final Status error) {
438
      if (stopped) {
1✔
439
        return;
×
440
      }
441
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
442
      Status errorWithNodeId = error.withDescription(
1✔
443
          description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
444
      logger.log(Level.FINE, "Error from XdsClient", errorWithNodeId);
1✔
445
      if (!isServing) {
1✔
446
        listener.onNotServing(errorWithNodeId.asException());
1✔
447
      }
448
    }
1✔
449

450
    private void shutdown() {
451
      stopped = true;
1✔
452
      cleanUpRouteDiscoveryStates();
1✔
453
      logger.log(Level.FINE, "Stop watching LDS resource {0}", resourceName);
1✔
454
      xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(), resourceName, this);
1✔
455
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
456
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
457
      for (SslContextProviderSupplier s: toRelease) {
1✔
458
        s.close();
1✔
459
      }
1✔
460
      releaseSuppliersInFlight();
1✔
461
    }
1✔
462

463
    private void updateSelector() {
464
      Map<FilterChain, AtomicReference<ServerRoutingConfig>> filterChainRouting = new HashMap<>();
1✔
465
      savedRdsRoutingConfigRef.clear();
1✔
466
      for (FilterChain filterChain: filterChains) {
1✔
467
        filterChainRouting.put(filterChain, generateRoutingConfig(filterChain));
1✔
468
      }
1✔
469
      FilterChainSelector selector = new FilterChainSelector(
1✔
470
          Collections.unmodifiableMap(filterChainRouting),
1✔
471
          defaultFilterChain == null ? null : defaultFilterChain.sslContextProviderSupplier(),
1✔
472
          defaultFilterChain == null ? new AtomicReference<ServerRoutingConfig>() :
1✔
473
              generateRoutingConfig(defaultFilterChain));
1✔
474
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
475
      logger.log(Level.FINEST, "Updating selector {0}", selector);
1✔
476
      filterChainSelectorManager.updateSelector(selector);
1✔
477
      for (SslContextProviderSupplier e: toRelease) {
1✔
478
        e.close();
1✔
479
      }
1✔
480
      startDelegateServer();
1✔
481
    }
1✔
482

483
    private AtomicReference<ServerRoutingConfig> generateRoutingConfig(FilterChain filterChain) {
484
      HttpConnectionManager hcm = filterChain.httpConnectionManager();
1✔
485
      if (hcm.virtualHosts() != null) {
1✔
486
        ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
1✔
487
                hcm.httpFilterConfigs(), hcm.virtualHosts());
1✔
488
        return new AtomicReference<>(ServerRoutingConfig.create(hcm.virtualHosts(),interceptors));
1✔
489
      } else {
490
        RouteDiscoveryState rds = routeDiscoveryStates.get(hcm.rdsName());
1✔
491
        checkNotNull(rds, "rds");
1✔
492
        AtomicReference<ServerRoutingConfig> serverRoutingConfigRef = new AtomicReference<>();
1✔
493
        if (rds.savedVirtualHosts != null) {
1✔
494
          ImmutableMap<Route, ServerInterceptor> interceptors = generatePerRouteInterceptors(
1✔
495
              hcm.httpFilterConfigs(), rds.savedVirtualHosts);
1✔
496
          ServerRoutingConfig serverRoutingConfig =
1✔
497
              ServerRoutingConfig.create(rds.savedVirtualHosts, interceptors);
1✔
498
          serverRoutingConfigRef.set(serverRoutingConfig);
1✔
499
        } else {
1✔
500
          serverRoutingConfigRef.set(ServerRoutingConfig.FAILING_ROUTING_CONFIG);
1✔
501
        }
502
        savedRdsRoutingConfigRef.put(filterChain, serverRoutingConfigRef);
1✔
503
        return serverRoutingConfigRef;
1✔
504
      }
505
    }
506

507
    private ImmutableMap<Route, ServerInterceptor> generatePerRouteInterceptors(
508
        List<NamedFilterConfig> namedFilterConfigs, List<VirtualHost> virtualHosts) {
509
      ImmutableMap.Builder<Route, ServerInterceptor> perRouteInterceptors =
1✔
510
          new ImmutableMap.Builder<>();
511
      for (VirtualHost virtualHost : virtualHosts) {
1✔
512
        for (Route route : virtualHost.routes()) {
1✔
513
          List<ServerInterceptor> filterInterceptors = new ArrayList<>();
1✔
514
          Map<String, FilterConfig> selectedOverrideConfigs =
1✔
515
              new HashMap<>(virtualHost.filterConfigOverrides());
1✔
516
          selectedOverrideConfigs.putAll(route.filterConfigOverrides());
1✔
517
          if (namedFilterConfigs != null) {
1✔
518
            for (NamedFilterConfig namedFilterConfig : namedFilterConfigs) {
1✔
519
              FilterConfig filterConfig = namedFilterConfig.filterConfig;
1✔
520
              Filter filter = filterRegistry.get(filterConfig.typeUrl());
1✔
521
              if (filter instanceof ServerInterceptorBuilder) {
1✔
522
                ServerInterceptor interceptor =
1✔
523
                    ((ServerInterceptorBuilder) filter).buildServerInterceptor(
1✔
524
                        filterConfig, selectedOverrideConfigs.get(namedFilterConfig.name));
1✔
525
                if (interceptor != null) {
1✔
526
                  filterInterceptors.add(interceptor);
1✔
527
                }
528
              } else {
1✔
529
                logger.log(Level.WARNING, "HttpFilterConfig(type URL: "
×
530
                    + filterConfig.typeUrl() + ") is not supported on server-side. "
×
531
                    + "Probably a bug at ClientXdsClient verification.");
532
              }
533
            }
1✔
534
          }
535
          ServerInterceptor interceptor = combineInterceptors(filterInterceptors);
1✔
536
          perRouteInterceptors.put(route, interceptor);
1✔
537
        }
1✔
538
      }
1✔
539
      return perRouteInterceptors.buildOrThrow();
1✔
540
    }
541

542
    private ServerInterceptor combineInterceptors(final List<ServerInterceptor> interceptors) {
543
      if (interceptors.isEmpty()) {
1✔
544
        return noopInterceptor;
1✔
545
      }
546
      if (interceptors.size() == 1) {
1✔
547
        return interceptors.get(0);
×
548
      }
549
      return new ServerInterceptor() {
1✔
550
        @Override
551
        public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
552
            Metadata headers, ServerCallHandler<ReqT, RespT> next) {
553
          // intercept forward
554
          for (int i = interceptors.size() - 1; i >= 0; i--) {
1✔
555
            next = InternalServerInterceptors.interceptCallHandlerCreate(
1✔
556
                interceptors.get(i), next);
1✔
557
          }
558
          return next.startCall(call, headers);
1✔
559
        }
560
      };
561
    }
562

563
    private void handleConfigNotFound(StatusException exception) {
564
      cleanUpRouteDiscoveryStates();
1✔
565
      List<SslContextProviderSupplier> toRelease = getSuppliersInUse();
1✔
566
      filterChainSelectorManager.updateSelector(FilterChainSelector.NO_FILTER_CHAIN);
1✔
567
      for (SslContextProviderSupplier s: toRelease) {
1✔
568
        s.close();
1✔
569
      }
1✔
570
      if (restartTimer != null) {
1✔
571
        restartTimer.cancel();
1✔
572
      }
573
      if (!delegate.isShutdown()) {
1✔
574
        delegate.shutdown();  // let in-progress calls finish
1✔
575
      }
576
      isServing = false;
1✔
577
      listener.onNotServing(exception);
1✔
578
    }
1✔
579

580
    private void cleanUpRouteDiscoveryStates() {
581
      for (RouteDiscoveryState rdsState : routeDiscoveryStates.values()) {
1✔
582
        String rdsName = rdsState.resourceName;
1✔
583
        logger.log(Level.FINE, "Stop watching RDS resource {0}", rdsName);
1✔
584
        xdsClient.cancelXdsResourceWatch(XdsRouteConfigureResource.getInstance(), rdsName,
1✔
585
            rdsState);
586
      }
1✔
587
      routeDiscoveryStates.clear();
1✔
588
      savedRdsRoutingConfigRef.clear();
1✔
589
    }
1✔
590

591
    private List<SslContextProviderSupplier> getSuppliersInUse() {
592
      List<SslContextProviderSupplier> toRelease = new ArrayList<>();
1✔
593
      FilterChainSelector selector = filterChainSelectorManager.getSelectorToUpdateSelector();
1✔
594
      if (selector != null) {
1✔
595
        for (FilterChain f: selector.getRoutingConfigs().keySet()) {
1✔
596
          if (f.sslContextProviderSupplier() != null) {
1✔
597
            toRelease.add(f.sslContextProviderSupplier());
1✔
598
          }
599
        }
1✔
600
        SslContextProviderSupplier defaultSupplier =
1✔
601
                selector.getDefaultSslContextProviderSupplier();
1✔
602
        if (defaultSupplier != null) {
1✔
603
          toRelease.add(defaultSupplier);
1✔
604
        }
605
      }
606
      return toRelease;
1✔
607
    }
608

609
    private void releaseSuppliersInFlight() {
610
      SslContextProviderSupplier supplier;
611
      for (FilterChain filterChain : filterChains) {
1✔
612
        supplier = filterChain.sslContextProviderSupplier();
1✔
613
        if (supplier != null) {
1✔
614
          supplier.close();
1✔
615
        }
616
      }
1✔
617
      if (defaultFilterChain != null
1✔
618
              && (supplier = defaultFilterChain.sslContextProviderSupplier()) != null) {
1✔
619
        supplier.close();
1✔
620
      }
621
    }
1✔
622

623
    private final class RouteDiscoveryState implements ResourceWatcher<RdsUpdate> {
624
      private final String resourceName;
625
      private ImmutableList<VirtualHost> savedVirtualHosts;
626
      private boolean isPending = true;
1✔
627

628
      private RouteDiscoveryState(String resourceName) {
1✔
629
        this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
630
      }
1✔
631

632
      @Override
633
      public void onChanged(final RdsUpdate update) {
634
        syncContext.execute(new Runnable() {
1✔
635
          @Override
636
          public void run() {
637
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
638
              return;
1✔
639
            }
640
            if (savedVirtualHosts == null && !isPending) {
1✔
641
              logger.log(Level.WARNING, "Received valid Rds {0} configuration.", resourceName);
1✔
642
            }
643
            savedVirtualHosts = ImmutableList.copyOf(update.virtualHosts);
1✔
644
            updateRdsRoutingConfig();
1✔
645
            maybeUpdateSelector();
1✔
646
          }
1✔
647
        });
648
      }
1✔
649

650
      @Override
651
      public void onResourceDoesNotExist(final String resourceName) {
652
        syncContext.execute(new Runnable() {
1✔
653
          @Override
654
          public void run() {
655
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
656
              return;
×
657
            }
658
            logger.log(Level.WARNING, "Rds {0} unavailable", resourceName);
1✔
659
            savedVirtualHosts = null;
1✔
660
            updateRdsRoutingConfig();
1✔
661
            maybeUpdateSelector();
1✔
662
          }
1✔
663
        });
664
      }
1✔
665

666
      @Override
667
      public void onError(final Status error) {
668
        syncContext.execute(new Runnable() {
1✔
669
          @Override
670
          public void run() {
671
            if (!routeDiscoveryStates.containsKey(resourceName)) {
1✔
672
              return;
×
673
            }
674
            String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
675
            Status errorWithNodeId = error.withDescription(
1✔
676
                    description + "xDS node ID: " + xdsClient.getBootstrapInfo().node().getId());
1✔
677
            logger.log(Level.WARNING, "Error loading RDS resource {0} from XdsClient: {1}.",
1✔
678
                    new Object[]{resourceName, errorWithNodeId});
1✔
679
            maybeUpdateSelector();
1✔
680
          }
1✔
681
        });
682
      }
1✔
683

684
      private void updateRdsRoutingConfig() {
685
        for (FilterChain filterChain : savedRdsRoutingConfigRef.keySet()) {
1✔
686
          if (resourceName.equals(filterChain.httpConnectionManager().rdsName())) {
1✔
687
            ServerRoutingConfig updatedRoutingConfig;
688
            if (savedVirtualHosts == null) {
1✔
689
              updatedRoutingConfig = ServerRoutingConfig.FAILING_ROUTING_CONFIG;
1✔
690
            } else {
691
              ImmutableMap<Route, ServerInterceptor> updatedInterceptors =
1✔
692
                  generatePerRouteInterceptors(
1✔
693
                      filterChain.httpConnectionManager().httpFilterConfigs(),
1✔
694
                      savedVirtualHosts);
695
              updatedRoutingConfig = ServerRoutingConfig.create(savedVirtualHosts,
1✔
696
                  updatedInterceptors);
697
            }
698
            logger.log(Level.FINEST, "Updating filter chain {0} rds routing config: {1}",
1✔
699
                new Object[]{filterChain.name(), updatedRoutingConfig});
1✔
700
            savedRdsRoutingConfigRef.get(filterChain).set(updatedRoutingConfig);
1✔
701
          }
702
        }
1✔
703
      }
1✔
704

705
      // Update the selector to use the most recently updated configs only after all rds have been
706
      // discovered for the first time. Later changes on rds will be applied through virtual host
707
      // list atomic ref.
708
      private void maybeUpdateSelector() {
709
        isPending = false;
1✔
710
        boolean isLastPending = pendingRds.remove(resourceName) && pendingRds.isEmpty();
1✔
711
        if (isLastPending) {
1✔
712
          updateSelector();
1✔
713
        }
714
      }
1✔
715
    }
716
  }
717

718
  @VisibleForTesting
719
  final class ConfigApplyingInterceptor implements ServerInterceptor {
1✔
720
    private final ServerInterceptor noopInterceptor = new ServerInterceptor() {
1✔
721
      @Override
722
      public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
723
          Metadata headers, ServerCallHandler<ReqT, RespT> next) {
724
        return next.startCall(call, headers);
×
725
      }
726
    };
727

728
    @Override
729
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
730
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
731
      AtomicReference<ServerRoutingConfig> routingConfigRef =
1✔
732
          call.getAttributes().get(ATTR_SERVER_ROUTING_CONFIG);
1✔
733
      ServerRoutingConfig routingConfig = routingConfigRef == null ? null :
1✔
734
          routingConfigRef.get();
1✔
735
      if (routingConfig == null || routingConfig == ServerRoutingConfig.FAILING_ROUTING_CONFIG) {
1✔
736
        String errorMsg = "Missing or broken xDS routing config: RDS config unavailable.";
1✔
737
        call.close(Status.UNAVAILABLE.withDescription(errorMsg), new Metadata());
1✔
738
        return new Listener<ReqT>() {};
1✔
739
      }
740
      List<VirtualHost> virtualHosts = routingConfig.virtualHosts();
1✔
741
      VirtualHost virtualHost = RoutingUtils.findVirtualHostForHostName(
1✔
742
          virtualHosts, call.getAuthority());
1✔
743
      if (virtualHost == null) {
1✔
744
        call.close(
1✔
745
            Status.UNAVAILABLE.withDescription("Could not find xDS virtual host matching RPC"),
1✔
746
            new Metadata());
747
        return new Listener<ReqT>() {};
1✔
748
      }
749
      Route selectedRoute = null;
1✔
750
      MethodDescriptor<ReqT, RespT> method = call.getMethodDescriptor();
1✔
751
      for (Route route : virtualHost.routes()) {
1✔
752
        if (RoutingUtils.matchRoute(
1✔
753
            route.routeMatch(), "/" + method.getFullMethodName(), headers, random)) {
1✔
754
          selectedRoute = route;
1✔
755
          break;
1✔
756
        }
757
      }
1✔
758
      if (selectedRoute == null) {
1✔
759
        call.close(Status.UNAVAILABLE.withDescription("Could not find xDS route matching RPC"),
1✔
760
            new Metadata());
761
        return new ServerCall.Listener<ReqT>() {};
1✔
762
      }
763
      if (selectedRoute.routeAction() != null) {
1✔
764
        call.close(Status.UNAVAILABLE.withDescription("Invalid xDS route action for matching "
1✔
765
            + "route: only Route.non_forwarding_action should be allowed."), new Metadata());
766
        return new ServerCall.Listener<ReqT>() {};
1✔
767
      }
768
      ServerInterceptor routeInterceptor = noopInterceptor;
1✔
769
      Map<Route, ServerInterceptor> perRouteInterceptors = routingConfig.interceptors();
1✔
770
      if (perRouteInterceptors != null && perRouteInterceptors.get(selectedRoute) != null) {
1✔
771
        routeInterceptor = perRouteInterceptors.get(selectedRoute);
1✔
772
      }
773
      return routeInterceptor.interceptCall(call, headers, next);
1✔
774
    }
775
  }
776

777
  /**
778
   * The HttpConnectionManager level configuration.
779
   */
780
  @AutoValue
781
  abstract static class ServerRoutingConfig {
1✔
782
    @VisibleForTesting
783
    static final ServerRoutingConfig FAILING_ROUTING_CONFIG = ServerRoutingConfig.create(
1✔
784
        ImmutableList.<VirtualHost>of(), ImmutableMap.<Route, ServerInterceptor>of());
1✔
785

786
    abstract ImmutableList<VirtualHost> virtualHosts();
787

788
    // Prebuilt per route server interceptors from http filter configs.
789
    abstract ImmutableMap<Route, ServerInterceptor> interceptors();
790

791
    /**
792
     * Server routing configuration.
793
     * */
794
    public static ServerRoutingConfig create(
795
        ImmutableList<VirtualHost> virtualHosts,
796
        ImmutableMap<Route, ServerInterceptor> interceptors) {
797
      checkNotNull(virtualHosts, "virtualHosts");
1✔
798
      checkNotNull(interceptors, "interceptors");
1✔
799
      return new AutoValue_XdsServerWrapper_ServerRoutingConfig(virtualHosts, interceptors);
1✔
800
    }
801
  }
802
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc