• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19855

09 Jun 2025 02:13PM UTC coverage: 88.609% (+0.006%) from 88.603%
#19855

push

github

ejona86
xds: cancelled=true on watch close in XdsDepManager

1fd29bc80 replaced cancelWatcher() with watcher.close(). But setting
cancelled was missing. Because the config update checks for shutdown,
the cancelled flag no longer avoids exceptions. But it seems best to
continue avoiding any processing after close to avoid surprises.

34700 of 39161 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.89
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.collect.ImmutableList;
25
import io.grpc.NameResolver;
26
import io.grpc.Status;
27
import io.grpc.StatusOr;
28
import io.grpc.SynchronizationContext;
29
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
30
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
31
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
32
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
33
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
34
import io.grpc.xds.client.XdsClient;
35
import io.grpc.xds.client.XdsClient.ResourceWatcher;
36
import io.grpc.xds.client.XdsResourceType;
37
import java.io.Closeable;
38
import java.io.IOException;
39
import java.util.Collections;
40
import java.util.HashMap;
41
import java.util.HashSet;
42
import java.util.LinkedHashSet;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Objects;
46
import java.util.Set;
47
import java.util.concurrent.ScheduledExecutorService;
48
import javax.annotation.Nullable;
49

50
/**
51
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
52
 * maintains the watchers for the xds resources and when an update is received, it either requests
53
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
54
 * applies to a single data plane authority.
55
 */
56
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
57
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
58
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
59
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
60
  private final String listenerName;
61
  private final XdsClient xdsClient;
62
  private final XdsConfigWatcher xdsConfigWatcher;
63
  private final SynchronizationContext syncContext;
64
  private final String dataPlaneAuthority;
65

66
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
67
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
68
  private final Set<ClusterSubscription> subscriptions = new HashSet<>();
1✔
69

70
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
71
                       SynchronizationContext syncContext, String dataPlaneAuthority,
72
                       String listenerName, NameResolver.Args nameResolverArgs,
73
                       ScheduledExecutorService scheduler) {
1✔
74
    this.listenerName = checkNotNull(listenerName, "listenerName");
1✔
75
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
76
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
77
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
78
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
79
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
80
    checkNotNull(scheduler, "scheduler");
1✔
81

82
    // start the ball rolling
83
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
84
  }
1✔
85

86
  public static String toContextStr(String typeName, String resourceName) {
87
    return typeName + " resource " + resourceName;
1✔
88
  }
89

90
  @Override
91
  public Closeable subscribeToCluster(String clusterName) {
92
    checkNotNull(clusterName, "clusterName");
1✔
93
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
94

95
    syncContext.execute(() -> {
1✔
96
      if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
97
        subscription.closed = true;
1✔
98
        return; // shutdown() called
1✔
99
      }
100
      subscriptions.add(subscription);
1✔
101
      addClusterWatcher(clusterName);
1✔
102
    });
1✔
103

104
    return subscription;
1✔
105
  }
106

107
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
108
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
109
    XdsResourceType<T> type = watcher.type;
1✔
110
    String resourceName = watcher.resourceName;
1✔
111

112
    getWatchers(type).put(resourceName, watcher);
1✔
113
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
114
  }
1✔
115

116
  public void shutdown() {
117
    syncContext.execute(() -> {
1✔
118
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
119
        shutdownWatchersForType(watchers);
1✔
120
      }
1✔
121
      resourceWatchers.clear();
1✔
122
      subscriptions.clear();
1✔
123
    });
1✔
124
  }
1✔
125

126
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
127
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
128
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
129
          watcherEntry.getValue());
1✔
130
      watcherEntry.getValue().cancelled = true;
1✔
131
    }
1✔
132
  }
1✔
133

134
  private void releaseSubscription(ClusterSubscription subscription) {
135
    checkNotNull(subscription, "subscription");
1✔
136
    syncContext.execute(() -> {
1✔
137
      if (subscription.closed) {
1✔
138
        return;
1✔
139
      }
140
      subscription.closed = true;
1✔
141
      if (!subscriptions.remove(subscription)) {
1✔
142
        return; // shutdown() called
×
143
      }
144
      maybePublishConfig();
1✔
145
    });
1✔
146
  }
1✔
147

148
  /**
149
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
150
   * the watchers.
151
   */
152
  private void maybePublishConfig() {
153
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
154
    if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
1✔
155
      return; // shutdown() called
×
156
    }
157
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
158
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
159
        .anyMatch(XdsWatcherBase::missingResult);
1✔
160
    if (waitingOnResource) {
1✔
161
      return;
1✔
162
    }
163

164
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
165
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
166
      return;
1✔
167
    }
168
    assert newUpdate.hasValue()
1✔
169
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
170
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
171
    lastUpdate = newUpdate;
1✔
172
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
173
  }
1✔
174

175
  @VisibleForTesting
176
  StatusOr<XdsConfig> buildUpdate() {
177
    // Create a config and discard any watchers not accessed
178
    WatcherTracer tracer = new WatcherTracer(resourceWatchers);
1✔
179
    StatusOr<XdsConfig> config = buildUpdate(
1✔
180
        tracer, listenerName, dataPlaneAuthority, subscriptions);
181
    tracer.closeUnusedWatchers();
1✔
182
    return config;
1✔
183
  }
184

185
  private static StatusOr<XdsConfig> buildUpdate(
186
      WatcherTracer tracer,
187
      String listenerName,
188
      String dataPlaneAuthority,
189
      Set<ClusterSubscription> subscriptions) {
190
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
191

192
    // Iterate watchers and build the XdsConfig
193

194
    XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
1✔
195
        = tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
1✔
196
    if (ldsWatcher == null) {
1✔
197
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
198
          "Bug: No listener watcher found for " + listenerName));
199
    }
200
    if (!ldsWatcher.getData().hasValue()) {
1✔
201
      return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
202
    }
203
    XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
204
    builder.setListener(ldsUpdate);
1✔
205

206
    RdsUpdateSupplier routeSource = ((LdsWatcher) ldsWatcher).getRouteSource(tracer);
1✔
207
    if (routeSource == null) {
1✔
208
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
×
209
          "Bug: No route source found for listener " + dataPlaneAuthority));
210
    }
211
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
212
    if (!statusOrRdsUpdate.hasValue()) {
1✔
213
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
214
    }
215
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
216
    builder.setRoute(rdsUpdate);
1✔
217

218
    VirtualHost activeVirtualHost =
1✔
219
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
220
    if (activeVirtualHost == null) {
1✔
221
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
222
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
223
    }
224
    builder.setVirtualHost(activeVirtualHost);
1✔
225

226
    Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters = new HashMap<>();
1✔
227
    LinkedHashSet<String> ancestors = new LinkedHashSet<>();
1✔
228
    for (String cluster : getClusterNamesFromVirtualHost(activeVirtualHost)) {
1✔
229
      addConfigForCluster(clusters, cluster, ancestors, tracer);
1✔
230
    }
1✔
231
    for (ClusterSubscription subscription : subscriptions) {
1✔
232
      addConfigForCluster(clusters, subscription.getClusterName(), ancestors, tracer);
1✔
233
    }
1✔
234
    for (Map.Entry<String, StatusOr<XdsConfig.XdsClusterConfig>> me : clusters.entrySet()) {
1✔
235
      builder.addCluster(me.getKey(), me.getValue());
1✔
236
    }
1✔
237

238
    return StatusOr.fromValue(builder.build());
1✔
239
  }
240

241
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
242
      XdsResourceType<T> resourceType) {
243
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
244
    if (typeWatchers == null) {
1✔
245
      typeWatchers = new TypeWatchers<T>(resourceType);
1✔
246
      resourceWatchers.put(resourceType, typeWatchers);
1✔
247
    }
248
    assert typeWatchers.resourceType == resourceType;
1✔
249
    @SuppressWarnings("unchecked")
250
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
251
    return tTypeWatchers.watchers;
1✔
252
  }
253

254
  private static void addConfigForCluster(
255
      Map<String, StatusOr<XdsConfig.XdsClusterConfig>> clusters,
256
      String clusterName,
257
      @SuppressWarnings("NonApiType") // Need order-preserving set for errors
258
      LinkedHashSet<String> ancestors,
259
      WatcherTracer tracer) {
260
    if (clusters.containsKey(clusterName)) {
1✔
261
      return;
1✔
262
    }
263
    if (ancestors.contains(clusterName)) {
1✔
264
      clusters.put(clusterName, StatusOr.fromStatus(
1✔
265
          Status.INTERNAL.withDescription(
1✔
266
              "Aggregate cluster cycle detected: " + ancestors)));
267
      return;
1✔
268
    }
269
    if (ancestors.size() > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
270
      clusters.put(clusterName, StatusOr.fromStatus(
×
271
          Status.INTERNAL.withDescription("Recursion limit reached: " + ancestors)));
×
272
      return;
×
273
    }
274

275
    CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
1✔
276
    StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
277
    if (!cdsWatcherDataOr.hasValue()) {
1✔
278
      clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
279
      return;
1✔
280
    }
281

282
    XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
283
    XdsConfig.XdsClusterConfig.ClusterChild child;
284
    switch (cdsUpdate.clusterType()) {
1✔
285
      case AGGREGATE:
286
        // Re-inserting a present element into a LinkedHashSet does not reorder the entries, so it
287
        // preserves the priority across all aggregate clusters
288
        LinkedHashSet<String> leafNames = new LinkedHashSet<String>();
1✔
289
        ancestors.add(clusterName);
1✔
290
        for (String childCluster : cdsUpdate.prioritizedClusterNames()) {
1✔
291
          addConfigForCluster(clusters, childCluster, ancestors, tracer);
1✔
292
          StatusOr<XdsConfig.XdsClusterConfig> config = clusters.get(childCluster);
1✔
293
          if (!config.hasValue()) {
1✔
294
            clusters.put(clusterName, StatusOr.fromStatus(Status.INTERNAL.withDescription(
1✔
295
                "Unable to get leaves for " + clusterName + ": "
296
                + config.getStatus().getDescription())));
1✔
297
            return;
1✔
298
          }
299
          XdsConfig.XdsClusterConfig.ClusterChild children = config.getValue().getChildren();
1✔
300
          if (children instanceof AggregateConfig) {
1✔
301
            leafNames.addAll(((AggregateConfig) children).getLeafNames());
1✔
302
          } else {
303
            leafNames.add(childCluster);
1✔
304
          }
305
        }
1✔
306
        ancestors.remove(clusterName);
1✔
307

308
        child = new AggregateConfig(ImmutableList.copyOf(leafNames));
1✔
309
        break;
1✔
310
      case EDS:
311
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
312
            tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
1✔
313
        if (edsWatcher != null) {
1✔
314
          child = new EndpointConfig(edsWatcher.getData());
1✔
315
        } else {
316
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
317
              "EDS resource not found for cluster " + clusterName)));
318
        }
319
        break;
×
320
      case LOGICAL_DNS:
321
        // TODO get the resolved endpoint configuration
322
        child = new EndpointConfig(StatusOr.fromStatus(
1✔
323
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
324
        break;
1✔
325
      default:
326
        throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
327
    }
328
    clusters.put(clusterName, StatusOr.fromValue(
1✔
329
        new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
330
  }
1✔
331

332
  private void addRdsWatcher(String resourceName) {
333
    if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
1✔
334
      return;
×
335
    }
336

337
    addWatcher(new RdsWatcher(resourceName));
1✔
338
  }
1✔
339

340
  private void addEdsWatcher(String edsServiceName) {
341
    if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
1✔
342
      return;
1✔
343
    }
344

345
    addWatcher(new EdsWatcher(edsServiceName));
1✔
346
  }
1✔
347

348
  private void addClusterWatcher(String clusterName) {
349
    if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
1✔
350
      return;
1✔
351
    }
352

353
    addWatcher(new CdsWatcher(clusterName));
1✔
354
  }
1✔
355

356
  private void updateRoutes(List<VirtualHost> virtualHosts) {
357
    VirtualHost virtualHost =
1✔
358
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
359
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
360
    newClusters.forEach((cluster) -> addClusterWatcher(cluster));
1✔
361
  }
1✔
362

363
  private String nodeInfo() {
364
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
365
  }
366

367
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
368
    if (virtualHost == null) {
1✔
369
      return Collections.emptySet();
1✔
370
    }
371

372
    // Get all cluster names to which requests can be routed through the virtual host.
373
    Set<String> clusters = new HashSet<>();
1✔
374
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
375
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
376
      if (action == null) {
1✔
377
        continue;
1✔
378
      }
379
      if (action.cluster() != null) {
1✔
380
        clusters.add(action.cluster());
1✔
381
      } else if (action.weightedClusters() != null) {
1✔
382
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
383
          clusters.add(weighedCluster.name());
1✔
384
        }
1✔
385
      }
386
    }
1✔
387

388
    return clusters;
1✔
389
  }
390

391
  private static class TypeWatchers<T extends ResourceUpdate> {
392
    // Key is resource name
393
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
394
    final XdsResourceType<T> resourceType;
395

396
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
397
      this.resourceType = resourceType;
1✔
398
    }
1✔
399
  }
400

401
  public interface XdsConfigWatcher {
402
    /**
403
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
404
     * INTERNAL.
405
     */
406
    void onUpdate(StatusOr<XdsConfig> config);
407
  }
408

409
  private final class ClusterSubscription implements Closeable {
410
    private final String clusterName;
411
    boolean closed; // Accessed from syncContext
412

413
    public ClusterSubscription(String clusterName) {
1✔
414
      this.clusterName = checkNotNull(clusterName, "clusterName");
1✔
415
    }
1✔
416

417
    String getClusterName() {
418
      return clusterName;
1✔
419
    }
420

421
    @Override
422
    public void close() throws IOException {
423
      releaseSubscription(this);
1✔
424
    }
1✔
425
  }
426

427
  /** State for tracing garbage collector. */
428
  private static final class WatcherTracer {
1✔
429
    private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
430
    private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
431

432
    public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
1✔
433
      this.resourceWatchers = resourceWatchers;
1✔
434

435
      this.usedWatchers = new HashMap<>();
1✔
436
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
437
        usedWatchers.put(type, newTypeWatchers(type));
1✔
438
      }
1✔
439
    }
1✔
440

441
    private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
442
        XdsResourceType<T> type) {
443
      return new TypeWatchers<T>(type);
1✔
444
    }
445

446
    public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
447
        XdsResourceType<T> resourceType, String name) {
448
      TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
449
      if (typeWatchers == null) {
1✔
450
        return null;
×
451
      }
452
      assert typeWatchers.resourceType == resourceType;
1✔
453
      @SuppressWarnings("unchecked")
454
      TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
455
      XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
1✔
456
      if (watcher == null) {
1✔
457
        return null;
×
458
      }
459
      @SuppressWarnings("unchecked")
460
      TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
1✔
461
      usedTypeWatchers.watchers.put(name, watcher);
1✔
462
      return watcher;
1✔
463
    }
464

465
    /** Shut down unused watchers. */
466
    public void closeUnusedWatchers() {
467
      boolean changed = false; // Help out the GC by preferring old objects
1✔
468
      for (XdsResourceType<?> type : resourceWatchers.keySet()) {
1✔
469
        TypeWatchers<?> orig = resourceWatchers.get(type);
1✔
470
        TypeWatchers<?> used = usedWatchers.get(type);
1✔
471
        for (String name : orig.watchers.keySet()) {
1✔
472
          if (used.watchers.containsKey(name)) {
1✔
473
            continue;
1✔
474
          }
475
          orig.watchers.get(name).close();
1✔
476
          changed = true;
1✔
477
        }
1✔
478
      }
1✔
479
      if (changed) {
1✔
480
        resourceWatchers.putAll(usedWatchers);
1✔
481
      }
482
    }
1✔
483
  }
484

485
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
486
      implements ResourceWatcher<T> {
487
    private final XdsResourceType<T> type;
488
    private final String resourceName;
489
    boolean cancelled;
490

491
    @Nullable
492
    private StatusOr<T> data;
493

494

495
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
496
      this.type = checkNotNull(type, "type");
1✔
497
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
498
    }
1✔
499

500
    @Override
501
    public void onError(Status error) {
502
      checkNotNull(error, "error");
1✔
503
      if (cancelled) {
1✔
504
        return;
×
505
      }
506
      // Don't update configuration on error, if we've already received configuration
507
      if (!hasDataValue()) {
1✔
508
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
509
            String.format("Error retrieving %s: %s: %s",
1✔
510
              toContextString(), error.getCode(), error.getDescription())));
1✔
511
        maybePublishConfig();
1✔
512
      }
513
    }
1✔
514

515
    @Override
516
    public void onResourceDoesNotExist(String resourceName) {
517
      if (cancelled) {
1✔
518
        return;
×
519
      }
520

521
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
522
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
523
          toContextString() + " does not exist" + nodeInfo()));
1✔
524
      maybePublishConfig();
1✔
525
    }
1✔
526

527
    public void close() {
528
      cancelled = true;
1✔
529
      xdsClient.cancelXdsResourceWatch(type, resourceName, this);
1✔
530
    }
1✔
531

532
    boolean missingResult() {
533
      return data == null;
1✔
534
    }
535

536
    @Nullable
537
    StatusOr<T> getData() {
538
      return data;
1✔
539
    }
540

541
    boolean hasDataValue() {
542
      return data != null && data.hasValue();
1✔
543
    }
544

545
    String resourceName() {
546
      return resourceName;
1✔
547
    }
548

549
    protected void setData(T data) {
550
      checkNotNull(data, "data");
1✔
551
      this.data = StatusOr.fromValue(data);
1✔
552
    }
1✔
553

554
    protected void setDataAsStatus(Status status) {
555
      checkNotNull(status, "status");
1✔
556
      this.data = StatusOr.fromStatus(status);
1✔
557
    }
1✔
558

559
    public String toContextString() {
560
      return toContextStr(type.typeName(), resourceName);
1✔
561
    }
562
  }
563

564
  private interface RdsUpdateSupplier {
565
    StatusOr<RdsUpdate> getRdsUpdate();
566
  }
567

568
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
569
      implements RdsUpdateSupplier {
570
    String rdsName;
571

572
    private LdsWatcher(String resourceName) {
1✔
573
      super(XdsListenerResource.getInstance(), resourceName);
1✔
574
    }
1✔
575

576
    @Override
577
    public void onChanged(XdsListenerResource.LdsUpdate update) {
578
      checkNotNull(update, "update");
1✔
579
      if (cancelled) {
1✔
580
        return;
1✔
581
      }
582

583
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
584
      List<VirtualHost> virtualHosts;
585
      String rdsName;
586
      if (httpConnectionManager == null) {
1✔
587
        // TCP listener. Unsupported config
588
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
589
        rdsName = null;
1✔
590
      } else {
591
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
592
        rdsName = httpConnectionManager.rdsName();
1✔
593
      }
594

595
      if (virtualHosts != null) {
1✔
596
        // No RDS watcher since we are getting RDS updates via LDS
597
        updateRoutes(virtualHosts);
1✔
598
        this.rdsName = null;
1✔
599
      } else {
600
        this.rdsName = rdsName;
1✔
601
        addRdsWatcher(rdsName);
1✔
602
      }
603

604
      setData(update);
1✔
605
      maybePublishConfig();
1✔
606
    }
1✔
607

608
    @Override
609
    public void onResourceDoesNotExist(String resourceName) {
610
      if (cancelled) {
1✔
611
        return;
×
612
      }
613

614
      checkArgument(resourceName().equals(resourceName), "Resource name does not match");
1✔
615
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
616
          toContextString() + " does not exist" + nodeInfo()));
1✔
617
      rdsName = null;
1✔
618
      maybePublishConfig();
1✔
619
    }
1✔
620

621
    private RdsWatcher getRdsWatcher(WatcherTracer tracer) {
622
      if (rdsName == null) {
1✔
623
        return null;
×
624
      }
625
      return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
1✔
626
    }
627

628
    public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
629
      if (!hasDataValue()) {
1✔
630
        return this;
×
631
      }
632
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
633
      if (hcm == null) {
1✔
634
        return this;
1✔
635
      }
636
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
637
      if (virtualHosts != null) {
1✔
638
        return this;
1✔
639
      }
640
      RdsWatcher rdsWatcher = getRdsWatcher(tracer);
1✔
641
      assert rdsWatcher != null;
1✔
642
      return rdsWatcher;
1✔
643
    }
644

645
    @Override
646
    public StatusOr<RdsUpdate> getRdsUpdate() {
647
      if (missingResult()) {
1✔
648
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
649
      }
650
      if (!getData().hasValue()) {
1✔
651
        return StatusOr.fromStatus(getData().getStatus());
×
652
      }
653
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
654
      if (hcm == null) {
1✔
655
        return StatusOr.fromStatus(
1✔
656
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
657
      }
658
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
659
      if (virtualHosts == null) {
1✔
660
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
661
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
662
        // bug
663
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
664
      }
665
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
666
    }
667
  }
668

669
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
670

671
    public RdsWatcher(String resourceName) {
1✔
672
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
673
    }
1✔
674

675
    @Override
676
    public void onChanged(RdsUpdate update) {
677
      checkNotNull(update, "update");
1✔
678
      if (cancelled) {
1✔
679
        return;
1✔
680
      }
681
      setData(update);
1✔
682
      updateRoutes(update.virtualHosts);
1✔
683
      maybePublishConfig();
1✔
684
    }
1✔
685

686
    @Override
687
    public StatusOr<RdsUpdate> getRdsUpdate() {
688
      if (missingResult()) {
1✔
689
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
690
      }
691
      return getData();
1✔
692
    }
693
  }
694

695
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
696
    CdsWatcher(String resourceName) {
1✔
697
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
698
    }
1✔
699

700
    @Override
701
    public void onChanged(XdsClusterResource.CdsUpdate update) {
702
      checkNotNull(update, "update");
1✔
703
      if (cancelled) {
1✔
704
        return;
1✔
705
      }
706
      switch (update.clusterType()) {
1✔
707
        case EDS:
708
          setData(update);
1✔
709
          addEdsWatcher(getEdsServiceName());
1✔
710
          break;
1✔
711
        case LOGICAL_DNS:
712
          setData(update);
1✔
713
          // no eds needed
714
          break;
1✔
715
        case AGGREGATE:
716
          setData(update);
1✔
717
          update.prioritizedClusterNames()
1✔
718
              .forEach(name -> addClusterWatcher(name));
1✔
719
          break;
1✔
720
        default:
721
          Status error = Status.UNAVAILABLE.withDescription(
×
722
              "unknown cluster type in " + resourceName() + " " + update.clusterType());
×
723
          setDataAsStatus(error);
×
724
      }
725
      maybePublishConfig();
1✔
726
    }
1✔
727

728
    public String getEdsServiceName() {
729
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
730
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
731
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
732
      if (edsServiceName == null) {
1✔
733
        edsServiceName = cdsUpdate.clusterName();
×
734
      }
735
      return edsServiceName;
1✔
736
    }
737
  }
738

739
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
740
    private EdsWatcher(String resourceName) {
1✔
741
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
742
    }
1✔
743

744
    @Override
745
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
746
      if (cancelled) {
1✔
747
        return;
1✔
748
      }
749
      setData(checkNotNull(update, "update"));
1✔
750
      maybePublishConfig();
1✔
751
    }
1✔
752
  }
753
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc