• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20015

13 Oct 2025 07:57AM UTC coverage: 88.57% (+0.02%) from 88.552%
#20015

push

github

web-flow
xds: ORCA to LRS propagation changes (#12203)

Implements gRFC A85 (https://github.com/grpc/proposal/pull/454).

34925 of 39432 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.23
/../xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
22
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.base.Joiner;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.ImmutableMap;
30
import com.google.common.util.concurrent.ListenableFuture;
31
import com.google.common.util.concurrent.SettableFuture;
32
import com.google.protobuf.Any;
33
import io.grpc.Internal;
34
import io.grpc.InternalLogId;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext;
37
import io.grpc.SynchronizationContext.ScheduledHandle;
38
import io.grpc.internal.BackoffPolicy;
39
import io.grpc.internal.TimeProvider;
40
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
41
import io.grpc.xds.client.Bootstrapper.ServerInfo;
42
import io.grpc.xds.client.XdsClient.ResourceStore;
43
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
44
import java.io.IOException;
45
import java.util.ArrayList;
46
import java.util.Collection;
47
import java.util.Collections;
48
import java.util.HashMap;
49
import java.util.HashSet;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Objects;
53
import java.util.Set;
54
import java.util.concurrent.Executor;
55
import java.util.concurrent.Future;
56
import java.util.concurrent.ScheduledExecutorService;
57
import java.util.concurrent.TimeUnit;
58
import java.util.stream.Collectors;
59
import javax.annotation.Nullable;
60

61
/**
62
 * XdsClient implementation.
63
 */
64
@Internal
65
public final class XdsClientImpl extends XdsClient implements ResourceStore {
66

67
  // Longest time to wait, since the subscription to some resource, for concluding its absence.
68
  @VisibleForTesting
69
  public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
70
  public static final int EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC = 30;
71

72
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
73
      new Thread.UncaughtExceptionHandler() {
1✔
74
        @Override
75
        public void uncaughtException(Thread t, Throwable e) {
76
          logger.log(
×
77
              XdsLogLevel.ERROR,
78
              "Uncaught exception in XdsClient SynchronizationContext. Panic!",
79
              e);
80
          // TODO: better error handling.
81
          throw new AssertionError(e);
×
82
        }
83
      });
84

85
  private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
1✔
86
  final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
1✔
87
  /** Map of authority to its activated control plane client (affected by xds fallback).
88
   * The last entry in the list for each value is the "active" CPC for the matching key */
89
  private final Map<String, List<ControlPlaneClient>> activatedCpClients = new HashMap<>();
1✔
90
  private final Map<ServerInfo, ControlPlaneClient> serverCpClientMap = new HashMap<>();
1✔
91

92
  /** Maps resource type to the corresponding map of subscribers (keyed by resource name). */
93
  private final Map<XdsResourceType<? extends ResourceUpdate>,
1✔
94
      Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
95
      resourceSubscribers = new HashMap<>();
96
  /** Maps typeUrl to the corresponding XdsResourceType. */
97
  private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
1✔
98

99
  private final XdsTransportFactory xdsTransportFactory;
100
  private final Bootstrapper.BootstrapInfo bootstrapInfo;
101
  private final ScheduledExecutorService timeService;
102
  private final BackoffPolicy.Provider backoffPolicyProvider;
103
  private final Supplier<Stopwatch> stopwatchSupplier;
104
  private final TimeProvider timeProvider;
105
  private final Object securityConfig;
106
  private final InternalLogId logId;
107
  private final XdsLogger logger;
108
  private volatile boolean isShutdown;
109
  private final MessagePrettyPrinter messagePrinter;
110
  private final XdsClientMetricReporter metricReporter;
111

112
  public XdsClientImpl(
113
      XdsTransportFactory xdsTransportFactory,
114
      Bootstrapper.BootstrapInfo bootstrapInfo,
115
      ScheduledExecutorService timeService,
116
      BackoffPolicy.Provider backoffPolicyProvider,
117
      Supplier<Stopwatch> stopwatchSupplier,
118
      TimeProvider timeProvider,
119
      MessagePrettyPrinter messagePrinter,
120
      Object securityConfig,
121
      XdsClientMetricReporter metricReporter) {
1✔
122
    this.xdsTransportFactory = xdsTransportFactory;
1✔
123
    this.bootstrapInfo = bootstrapInfo;
1✔
124
    this.timeService = timeService;
1✔
125
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
126
    this.stopwatchSupplier = stopwatchSupplier;
1✔
127
    this.timeProvider = timeProvider;
1✔
128
    this.messagePrinter = messagePrinter;
1✔
129
    this.securityConfig = securityConfig;
1✔
130
    this.metricReporter = metricReporter;
1✔
131
    logId = InternalLogId.allocate("xds-client", null);
1✔
132
    logger = XdsLogger.withLogId(logId);
1✔
133
    logger.log(XdsLogLevel.INFO, "Created");
1✔
134
  }
1✔
135

136
  @Override
137
  public void shutdown() {
138
    syncContext.execute(
1✔
139
        new Runnable() {
1✔
140
          @Override
141
          public void run() {
142
            if (isShutdown) {
1✔
143
              return;
×
144
            }
145
            isShutdown = true;
1✔
146
            for (ControlPlaneClient xdsChannel : serverCpClientMap.values()) {
1✔
147
              xdsChannel.shutdown();
1✔
148
            }
1✔
149
            for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
1✔
150
              lrsClient.stopLoadReporting();
1✔
151
            }
1✔
152
            cleanUpResourceTimers(null);
1✔
153
            activatedCpClients.clear();
1✔
154
          }
1✔
155
        });
156
  }
1✔
157

158
  @Override
159
  public boolean isShutDown() {
160
    return isShutdown;
1✔
161
  }
162

163
  @Override
164
  public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
165
    return Collections.unmodifiableMap(subscribedResourceTypeUrls);
1✔
166
  }
167

168
  private ControlPlaneClient getActiveCpc(String authority) {
169
    List<ControlPlaneClient> controlPlaneClients = activatedCpClients.get(authority);
1✔
170
    if (controlPlaneClients == null || controlPlaneClients.isEmpty()) {
1✔
171
      return null;
1✔
172
    }
173

174
    return controlPlaneClients.get(controlPlaneClients.size() - 1);
1✔
175
  }
176

177
  @Nullable
178
  @Override
179
  public Collection<String> getSubscribedResources(
180
      ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type) {
181
    ControlPlaneClient targetCpc = serverCpClientMap.get(serverInfo);
1✔
182
    if (targetCpc == null) {
1✔
183
      return null;
×
184
    }
185

186
    // This should include all of the authorities that targetCpc or a fallback from it is serving
187
    List<String> authorities = activatedCpClients.entrySet().stream()
1✔
188
        .filter(entry -> entry.getValue().contains(targetCpc))
1✔
189
        .map(Map.Entry::getKey)
1✔
190
        .collect(Collectors.toList());
1✔
191

192
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
1✔
193
        resourceSubscribers.getOrDefault(type, Collections.emptyMap());
1✔
194

195
    Collection<String> retVal = resources.entrySet().stream()
1✔
196
        .filter(entry -> authorities.contains(entry.getValue().authority))
1✔
197
        .map(Map.Entry::getKey)
1✔
198
        .collect(Collectors.toList());
1✔
199

200
    return retVal.isEmpty() ? null : retVal;
1✔
201
  }
202

203
  @Override
204
  public void startMissingResourceTimers(Collection<String> resourceNames,
205
                                         XdsResourceType<?> resourceType) {
206
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap =
1✔
207
        resourceSubscribers.get(resourceType);
1✔
208

209
    for (String resourceName : resourceNames) {
1✔
210
      ResourceSubscriber<?> subscriber = subscriberMap.get(resourceName);
1✔
211
      if (subscriber.respTimer == null && !subscriber.hasResult()) {
1✔
212
        subscriber.restartTimer();
1✔
213
      }
214
    }
1✔
215
  }
1✔
216

217
  // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
218
  // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
219
  @Override
220
  public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
221
      getSubscribedResourcesMetadataSnapshot() {
222
    final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
223
        SettableFuture.create();
1✔
224
    syncContext.execute(new Runnable() {
1✔
225
      @Override
226
      public void run() {
227
        // A map from a "resource type" to a map ("resource name": "resource metadata")
228
        ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
229
            ImmutableMap.builder();
1✔
230
        for (XdsResourceType<?> resourceType : resourceSubscribers.keySet()) {
1✔
231
          ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
1✔
232
          for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
233
              : resourceSubscribers.get(resourceType).entrySet()) {
1✔
234
            metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
1✔
235
          }
1✔
236
          metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
1✔
237
        }
1✔
238
        future.set(metadataSnapshot.buildOrThrow());
1✔
239
      }
1✔
240
    });
241
    return future;
1✔
242
  }
243

244
  @Override
245
  public Object getSecurityConfig() {
246
    return securityConfig;
×
247
  }
248

249
  @Override
250
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
251
                                                          String resourceName,
252
                                                          ResourceWatcher<T> watcher,
253
                                                          Executor watcherExecutor) {
254
    syncContext.execute(new Runnable() {
1✔
255
      @Override
256
      @SuppressWarnings("unchecked")
257
      public void run() {
258
        if (!resourceSubscribers.containsKey(type)) {
1✔
259
          resourceSubscribers.put(type, new HashMap<>());
1✔
260
          subscribedResourceTypeUrls.put(type.typeUrl(), type);
1✔
261
        }
262
        ResourceSubscriber<T> subscriber =
1✔
263
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
264

265
        if (subscriber == null) {
1✔
266
          logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
1✔
267
          subscriber = new ResourceSubscriber<>(type, resourceName);
1✔
268
          resourceSubscribers.get(type).put(resourceName, subscriber);
1✔
269

270
          if (subscriber.errorDescription == null) {
1✔
271
            CpcWithFallbackState cpcToUse = manageControlPlaneClient(subscriber);
1✔
272
            if (cpcToUse.cpc != null) {
1✔
273
              cpcToUse.cpc.adjustResourceSubscription(type);
1✔
274
            }
275
          }
276
        }
277

278
        subscriber.addWatcher(watcher, watcherExecutor);
1✔
279
      }
1✔
280
    });
281
  }
1✔
282

283
  /**
284
   * Gets a ControlPlaneClient for the subscriber's authority, creating one if necessary.
285
   * If there already was an active CPC for this authority, and it is different from the one
286
   * identified, then do fallback to the identified one (cpcToUse).
287
   *
288
   * @return identified CPC or {@code null} (if there are no valid ServerInfos associated with the
289
   *     subscriber's authority or CPC's for all are in backoff), and whether did a fallback.
290
   */
291
  @VisibleForTesting
292
  private <T extends ResourceUpdate> CpcWithFallbackState manageControlPlaneClient(
293
      ResourceSubscriber<T> subscriber) {
294

295
    ControlPlaneClient cpcToUse;
296
    boolean didFallback = false;
1✔
297
    try {
298
      cpcToUse = getOrCreateControlPlaneClient(subscriber.authority);
1✔
299
    } catch (IllegalArgumentException e) {
×
300
      if (subscriber.errorDescription == null) {
×
301
        subscriber.errorDescription = "Bad configuration:  " + e.getMessage();
×
302
      }
303

304
      subscriber.onError(
×
305
          Status.INVALID_ARGUMENT.withDescription(subscriber.errorDescription), null);
×
306
      return new CpcWithFallbackState(null, false);
×
307
    } catch (IOException e) {
1✔
308
      logger.log(XdsLogLevel.DEBUG,
1✔
309
          "Could not create a control plane client for authority {0}: {1}",
310
          subscriber.authority, e.getMessage());
1✔
311
      return new CpcWithFallbackState(null, false);
1✔
312
    }
1✔
313

314
    ControlPlaneClient activeCpClient = getActiveCpc(subscriber.authority);
1✔
315
    if (cpcToUse != activeCpClient) {
1✔
316
      addCpcToAuthority(subscriber.authority, cpcToUse); // makes it active
1✔
317
      if (activeCpClient != null) {
1✔
318
        didFallback = cpcToUse != null && !cpcToUse.isInError();
1✔
319
        if (didFallback) {
1✔
320
          logger.log(XdsLogLevel.INFO, "Falling back to XDS server {0}",
1✔
321
              cpcToUse.getServerInfo().target());
1✔
322
        } else {
323
          logger.log(XdsLogLevel.WARNING, "No working fallback XDS Servers found from {0}",
×
324
              activeCpClient.getServerInfo().target());
×
325
        }
326
      }
327
    }
328

329
    return new CpcWithFallbackState(cpcToUse, didFallback);
1✔
330
  }
331

332
  private void addCpcToAuthority(String authority, ControlPlaneClient cpcToUse) {
333
    List<ControlPlaneClient> controlPlaneClients =
1✔
334
        activatedCpClients.computeIfAbsent(authority, k -> new ArrayList<>());
1✔
335

336
    if (controlPlaneClients.contains(cpcToUse)) {
1✔
337
      return;
×
338
    }
339

340
    // if there are any missing CPCs between the last one and cpcToUse, add them + add cpcToUse
341
    ImmutableList<ServerInfo> serverInfos = getServerInfos(authority);
1✔
342
    for (int i = controlPlaneClients.size(); i < serverInfos.size(); i++) {
1✔
343
      ServerInfo serverInfo = serverInfos.get(i);
1✔
344
      ControlPlaneClient cpc = serverCpClientMap.get(serverInfo);
1✔
345
      controlPlaneClients.add(cpc);
1✔
346
      logger.log(XdsLogLevel.DEBUG, "Adding control plane client {0} to authority {1}",
1✔
347
          cpc, authority);
348
      cpcToUse.sendDiscoveryRequests();
1✔
349
      if (cpc == cpcToUse) {
1✔
350
        break;
1✔
351
      }
352
    }
353
  }
1✔
354

355
  @Override
356
  public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
357
                                                                String resourceName,
358
                                                                ResourceWatcher<T> watcher) {
359
    syncContext.execute(new Runnable() {
1✔
360
      @Override
361
      @SuppressWarnings("unchecked")
362
      public void run() {
363
        ResourceSubscriber<T> subscriber =
1✔
364
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
365
        if (subscriber == null) {
1✔
366
          logger.log(XdsLogLevel.WARNING, "double cancel of resource watch for {0}:{1}",
1✔
367
              type.typeName(), resourceName);
1✔
368
          return;
1✔
369
        }
370
        subscriber.removeWatcher(watcher);
1✔
371
        if (!subscriber.isWatched()) {
1✔
372
          subscriber.cancelResourceWatch();
1✔
373
          resourceSubscribers.get(type).remove(resourceName);
1✔
374

375
          List<ControlPlaneClient> controlPlaneClients =
1✔
376
              activatedCpClients.get(subscriber.authority);
1✔
377
          if (controlPlaneClients != null) {
1✔
378
            controlPlaneClients.forEach((cpc) -> {
1✔
379
              cpc.adjustResourceSubscription(type);
1✔
380
            });
1✔
381
          }
382

383
          if (resourceSubscribers.get(type).isEmpty()) {
1✔
384
            resourceSubscribers.remove(type);
1✔
385
            subscribedResourceTypeUrls.remove(type.typeUrl());
1✔
386
          }
387
        }
388
      }
1✔
389
    });
390
  }
1✔
391

392
  @Override
393
  public LoadStatsManager2.ClusterDropStats addClusterDropStats(
394
      final ServerInfo serverInfo, String clusterName,
395
      @Nullable String edsServiceName) {
396
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
397
    LoadStatsManager2.ClusterDropStats dropCounter =
1✔
398
        loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
1✔
399
    syncContext.execute(new Runnable() {
1✔
400
      @Override
401
      public void run() {
402
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
403
      }
1✔
404
    });
405
    return dropCounter;
1✔
406
  }
407

408
  @Override
409
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
410
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
411
      Locality locality) {
412
    return addClusterLocalityStats(serverInfo, clusterName, edsServiceName, locality, null);
1✔
413
  }
414

415
  @Override
416
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
417
      final ServerInfo serverInfo,
418
      String clusterName,
419
      @Nullable String edsServiceName,
420
      Locality locality,
421
      @Nullable BackendMetricPropagation backendMetricPropagation) {
422
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
423

424
    LoadStatsManager2.ClusterLocalityStats loadCounter =
1✔
425
        loadStatsManager.getClusterLocalityStats(
1✔
426
            clusterName, edsServiceName, locality, backendMetricPropagation);
427

428
    syncContext.execute(new Runnable() {
1✔
429
      @Override
430
      public void run() {
431
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
432
      }
1✔
433
    });
434
    return loadCounter;
1✔
435
  }
436

437

438
  @Override
439
  public Bootstrapper.BootstrapInfo getBootstrapInfo() {
440
    return bootstrapInfo;
1✔
441
  }
442

443
  @Override
444
  public String toString() {
445
    return logId.toString();
×
446
  }
447

448
  private Set<String> getResourceKeys(XdsResourceType<?> xdsResourceType) {
449
    if (!resourceSubscribers.containsKey(xdsResourceType)) {
1✔
450
      return null;
×
451
    }
452

453
    return resourceSubscribers.get(xdsResourceType).keySet();
1✔
454
  }
455

456
  // cpcForThisStream is null when doing shutdown
457
  private void cleanUpResourceTimers(ControlPlaneClient cpcForThisStream) {
458
    Collection<String> authoritiesForCpc = getActiveAuthorities(cpcForThisStream);
1✔
459
    String target = cpcForThisStream == null ? "null" : cpcForThisStream.getServerInfo().target();
1✔
460
    logger.log(XdsLogLevel.DEBUG, "Cleaning up resource timers for CPC {0}, authorities {1}",
1✔
461
        target, authoritiesForCpc);
462

463
    for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
464
      for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
465
        if (cpcForThisStream == null || authoritiesForCpc.contains(subscriber.authority)) {
1✔
466
          subscriber.stopTimer();
1✔
467
        }
468
      }
1✔
469
    }
1✔
470
  }
1✔
471

472
  private ControlPlaneClient getOrCreateControlPlaneClient(String authority) throws IOException {
473
    // Optimize for the common case of a working ads stream already exists for the authority
474
    ControlPlaneClient activeCpc = getActiveCpc(authority);
1✔
475
    if (activeCpc != null && !activeCpc.isInError()) {
1✔
476
      return activeCpc;
1✔
477
    }
478

479
    ImmutableList<ServerInfo> serverInfos = getServerInfos(authority);
1✔
480
    if (serverInfos == null) {
1✔
481
      throw new IllegalArgumentException("No xds servers found for authority " + authority);
×
482
    }
483

484
    for (ServerInfo serverInfo : serverInfos) {
1✔
485
      ControlPlaneClient cpc = getOrCreateControlPlaneClient(serverInfo);
1✔
486
      if (cpc.isInError()) {
1✔
487
        continue;
1✔
488
      }
489
      return cpc;
1✔
490
    }
491

492
    // Everything existed and is in backoff so throw
493
    throw new IOException("All xds transports for authority " + authority + " are in backoff");
1✔
494
  }
495

496
  private ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) {
497
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
498
    if (serverCpClientMap.containsKey(serverInfo)) {
1✔
499
      return serverCpClientMap.get(serverInfo);
1✔
500
    }
501

502
    logger.log(XdsLogLevel.DEBUG, "Creating control plane client for {0}", serverInfo.target());
1✔
503
    XdsTransportFactory.XdsTransport xdsTransport;
504
    try {
505
      xdsTransport = xdsTransportFactory.create(serverInfo);
1✔
506
    } catch (Exception e) {
1✔
507
      String msg = String.format("Failed to create xds transport for %s: %s",
1✔
508
          serverInfo.target(), e.getMessage());
1✔
509
      logger.log(XdsLogLevel.WARNING, msg);
1✔
510
      xdsTransport =
1✔
511
          new ControlPlaneClient.FailingXdsTransport(Status.UNAVAILABLE.withDescription(msg));
1✔
512
    }
1✔
513

514
    ControlPlaneClient controlPlaneClient = new ControlPlaneClient(
1✔
515
        xdsTransport,
516
        serverInfo,
517
        bootstrapInfo.node(),
1✔
518
        new ResponseHandler(serverInfo),
519
        this,
520
        timeService,
521
        syncContext,
522
        backoffPolicyProvider,
523
        stopwatchSupplier,
524
        messagePrinter
525
    );
526

527
    serverCpClientMap.put(serverInfo, controlPlaneClient);
1✔
528

529
    LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
1✔
530
    loadStatsManagerMap.put(serverInfo, loadStatsManager);
1✔
531
    LoadReportClient lrsClient = new LoadReportClient(
1✔
532
        loadStatsManager, xdsTransport, bootstrapInfo.node(),
1✔
533
        syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
534
    serverLrsClientMap.put(serverInfo, lrsClient);
1✔
535

536
    return controlPlaneClient;
1✔
537
  }
538

539
  @VisibleForTesting
540
  @Override
541
  public Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
542
    return ImmutableMap.copyOf(serverLrsClientMap);
1✔
543
  }
544

545
  @Nullable
546
  private ImmutableList<ServerInfo> getServerInfos(String authority) {
547
    if (authority != null) {
1✔
548
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
1✔
549
      if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
1✔
550
        return null;
1✔
551
      }
552
      return authorityInfo.xdsServers();
1✔
553
    } else {
554
      return bootstrapInfo.servers();
1✔
555
    }
556
  }
557

558
  @SuppressWarnings("unchecked")
559
  private <T extends ResourceUpdate> void handleResourceUpdate(
560
      XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType,
561
      boolean isFirstResponse, ProcessingTracker processingTracker) {
562
    ControlPlaneClient controlPlaneClient = serverCpClientMap.get(args.serverInfo);
1✔
563

564
    if (isFirstResponse) {
1✔
565
      shutdownLowerPriorityCpcs(controlPlaneClient);
1✔
566
    }
567

568
    ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
1✔
569
    logger.log(XdsLogger.XdsLogLevel.INFO,
1✔
570
        "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
571
        xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
1✔
572
    Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
1✔
573
    Set<String> invalidResources = result.invalidResources;
1✔
574
    metricReporter.reportResourceUpdates(Long.valueOf(parsedResources.size()),
1✔
575
        Long.valueOf(invalidResources.size()),
1✔
576
        args.getServerInfo().target(), xdsResourceType.typeUrl());
1✔
577

578
    List<String> errors = result.errors;
1✔
579
    String errorDetail = null;
1✔
580
    if (errors.isEmpty()) {
1✔
581
      checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
1✔
582
      controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, args.nonce);
1✔
583
    } else {
584
      errorDetail = Joiner.on('\n').join(errors);
1✔
585
      logger.log(XdsLogLevel.WARNING,
1✔
586
          "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
587
          xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
1✔
588
      controlPlaneClient.nackResponse(xdsResourceType, args.nonce, errorDetail);
1✔
589
    }
590

591
    long updateTime = timeProvider.currentTimeNanos();
1✔
592
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
1✔
593
        resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
1✔
594
    for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
1✔
595
      String resourceName = entry.getKey();
1✔
596
      ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
1✔
597
      if (parsedResources.containsKey(resourceName)) {
1✔
598
        // Happy path: the resource updated successfully. Notify the watchers of the update.
599
        subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime,
1✔
600
            processingTracker);
601
        continue;
1✔
602
      }
603

604
      if (invalidResources.contains(resourceName)) {
1✔
605
        // The resource update is invalid. Capture the error without notifying the watchers.
606
        subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
1✔
607
      }
608

609
      if (invalidResources.contains(resourceName)) {
1✔
610
        // The resource is missing. Reuse the cached resource if possible.
611
        if (subscriber.data == null) {
1✔
612
          // No cached data. Notify the watchers of an invalid update.
613
          subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
1✔
614
        }
615
        continue;
616
      }
617

618
      // Nothing else to do for incremental ADS resources.
619
      if (!xdsResourceType.isFullStateOfTheWorld()) {
1✔
620
        continue;
1✔
621
      }
622

623
      // For State of the World services, notify watchers when their watched resource is missing
624
      // from the ADS update. Note that we can only do this if the resource update is coming from
625
      // the same xDS server that the ResourceSubscriber is subscribed to.
626
      if (getActiveCpc(subscriber.authority) == controlPlaneClient) {
1✔
627
        subscriber.onAbsent(processingTracker, args.serverInfo);
1✔
628
      }
629
    }
1✔
630
  }
1✔
631

632
  @Override
633
  public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
634
    SettableFuture<Void> future = SettableFuture.create();
1✔
635
    syncContext.execute(() -> {
1✔
636
      serverCpClientMap.forEach((serverInfo, controlPlaneClient) ->
1✔
637
          callback.reportServerConnectionGauge(
1✔
638
              !controlPlaneClient.isInError(), serverInfo.target()));
1✔
639
      future.set(null);
1✔
640
    });
1✔
641
    return future;
1✔
642
  }
643

644
  private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) {
645
    // For each authority, remove any control plane clients, with lower priority than the activated
646
    // one, from activatedCpClients storing them all in cpcsToShutdown.
647
    Set<ControlPlaneClient> cpcsToShutdown = new HashSet<>();
1✔
648
    for ( List<ControlPlaneClient> cpcsForAuth : activatedCpClients.values()) {
1✔
649
      if (cpcsForAuth == null) {
1✔
650
        continue;
×
651
      }
652
      int index = cpcsForAuth.indexOf(activatedCpc);
1✔
653
      if (index > -1) {
1✔
654
        cpcsToShutdown.addAll(cpcsForAuth.subList(index + 1, cpcsForAuth.size()));
1✔
655
        cpcsForAuth.subList(index + 1, cpcsForAuth.size()).clear(); // remove lower priority cpcs
1✔
656
      }
657
    }
1✔
658

659
    // Shutdown any lower priority control plane clients identified above that aren't still being
660
    // used by another authority.  If they are still being used let the XDS server know that we
661
    // no longer are interested in subscriptions for authorities we are no longer responsible for.
662
    for (ControlPlaneClient cpc : cpcsToShutdown) {
1✔
663
      if (activatedCpClients.values().stream().noneMatch(list -> list.contains(cpc))) {
1✔
664
        cpc.shutdown();
1✔
665
        serverCpClientMap.remove(cpc.getServerInfo());
1✔
666
      } else {
667
        cpc.sendDiscoveryRequests();
×
668
      }
669
    }
1✔
670
  }
1✔
671

672

673
  /** Tracks a single subscribed resource. */
674
  private final class ResourceSubscriber<T extends ResourceUpdate> {
675
    @Nullable
676
    private final String authority;
677
    private final XdsResourceType<T> type;
678
    private final String resource;
679
    private final Map<ResourceWatcher<T>, Executor> watchers = new HashMap<>();
1✔
680
    @Nullable
681
    private T data;
682
    private boolean absent;
683
    // Tracks whether the deletion has been ignored per bootstrap server feature.
684
    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
685
    private boolean resourceDeletionIgnored;
686
    @Nullable
687
    private ScheduledHandle respTimer;
688
    @Nullable
689
    private ResourceMetadata metadata;
690
    @Nullable
691
    private String errorDescription;
692
    @Nullable
693
    private Status lastError;
694

695
    ResourceSubscriber(XdsResourceType<T> type, String resource) {
1✔
696
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
697
      this.type = type;
1✔
698
      this.resource = resource;
1✔
699
      this.authority = getAuthorityFromResourceName(resource);
1✔
700
      if (getServerInfos(authority) == null) {
1✔
701
        this.errorDescription = "Wrong configuration: xds server does not exist for resource "
1✔
702
            + resource;
703
        return;
1✔
704
      }
705

706
      // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
707
      // is created but not yet requested because the client is in backoff.
708
      this.metadata = ResourceMetadata.newResourceMetadataUnknown();
1✔
709
    }
1✔
710

711
    @Override
712
    public String toString() {
713
      return "ResourceSubscriber{"
×
714
          + "resource='" + resource + '\''
715
          + ", authority='" + authority + '\''
716
          + ", type=" + type
717
          + ", watchers=" + watchers.size()
×
718
          + ", data=" + data
719
          + ", absent=" + absent
720
          + ", resourceDeletionIgnored=" + resourceDeletionIgnored
721
          + ", errorDescription='" + errorDescription + '\''
722
          + '}';
723
    }
724

725
    void addWatcher(ResourceWatcher<T> watcher, Executor watcherExecutor) {
726
      checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher);
1✔
727
      watchers.put(watcher, watcherExecutor);
1✔
728
      T savedData = data;
1✔
729
      boolean savedAbsent = absent;
1✔
730
      Status savedError = lastError;
1✔
731
      watcherExecutor.execute(() -> {
1✔
732
        if (errorDescription != null) {
1✔
733
          watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
1✔
734
          return;
1✔
735
        }
736
        if (savedError != null) {
1✔
737
          watcher.onError(savedError);
1✔
738
          return;
1✔
739
        }
740
        if (savedData != null) {
1✔
741
          notifyWatcher(watcher, savedData);
1✔
742
        } else if (savedAbsent) {
1✔
743
          watcher.onResourceDoesNotExist(resource);
1✔
744
        }
745
      });
1✔
746
    }
1✔
747

748
    void removeWatcher(ResourceWatcher<T> watcher) {
749
      checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher);
1✔
750
      watchers.remove(watcher);
1✔
751
    }
1✔
752

753
    void restartTimer() {
754
      if (data != null || absent) {  // resource already resolved
1✔
755
        return;
×
756
      }
757
      ControlPlaneClient activeCpc = getActiveCpc(authority);
1✔
758
      if (activeCpc == null || !activeCpc.isReady()) {
1✔
759
        // When client becomes ready, it triggers a restartTimer for all relevant subscribers.
760
        return;
1✔
761
      }
762
      ServerInfo serverInfo = activeCpc.getServerInfo();
1✔
763
      int timeoutSec = serverInfo.resourceTimerIsTransientError()
1✔
764
          ? EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC : INITIAL_RESOURCE_FETCH_TIMEOUT_SEC;
1✔
765

766
      class ResourceNotFound implements Runnable {
1✔
767
        @Override
768
        public void run() {
769
          logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
1✔
770
              type, resource);
1✔
771
          respTimer = null;
1✔
772
          onAbsent(null, activeCpc.getServerInfo());
1✔
773
        }
1✔
774

775
        @Override
776
        public String toString() {
777
          return type + this.getClass().getSimpleName();
1✔
778
        }
779
      }
780

781
      // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
782
      metadata = ResourceMetadata.newResourceMetadataRequested();
1✔
783

784
      if (respTimer != null) {
1✔
785
        respTimer.cancel();
×
786
      }
787
      respTimer = syncContext.schedule(
1✔
788
          new ResourceNotFound(), timeoutSec, TimeUnit.SECONDS, timeService);
1✔
789
    }
1✔
790

791
    void stopTimer() {
792
      if (respTimer != null && respTimer.isPending()) {
1✔
793
        respTimer.cancel();
1✔
794
        respTimer = null;
1✔
795
      }
796
    }
1✔
797

798
    void cancelResourceWatch() {
799
      if (isWatched()) {
1✔
800
        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
×
801
      }
802
      stopTimer();
1✔
803
      String message = "Unsubscribing {0} resource {1} from server {2}";
1✔
804
      XdsLogLevel logLevel = XdsLogLevel.INFO;
1✔
805
      if (resourceDeletionIgnored) {
1✔
806
        message += " for which we previously ignored a deletion";
×
807
        logLevel = XdsLogLevel.FORCE_INFO;
×
808
      }
809
      logger.log(logLevel, message, type, resource, getTarget());
1✔
810
    }
1✔
811

812
    boolean isWatched() {
813
      return !watchers.isEmpty();
1✔
814
    }
815

816
    boolean hasResult() {
817
      return data != null || absent;
1✔
818
    }
819

820
    void onData(ParsedResource<T> parsedResource, String version, long updateTime,
821
                ProcessingTracker processingTracker) {
822
      if (respTimer != null && respTimer.isPending()) {
1✔
823
        respTimer.cancel();
1✔
824
        respTimer = null;
1✔
825
      }
826
      ResourceUpdate oldData = this.data;
1✔
827
      this.data = parsedResource.getResourceUpdate();
1✔
828
      this.metadata = ResourceMetadata
1✔
829
          .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
1✔
830
      absent = false;
1✔
831
      lastError = null;
1✔
832
      if (resourceDeletionIgnored) {
1✔
833
        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
1✔
834
                + "of resource for which we previously ignored a deletion: type {1} name {2}",
835
            getTarget(), type, resource);
1✔
836
        resourceDeletionIgnored = false;
1✔
837
      }
838
      if (!Objects.equals(oldData, data)) {
1✔
839
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
840
          processingTracker.startTask();
1✔
841
          watchers.get(watcher).execute(() -> {
1✔
842
            try {
843
              notifyWatcher(watcher, data);
1✔
844
            } finally {
845
              processingTracker.onComplete();
1✔
846
            }
847
          });
1✔
848
        }
1✔
849
      }
850
    }
1✔
851

852
    private String getTarget() {
853
      ControlPlaneClient activeCpc = getActiveCpc(authority);
1✔
854
      return (activeCpc != null)
1✔
855
             ? activeCpc.getServerInfo().target()
1✔
856
             : "unknown";
1✔
857
    }
858

859
    void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverInfo) {
860
      if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
1✔
861
        return;
1✔
862
      }
863

864
      // Ignore deletion of State of the World resources when this feature is on,
865
      // and the resource is reusable.
866
      boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion();
1✔
867
      if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
1✔
868
        if (!resourceDeletionIgnored) {
1✔
869
          logger.log(XdsLogLevel.FORCE_WARNING,
1✔
870
              "xds server {0}: ignoring deletion for resource type {1} name {2}}",
871
              serverInfo.target(), type, resource);
1✔
872
          resourceDeletionIgnored = true;
1✔
873
        }
874
        return;
1✔
875
      }
876

877
      logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
1✔
878
      if (!absent) {
1✔
879
        data = null;
1✔
880
        absent = true;
1✔
881
        lastError = null;
1✔
882
        metadata = serverInfo.resourceTimerIsTransientError()
1✔
883
            ? ResourceMetadata.newResourceMetadataTimeout()
1✔
884
            : ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
885
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
886
          if (processingTracker != null) {
1✔
887
            processingTracker.startTask();
1✔
888
          }
889
          watchers.get(watcher).execute(() -> {
1✔
890
            try {
891
              if (serverInfo.resourceTimerIsTransientError()) {
1✔
892
                watcher.onError(Status.UNAVAILABLE.withDescription(
1✔
893
                    "Timed out waiting for resource " + resource + " from xDS server"));
894
              } else {
895
                watcher.onResourceDoesNotExist(resource);
1✔
896
              }
897
            } finally {
898
              if (processingTracker != null) {
1✔
899
                processingTracker.onComplete();
1✔
900
              }
901
            }
902
          });
1✔
903
        }
1✔
904
      }
905
    }
1✔
906

907
    void onError(Status error, @Nullable ProcessingTracker tracker) {
908
      if (respTimer != null && respTimer.isPending()) {
1✔
909
        respTimer.cancel();
1✔
910
        respTimer = null;
1✔
911
      }
912

913
      // Include node ID in xds failures to allow cross-referencing with control plane logs
914
      // when debugging.
915
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
916
      Status errorAugmented = Status.fromCode(error.getCode())
1✔
917
          .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
1✔
918
          .withCause(error.getCause());
1✔
919
      this.lastError = errorAugmented;
1✔
920

921
      for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
922
        if (tracker != null) {
1✔
923
          tracker.startTask();
1✔
924
        }
925
        watchers.get(watcher).execute(() -> {
1✔
926
          try {
927
            watcher.onError(errorAugmented);
1✔
928
          } finally {
929
            if (tracker != null) {
1✔
930
              tracker.onComplete();
1✔
931
            }
932
          }
933
        });
1✔
934
      }
1✔
935
    }
1✔
936

937
    void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
938
      metadata = ResourceMetadata
1✔
939
          .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails,
1✔
940
              data != null);
941
    }
1✔
942

943
    private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
944
      watcher.onChanged(update);
1✔
945
    }
1✔
946
  }
947

948
  private class ResponseHandler implements XdsResponseHandler {
949
    final ServerInfo serverInfo;
950

951
    ResponseHandler(ServerInfo serverInfo) {
1✔
952
      this.serverInfo = serverInfo;
1✔
953
    }
1✔
954

955
    @Override
956
    public void handleResourceResponse(
957
        XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
958
        List<Any> resources, String nonce, boolean isFirstResponse,
959
        ProcessingTracker processingTracker) {
960
      checkNotNull(xdsResourceType, "xdsResourceType");
1✔
961
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
962
      Set<String> toParseResourceNames =
963
          xdsResourceType.shouldRetrieveResourceKeysForArgs()
1✔
964
          ? getResourceKeys(xdsResourceType)
1✔
965
          : null;
1✔
966
      XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
1✔
967
          bootstrapInfo, securityConfig, toParseResourceNames);
1✔
968
      handleResourceUpdate(args, resources, xdsResourceType, isFirstResponse, processingTracker);
1✔
969
    }
1✔
970

971
    @Override
972
    public void handleStreamClosed(Status status, boolean shouldTryFallback) {
973
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
974

975
      ControlPlaneClient cpcClosed = serverCpClientMap.get(serverInfo);
1✔
976
      if (cpcClosed == null) {
1✔
977
        logger.log(XdsLogLevel.DEBUG,
×
978
            "Couldn't find closing CPC for {0}, so skipping cleanup and reporting", serverInfo);
979
        return;
×
980
      }
981

982
      cleanUpResourceTimers(cpcClosed);
1✔
983

984
      if (status.isOk()) {
1✔
985
        return; // Not considered an error
1✔
986
      }
987

988
      metricReporter.reportServerFailure(1L, serverInfo.target());
1✔
989

990
      Collection<String> authoritiesForClosedCpc = getActiveAuthorities(cpcClosed);
1✔
991
      for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
992
          resourceSubscribers.values()) {
1✔
993
        for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
994
          if (subscriber.hasResult() || !authoritiesForClosedCpc.contains(subscriber.authority)) {
1✔
995
            continue;
1✔
996
          }
997

998
          // try to fallback to lower priority control plane client
999
          if (shouldTryFallback && manageControlPlaneClient(subscriber).didFallback) {
1✔
1000
            authoritiesForClosedCpc.remove(subscriber.authority);
1✔
1001
            if (authoritiesForClosedCpc.isEmpty()) {
1✔
1002
              return; // optimization: no need to continue once all authorities have done fallback
1✔
1003
            }
1004
            continue; // since we did fallback, don't consider it an error
1005
          }
1006

1007
          subscriber.onError(status, null);
1✔
1008
        }
1✔
1009
      }
1✔
1010
    }
1✔
1011

1012
  }
1013

1014
  private static class CpcWithFallbackState {
1015
    ControlPlaneClient cpc;
1016
    boolean didFallback;
1017

1018
    private CpcWithFallbackState(ControlPlaneClient cpc, boolean didFallback) {
1✔
1019
      this.cpc = cpc;
1✔
1020
      this.didFallback = didFallback;
1✔
1021
    }
1✔
1022
  }
1023

1024
  private Collection<String> getActiveAuthorities(ControlPlaneClient cpc) {
1025
    List<String> asList = activatedCpClients.entrySet().stream()
1✔
1026
        .filter(entry -> !entry.getValue().isEmpty()
1✔
1027
            && cpc == entry.getValue().get(entry.getValue().size() - 1))
1✔
1028
        .map(Map.Entry::getKey)
1✔
1029
        .collect(Collectors.toList());
1✔
1030

1031
    // Since this is usually used for contains, use a set when the list is large
1032
    return (asList.size() < 100) ? asList : new HashSet<>(asList);
1✔
1033
  }
1034

1035
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc