• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19858

11 Jun 2025 07:19PM UTC coverage: 88.606% (-0.007%) from 88.613%
#19858

push

github

web-flow
google-java-format a line that was too long (#12147)

34613 of 39064 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.88
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.ImmutableList;
26
import io.grpc.NameResolver;
27
import io.grpc.Status;
28
import io.grpc.StatusOr;
29
import io.grpc.SynchronizationContext;
30
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
31
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
32
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
34
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
35
import io.grpc.xds.client.XdsClient;
36
import io.grpc.xds.client.XdsClient.ResourceWatcher;
37
import io.grpc.xds.client.XdsResourceType;
38
import java.util.Collections;
39
import java.util.HashMap;
40
import java.util.HashSet;
41
import java.util.LinkedHashSet;
42
import java.util.List;
43
import java.util.Map;
44
import java.util.Objects;
45
import java.util.Set;
46
import java.util.concurrent.ScheduledExecutorService;
47
import javax.annotation.Nullable;
48

49
/**
50
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
51
 * maintains the watchers for the xds resources and when an update is received, it either requests
52
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
53
 * applies to a single data plane authority.
54
 */
55
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
56
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
57
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
58
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
59
  private final String listenerName;
60
  private final XdsClient xdsClient;
61
  private final SynchronizationContext syncContext;
62
  private final String dataPlaneAuthority;
63
  private XdsConfigWatcher xdsConfigWatcher;
64

65
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
66
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
67
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
68

69
  XdsDependencyManager(XdsClient xdsClient,
70
                       SynchronizationContext syncContext, String dataPlaneAuthority,
71
                       String listenerName, NameResolver.Args nameResolverArgs,
72
                       ScheduledExecutorService scheduler) {
1✔
73
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
74
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
75
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
76
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
77
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
78
    checkNotNull(scheduler, "scheduler");
1✔
79
  }
1✔
80

81
  public static String toContextStr(String typeName, String resourceName) {
82
    return typeName + " resource " + resourceName;
1✔
83
  }
84

85
  public void start(XdsConfigWatcher xdsConfigWatcher) {
86
    checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
1✔
87
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
88
    // start the ball rolling
89
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
90
  }
1✔
91

92
  @Override
93
  public XdsConfig.Subscription subscribeToCluster(String clusterName) {
94
    checkState(this.xdsConfigWatcher != null, "dep manager must first be started");
1✔
95
    checkNotNull(clusterName, "clusterName");
1✔
96
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
97

98
    syncContext.execute(() -> {
1✔
99
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
100
        subscription.closed = true;
1✔
101
        return; // shutdown() called
1✔
102
      }
103
      subscriptions.add(subscription);
1✔
104
      addClusterWatcher(clusterName);
1✔
105
    });
1✔
106

107
    return subscription;
1✔
108
  }
109

110
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
111
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
112
    XdsResourceType<T> type = watcher.type;
1✔
113
    String resourceName = watcher.resourceName;
1✔
114

115
    getWatchers(type).put(resourceName, watcher);
1✔
116
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
117
  }
1✔
118

119
  public void shutdown() {
120
    syncContext.execute(() -> {
1✔
121
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
122
        shutdownWatchersForType(watchers);
1✔
123
      }
1✔
124
      resourceWatchers.clear();
1✔
125
      subscriptions.clear();
1✔
126
    });
1✔
127
  }
1✔
128

129
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
130
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
131
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
132
          watcherEntry.getValue());
1✔
133
      watcherEntry.getValue().cancelled = true;
1✔
134
    }
1✔
135
  }
1✔
136

137
  private void releaseSubscription(ClusterSubscription subscription) {
138
    checkNotNull(subscription, "subscription");
1✔
139
    syncContext.execute(() -> {
1✔
140
      if (subscription.closed) {
1✔
141
        return;
1✔
142
      }
143
      subscription.closed = true;
1✔
144
      if (!subscriptions.remove(subscription)) {
1✔
145
        return; // shutdown() called
×
146
      }
147
      maybePublishConfig();
1✔
148
    });
1✔
149
  }
1✔
150

151
  /**
152
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
153
   * the watchers.
154
   */
155
  private void maybePublishConfig() {
156
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
157
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
158
      return; // shutdown() called
×
159
    }
160
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
161
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
162
        .anyMatch(XdsWatcherBase::missingResult);
1✔
163
    if (waitingOnResource) {
1✔
164
      return;
1✔
165
    }
166

167
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
168
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
169
      return;
1✔
170
    }
171
    assert newUpdate.hasValue()
1✔
172
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
173
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
174
    lastUpdate = newUpdate;
1✔
175
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
176
  }
1✔
177

178
  @VisibleForTesting
179
  StatusOr<XdsConfig> buildUpdate() {
180
    // Create a config and discard any watchers not accessed
181
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
182
    StatusOr<XdsConfig> config = buildUpdate(
1✔
183
        tracer, listenerName, dataPlaneAuthority, subscriptions);
184
    tracer.closeUnusedWatchers();
1✔
185
    return config;
1✔
186
  }
187

188
  private static StatusOr<XdsConfig> buildUpdate(
189
      WatcherTracer tracer,
190
      String listenerName,
191
      String dataPlaneAuthority,
192
      Set<ClusterSubscription> subscriptions) {
193
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
194

195
    // Iterate watchers and build the XdsConfig
196

197
    XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
198
        = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
1✔
199
    if (ldsWatcher == null) {
1✔
200
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
201
          "Bug: No listener watcher found for " + listenerName));
202
    }
203
    if (!ldsWatcher.getData().hasValue()) {
1✔
204
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
205
    }
206
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
207
    builder.setListener(ldsUpdate);
1✔
208

209
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
210
    if (routeSource == null) {
1✔
211
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
212
          "Bug: No route source found for listener " + dataPlaneAuthority));
213
    }
214
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
215
    if (!statusOrRdsUpdate.hasValue()) {
1✔
216
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
217
    }
218
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
219
    builder.setRoute(rdsUpdate);
1✔
220

221
    VirtualHost activeVirtualHost =
1✔
222
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
223
    if (activeVirtualHost == null) {
1✔
224
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
225
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
226
    }
227
    builder.setVirtualHost(activeVirtualHost);
1✔
228

229
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
230
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
231
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
232
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
233
    }
1✔
234
    for (ClusterSubscription subscription : subscriptions) {
1✔
235
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
236
    }
1✔
237
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
238
      builder.addCluster(me.getKey(), me.getValue());
1✔
239
    }
1✔
240

241
    return StatusOr.fromValue(builder.build());
1✔
242
  }
243

244
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
245
      XdsResourceType<T> resourceType) {
246
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
247
    if (typeWatchers == null) {
1✔
248
      typeWatchers = new TypeWatchers<T>(resourceType);
1✔
249
      resourceWatchers.put(resourceType, typeWatchers);
1✔
250
    }
251
    assert typeWatchers.resourceType == resourceType;
1✔
252
    @SuppressWarnings("unchecked")
253
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
254
    return tTypeWatchers.watchers;
1✔
255
  }
256

257
  private static void addConfigForCluster(
258
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
259
      String clusterName,
260
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
261
      LinkedHashSet<String> ancestors,
262
      WatcherTracer tracer) {
263
    if (clusters.containsKey(clusterName)) {
1✔
264
      return;
1✔
265
    }
266
    if (ancestors.contains(clusterName)) {
1✔
267
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
268
          Status.INTERNAL.withDescription(
1✔
269
              "Aggregate cluster cycle detected: " + ancestors)));
270
      return;
1✔
271
    }
272
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
273
      clusters.put(clusterName, StatusOr.fromStatus(
×
274
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
275
      return;
×
276
    }
277

278
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
1✔
279
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
280
    if (!cdsWatcherDataOr.hasValue()) {
1✔
281
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
282
      return;
1✔
283
    }
284

285
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
286
    XdsConfig.XdsClusterConfig.ClusterChild child;
287
    switch (cdsUpdate.clusterType()) {
1✔
288
      case AGGREGATE:
289
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
290
        // preserves the priority across all aggregate clusters
291
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
292
        ancestors.add(clusterName);
1✔
293
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
294
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
295
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
296
          if (!config.hasValue()) {
1✔
297
            // gRFC A37 says: If any of a CDS policy's watchers reports that the resource does not
298
            // exist the policy should report that it is in TRANSIENT_FAILURE. If any of the
299
            // watchers reports a transient ADS stream error, the policy should report that it is in
300
            // TRANSIENT_FAILURE if it has never passed a config to its child.
301
            //
302
            // But there's currently disagreement about whether that is actually what we want, and
303
            // that was not originally implemented in gRPC Java. So we're keeping Java's old
304
            // behavior for now and only failing the "leaves" (which is a bit arbitrary for a
305
            // cycle).
306
            leafNames.add(childCluster);
1✔
307
            continue;
1✔
308
          }
309
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
310
          if (children instanceof AggregateConfig) {
1✔
311
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
312
          } else {
313
            leafNames.add(childCluster);
1✔
314
          }
315
        }
1✔
316
        ancestors.remove(clusterName);
1✔
317

318
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
319
        break;
1✔
320
      case EDS:
321
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
322
            tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
1✔
323
        if (edsWatcher != null) {
1✔
324
          child = new EndpointConfig(edsWatcher.getData());
1✔
325
        } else {
326
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
327
              "EDS resource not found for cluster " + clusterName)));
328
        }
329
        break;
×
330
      case LOGICAL_DNS:
331
        // TODO get the resolved endpoint configuration
332
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
333
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
334
        break;
1✔
335
      default:
336
        throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
337
    }
338
    if (clusters.containsKey(clusterName)) {
1✔
339
      // If a cycle is detected, we'll have detected it while recursing, so now there will be a key
340
      // present. We don't want to overwrite it with a non-error value.
341
      return;
1✔
342
    }
343
    clusters.put(clusterName, StatusOr.fromValue(
1✔
344
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
345
  }
1✔
346

347
  private void addRdsWatcher(String resourceName) {
348
    if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
1✔
349
      return;
×
350
    }
351

352
    addWatcher(new RdsWatcher(resourceName));
1✔
353
  }
1✔
354

355
  private void addEdsWatcher(String edsServiceName) {
356
    if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
1✔
357
      return;
1✔
358
    }
359

360
    addWatcher(new EdsWatcher(edsServiceName));
1✔
361
  }
1✔
362

363
  private void addClusterWatcher(String clusterName) {
364
    if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
1✔
365
      return;
1✔
366
    }
367

368
    addWatcher(new CdsWatcher(clusterName));
1✔
369
  }
1✔
370

371
  private void updateRoutes(List<VirtualHost> virtualHosts) {
372
    VirtualHost virtualHost =
1✔
373
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
374
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
375
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
376
  }
1✔
377

378
  private String nodeInfo() {
379
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
380
  }
381

382
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
383
    if (virtualHost == null) {
1✔
384
      return Collections.emptySet();
1✔
385
    }
386

387
    // Get all cluster names to which requests can be routed through the virtual host.
388
    Set<String> clusters = new HashSet<>();
1✔
389
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
390
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
391
      if (action == null) {
1✔
392
        continue;
1✔
393
      }
394
      if (action.cluster() != null) {
1✔
395
        clusters.add(action.cluster());
1✔
396
      } else if (action.weightedClusters() != null) {
1✔
397
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
398
          clusters.add(weighedCluster.name());
1✔
399
        }
1✔
400
      }
401
    }
1✔
402

403
    return clusters;
1✔
404
  }
405

406
  private static class TypeWatchers<T extends ResourceUpdate> {
407
    // Key is resource name
408
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
409
    final XdsResourceType<T> resourceType;
410

411
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
412
      this.resourceType = resourceType;
1✔
413
    }
1✔
414
  }
415

416
  public interface XdsConfigWatcher {
417
    /**
418
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
419
     * INTERNAL.
420
     */
421
    void onUpdate(StatusOr<XdsConfig> config);
422
  }
423

424
  private final class ClusterSubscription implements XdsConfig.Subscription {
425
    private final String clusterName;
426
    boolean closed; // Accessed from syncContext
427

428
    public ClusterSubscription(String clusterName) {
1✔
429
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
430
    }
1✔
431

432
    String getClusterName() {
433
      return clusterName;
1✔
434
    }
435

436
    @Override
437
    public void close() {
438
      releaseSubscription(this);
1✔
439
    }
1✔
440
  }
441

442
  /** State for tracing garbage collector. */
443
  private static final class WatcherTracer {
1✔
444
    private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
445
    private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
446

447
    public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
1✔
448
      this.resourceWatchers = resourceWatchers;
1✔
449

450
      this.usedWatchers = new HashMap<>();
1✔
451
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
452
        usedWatchers.put(type, newTypeWatchers(type));
1✔
453
      }
1✔
454
    }
1✔
455

456
    private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
457
        XdsResourceType<T> type) {
458
      return new TypeWatchers<T>(type);
1✔
459
    }
460

461
    public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
462
        XdsResourceType<T> resourceType, String name) {
463
      TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
464
      if (typeWatchers == null) {
1✔
465
        return null;
×
466
      }
467
      assert typeWatchers.resourceType == resourceType;
1✔
468
      @SuppressWarnings("unchecked")
469
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
470
      XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
1✔
471
      if (watcher == null) {
1✔
472
        return null;
×
473
      }
474
      @SuppressWarnings("unchecked")
475
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
1✔
476
      usedTypeWatchers.watchers.put(name, watcher);
1✔
477
      return watcher;
1✔
478
    }
479

480
    /** Shut down unused watchers. */
481
    public void closeUnusedWatchers() {
482
      boolean changed = false; // Help out the GC by preferring old objects
1✔
483
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
484
        TypeWatchers<?> orig = resourceWatchers.get(type);
1✔
485
        TypeWatchers<?> used = usedWatchers.get(type);
1✔
486
        for (String name : orig.watchers.keySet()) {
1✔
487
          if (used.watchers.containsKey(name)) {
1✔
488
            continue;
1✔
489
          }
490
          orig.watchers.get(name).close();
1✔
491
          changed = true;
1✔
492
        }
1✔
493
      }
1✔
494
      if (changed) {
1✔
495
        resourceWatchers.putAll(usedWatchers);
1✔
496
      }
497
    }
1✔
498
  }
499

500
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
501
      implements ResourceWatcher<T> {
502
    private final XdsResourceType<T> type;
503
    private final String resourceName;
504
    boolean cancelled;
505

506
    @Nullable
507
    private StatusOr<T> data;
508

509

510
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
511
      this.type = checkNotNull(type, "type");
1✔
512
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
513
    }
1✔
514

515
    @Override
516
    public void onError(Status error) {
517
      checkNotNull(error, "error");
1✔
518
      if (cancelled) {
1✔
519
        return;
×
520
      }
521
      // Don't update configuration on error, if we've already received configuration
522
      if (!hasDataValue()) {
1✔
523
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
524
            String.format("Error retrieving %s: %s: %s",
1✔
525
              toContextString(), error.getCode(), error.getDescription())));
1✔
526
        maybePublishConfig();
1✔
527
      }
528
    }
1✔
529

530
    @Override
531
    public void onResourceDoesNotExist(String resourceName) {
532
      if (cancelled) {
1✔
533
        return;
×
534
      }
535

536
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
537
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
538
          toContextString() + " does not exist" + nodeInfo()));
1✔
539
      maybePublishConfig();
1✔
540
    }
1✔
541

542
    public void close() {
543
      cancelled = true;
1✔
544
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
545
    }
1✔
546

547
    boolean missingResult() {
548
      return data == null;
1✔
549
    }
550

551
    @Nullable
552
    StatusOr<T> getData() {
553
      return data;
1✔
554
    }
555

556
    boolean hasDataValue() {
557
      return data != null && data.hasValue();
1✔
558
    }
559

560
    String resourceName() {
561
      return resourceName;
×
562
    }
563

564
    protected void setData(T data) {
565
      checkNotNull(data, "data");
1✔
566
      this.data = StatusOr.fromValue(data);
1✔
567
    }
1✔
568

569
    protected void setDataAsStatus(Status status) {
570
      checkNotNull(status, "status");
1✔
571
      this.data = StatusOr.fromStatus(status);
1✔
572
    }
1✔
573

574
    public String toContextString() {
575
      return toContextStr(type.typeName(), resourceName);
1✔
576
    }
577
  }
578

579
  private interface RdsUpdateSupplier {
580
    StatusOr<RdsUpdate> getRdsUpdate();
581
  }
582

583
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
584
      implements RdsUpdateSupplier {
585

586
    private LdsWatcher(String resourceName) {
1✔
587
      super(XdsListenerResource.getInstance(), resourceName);
1✔
588
    }
1✔
589

590
    @Override
591
    public void onChanged(XdsListenerResource.LdsUpdate update) {
592
      checkNotNull(update, "update");
1✔
593
      if (cancelled) {
1✔
594
        return;
1✔
595
      }
596

597
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
598
      List<VirtualHost> virtualHosts;
599
      if (httpConnectionManager == null) {
1✔
600
        // TCP listener. Unsupported config
601
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
602
      } else {
603
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
604
      }
605
      if (virtualHosts != null) {
1✔
606
        updateRoutes(virtualHosts);
1✔
607
      }
608

609
      String rdsName = getRdsName(update);
1✔
610
      if (rdsName != null) {
1✔
611
        addRdsWatcher(rdsName);
1✔
612
      }
613

614
      setData(update);
1✔
615
      maybePublishConfig();
1✔
616
    }
1✔
617

618
    private String getRdsName(XdsListenerResource.LdsUpdate update) {
619
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
620
      if (httpConnectionManager == null) {
1✔
621
        // TCP listener. Unsupported config
622
        return null;
1✔
623
      }
624
      return httpConnectionManager.rdsName();
1✔
625
    }
626

627
    private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTracer tracer) {
628
      String rdsName = getRdsName(update);
1✔
629
      if (rdsName == null) {
1✔
630
        return null;
×
631
      }
632
      return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
1✔
633
    }
634

635
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
636
      if (!hasDataValue()) {
1✔
637
        return this;
×
638
      }
639
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
640
      if (hcm == null) {
1✔
641
        return this;
1✔
642
      }
643
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
644
      if (virtualHosts != null) {
1✔
645
        return this;
1✔
646
      }
647
      RdsWatcher rdsWatcher = getRdsWatcher(getData().getValue(), tracer);
1✔
648
      assert rdsWatcher != null;
1✔
649
      return rdsWatcher;
1✔
650
    }
651

652
    @Override
653
    public StatusOr<RdsUpdate> getRdsUpdate() {
654
      if (missingResult()) {
1✔
655
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
656
      }
657
      if (!getData().hasValue()) {
1✔
658
        return StatusOr.fromStatus(getData().getStatus());
×
659
      }
660
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
661
      if (hcm == null) {
1✔
662
        return StatusOr.fromStatus(
1✔
663
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
664
      }
665
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
666
      if (virtualHosts == null) {
1✔
667
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
668
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
669
        // bug
670
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
671
      }
672
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
673
    }
674
  }
675

676
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
677

678
    public RdsWatcher(String resourceName) {
1✔
679
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
680
    }
1✔
681

682
    @Override
683
    public void onChanged(RdsUpdate update) {
684
      checkNotNull(update, "update");
1✔
685
      if (cancelled) {
1✔
686
        return;
1✔
687
      }
688
      setData(update);
1✔
689
      updateRoutes(update.virtualHosts);
1✔
690
      maybePublishConfig();
1✔
691
    }
1✔
692

693
    @Override
694
    public StatusOr<RdsUpdate> getRdsUpdate() {
695
      if (missingResult()) {
1✔
696
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
697
      }
698
      return getData();
1✔
699
    }
700
  }
701

702
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
703
    CdsWatcher(String resourceName) {
1✔
704
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
705
    }
1✔
706

707
    @Override
708
    public void onChanged(XdsClusterResource.CdsUpdate update) {
709
      checkNotNull(update, "update");
1✔
710
      if (cancelled) {
1✔
711
        return;
1✔
712
      }
713
      switch (update.clusterType()) {
1✔
714
        case EDS:
715
          setData(update);
1✔
716
          addEdsWatcher(getEdsServiceName());
1✔
717
          break;
1✔
718
        case LOGICAL_DNS:
719
          setData(update);
1✔
720
          // no eds needed
721
          break;
1✔
722
        case AGGREGATE:
723
          setData(update);
1✔
724
          update.prioritizedClusterNames()
1✔
725
              .forEach(name -> addClusterWatcher(name));
1✔
726
          break;
1✔
727
        default:
728
          Status error = Status.UNAVAILABLE.withDescription(
×
729
              "unknown cluster type in " + resourceName() + " " + update.clusterType());
×
730
          setDataAsStatus(error);
×
731
      }
732
      maybePublishConfig();
1✔
733
    }
1✔
734

735
    public String getEdsServiceName() {
736
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
737
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
738
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
739
      if (edsServiceName == null) {
1✔
740
        edsServiceName = cdsUpdate.clusterName();
×
741
      }
742
      return edsServiceName;
1✔
743
    }
744
  }
745

746
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
747
    private EdsWatcher(String resourceName) {
1✔
748
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
749
    }
1✔
750

751
    @Override
752
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
753
      if (cancelled) {
1✔
754
        return;
1✔
755
      }
756
      setData(checkNotNull(update, "update"));
1✔
757
      maybePublishConfig();
1✔
758
    }
1✔
759
  }
760
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc