• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20105

05 Dec 2025 11:11AM UTC coverage: 88.639% (-0.008%) from 88.647%
#20105

push

github

web-flow
okhttp: Fix race condition overwriting MAX_CONCURRENT_STREAMS (#12548)

### What this PR does

This PR fixes a race condition in `OkHttpClientTransport` where
`MAX_CONCURRENT_STREAMS` sent by the server could be incorrectly
overwritten by the client's default initialization.

The fix simply reorders the initialization to happen **before** starting
the reader thread, ensuring that any updates from the server are
preserved.

### Note on Testing

I attempted to add a deterministic reproduction test, but reliably
simulating this specific race condition proved difficult without
intrusive changes.
I request reviewers to primarily verify the logical correctness of the
reordering. I am open to collaborating with the team to develop a
suitable test case if required.


### Future Work

This PR covers **Step 1** (Fixing the race condition) of the plan
discussed in #11985.
I plan to follow up with **Step 2** (Adding assertions to verify no
pending streams exist) in a separate PR.

Part of #11985

35173 of 39681 relevant lines covered (88.64%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.65
/../xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
22
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import com.google.common.base.Joiner;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.ImmutableMap;
30
import com.google.common.util.concurrent.ListenableFuture;
31
import com.google.common.util.concurrent.SettableFuture;
32
import com.google.protobuf.Any;
33
import io.grpc.Internal;
34
import io.grpc.InternalLogId;
35
import io.grpc.Status;
36
import io.grpc.StatusOr;
37
import io.grpc.SynchronizationContext;
38
import io.grpc.SynchronizationContext.ScheduledHandle;
39
import io.grpc.internal.BackoffPolicy;
40
import io.grpc.internal.TimeProvider;
41
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
42
import io.grpc.xds.client.Bootstrapper.ServerInfo;
43
import io.grpc.xds.client.XdsClient.ResourceStore;
44
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
45
import java.io.IOException;
46
import java.util.ArrayList;
47
import java.util.Collection;
48
import java.util.Collections;
49
import java.util.HashMap;
50
import java.util.HashSet;
51
import java.util.List;
52
import java.util.Map;
53
import java.util.Objects;
54
import java.util.Set;
55
import java.util.concurrent.Executor;
56
import java.util.concurrent.Future;
57
import java.util.concurrent.ScheduledExecutorService;
58
import java.util.concurrent.TimeUnit;
59
import java.util.stream.Collectors;
60
import javax.annotation.Nullable;
61

62
/**
63
 * XdsClient implementation.
64
 */
65
@Internal
66
public final class XdsClientImpl extends XdsClient implements ResourceStore {
67

68
  // Longest time to wait, since the subscription to some resource, for concluding its absence.
69
  @VisibleForTesting
70
  public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
71
  public static final int EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC = 30;
72

73
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
74
      new Thread.UncaughtExceptionHandler() {
1✔
75
        @Override
76
        public void uncaughtException(Thread t, Throwable e) {
77
          logger.log(
×
78
              XdsLogLevel.ERROR,
79
              "Uncaught exception in XdsClient SynchronizationContext. Panic!",
80
              e);
81
          // TODO: better error handling.
82
          throw new AssertionError(e);
×
83
        }
84
      });
85

86
  private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
1✔
87
  final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
1✔
88
  /** Map of authority to its activated control plane client (affected by xds fallback).
89
   * The last entry in the list for each value is the "active" CPC for the matching key */
90
  private final Map<String, List<ControlPlaneClient>> activatedCpClients = new HashMap<>();
1✔
91
  private final Map<ServerInfo, ControlPlaneClient> serverCpClientMap = new HashMap<>();
1✔
92

93
  /** Maps resource type to the corresponding map of subscribers (keyed by resource name). */
94
  private final Map<XdsResourceType<? extends ResourceUpdate>,
1✔
95
      Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
96
      resourceSubscribers = new HashMap<>();
97
  /** Maps typeUrl to the corresponding XdsResourceType. */
98
  private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
1✔
99

100
  private final XdsTransportFactory xdsTransportFactory;
101
  private final Bootstrapper.BootstrapInfo bootstrapInfo;
102
  private final ScheduledExecutorService timeService;
103
  private final BackoffPolicy.Provider backoffPolicyProvider;
104
  private final Supplier<Stopwatch> stopwatchSupplier;
105
  private final TimeProvider timeProvider;
106
  private final Object securityConfig;
107
  private final InternalLogId logId;
108
  private final XdsLogger logger;
109
  private volatile boolean isShutdown;
110
  private final MessagePrettyPrinter messagePrinter;
111
  private final XdsClientMetricReporter metricReporter;
112

113
  public XdsClientImpl(
114
      XdsTransportFactory xdsTransportFactory,
115
      Bootstrapper.BootstrapInfo bootstrapInfo,
116
      ScheduledExecutorService timeService,
117
      BackoffPolicy.Provider backoffPolicyProvider,
118
      Supplier<Stopwatch> stopwatchSupplier,
119
      TimeProvider timeProvider,
120
      MessagePrettyPrinter messagePrinter,
121
      Object securityConfig,
122
      XdsClientMetricReporter metricReporter) {
1✔
123
    this.xdsTransportFactory = xdsTransportFactory;
1✔
124
    this.bootstrapInfo = bootstrapInfo;
1✔
125
    this.timeService = timeService;
1✔
126
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
127
    this.stopwatchSupplier = stopwatchSupplier;
1✔
128
    this.timeProvider = timeProvider;
1✔
129
    this.messagePrinter = messagePrinter;
1✔
130
    this.securityConfig = securityConfig;
1✔
131
    this.metricReporter = metricReporter;
1✔
132
    logId = InternalLogId.allocate("xds-client", null);
1✔
133
    logger = XdsLogger.withLogId(logId);
1✔
134
    logger.log(XdsLogLevel.INFO, "Created");
1✔
135
  }
1✔
136

137
  @Override
138
  public void shutdown() {
139
    syncContext.execute(
1✔
140
        new Runnable() {
1✔
141
          @Override
142
          public void run() {
143
            if (isShutdown) {
1✔
144
              return;
×
145
            }
146
            isShutdown = true;
1✔
147
            for (ControlPlaneClient xdsChannel : serverCpClientMap.values()) {
1✔
148
              xdsChannel.shutdown();
1✔
149
            }
1✔
150
            for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
1✔
151
              lrsClient.stopLoadReporting();
1✔
152
            }
1✔
153
            cleanUpResourceTimers(null);
1✔
154
            activatedCpClients.clear();
1✔
155
          }
1✔
156
        });
157
  }
1✔
158

159
  @Override
160
  public boolean isShutDown() {
161
    return isShutdown;
1✔
162
  }
163

164
  @Override
165
  public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
166
    return Collections.unmodifiableMap(subscribedResourceTypeUrls);
1✔
167
  }
168

169
  private ControlPlaneClient getActiveCpc(String authority) {
170
    List<ControlPlaneClient> controlPlaneClients = activatedCpClients.get(authority);
1✔
171
    if (controlPlaneClients == null || controlPlaneClients.isEmpty()) {
1✔
172
      return null;
1✔
173
    }
174

175
    return controlPlaneClients.get(controlPlaneClients.size() - 1);
1✔
176
  }
177

178
  @Nullable
179
  @Override
180
  public Collection<String> getSubscribedResources(
181
      ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type) {
182
    ControlPlaneClient targetCpc = serverCpClientMap.get(serverInfo);
1✔
183
    if (targetCpc == null) {
1✔
184
      return null;
×
185
    }
186

187
    // This should include all of the authorities that targetCpc or a fallback from it is serving
188
    List<String> authorities = activatedCpClients.entrySet().stream()
1✔
189
        .filter(entry -> entry.getValue().contains(targetCpc))
1✔
190
        .map(Map.Entry::getKey)
1✔
191
        .collect(Collectors.toList());
1✔
192

193
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
1✔
194
        resourceSubscribers.getOrDefault(type, Collections.emptyMap());
1✔
195

196
    Collection<String> retVal = resources.entrySet().stream()
1✔
197
        .filter(entry -> authorities.contains(entry.getValue().authority))
1✔
198
        .map(Map.Entry::getKey)
1✔
199
        .collect(Collectors.toList());
1✔
200

201
    return retVal.isEmpty() ? null : retVal;
1✔
202
  }
203

204
  @Override
205
  public void startMissingResourceTimers(Collection<String> resourceNames,
206
                                         XdsResourceType<?> resourceType) {
207
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap =
1✔
208
        resourceSubscribers.get(resourceType);
1✔
209

210
    for (String resourceName : resourceNames) {
1✔
211
      ResourceSubscriber<?> subscriber = subscriberMap.get(resourceName);
1✔
212
      if (subscriber.respTimer == null && !subscriber.hasResult()) {
1✔
213
        subscriber.restartTimer();
1✔
214
      }
215
    }
1✔
216
  }
1✔
217

218
  // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
219
  // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
220
  @Override
221
  public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
222
      getSubscribedResourcesMetadataSnapshot() {
223
    final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
224
        SettableFuture.create();
1✔
225
    syncContext.execute(new Runnable() {
1✔
226
      @Override
227
      public void run() {
228
        // A map from a "resource type" to a map ("resource name": "resource metadata")
229
        ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
230
            ImmutableMap.builder();
1✔
231
        for (XdsResourceType<?> resourceType : resourceSubscribers.keySet()) {
1✔
232
          ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
1✔
233
          for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
234
              : resourceSubscribers.get(resourceType).entrySet()) {
1✔
235
            metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
1✔
236
          }
1✔
237
          metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
1✔
238
        }
1✔
239
        future.set(metadataSnapshot.buildOrThrow());
1✔
240
      }
1✔
241
    });
242
    return future;
1✔
243
  }
244

245
  @Override
246
  public Object getSecurityConfig() {
247
    return securityConfig;
×
248
  }
249

250
  @Override
251
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
252
                                                          String resourceName,
253
                                                          ResourceWatcher<T> watcher,
254
                                                          Executor watcherExecutor) {
255
    syncContext.execute(new Runnable() {
1✔
256
      @Override
257
      @SuppressWarnings("unchecked")
258
      public void run() {
259
        if (!resourceSubscribers.containsKey(type)) {
1✔
260
          resourceSubscribers.put(type, new HashMap<>());
1✔
261
          subscribedResourceTypeUrls.put(type.typeUrl(), type);
1✔
262
        }
263
        ResourceSubscriber<T> subscriber =
1✔
264
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
265

266
        if (subscriber == null) {
1✔
267
          logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
1✔
268
          subscriber = new ResourceSubscriber<>(type, resourceName);
1✔
269
          resourceSubscribers.get(type).put(resourceName, subscriber);
1✔
270

271
          if (subscriber.errorDescription == null) {
1✔
272
            CpcWithFallbackState cpcToUse = manageControlPlaneClient(subscriber);
1✔
273
            if (cpcToUse.cpc != null) {
1✔
274
              cpcToUse.cpc.adjustResourceSubscription(type);
1✔
275
            }
276
          }
277
        }
278

279
        subscriber.addWatcher(watcher, watcherExecutor);
1✔
280
      }
1✔
281
    });
282
  }
1✔
283

284
  /**
285
   * Gets a ControlPlaneClient for the subscriber's authority, creating one if necessary.
286
   * If there already was an active CPC for this authority, and it is different from the one
287
   * identified, then do fallback to the identified one (cpcToUse).
288
   *
289
   * @return identified CPC or {@code null} (if there are no valid ServerInfos associated with the
290
   *     subscriber's authority or CPC's for all are in backoff), and whether did a fallback.
291
   */
292
  @VisibleForTesting
293
  private <T extends ResourceUpdate> CpcWithFallbackState manageControlPlaneClient(
294
      ResourceSubscriber<T> subscriber) {
295

296
    ControlPlaneClient cpcToUse;
297
    boolean didFallback = false;
1✔
298
    try {
299
      cpcToUse = getOrCreateControlPlaneClient(subscriber.authority);
1✔
300
    } catch (IllegalArgumentException e) {
×
301
      if (subscriber.errorDescription == null) {
×
302
        subscriber.errorDescription = "Bad configuration:  " + e.getMessage();
×
303
      }
304

305
      subscriber.onError(
×
306
          Status.INVALID_ARGUMENT.withDescription(subscriber.errorDescription), null);
×
307
      return new CpcWithFallbackState(null, false);
×
308
    } catch (IOException e) {
1✔
309
      logger.log(XdsLogLevel.DEBUG,
1✔
310
          "Could not create a control plane client for authority {0}: {1}",
311
          subscriber.authority, e.getMessage());
1✔
312
      return new CpcWithFallbackState(null, false);
1✔
313
    }
1✔
314

315
    ControlPlaneClient activeCpClient = getActiveCpc(subscriber.authority);
1✔
316
    if (cpcToUse != activeCpClient) {
1✔
317
      addCpcToAuthority(subscriber.authority, cpcToUse); // makes it active
1✔
318
      if (activeCpClient != null) {
1✔
319
        didFallback = cpcToUse != null && !cpcToUse.isInError();
1✔
320
        if (didFallback) {
1✔
321
          logger.log(XdsLogLevel.INFO, "Falling back to XDS server {0}",
1✔
322
              cpcToUse.getServerInfo().target());
1✔
323
        } else {
324
          logger.log(XdsLogLevel.WARNING, "No working fallback XDS Servers found from {0}",
×
325
              activeCpClient.getServerInfo().target());
×
326
        }
327
      }
328
    }
329

330
    return new CpcWithFallbackState(cpcToUse, didFallback);
1✔
331
  }
332

333
  private void addCpcToAuthority(String authority, ControlPlaneClient cpcToUse) {
334
    List<ControlPlaneClient> controlPlaneClients =
1✔
335
        activatedCpClients.computeIfAbsent(authority, k -> new ArrayList<>());
1✔
336

337
    if (controlPlaneClients.contains(cpcToUse)) {
1✔
338
      return;
×
339
    }
340

341
    // if there are any missing CPCs between the last one and cpcToUse, add them + add cpcToUse
342
    ImmutableList<ServerInfo> serverInfos = getServerInfos(authority);
1✔
343
    for (int i = controlPlaneClients.size(); i < serverInfos.size(); i++) {
1✔
344
      ServerInfo serverInfo = serverInfos.get(i);
1✔
345
      ControlPlaneClient cpc = serverCpClientMap.get(serverInfo);
1✔
346
      controlPlaneClients.add(cpc);
1✔
347
      logger.log(XdsLogLevel.DEBUG, "Adding control plane client {0} to authority {1}",
1✔
348
          cpc, authority);
349
      cpcToUse.sendDiscoveryRequests();
1✔
350
      if (cpc == cpcToUse) {
1✔
351
        break;
1✔
352
      }
353
    }
354
  }
1✔
355

356
  @Override
357
  public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
358
                                                                String resourceName,
359
                                                                ResourceWatcher<T> watcher) {
360
    syncContext.execute(new Runnable() {
1✔
361
      @Override
362
      @SuppressWarnings("unchecked")
363
      public void run() {
364
        ResourceSubscriber<T> subscriber =
1✔
365
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
366
        if (subscriber == null) {
1✔
367
          logger.log(XdsLogLevel.WARNING, "double cancel of resource watch for {0}:{1}",
1✔
368
              type.typeName(), resourceName);
1✔
369
          return;
1✔
370
        }
371
        subscriber.removeWatcher(watcher);
1✔
372
        if (!subscriber.isWatched()) {
1✔
373
          subscriber.cancelResourceWatch();
1✔
374
          resourceSubscribers.get(type).remove(resourceName);
1✔
375

376
          List<ControlPlaneClient> controlPlaneClients =
1✔
377
              activatedCpClients.get(subscriber.authority);
1✔
378
          if (controlPlaneClients != null) {
1✔
379
            controlPlaneClients.forEach((cpc) -> {
1✔
380
              cpc.adjustResourceSubscription(type);
1✔
381
            });
1✔
382
          }
383

384
          if (resourceSubscribers.get(type).isEmpty()) {
1✔
385
            resourceSubscribers.remove(type);
1✔
386
            subscribedResourceTypeUrls.remove(type.typeUrl());
1✔
387
          }
388
        }
389
      }
1✔
390
    });
391
  }
1✔
392

393
  @Override
394
  public LoadStatsManager2.ClusterDropStats addClusterDropStats(
395
      final ServerInfo serverInfo, String clusterName,
396
      @Nullable String edsServiceName) {
397
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
398
    LoadStatsManager2.ClusterDropStats dropCounter =
1✔
399
        loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
1✔
400
    syncContext.execute(new Runnable() {
1✔
401
      @Override
402
      public void run() {
403
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
404
      }
1✔
405
    });
406
    return dropCounter;
1✔
407
  }
408

409
  @Override
410
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
411
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
412
      Locality locality) {
413
    return addClusterLocalityStats(serverInfo, clusterName, edsServiceName, locality, null);
1✔
414
  }
415

416
  @Override
417
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
418
      final ServerInfo serverInfo,
419
      String clusterName,
420
      @Nullable String edsServiceName,
421
      Locality locality,
422
      @Nullable BackendMetricPropagation backendMetricPropagation) {
423
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
424

425
    LoadStatsManager2.ClusterLocalityStats loadCounter =
1✔
426
        loadStatsManager.getClusterLocalityStats(
1✔
427
            clusterName, edsServiceName, locality, backendMetricPropagation);
428

429
    syncContext.execute(new Runnable() {
1✔
430
      @Override
431
      public void run() {
432
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
433
      }
1✔
434
    });
435
    return loadCounter;
1✔
436
  }
437

438

439
  @Override
440
  public Bootstrapper.BootstrapInfo getBootstrapInfo() {
441
    return bootstrapInfo;
1✔
442
  }
443

444
  @Override
445
  public String toString() {
446
    return logId.toString();
×
447
  }
448

449
  private Set<String> getResourceKeys(XdsResourceType<?> xdsResourceType) {
450
    if (!resourceSubscribers.containsKey(xdsResourceType)) {
1✔
451
      return null;
×
452
    }
453

454
    return resourceSubscribers.get(xdsResourceType).keySet();
1✔
455
  }
456

457
  // cpcForThisStream is null when doing shutdown
458
  private void cleanUpResourceTimers(ControlPlaneClient cpcForThisStream) {
459
    Collection<String> authoritiesForCpc = getActiveAuthorities(cpcForThisStream);
1✔
460
    String target = cpcForThisStream == null ? "null" : cpcForThisStream.getServerInfo().target();
1✔
461
    logger.log(XdsLogLevel.DEBUG, "Cleaning up resource timers for CPC {0}, authorities {1}",
1✔
462
        target, authoritiesForCpc);
463

464
    for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
465
      for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
466
        if (cpcForThisStream == null || authoritiesForCpc.contains(subscriber.authority)) {
1✔
467
          subscriber.stopTimer();
1✔
468
        }
469
      }
1✔
470
    }
1✔
471
  }
1✔
472

473
  private ControlPlaneClient getOrCreateControlPlaneClient(String authority) throws IOException {
474
    // Optimize for the common case of a working ads stream already exists for the authority
475
    ControlPlaneClient activeCpc = getActiveCpc(authority);
1✔
476
    if (activeCpc != null && !activeCpc.isInError()) {
1✔
477
      return activeCpc;
1✔
478
    }
479

480
    ImmutableList<ServerInfo> serverInfos = getServerInfos(authority);
1✔
481
    if (serverInfos == null) {
1✔
482
      throw new IllegalArgumentException("No xds servers found for authority " + authority);
×
483
    }
484

485
    for (ServerInfo serverInfo : serverInfos) {
1✔
486
      ControlPlaneClient cpc = getOrCreateControlPlaneClient(serverInfo);
1✔
487
      if (cpc.isInError()) {
1✔
488
        continue;
1✔
489
      }
490
      return cpc;
1✔
491
    }
492

493
    // Everything existed and is in backoff so throw
494
    throw new IOException("All xds transports for authority " + authority + " are in backoff");
1✔
495
  }
496

497
  private ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) {
498
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
499
    if (serverCpClientMap.containsKey(serverInfo)) {
1✔
500
      return serverCpClientMap.get(serverInfo);
1✔
501
    }
502

503
    logger.log(XdsLogLevel.DEBUG, "Creating control plane client for {0}", serverInfo.target());
1✔
504
    XdsTransportFactory.XdsTransport xdsTransport;
505
    try {
506
      xdsTransport = xdsTransportFactory.create(serverInfo);
1✔
507
    } catch (Exception e) {
1✔
508
      String msg = String.format("Failed to create xds transport for %s: %s",
1✔
509
          serverInfo.target(), e.getMessage());
1✔
510
      logger.log(XdsLogLevel.WARNING, msg);
1✔
511
      xdsTransport =
1✔
512
          new ControlPlaneClient.FailingXdsTransport(Status.UNAVAILABLE.withDescription(msg));
1✔
513
    }
1✔
514

515
    ControlPlaneClient controlPlaneClient = new ControlPlaneClient(
1✔
516
        xdsTransport,
517
        serverInfo,
518
        bootstrapInfo.node(),
1✔
519
        new ResponseHandler(serverInfo),
520
        this,
521
        timeService,
522
        syncContext,
523
        backoffPolicyProvider,
524
        stopwatchSupplier,
525
        messagePrinter
526
    );
527

528
    serverCpClientMap.put(serverInfo, controlPlaneClient);
1✔
529

530
    LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
1✔
531
    loadStatsManagerMap.put(serverInfo, loadStatsManager);
1✔
532
    LoadReportClient lrsClient = new LoadReportClient(
1✔
533
        loadStatsManager, xdsTransport, bootstrapInfo.node(),
1✔
534
        syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
535
    serverLrsClientMap.put(serverInfo, lrsClient);
1✔
536

537
    return controlPlaneClient;
1✔
538
  }
539

540
  @VisibleForTesting
541
  @Override
542
  public Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
543
    return ImmutableMap.copyOf(serverLrsClientMap);
1✔
544
  }
545

546
  @Nullable
547
  private ImmutableList<ServerInfo> getServerInfos(String authority) {
548
    if (authority != null) {
1✔
549
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
1✔
550
      if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
1✔
551
        return null;
1✔
552
      }
553
      return authorityInfo.xdsServers();
1✔
554
    } else {
555
      return bootstrapInfo.servers();
1✔
556
    }
557
  }
558

559
  @SuppressWarnings("unchecked")
560
  private <T extends ResourceUpdate> void handleResourceUpdate(
561
      XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType,
562
      boolean isFirstResponse, ProcessingTracker processingTracker) {
563
    ControlPlaneClient controlPlaneClient = serverCpClientMap.get(args.serverInfo);
1✔
564

565
    if (isFirstResponse) {
1✔
566
      shutdownLowerPriorityCpcs(controlPlaneClient);
1✔
567
    }
568

569
    ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
1✔
570
    logger.log(XdsLogger.XdsLogLevel.INFO,
1✔
571
        "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
572
        xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
1✔
573
    Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
1✔
574
    Set<String> invalidResources = result.invalidResources;
1✔
575
    metricReporter.reportResourceUpdates(Long.valueOf(parsedResources.size()),
1✔
576
        Long.valueOf(invalidResources.size()),
1✔
577
        args.getServerInfo().target(), xdsResourceType.typeUrl());
1✔
578

579
    List<String> errors = result.errors;
1✔
580
    String errorDetail = null;
1✔
581
    if (errors.isEmpty()) {
1✔
582
      checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
1✔
583
      controlPlaneClient.ackResponse(xdsResourceType, args.versionInfo, args.nonce);
1✔
584
    } else {
585
      errorDetail = Joiner.on('\n').join(errors);
1✔
586
      logger.log(XdsLogLevel.WARNING,
1✔
587
          "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
588
          xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
1✔
589
      controlPlaneClient.nackResponse(xdsResourceType, args.nonce, errorDetail);
1✔
590
    }
591

592
    long updateTime = timeProvider.currentTimeNanos();
1✔
593
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
1✔
594
        resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
1✔
595
    for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
1✔
596
      String resourceName = entry.getKey();
1✔
597
      ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
1✔
598
      if (parsedResources.containsKey(resourceName)) {
1✔
599
        // Happy path: the resource updated successfully. Notify the watchers of the update.
600
        subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime,
1✔
601
            processingTracker);
602
        continue;
1✔
603
      }
604

605
      if (invalidResources.contains(resourceName)) {
1✔
606
        // The resource update is invalid (NACK). Handle as a data error.
607
        subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
1✔
608
        
609
        // Handle data errors (NACKs) based on fail_on_data_errors server feature.
610
        // When xdsDataErrorHandlingEnabled is true and fail_on_data_errors is present,
611
        // delete cached data so onError will call onResourceChanged instead of onAmbientError.
612
        // When xdsDataErrorHandlingEnabled is false, use old behavior (always keep cached data).
613
        if (BootstrapperImpl.xdsDataErrorHandlingEnabled && subscriber.data != null
1✔
614
            && args.serverInfo.failOnDataErrors()) {
1✔
615
          subscriber.data = null;
1✔
616
        }
617
        // Call onError, which will decide whether to call onResourceChanged or onAmbientError
618
        // based on whether data exists after the above deletion.
619
        subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
1✔
620
        continue;
1✔
621
      }
622

623
      // Nothing else to do for incremental ADS resources.
624
      if (!xdsResourceType.isFullStateOfTheWorld()) {
1✔
625
        continue;
1✔
626
      }
627

628
      // For State of the World services, notify watchers when their watched resource is missing
629
      // from the ADS update. Note that we can only do this if the resource update is coming from
630
      // the same xDS server that the ResourceSubscriber is subscribed to.
631
      if (getActiveCpc(subscriber.authority) == controlPlaneClient) {
1✔
632
        subscriber.onAbsent(processingTracker, args.serverInfo);
1✔
633
      }
634
    }
1✔
635
  }
1✔
636

637
  @Override
638
  public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
639
    SettableFuture<Void> future = SettableFuture.create();
1✔
640
    syncContext.execute(() -> {
1✔
641
      serverCpClientMap.forEach((serverInfo, controlPlaneClient) ->
1✔
642
          callback.reportServerConnectionGauge(
1✔
643
              !controlPlaneClient.isInError(), serverInfo.target()));
1✔
644
      future.set(null);
1✔
645
    });
1✔
646
    return future;
1✔
647
  }
648

649
  private void shutdownLowerPriorityCpcs(ControlPlaneClient activatedCpc) {
650
    // For each authority, remove any control plane clients, with lower priority than the activated
651
    // one, from activatedCpClients storing them all in cpcsToShutdown.
652
    Set<ControlPlaneClient> cpcsToShutdown = new HashSet<>();
1✔
653
    for ( List<ControlPlaneClient> cpcsForAuth : activatedCpClients.values()) {
1✔
654
      if (cpcsForAuth == null) {
1✔
655
        continue;
×
656
      }
657
      int index = cpcsForAuth.indexOf(activatedCpc);
1✔
658
      if (index > -1) {
1✔
659
        cpcsToShutdown.addAll(cpcsForAuth.subList(index + 1, cpcsForAuth.size()));
1✔
660
        cpcsForAuth.subList(index + 1, cpcsForAuth.size()).clear(); // remove lower priority cpcs
1✔
661
      }
662
    }
1✔
663

664
    // Shutdown any lower priority control plane clients identified above that aren't still being
665
    // used by another authority.  If they are still being used let the XDS server know that we
666
    // no longer are interested in subscriptions for authorities we are no longer responsible for.
667
    for (ControlPlaneClient cpc : cpcsToShutdown) {
1✔
668
      if (activatedCpClients.values().stream().noneMatch(list -> list.contains(cpc))) {
1✔
669
        cpc.shutdown();
1✔
670
        serverCpClientMap.remove(cpc.getServerInfo());
1✔
671
      } else {
672
        cpc.sendDiscoveryRequests();
×
673
      }
674
    }
1✔
675
  }
1✔
676

677

678
  /** Tracks a single subscribed resource. */
679
  private final class ResourceSubscriber<T extends ResourceUpdate> {
680
    @Nullable
681
    private final String authority;
682
    private final XdsResourceType<T> type;
683
    private final String resource;
684
    private final Map<ResourceWatcher<T>, Executor> watchers = new HashMap<>();
1✔
685
    @Nullable
686
    private T data;
687
    private boolean absent;
688
    // Tracks whether the deletion has been ignored per bootstrap server feature.
689
    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
690
    private boolean resourceDeletionIgnored;
691
    @Nullable
692
    private ScheduledHandle respTimer;
693
    @Nullable
694
    private ResourceMetadata metadata;
695
    @Nullable
696
    private String errorDescription;
697
    @Nullable
698
    private Status lastError;
699

700
    ResourceSubscriber(XdsResourceType<T> type, String resource) {
1✔
701
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
702
      this.type = type;
1✔
703
      this.resource = resource;
1✔
704
      this.authority = getAuthorityFromResourceName(resource);
1✔
705
      if (getServerInfos(authority) == null) {
1✔
706
        this.errorDescription = "Wrong configuration: xds server does not exist for resource "
1✔
707
            + resource;
708
        return;
1✔
709
      }
710

711
      // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
712
      // is created but not yet requested because the client is in backoff.
713
      this.metadata = ResourceMetadata.newResourceMetadataUnknown();
1✔
714
    }
1✔
715

716
    @Override
717
    public String toString() {
718
      return "ResourceSubscriber{"
×
719
          + "resource='" + resource + '\''
720
          + ", authority='" + authority + '\''
721
          + ", type=" + type
722
          + ", watchers=" + watchers.size()
×
723
          + ", data=" + data
724
          + ", absent=" + absent
725
          + ", resourceDeletionIgnored=" + resourceDeletionIgnored
726
          + ", errorDescription='" + errorDescription + '\''
727
          + '}';
728
    }
729

730
    void addWatcher(ResourceWatcher<T> watcher, Executor watcherExecutor) {
731
      checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher);
1✔
732
      watchers.put(watcher, watcherExecutor);
1✔
733
      T savedData = data;
1✔
734
      boolean savedAbsent = absent;
1✔
735
      Status savedError = lastError;
1✔
736
      watcherExecutor.execute(() -> {
1✔
737
        if (errorDescription != null) {
1✔
738
          watcher.onResourceChanged(StatusOr.fromStatus(
1✔
739
              Status.INVALID_ARGUMENT.withDescription(errorDescription)));
1✔
740
          return;
1✔
741
        }
742
        if (savedData != null) {
1✔
743
          watcher.onResourceChanged(StatusOr.fromValue(savedData));
1✔
744
          if (savedError != null) {
1✔
745
            watcher.onAmbientError(savedError);
1✔
746
          }
747
        } else if (savedError != null) {
1✔
748
          watcher.onResourceChanged(StatusOr.fromStatus(savedError));
1✔
749
        } else if (savedAbsent) {
1✔
750
          watcher.onResourceChanged(StatusOr.fromStatus(
1✔
751
              Status.NOT_FOUND.withDescription("Resource " + resource + " does not exist")));
1✔
752
        }
753
      });
1✔
754
    }
1✔
755

756
    void removeWatcher(ResourceWatcher<T> watcher) {
757
      checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher);
1✔
758
      watchers.remove(watcher);
1✔
759
    }
1✔
760

761
    void restartTimer() {
762
      if (data != null || absent) {  // resource already resolved
1✔
763
        return;
×
764
      }
765
      ControlPlaneClient activeCpc = getActiveCpc(authority);
1✔
766
      if (activeCpc == null || !activeCpc.isReady()) {
1✔
767
        // When client becomes ready, it triggers a restartTimer for all relevant subscribers.
768
        return;
1✔
769
      }
770
      ServerInfo serverInfo = activeCpc.getServerInfo();
1✔
771
      int timeoutSec = serverInfo.resourceTimerIsTransientError()
1✔
772
          ? EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC : INITIAL_RESOURCE_FETCH_TIMEOUT_SEC;
1✔
773

774
      class ResourceNotFound implements Runnable {
1✔
775
        @Override
776
        public void run() {
777
          logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
1✔
778
              type, resource);
1✔
779
          onAbsent(null, activeCpc.getServerInfo());
1✔
780
          respTimer = null;
1✔
781
        }
1✔
782

783
        @Override
784
        public String toString() {
785
          return type + this.getClass().getSimpleName();
1✔
786
        }
787
      }
788

789
      // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
790
      metadata = ResourceMetadata.newResourceMetadataRequested();
1✔
791

792
      if (respTimer != null) {
1✔
793
        respTimer.cancel();
×
794
      }
795
      respTimer = syncContext.schedule(
1✔
796
          new ResourceNotFound(), timeoutSec, TimeUnit.SECONDS, timeService);
1✔
797
    }
1✔
798

799
    void stopTimer() {
800
      if (respTimer != null && respTimer.isPending()) {
1✔
801
        respTimer.cancel();
1✔
802
        respTimer = null;
1✔
803
      }
804
    }
1✔
805

806
    void cancelResourceWatch() {
807
      if (isWatched()) {
1✔
808
        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
×
809
      }
810
      stopTimer();
1✔
811
      String message = "Unsubscribing {0} resource {1} from server {2}";
1✔
812
      XdsLogLevel logLevel = XdsLogLevel.INFO;
1✔
813
      if (resourceDeletionIgnored) {
1✔
814
        message += " for which we previously ignored a deletion";
×
815
        logLevel = XdsLogLevel.FORCE_INFO;
×
816
      }
817
      logger.log(logLevel, message, type, resource, getTarget());
1✔
818
    }
1✔
819

820
    boolean isWatched() {
821
      return !watchers.isEmpty();
1✔
822
    }
823

824
    boolean hasResult() {
825
      return data != null || absent;
1✔
826
    }
827

828
    void onData(ParsedResource<T> parsedResource, String version, long updateTime,
829
                ProcessingTracker processingTracker) {
830
      if (respTimer != null && respTimer.isPending()) {
1✔
831
        respTimer.cancel();
1✔
832
        respTimer = null;
1✔
833
      }
834
      ResourceUpdate oldData = this.data;
1✔
835
      this.data = parsedResource.getResourceUpdate();
1✔
836
      this.metadata = ResourceMetadata.newResourceMetadataAcked(
1✔
837
          parsedResource.getRawResource(), version, updateTime);
1✔
838
      absent = false;
1✔
839
      lastError = null;
1✔
840
      if (resourceDeletionIgnored) {
1✔
841
        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
1✔
842
                + "of resource for which we previously ignored a deletion: type {1} name {2}",
843
            getTarget(), type, resource);
1✔
844
        resourceDeletionIgnored = false;
1✔
845
      }
846
      if (!Objects.equals(oldData, data)) {
1✔
847
        StatusOr<T> update = StatusOr.fromValue(data);
1✔
848
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
849
          processingTracker.startTask();
1✔
850
          watchers.get(watcher).execute(() -> {
1✔
851
            try {
852
              watcher.onResourceChanged(update);
1✔
853
            } finally {
854
              processingTracker.onComplete();
1✔
855
            }
856
          });
1✔
857
        }
1✔
858
      }
859
    }
1✔
860

861
    private String getTarget() {
862
      ControlPlaneClient activeCpc = getActiveCpc(authority);
1✔
863
      return (activeCpc != null)
1✔
864
             ? activeCpc.getServerInfo().target()
1✔
865
             : "unknown";
1✔
866
    }
867

868
    void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverInfo) {
869
      if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
1✔
870
        return;
1✔
871
      }
872

873
      // Handle data errors (resource deletions) based on fail_on_data_errors server feature.
874
      // When xdsDataErrorHandlingEnabled is true and fail_on_data_errors is not present,
875
      // we treat deletions as ambient errors and keep using the cached resource.
876
      // When fail_on_data_errors is present, we delete the cached resource and fail.
877
      // When xdsDataErrorHandlingEnabled is false, use the old behavior (ignore_resource_deletion).
878
      boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion();
1✔
879
      boolean failOnDataErrors = serverInfo.failOnDataErrors();
1✔
880
      boolean xdsDataErrorHandlingEnabled = BootstrapperImpl.xdsDataErrorHandlingEnabled;
1✔
881

882
      if (type.isFullStateOfTheWorld() && data != null) {
1✔
883
        // New behavior (per gRFC A88): Default is to treat deletions as ambient errors
884
        if (xdsDataErrorHandlingEnabled && !failOnDataErrors) {
1✔
885
          if (!resourceDeletionIgnored) {
1✔
886
            logger.log(XdsLogLevel.FORCE_WARNING,
1✔
887
                "xds server {0}: ignoring deletion for resource type {1} name {2}}",
888
                serverInfo.target(), type, resource);
1✔
889
            resourceDeletionIgnored = true;
1✔
890
          }
891
          Status deletionStatus = Status.NOT_FOUND.withDescription(
1✔
892
              "Resource " + resource + " deleted from server");
893
          onAmbientError(deletionStatus, processingTracker);
1✔
894
          return;
1✔
895
        }
896
        // Old behavior: Use ignore_resource_deletion server feature
897
        if (!xdsDataErrorHandlingEnabled && ignoreResourceDeletionEnabled) {
1✔
898
          if (!resourceDeletionIgnored) {
1✔
899
            logger.log(XdsLogLevel.FORCE_WARNING,
1✔
900
                "xds server {0}: ignoring deletion for resource type {1} name {2}}",
901
                serverInfo.target(), type, resource);
1✔
902
            resourceDeletionIgnored = true;
1✔
903
          }
904
          Status deletionStatus = Status.NOT_FOUND.withDescription(
1✔
905
              "Resource " + resource + " deleted from server");
906
          onAmbientError(deletionStatus, processingTracker);
1✔
907
          return;
1✔
908
        }
909
      }
910

911
      logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
1✔
912
      if (!absent) {
1✔
913
        data = null;
1✔
914
        absent = true;
1✔
915
        lastError = null;
1✔
916

917
        Status status;
918
        if (respTimer == null) {
1✔
919
          status = Status.NOT_FOUND.withDescription("Resource " + resource + " does not exist");
1✔
920
          metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
921
        } else {
922
          status = serverInfo.resourceTimerIsTransientError()
1✔
923
              ? Status.UNAVAILABLE.withDescription(
1✔
924
                  "Timed out waiting for resource " + resource + " from xDS server")
925
              : Status.NOT_FOUND.withDescription(
1✔
926
                  "Timed out waiting for resource " + resource + " from xDS server");
927
          metadata = serverInfo.resourceTimerIsTransientError()
1✔
928
              ? ResourceMetadata.newResourceMetadataTimeout()
1✔
929
              : ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
930
        }
931

932
        StatusOr<T> update = StatusOr.fromStatus(status);
1✔
933
        for (Map.Entry<ResourceWatcher<T>, Executor> entry : watchers.entrySet()) {
1✔
934
          if (processingTracker != null) {
1✔
935
            processingTracker.startTask();
1✔
936
          }
937
          entry.getValue().execute(() -> {
1✔
938
            try {
939
              entry.getKey().onResourceChanged(update);
1✔
940
            } finally {
941
              if (processingTracker != null) {
1✔
942
                processingTracker.onComplete();
1✔
943
              }
944
            }
945
          });
1✔
946
        }
1✔
947
      }
948
    }
1✔
949

950
    void onError(Status error, @Nullable ProcessingTracker tracker) {
951
      if (respTimer != null && respTimer.isPending()) {
1✔
952
        respTimer.cancel();
1✔
953
        respTimer = null;
1✔
954
      }
955

956
      // Include node ID in xds failures to allow cross-referencing with control plane logs
957
      // when debugging.
958
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
959
      Status errorAugmented = Status.fromCode(error.getCode())
1✔
960
          .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
1✔
961
          .withCause(error.getCause());
1✔
962
      this.lastError = errorAugmented;
1✔
963

964
      if (data != null) {
1✔
965
        // We have cached data, so this is an ambient error.
966
        onAmbientError(errorAugmented, tracker);
1✔
967
      } else {
968
        // No data, this is a definitive resource error.
969
        StatusOr<T> update = StatusOr.fromStatus(errorAugmented);
1✔
970
        for (Map.Entry<ResourceWatcher<T>, Executor> entry : watchers.entrySet()) {
1✔
971
          if (tracker != null) {
1✔
972
            tracker.startTask();
1✔
973
          }
974
          entry.getValue().execute(() -> {
1✔
975
            try {
976
              entry.getKey().onResourceChanged(update);
1✔
977
            } finally {
978
              if (tracker != null) {
1✔
979
                tracker.onComplete();
1✔
980
              }
981
            }
982
          });
1✔
983
        }
1✔
984
      }
985
    }
1✔
986

987
    private void onAmbientError(Status error, @Nullable ProcessingTracker tracker) {
988
      for (Map.Entry<ResourceWatcher<T>, Executor> entry : watchers.entrySet()) {
1✔
989
        if (tracker != null) {
1✔
990
          tracker.startTask();
1✔
991
        }
992
        entry.getValue().execute(() -> {
1✔
993
          try {
994
            entry.getKey().onAmbientError(error);
1✔
995
          } finally {
996
            if (tracker != null) {
1✔
997
              tracker.onComplete();
1✔
998
            }
999
          }
1000
        });
1✔
1001
      }
1✔
1002
    }
1✔
1003

1004
    void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
1005
      metadata = ResourceMetadata
1✔
1006
          .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails,
1✔
1007
              data != null);
1008
    }
1✔
1009
  }
1010

1011
  private class ResponseHandler implements XdsResponseHandler {
1012
    final ServerInfo serverInfo;
1013

1014
    ResponseHandler(ServerInfo serverInfo) {
1✔
1015
      this.serverInfo = serverInfo;
1✔
1016
    }
1✔
1017

1018
    @Override
1019
    public void handleResourceResponse(
1020
        XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
1021
        List<Any> resources, String nonce, boolean isFirstResponse,
1022
        ProcessingTracker processingTracker) {
1023
      checkNotNull(xdsResourceType, "xdsResourceType");
1✔
1024
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1025
      Set<String> toParseResourceNames =
1026
          xdsResourceType.shouldRetrieveResourceKeysForArgs()
1✔
1027
          ? getResourceKeys(xdsResourceType)
1✔
1028
          : null;
1✔
1029
      XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
1✔
1030
          bootstrapInfo, securityConfig, toParseResourceNames);
1✔
1031
      handleResourceUpdate(args, resources, xdsResourceType, isFirstResponse, processingTracker);
1✔
1032
    }
1✔
1033

1034
    @Override
1035
    public void handleStreamClosed(Status status, boolean shouldTryFallback) {
1036
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
1037

1038
      ControlPlaneClient cpcClosed = serverCpClientMap.get(serverInfo);
1✔
1039
      if (cpcClosed == null) {
1✔
1040
        logger.log(XdsLogLevel.DEBUG,
×
1041
            "Couldn't find closing CPC for {0}, so skipping cleanup and reporting", serverInfo);
1042
        return;
×
1043
      }
1044

1045
      cleanUpResourceTimers(cpcClosed);
1✔
1046

1047
      if (status.isOk()) {
1✔
1048
        return; // Not considered an error
1✔
1049
      }
1050

1051
      metricReporter.reportServerFailure(1L, serverInfo.target());
1✔
1052

1053
      Collection<String> authoritiesForClosedCpc = getActiveAuthorities(cpcClosed);
1✔
1054
      for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
1055
          resourceSubscribers.values()) {
1✔
1056
        for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
1057
          if (!authoritiesForClosedCpc.contains(subscriber.authority)) {
1✔
1058
            continue;
1✔
1059
          }
1060
          // If subscriber already has data, this is an ambient error.
1061
          if (subscriber.hasResult()) {
1✔
1062
            subscriber.onError(status, null);
1✔
1063
            continue;
1✔
1064
          }
1065

1066
          // try to fallback to lower priority control plane client
1067
          if (shouldTryFallback && manageControlPlaneClient(subscriber).didFallback) {
1✔
1068
            authoritiesForClosedCpc.remove(subscriber.authority);
1✔
1069
            if (authoritiesForClosedCpc.isEmpty()) {
1✔
1070
              return; // optimization: no need to continue once all authorities have done fallback
1✔
1071
            }
1072
            continue; // since we did fallback, don't consider it an error
1073
          }
1074

1075
          subscriber.onError(status, null);
1✔
1076
        }
1✔
1077
      }
1✔
1078
    }
1✔
1079
  }
1080

1081
  private static class CpcWithFallbackState {
1082
    ControlPlaneClient cpc;
1083
    boolean didFallback;
1084

1085
    private CpcWithFallbackState(ControlPlaneClient cpc, boolean didFallback) {
1✔
1086
      this.cpc = cpc;
1✔
1087
      this.didFallback = didFallback;
1✔
1088
    }
1✔
1089
  }
1090

1091
  private Collection<String> getActiveAuthorities(ControlPlaneClient cpc) {
1092
    List<String> asList = activatedCpClients.entrySet().stream()
1✔
1093
        .filter(entry -> !entry.getValue().isEmpty()
1✔
1094
            && cpc == entry.getValue().get(entry.getValue().size() - 1))
1✔
1095
        .map(Map.Entry::getKey)
1✔
1096
        .collect(Collectors.toList());
1✔
1097

1098
    // Since this is usually used for contains, use a set when the list is large
1099
    return (asList.size() < 100) ? asList : new HashSet<>(asList);
1✔
1100
  }
1101

1102
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc