• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19740

18 Mar 2025 09:05PM UTC coverage: 88.588% (+0.05%) from 88.536%
#19740

push

github

web-flow
xds: Use XdsDependencyManager for XdsNameResolver

Contributes to the gRFC A74 effort.
https://github.com/grpc/proposal/blob/master/A74-xds-config-tears.md

The alternative to using Mockito's ArgumentMatcher is to use Hamcrest.
However, Hamcrest did not impress me. ArgumentMatcher is trivial if you
don't care about the error message.

This fixes a pre-existing issue where ConfigSelector.releaseCluster
could revert the LB config back to using cluster manager after releasing
all RPCs using a cluster have committed.

Co-authored-by: Larry Safran <lsafran@google.com>

34583 of 39038 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.79
/../xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
22
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.collect.Sets;
26
import io.grpc.InternalLogId;
27
import io.grpc.NameResolver;
28
import io.grpc.Status;
29
import io.grpc.StatusOr;
30
import io.grpc.SynchronizationContext;
31
import io.grpc.xds.VirtualHost.Route.RouteAction.ClusterWeight;
32
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
33
import io.grpc.xds.XdsConfig.XdsClusterConfig.AggregateConfig;
34
import io.grpc.xds.XdsConfig.XdsClusterConfig.EndpointConfig;
35
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
36
import io.grpc.xds.client.XdsClient;
37
import io.grpc.xds.client.XdsClient.ResourceWatcher;
38
import io.grpc.xds.client.XdsLogger;
39
import io.grpc.xds.client.XdsResourceType;
40
import java.io.Closeable;
41
import java.io.IOException;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.HashSet;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Objects;
48
import java.util.Set;
49
import java.util.concurrent.ScheduledExecutorService;
50
import java.util.stream.Collectors;
51
import javax.annotation.Nullable;
52

53
/**
54
 * This class acts as a layer of indirection between the XdsClient and the NameResolver. It
55
 * maintains the watchers for the xds resources and when an update is received, it either requests
56
 * referenced resources or updates the XdsConfig and notifies the XdsConfigWatcher.  Each instance
57
 * applies to a single data plane authority.
58
 */
59
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
60
  public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
1✔
61
  public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
1✔
62
  private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Matches C++
63
  private final XdsClient xdsClient;
64
  private final XdsConfigWatcher xdsConfigWatcher;
65
  private final SynchronizationContext syncContext;
66
  private final String dataPlaneAuthority;
67

68
  private final InternalLogId logId;
69
  private final XdsLogger logger;
70
  private StatusOr<XdsConfig> lastUpdate = null;
1✔
71
  private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
1✔
72

73
  XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
74
                       SynchronizationContext syncContext, String dataPlaneAuthority,
75
                       String listenerName, NameResolver.Args nameResolverArgs,
76
                       ScheduledExecutorService scheduler) {
1✔
77
    logId = InternalLogId.allocate("xds-dependency-manager", listenerName);
1✔
78
    logger = XdsLogger.withLogId(logId);
1✔
79
    this.xdsClient = checkNotNull(xdsClient, "xdsClient");
1✔
80
    this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
1✔
81
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
82
    this.dataPlaneAuthority = checkNotNull(dataPlaneAuthority, "dataPlaneAuthority");
1✔
83
    checkNotNull(nameResolverArgs, "nameResolverArgs");
1✔
84
    checkNotNull(scheduler, "scheduler");
1✔
85

86
    // start the ball rolling
87
    syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
1✔
88
  }
1✔
89

90
  public static String toContextStr(String typeName, String resourceName) {
91
    return typeName + " resource " + resourceName;
1✔
92
  }
93

94
  @Override
95
  public Closeable subscribeToCluster(String clusterName) {
96

97
    checkNotNull(clusterName, "clusterName");
1✔
98
    ClusterSubscription subscription = new ClusterSubscription(clusterName);
1✔
99

100
    syncContext.execute(() -> {
1✔
101
      addClusterWatcher(clusterName, subscription, 1);
1✔
102
      maybePublishConfig();
1✔
103
    });
1✔
104

105
    return subscription;
1✔
106
  }
107

108
  private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
109
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
110
    XdsResourceType<T> type = watcher.type;
1✔
111
    String resourceName = watcher.resourceName;
1✔
112

113
    @SuppressWarnings("unchecked")
114
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
115
    if (typeWatchers == null) {
1✔
116
      typeWatchers = new TypeWatchers<>(type);
1✔
117
      resourceWatchers.put(type, typeWatchers);
1✔
118
    }
119

120
    typeWatchers.add(resourceName, watcher);
1✔
121
    xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
1✔
122
  }
1✔
123

124
  private void cancelCdsWatcher(CdsWatcher watcher, Object parentContext) {
125
    if (watcher == null) {
1✔
126
      return;
×
127
    }
128
    watcher.parentContexts.remove(parentContext);
1✔
129
    if (watcher.parentContexts.isEmpty()) {
1✔
130
      cancelWatcher(watcher);
1✔
131
    }
132
  }
1✔
133

134
  private void cancelEdsWatcher(EdsWatcher watcher, CdsWatcher parentContext) {
135
    if (watcher == null) {
1✔
136
      return;
×
137
    }
138
    watcher.parentContexts.remove(parentContext);
1✔
139
    if (watcher.parentContexts.isEmpty()) {
1✔
140
      cancelWatcher(watcher);
1✔
141
    }
142
  }
1✔
143

144
  private <T extends ResourceUpdate> void cancelWatcher(XdsWatcherBase<T> watcher) {
145
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
146

147
    if (watcher == null) {
1✔
148
      return;
×
149
    }
150

151
    if (watcher instanceof CdsWatcher || watcher instanceof EdsWatcher) {
1✔
152
      throwIfParentContextsNotEmpty(watcher);
1✔
153
    }
154

155
    watcher.cancelled = true;
1✔
156
    XdsResourceType<T> type = watcher.type;
1✔
157
    String resourceName = watcher.resourceName;
1✔
158

159
    @SuppressWarnings("unchecked")
160
    TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
1✔
161
    if (typeWatchers == null) {
1✔
162
      logger.log(DEBUG, "Trying to cancel watcher {0}, but type not watched", watcher);
×
163
      return;
×
164
    }
165

166
    typeWatchers.watchers.remove(resourceName);
1✔
167
    xdsClient.cancelXdsResourceWatch(type, resourceName, watcher);
1✔
168

169
  }
1✔
170

171
  private static void throwIfParentContextsNotEmpty(XdsWatcherBase<?> watcher) {
172
    if (watcher instanceof CdsWatcher) {
1✔
173
      CdsWatcher cdsWatcher = (CdsWatcher) watcher;
1✔
174
      if (!cdsWatcher.parentContexts.isEmpty()) {
1✔
175
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
176
            cdsWatcher.resourceName(), cdsWatcher.parentContexts.keySet());
×
177
        throw new IllegalStateException(msg);
×
178
      }
179
    } else if (watcher instanceof EdsWatcher) {
1✔
180
      EdsWatcher edsWatcher = (EdsWatcher) watcher;
1✔
181
      if (!edsWatcher.parentContexts.isEmpty()) {
1✔
182
        String msg = String.format("CdsWatcher %s has parent contexts %s",
×
183
            edsWatcher.resourceName(), edsWatcher.parentContexts);
×
184
        throw new IllegalStateException(msg);
×
185
      }
186
    }
187
  }
1✔
188

189
  public void shutdown() {
190
    syncContext.execute(() -> {
1✔
191
      for (TypeWatchers<?> watchers : resourceWatchers.values()) {
1✔
192
        shutdownWatchersForType(watchers);
1✔
193
      }
1✔
194
      resourceWatchers.clear();
1✔
195
    });
1✔
196
  }
1✔
197

198
  private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
199
    for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
1✔
200
      xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
1✔
201
          watcherEntry.getValue());
1✔
202
    }
1✔
203
  }
1✔
204

205
  private void releaseSubscription(ClusterSubscription subscription) {
206
    checkNotNull(subscription, "subscription");
1✔
207
    String clusterName = subscription.getClusterName();
1✔
208
    syncContext.execute(() -> {
1✔
209
      XdsWatcherBase<?> cdsWatcher =
1✔
210
          resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
211
      if (cdsWatcher == null) {
1✔
212
        return; // already released while waiting for the syncContext
×
213
      }
214
      cancelClusterWatcherTree((CdsWatcher) cdsWatcher, subscription);
1✔
215
      maybePublishConfig();
1✔
216
    });
1✔
217
  }
1✔
218

219
  private void cancelClusterWatcherTree(CdsWatcher root, Object parentContext) {
220
    checkNotNull(root, "root");
1✔
221

222
    cancelCdsWatcher(root, parentContext);
1✔
223

224
    if (!root.hasDataValue() || !root.parentContexts.isEmpty()) {
1✔
225
      return;
1✔
226
    }
227

228
    XdsClusterResource.CdsUpdate cdsUpdate = root.getData().getValue();
1✔
229
    switch (cdsUpdate.clusterType()) {
1✔
230
      case EDS:
231
        String edsServiceName = root.getEdsServiceName();
1✔
232
        EdsWatcher edsWatcher =
1✔
233
            (EdsWatcher) resourceWatchers.get(ENDPOINT_RESOURCE).watchers.get(edsServiceName);
1✔
234
        cancelEdsWatcher(edsWatcher, root);
1✔
235
        break;
1✔
236
      case AGGREGATE:
237
        for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
238
          CdsWatcher clusterWatcher =
1✔
239
              (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(cluster);
1✔
240
          if (clusterWatcher != null) {
1✔
241
            cancelClusterWatcherTree(clusterWatcher, root);
1✔
242
          }
243
        }
1✔
244
        break;
1✔
245
      case LOGICAL_DNS:
246
        // no eds needed, so everything happens in cancelCdsWatcher()
247
        break;
×
248
      default:
249
        throw new AssertionError("Unknown cluster type: " + cdsUpdate.clusterType());
×
250
    }
251
  }
1✔
252

253
  /**
254
   * Check if all resources have results, and if so, generate a new XdsConfig and send it to all
255
   * the watchers.
256
   */
257
  private void maybePublishConfig() {
258
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
259
    boolean waitingOnResource = resourceWatchers.values().stream()
1✔
260
        .flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
1✔
261
        .anyMatch(XdsWatcherBase::missingResult);
1✔
262
    if (waitingOnResource) {
1✔
263
      return;
1✔
264
    }
265

266
    StatusOr<XdsConfig> newUpdate = buildUpdate();
1✔
267
    if (Objects.equals(newUpdate, lastUpdate)) {
1✔
268
      return;
1✔
269
    }
270
    assert newUpdate.hasValue()
1✔
271
        || (newUpdate.getStatus().getCode() == Status.Code.UNAVAILABLE
1✔
272
            || newUpdate.getStatus().getCode() == Status.Code.INTERNAL);
×
273
    lastUpdate = newUpdate;
1✔
274
    xdsConfigWatcher.onUpdate(lastUpdate);
1✔
275
  }
1✔
276

277
  @VisibleForTesting
278
  StatusOr<XdsConfig> buildUpdate() {
279
    XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
1✔
280

281
    // Iterate watchers and build the XdsConfig
282

283
    // Will only be 1 listener and 1 route resource
284
    RdsUpdateSupplier routeSource = null;
1✔
285
    for (XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher :
286
        getWatchers(XdsListenerResource.getInstance()).values()) {
1✔
287
      if (!ldsWatcher.getData().hasValue()) {
1✔
288
        return StatusOr.fromStatus(ldsWatcher.getData().getStatus());
1✔
289
      }
290
      XdsListenerResource.LdsUpdate ldsUpdate = ldsWatcher.getData().getValue();
1✔
291
      builder.setListener(ldsUpdate);
1✔
292
      routeSource = ((LdsWatcher) ldsWatcher).getRouteSource();
1✔
293
    }
1✔
294

295
    StatusOr<RdsUpdate> statusOrRdsUpdate = routeSource.getRdsUpdate();
1✔
296
    if (!statusOrRdsUpdate.hasValue()) {
1✔
297
      return StatusOr.fromStatus(statusOrRdsUpdate.getStatus());
1✔
298
    }
299
    RdsUpdate rdsUpdate = statusOrRdsUpdate.getValue();
1✔
300
    builder.setRoute(rdsUpdate);
1✔
301

302
    VirtualHost activeVirtualHost =
1✔
303
        RoutingUtils.findVirtualHostForHostName(rdsUpdate.virtualHosts, dataPlaneAuthority);
1✔
304
    if (activeVirtualHost == null) {
1✔
305
      String error = "Failed to find virtual host matching hostname: " + dataPlaneAuthority;
1✔
306
      return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(error));
1✔
307
    }
308
    builder.setVirtualHost(activeVirtualHost);
1✔
309

310
    Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers =
1✔
311
        getWatchers(ENDPOINT_RESOURCE);
1✔
312
    Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers =
1✔
313
        getWatchers(CLUSTER_RESOURCE);
1✔
314

315
    // Only care about aggregates from LDS/RDS or subscriptions and the leaf clusters
316
    List<String> topLevelClusters =
1✔
317
        cdsWatchers.values().stream()
1✔
318
            .filter(XdsDependencyManager::isTopLevelCluster)
1✔
319
            .map(XdsWatcherBase<?>::resourceName)
1✔
320
            .distinct()
1✔
321
            .collect(Collectors.toList());
1✔
322

323
    // Flatten multi-level aggregates into lists of leaf clusters
324
    Set<String> leafNames =
1✔
325
        addTopLevelClustersToBuilder(builder, edsWatchers, cdsWatchers, topLevelClusters);
1✔
326

327
    addLeavesToBuilder(builder, edsWatchers, leafNames);
1✔
328

329
    return StatusOr.fromValue(builder.build());
1✔
330
  }
331

332
  private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
333
      XdsResourceType<T> resourceType) {
334
    TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
1✔
335
    if (typeWatchers == null) {
1✔
336
      return Collections.emptyMap();
1✔
337
    }
338
    assert typeWatchers.resourceType == resourceType;
1✔
339
    @SuppressWarnings("unchecked")
340
    TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
1✔
341
    return tTypeWatchers.watchers;
1✔
342
  }
343

344
  private void addLeavesToBuilder(
345
      XdsConfig.XdsConfigBuilder builder,
346
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
347
      Set<String> leafNames) {
348
    for (String clusterName : leafNames) {
1✔
349
      CdsWatcher cdsWatcher = getCluster(clusterName);
1✔
350
      StatusOr<XdsClusterResource.CdsUpdate> cdsUpdateOr = cdsWatcher.getData();
1✔
351

352
      if (!cdsUpdateOr.hasValue()) {
1✔
353
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdateOr.getStatus()));
1✔
354
        continue;
1✔
355
      }
356

357
      XdsClusterResource.CdsUpdate cdsUpdate = cdsUpdateOr.getValue();
1✔
358
      if (cdsUpdate.clusterType() == ClusterType.EDS) {
1✔
359
        XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
360
            edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
361
        EndpointConfig child;
362
        if (edsWatcher != null) {
1✔
363
          child = new EndpointConfig(edsWatcher.getData());
1✔
364
        } else {
365
          child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
366
              "EDS resource not found for cluster " + clusterName)));
367
        }
368
        builder.addCluster(clusterName, StatusOr.fromValue(
1✔
369
            new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
370
      } else if (cdsUpdate.clusterType() == ClusterType.LOGICAL_DNS) {
1✔
371
        builder.addCluster(clusterName, StatusOr.fromStatus(
×
372
            Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
×
373
      }
374
    }
1✔
375
  }
1✔
376

377
  // Adds the top-level clusters to the builder and returns the leaf cluster names
378
  private Set<String> addTopLevelClustersToBuilder(
379
      XdsConfig.XdsConfigBuilder builder,
380
      Map<String, XdsWatcherBase<XdsEndpointResource.EdsUpdate>> edsWatchers,
381
      Map<String, XdsWatcherBase<XdsClusterResource.CdsUpdate>> cdsWatchers,
382
      List<String> topLevelClusters) {
383

384
    Set<String> leafClusterNames = new HashSet<>();
1✔
385
    for (String clusterName : topLevelClusters) {
1✔
386
      CdsWatcher cdsWatcher = (CdsWatcher) cdsWatchers.get(clusterName);
1✔
387
      StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
1✔
388
      if (!cdsWatcher.hasDataValue()) {
1✔
389
        builder.addCluster(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
1✔
390
        continue;
1✔
391
      }
392

393
      XdsClusterResource.CdsUpdate cdsUpdate = cdsWatcherDataOr.getValue();
1✔
394
      XdsConfig.XdsClusterConfig.ClusterChild child;
395
      switch (cdsUpdate.clusterType()) {
1✔
396
        case AGGREGATE:
397
          Set<String> leafNames = new HashSet<>();
1✔
398
          addLeafNames(leafNames, cdsUpdate);
1✔
399
          child = new AggregateConfig(leafNames);
1✔
400
          leafClusterNames.addAll(leafNames);
1✔
401
          break;
1✔
402
        case EDS:
403
          XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
1✔
404
              edsWatchers.get(cdsWatcher.getEdsServiceName());
1✔
405
          if (edsWatcher != null) {
1✔
406
            child = new EndpointConfig(edsWatcher.getData());
1✔
407
          } else {
408
            child = new EndpointConfig(StatusOr.fromStatus(Status.INTERNAL.withDescription(
×
409
                "EDS resource not found for cluster " + clusterName)));
410
          }
411
          break;
×
412
        case LOGICAL_DNS:
413
          // TODO get the resolved endpoint configuration
414
          child = new EndpointConfig(StatusOr.fromStatus(
1✔
415
              Status.INTERNAL.withDescription("Logical DNS in dependency manager unsupported")));
1✔
416
          break;
1✔
417
        default:
418
          throw new IllegalStateException("Unexpected value: " + cdsUpdate.clusterType());
×
419
      }
420
      builder.addCluster(clusterName, StatusOr.fromValue(
1✔
421
          new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate, child)));
422
    }
1✔
423

424
    return leafClusterNames;
1✔
425
  }
426

427
  private void addLeafNames(Set<String> leafNames, XdsClusterResource.CdsUpdate cdsUpdate) {
428
    for (String cluster : cdsUpdate.prioritizedClusterNames()) {
1✔
429
      if (leafNames.contains(cluster)) {
1✔
430
        continue;
1✔
431
      }
432
      StatusOr<XdsClusterResource.CdsUpdate> data = getCluster(cluster).getData();
1✔
433
      if (data == null || !data.hasValue() || data.getValue() == null) {
1✔
434
        leafNames.add(cluster);
1✔
435
        continue;
1✔
436
      }
437
      if (data.getValue().clusterType() == ClusterType.AGGREGATE) {
1✔
438
        addLeafNames(leafNames, data.getValue());
1✔
439
      } else {
440
        leafNames.add(cluster);
1✔
441
      }
442
    }
1✔
443
  }
1✔
444

445
  private static boolean isTopLevelCluster(
446
      XdsWatcherBase<XdsClusterResource.CdsUpdate> cdsWatcher) {
447
    return ((CdsWatcher)cdsWatcher).parentContexts.values().stream()
1✔
448
        .anyMatch(depth -> depth == 1);
1✔
449
  }
450

451
  @Override
452
  public String toString() {
453
    return logId.toString();
×
454
  }
455

456
  // Returns true if the watcher was added, false if it already exists
457
  private boolean addEdsWatcher(String edsServiceName, CdsWatcher parentContext) {
458
    TypeWatchers<?> typeWatchers = resourceWatchers.get(XdsEndpointResource.getInstance());
1✔
459
    if (typeWatchers == null || !typeWatchers.watchers.containsKey(edsServiceName)) {
1✔
460
      addWatcher(new EdsWatcher(edsServiceName, parentContext));
1✔
461
      return true;
1✔
462
    }
463

464
    EdsWatcher watcher = (EdsWatcher) typeWatchers.watchers.get(edsServiceName);
1✔
465
    watcher.addParentContext(parentContext); // Is a set, so don't need to check for existence
1✔
466
    return false;
1✔
467
  }
468

469
  private void addClusterWatcher(String clusterName, Object parentContext, int depth) {
470
    TypeWatchers<?> clusterWatchers = resourceWatchers.get(CLUSTER_RESOURCE);
1✔
471
    if (clusterWatchers != null) {
1✔
472
      CdsWatcher watcher = (CdsWatcher) clusterWatchers.watchers.get(clusterName);
1✔
473
      if (watcher != null) {
1✔
474
        watcher.parentContexts.put(parentContext, depth);
1✔
475
        return;
1✔
476
      }
477
    }
478

479
    addWatcher(new CdsWatcher(clusterName, parentContext, depth));
1✔
480
  }
1✔
481

482
  private void updateRoutes(List<VirtualHost> virtualHosts, Object newParentContext,
483
                            List<VirtualHost> oldVirtualHosts, boolean sameParentContext) {
484
    VirtualHost oldVirtualHost =
1✔
485
        RoutingUtils.findVirtualHostForHostName(oldVirtualHosts, dataPlaneAuthority);
1✔
486
    VirtualHost virtualHost =
1✔
487
        RoutingUtils.findVirtualHostForHostName(virtualHosts, dataPlaneAuthority);
1✔
488

489
    Set<String> newClusters = getClusterNamesFromVirtualHost(virtualHost);
1✔
490
    Set<String> oldClusters = getClusterNamesFromVirtualHost(oldVirtualHost);
1✔
491

492
    if (sameParentContext) {
1✔
493
      // Calculate diffs.
494
      Set<String> addedClusters = Sets.difference(newClusters, oldClusters);
1✔
495
      Set<String> deletedClusters = Sets.difference(oldClusters, newClusters);
1✔
496

497
      deletedClusters.forEach(watcher ->
1✔
498
          cancelClusterWatcherTree(getCluster(watcher), newParentContext));
1✔
499
      addedClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
500
    } else {
1✔
501
      newClusters.forEach((cluster) -> addClusterWatcher(cluster, newParentContext, 1));
1✔
502
    }
503
  }
1✔
504

505
  private String nodeInfo() {
506
    return " nodeID: " + xdsClient.getBootstrapInfo().node().getId();
1✔
507
  }
508

509
  private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHost) {
510
    if (virtualHost == null) {
1✔
511
      return Collections.emptySet();
1✔
512
    }
513

514
    // Get all cluster names to which requests can be routed through the virtual host.
515
    Set<String> clusters = new HashSet<>();
1✔
516
    for (VirtualHost.Route route : virtualHost.routes()) {
1✔
517
      VirtualHost.Route.RouteAction action = route.routeAction();
1✔
518
      if (action == null) {
1✔
519
        continue;
1✔
520
      }
521
      if (action.cluster() != null) {
1✔
522
        clusters.add(action.cluster());
1✔
523
      } else if (action.weightedClusters() != null) {
1✔
524
        for (ClusterWeight weighedCluster : action.weightedClusters()) {
1✔
525
          clusters.add(weighedCluster.name());
1✔
526
        }
1✔
527
      }
528
    }
1✔
529

530
    return clusters;
1✔
531
  }
532

533
  private CdsWatcher getCluster(String clusterName) {
534
    return (CdsWatcher) resourceWatchers.get(CLUSTER_RESOURCE).watchers.get(clusterName);
1✔
535
  }
536

537
  private static class TypeWatchers<T extends ResourceUpdate> {
538
    // Key is resource name
539
    final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
1✔
540
    final XdsResourceType<T> resourceType;
541

542
    TypeWatchers(XdsResourceType<T> resourceType) {
1✔
543
      this.resourceType = resourceType;
1✔
544
    }
1✔
545

546
    public void add(String resourceName, XdsWatcherBase<T> watcher) {
547
      watchers.put(resourceName, watcher);
1✔
548
    }
1✔
549
  }
550

551
  public interface XdsConfigWatcher {
552
    /**
553
     * An updated XdsConfig or RPC-safe Status. The status code will be either UNAVAILABLE or
554
     * INTERNAL.
555
     */
556
    void onUpdate(StatusOr<XdsConfig> config);
557
  }
558

559
  private class ClusterSubscription implements Closeable {
560
    String clusterName;
561

562
    public ClusterSubscription(String clusterName) {
1✔
563
      this.clusterName = clusterName;
1✔
564
    }
1✔
565

566
    public String getClusterName() {
567
      return clusterName;
1✔
568
    }
569

570
    @Override
571
    public void close() throws IOException {
572
      releaseSubscription(this);
1✔
573
    }
1✔
574
  }
575

576
  private abstract class XdsWatcherBase<T extends ResourceUpdate>
577
      implements ResourceWatcher<T> {
578
    private final XdsResourceType<T> type;
579
    private final String resourceName;
580
    boolean cancelled;
581

582
    @Nullable
583
    private StatusOr<T> data;
584

585

586
    private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
1✔
587
      this.type = checkNotNull(type, "type");
1✔
588
      this.resourceName = checkNotNull(resourceName, "resourceName");
1✔
589
    }
1✔
590

591
    @Override
592
    public void onError(Status error) {
593
      checkNotNull(error, "error");
1✔
594
      // Don't update configuration on error, if we've already received configuration
595
      if (!hasDataValue()) {
1✔
596
        setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
597
            String.format("Error retrieving %s: %s: %s",
1✔
598
              toContextString(), error.getCode(), error.getDescription())));
1✔
599
        maybePublishConfig();
1✔
600
      }
601
    }
1✔
602

603
    @Override
604
    public void onResourceDoesNotExist(String resourceName) {
605
      if (cancelled) {
1✔
606
        return;
1✔
607
      }
608

609
      checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
1✔
610
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
611
          toContextString() + " does not exist" + nodeInfo()));
1✔
612
      maybePublishConfig();
1✔
613
    }
1✔
614

615
    boolean missingResult() {
616
      return data == null;
1✔
617
    }
618

619
    @Nullable
620
    StatusOr<T> getData() {
621
      return data;
1✔
622
    }
623

624
    boolean hasDataValue() {
625
      return data != null && data.hasValue();
1✔
626
    }
627

628
    String resourceName() {
629
      return resourceName;
1✔
630
    }
631

632
    protected void setData(T data) {
633
      checkNotNull(data, "data");
1✔
634
      this.data = StatusOr.fromValue(data);
1✔
635
    }
1✔
636

637
    protected void setDataAsStatus(Status status) {
638
      checkNotNull(status, "status");
1✔
639
      this.data = StatusOr.fromStatus(status);
1✔
640
    }
1✔
641

642
    public String toContextString() {
643
      return toContextStr(type.typeName(), resourceName);
1✔
644
    }
645
  }
646

647
  private interface RdsUpdateSupplier {
648
    StatusOr<RdsUpdate> getRdsUpdate();
649
  }
650

651
  private class LdsWatcher extends XdsWatcherBase<XdsListenerResource.LdsUpdate>
1✔
652
      implements RdsUpdateSupplier {
653
    String rdsName;
654

655
    private LdsWatcher(String resourceName) {
1✔
656
      super(XdsListenerResource.getInstance(), resourceName);
1✔
657
    }
1✔
658

659
    @Override
660
    public void onChanged(XdsListenerResource.LdsUpdate update) {
661
      checkNotNull(update, "update");
1✔
662

663
      HttpConnectionManager httpConnectionManager = update.httpConnectionManager();
1✔
664
      List<VirtualHost> virtualHosts;
665
      String rdsName;
666
      if (httpConnectionManager == null) {
1✔
667
        // TCP listener. Unsupported config
668
        virtualHosts = Collections.emptyList(); // Not null, to not delegate to RDS
1✔
669
        rdsName = null;
1✔
670
      } else {
671
        virtualHosts = httpConnectionManager.virtualHosts();
1✔
672
        rdsName = httpConnectionManager.rdsName();
1✔
673
      }
674
      StatusOr<RdsUpdate> activeRdsUpdate = getRouteSource().getRdsUpdate();
1✔
675
      List<VirtualHost> activeVirtualHosts = activeRdsUpdate.hasValue()
1✔
676
          ? activeRdsUpdate.getValue().virtualHosts
1✔
677
          : Collections.emptyList();
1✔
678

679
      boolean changedRdsName = !Objects.equals(rdsName, this.rdsName);
1✔
680
      if (changedRdsName) {
1✔
681
        cleanUpRdsWatcher();
1✔
682
      }
683

684
      if (virtualHosts != null) {
1✔
685
        // No RDS watcher since we are getting RDS updates via LDS
686
        updateRoutes(virtualHosts, this, activeVirtualHosts, this.rdsName == null);
1✔
687
        this.rdsName = null;
1✔
688
      } else if (changedRdsName) {
1✔
689
        this.rdsName = rdsName;
1✔
690
        addWatcher(new RdsWatcher(rdsName));
1✔
691
        logger.log(XdsLogger.XdsLogLevel.INFO, "Start watching RDS resource {0}", rdsName);
1✔
692
      }
693

694
      setData(update);
1✔
695
      maybePublishConfig();
1✔
696
    }
1✔
697

698
    @Override
699
    public void onResourceDoesNotExist(String resourceName) {
700
      if (cancelled) {
1✔
701
        return;
×
702
      }
703

704
      checkArgument(resourceName().equals(resourceName), "Resource name does not match");
1✔
705
      setDataAsStatus(Status.UNAVAILABLE.withDescription(
1✔
706
          toContextString() + " does not exist" + nodeInfo()));
1✔
707
      cleanUpRdsWatcher();
1✔
708
      rdsName = null;
1✔
709
      maybePublishConfig();
1✔
710
    }
1✔
711

712
    private void cleanUpRdsWatcher() {
713
      RdsWatcher oldRdsWatcher = getRdsWatcher();
1✔
714
      if (oldRdsWatcher != null) {
1✔
715
        cancelWatcher(oldRdsWatcher);
1✔
716
        logger.log(XdsLogger.XdsLogLevel.DEBUG, "Stop watching RDS resource {0}", rdsName);
1✔
717

718
        // Cleanup clusters (as appropriate) that had the old rds watcher as a parent
719
        if (!oldRdsWatcher.hasDataValue() || resourceWatchers.get(CLUSTER_RESOURCE) == null) {
1✔
720
          return;
×
721
        }
722
        for (XdsWatcherBase<?> watcher :
723
            resourceWatchers.get(CLUSTER_RESOURCE).watchers.values()) {
1✔
724
          cancelCdsWatcher((CdsWatcher) watcher, oldRdsWatcher);
1✔
725
        }
1✔
726
      }
727
    }
1✔
728

729
    private RdsWatcher getRdsWatcher() {
730
      if (rdsName == null) {
1✔
731
        return null;
1✔
732
      }
733
      TypeWatchers<?> watchers = resourceWatchers.get(XdsRouteConfigureResource.getInstance());
1✔
734
      if (watchers == null) {
1✔
735
        return null;
×
736
      }
737
      return (RdsWatcher) watchers.watchers.get(rdsName);
1✔
738
    }
739

740
    public RdsUpdateSupplier getRouteSource() {
741
      if (!hasDataValue()) {
1✔
742
        return this;
1✔
743
      }
744
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
745
      if (hcm == null) {
1✔
746
        return this;
1✔
747
      }
748
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
749
      if (virtualHosts != null) {
1✔
750
        return this;
1✔
751
      }
752
      RdsWatcher rdsWatcher = getRdsWatcher();
1✔
753
      assert rdsWatcher != null;
1✔
754
      return rdsWatcher;
1✔
755
    }
756

757
    @Override
758
    public StatusOr<RdsUpdate> getRdsUpdate() {
759
      if (missingResult()) {
1✔
760
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
1✔
761
      }
762
      if (!getData().hasValue()) {
1✔
763
        return StatusOr.fromStatus(getData().getStatus());
1✔
764
      }
765
      HttpConnectionManager hcm = getData().getValue().httpConnectionManager();
1✔
766
      if (hcm == null) {
1✔
767
        return StatusOr.fromStatus(
1✔
768
            Status.UNAVAILABLE.withDescription("Not an API listener" + nodeInfo()));
1✔
769
      }
770
      List<VirtualHost> virtualHosts = hcm.virtualHosts();
1✔
771
      if (virtualHosts == null) {
1✔
772
        // Code shouldn't trigger this case, as it should be calling RdsWatcher instead. This would
773
        // be easily implemented with getRdsWatcher().getRdsUpdate(), but getting here is likely a
774
        // bug
775
        return StatusOr.fromStatus(Status.INTERNAL.withDescription("Routes are in RDS, not LDS"));
×
776
      }
777
      return StatusOr.fromValue(new RdsUpdate(virtualHosts));
1✔
778
    }
779
  }
780

781
  private class RdsWatcher extends XdsWatcherBase<RdsUpdate> implements RdsUpdateSupplier {
782

783
    public RdsWatcher(String resourceName) {
1✔
784
      super(XdsRouteConfigureResource.getInstance(), checkNotNull(resourceName, "resourceName"));
1✔
785
    }
1✔
786

787
    @Override
788
    public void onChanged(RdsUpdate update) {
789
      checkNotNull(update, "update");
1✔
790
      List<VirtualHost> oldVirtualHosts = hasDataValue()
1✔
791
          ? getData().getValue().virtualHosts
1✔
792
          : Collections.emptyList();
1✔
793
      setData(update);
1✔
794
      updateRoutes(update.virtualHosts, this, oldVirtualHosts, true);
1✔
795
      maybePublishConfig();
1✔
796
    }
1✔
797

798
    @Override
799
    public StatusOr<RdsUpdate> getRdsUpdate() {
800
      if (missingResult()) {
1✔
801
        return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription("Not yet loaded"));
×
802
      }
803
      return getData();
1✔
804
    }
805
  }
806

807
  private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
1✔
808
    Map<Object, Integer> parentContexts = new HashMap<>();
1✔
809

810
    CdsWatcher(String resourceName, Object parentContext, int depth) {
1✔
811
      super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
812
      this.parentContexts.put(checkNotNull(parentContext, "parentContext"), depth);
1✔
813
    }
1✔
814

815
    @Override
816
    public void onChanged(XdsClusterResource.CdsUpdate update) {
817
      checkNotNull(update, "update");
1✔
818
      switch (update.clusterType()) {
1✔
819
        case EDS:
820
          setData(update);
1✔
821
          if (!addEdsWatcher(getEdsServiceName(), this))  {
1✔
822
            maybePublishConfig();
1✔
823
          }
824
          break;
825
        case LOGICAL_DNS:
826
          setData(update);
1✔
827
          maybePublishConfig();
1✔
828
          // no eds needed
829
          break;
1✔
830
        case AGGREGATE:
831
          Object parentContext = this;
1✔
832
          int depth = parentContexts.values().stream().max(Integer::compare).orElse(0) + 1;
1✔
833
          if (depth > MAX_CLUSTER_RECURSION_DEPTH) {
1✔
834
            logger.log(XdsLogger.XdsLogLevel.WARNING,
×
835
                "Cluster recursion depth limit exceeded for cluster {0}", resourceName());
×
836
            Status error = Status.UNAVAILABLE.withDescription(
×
837
                "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
838
            setDataAsStatus(error);
×
839
          }
840
          if (hasDataValue()) {
1✔
841
            Set<String> oldNames = getData().getValue().clusterType() == ClusterType.AGGREGATE
1✔
842
                ? new HashSet<>(getData().getValue().prioritizedClusterNames())
1✔
843
                : Collections.emptySet();
1✔
844
            Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());
1✔
845

846
            Set<String> deletedClusters = Sets.difference(oldNames, newNames);
1✔
847
            deletedClusters.forEach((cluster)
1✔
848
                -> cancelClusterWatcherTree(getCluster(cluster), parentContext));
1✔
849

850
            if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
851
              setData(update);
1✔
852
              Set<String> addedClusters = Sets.difference(newNames, oldNames);
1✔
853
              addedClusters.forEach((cluster) -> addClusterWatcher(cluster, parentContext, depth));
1✔
854

855
              if (addedClusters.isEmpty()) {
1✔
856
                maybePublishConfig();
×
857
              }
858
            } else { // data was set to error status above
1✔
859
              maybePublishConfig();
×
860
            }
861

862
          } else if (depth <= MAX_CLUSTER_RECURSION_DEPTH) {
1✔
863
            setData(update);
1✔
864
            update.prioritizedClusterNames()
1✔
865
                .forEach(name -> addClusterWatcher(name, parentContext, depth));
1✔
866
            maybePublishConfig();
1✔
867
          }
868
          break;
869
        default:
870
          Status error = Status.UNAVAILABLE.withDescription(
×
871
              "aggregate cluster graph exceeds max depth at " + resourceName() + nodeInfo());
×
872
          setDataAsStatus(error);
×
873
          maybePublishConfig();
×
874
      }
875
    }
1✔
876

877
    public String getEdsServiceName() {
878
      XdsClusterResource.CdsUpdate cdsUpdate = getData().getValue();
1✔
879
      assert cdsUpdate.clusterType() == ClusterType.EDS;
1✔
880
      String edsServiceName = cdsUpdate.edsServiceName();
1✔
881
      if (edsServiceName == null) {
1✔
882
        edsServiceName = cdsUpdate.clusterName();
×
883
      }
884
      return edsServiceName;
1✔
885
    }
886
  }
887

888
  private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
889
    private final Set<CdsWatcher> parentContexts = new HashSet<>();
1✔
890

891
    private EdsWatcher(String resourceName, CdsWatcher parentContext) {
1✔
892
      super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
1✔
893
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
894
    }
1✔
895

896
    @Override
897
    public void onChanged(XdsEndpointResource.EdsUpdate update) {
898
      setData(checkNotNull(update, "update"));
1✔
899
      maybePublishConfig();
1✔
900
    }
1✔
901

902
    void addParentContext(CdsWatcher parentContext) {
903
      parentContexts.add(checkNotNull(parentContext, "parentContext"));
1✔
904
    }
1✔
905
  }
906
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc