• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19502

09 Oct 2024 12:28AM UTC coverage: 84.683% (+0.02%) from 84.659%
#19502

push

github

web-flow
xds: Update error handling for ADS stream close and failure scenarios (#11596)

When an ADS stream in closed with a non-OK status after receiving a response, new status will be updated to OK status. This makes the fail behavior consistent with gRFC A57.

33785 of 39896 relevant lines covered (84.68%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.87
/../xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
23
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Joiner;
27
import com.google.common.base.Stopwatch;
28
import com.google.common.base.Supplier;
29
import com.google.common.collect.ImmutableMap;
30
import com.google.common.collect.ImmutableSet;
31
import com.google.common.util.concurrent.ListenableFuture;
32
import com.google.common.util.concurrent.SettableFuture;
33
import com.google.protobuf.Any;
34
import io.grpc.Internal;
35
import io.grpc.InternalLogId;
36
import io.grpc.Status;
37
import io.grpc.SynchronizationContext;
38
import io.grpc.SynchronizationContext.ScheduledHandle;
39
import io.grpc.internal.BackoffPolicy;
40
import io.grpc.internal.TimeProvider;
41
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
42
import io.grpc.xds.client.Bootstrapper.ServerInfo;
43
import io.grpc.xds.client.XdsClient.ResourceStore;
44
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
45
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
46
import java.net.URI;
47
import java.util.Collection;
48
import java.util.Collections;
49
import java.util.HashMap;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Objects;
53
import java.util.Set;
54
import java.util.concurrent.Executor;
55
import java.util.concurrent.ScheduledExecutorService;
56
import java.util.concurrent.TimeUnit;
57
import javax.annotation.Nullable;
58

59
/**
60
 * XdsClient implementation.
61
 */
62
@Internal
63
public final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore {
64

65
  // Longest time to wait, since the subscription to some resource, for concluding its absence.
66
  @VisibleForTesting
67
  public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
68

69
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
70
      new Thread.UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          logger.log(
×
74
              XdsLogLevel.ERROR,
75
              "Uncaught exception in XdsClient SynchronizationContext. Panic!",
76
              e);
77
          // TODO(chengyuanzhang): better error handling.
78
          throw new AssertionError(e);
×
79
        }
80
      });
81

82
  private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap =
1✔
83
      new HashMap<>();
84
  final Map<ServerInfo, LoadReportClient> serverLrsClientMap =
1✔
85
      new HashMap<>();
86

87
  private final Map<ServerInfo, ControlPlaneClient> serverCpClientMap = new HashMap<>();
1✔
88
  private final Map<XdsResourceType<? extends ResourceUpdate>,
1✔
89
      Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
90
      resourceSubscribers = new HashMap<>();
91
  private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
1✔
92
  private final XdsTransportFactory xdsTransportFactory;
93
  private final Bootstrapper.BootstrapInfo bootstrapInfo;
94
  private final ScheduledExecutorService timeService;
95
  private final BackoffPolicy.Provider backoffPolicyProvider;
96
  private final Supplier<Stopwatch> stopwatchSupplier;
97
  private final TimeProvider timeProvider;
98
  private final Object securityConfig;
99
  private final InternalLogId logId;
100
  private final XdsLogger logger;
101
  private volatile boolean isShutdown;
102
  private final MessagePrettyPrinter messagePrinter;
103

104
  public XdsClientImpl(
105
      XdsTransportFactory xdsTransportFactory,
106
      Bootstrapper.BootstrapInfo bootstrapInfo,
107
      ScheduledExecutorService timeService,
108
      BackoffPolicy.Provider backoffPolicyProvider,
109
      Supplier<Stopwatch> stopwatchSupplier,
110
      TimeProvider timeProvider,
111
      MessagePrettyPrinter messagePrinter,
112
      Object securityConfig) {
1✔
113
    this.xdsTransportFactory = xdsTransportFactory;
1✔
114
    this.bootstrapInfo = bootstrapInfo;
1✔
115
    this.timeService = timeService;
1✔
116
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
117
    this.stopwatchSupplier = stopwatchSupplier;
1✔
118
    this.timeProvider = timeProvider;
1✔
119
    this.messagePrinter = messagePrinter;
1✔
120
    this.securityConfig = securityConfig;
1✔
121
    logId = InternalLogId.allocate("xds-client", null);
1✔
122
    logger = XdsLogger.withLogId(logId);
1✔
123
    logger.log(XdsLogLevel.INFO, "Created");
1✔
124
  }
1✔
125

126
  @Override
127
  public void handleResourceResponse(
128
      XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
129
      List<Any> resources, String nonce, ProcessingTracker processingTracker) {
130
    checkNotNull(xdsResourceType, "xdsResourceType");
1✔
131
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
132
    Set<String> toParseResourceNames =
133
        xdsResourceType.shouldRetrieveResourceKeysForArgs()
1✔
134
            ? getResourceKeys(xdsResourceType)
1✔
135
            : null;
1✔
136
    XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
1✔
137
        bootstrapInfo, securityConfig, toParseResourceNames);
138
    handleResourceUpdate(args, resources, xdsResourceType, processingTracker);
1✔
139
  }
1✔
140

141
  @Override
142
  public void handleStreamClosed(Status error) {
143
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
144
    cleanUpResourceTimers();
1✔
145
    if (!error.isOk()) {
1✔
146
      for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
147
          resourceSubscribers.values()) {
1✔
148
        for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
149
          if (!subscriber.hasResult()) {
1✔
150
            subscriber.onError(error, null);
1✔
151
          }
152
        }
1✔
153
      }
1✔
154
    }
155
  }
1✔
156

157
  @Override
158
  public void handleStreamRestarted(ServerInfo serverInfo) {
159
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
160
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
161
        resourceSubscribers.values()) {
1✔
162
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
163
        if (subscriber.serverInfo.equals(serverInfo)) {
1✔
164
          subscriber.restartTimer();
1✔
165
        }
166
      }
1✔
167
    }
1✔
168
  }
1✔
169

170
  @Override
171
  public void shutdown() {
172
    syncContext.execute(
1✔
173
        new Runnable() {
1✔
174
          @Override
175
          public void run() {
176
            if (isShutdown) {
1✔
177
              return;
×
178
            }
179
            isShutdown = true;
1✔
180
            for (ControlPlaneClient xdsChannel : serverCpClientMap.values()) {
1✔
181
              xdsChannel.shutdown();
1✔
182
            }
1✔
183
            for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
1✔
184
              lrsClient.stopLoadReporting();
1✔
185
            }
1✔
186
            cleanUpResourceTimers();
1✔
187
          }
1✔
188
        });
189
  }
1✔
190

191
  @Override
192
  public boolean isShutDown() {
193
    return isShutdown;
1✔
194
  }
195

196
  @Override
197
  public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
198
    return Collections.unmodifiableMap(subscribedResourceTypeUrls);
1✔
199
  }
200

201
  @Nullable
202
  @Override
203
  public Collection<String> getSubscribedResources(ServerInfo serverInfo,
204
                                                   XdsResourceType<? extends ResourceUpdate> type) {
205
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
1✔
206
        resourceSubscribers.getOrDefault(type, Collections.emptyMap());
1✔
207
    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
1✔
208
    for (String key : resources.keySet()) {
1✔
209
      if (resources.get(key).serverInfo.equals(serverInfo)) {
1✔
210
        builder.add(key);
1✔
211
      }
212
    }
1✔
213
    Collection<String> retVal = builder.build();
1✔
214
    return retVal.isEmpty() ? null : retVal;
1✔
215
  }
216

217
  // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
218
  // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
219
  @Override
220
  public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
221
      getSubscribedResourcesMetadataSnapshot() {
222
    final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
223
        SettableFuture.create();
1✔
224
    syncContext.execute(new Runnable() {
1✔
225
      @Override
226
      public void run() {
227
        // A map from a "resource type" to a map ("resource name": "resource metadata")
228
        ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
229
            ImmutableMap.builder();
1✔
230
        for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
1✔
231
          ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
1✔
232
          for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
233
              : resourceSubscribers.get(resourceType).entrySet()) {
1✔
234
            metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
1✔
235
          }
1✔
236
          metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
1✔
237
        }
1✔
238
        future.set(metadataSnapshot.buildOrThrow());
1✔
239
      }
1✔
240
    });
241
    return future;
1✔
242
  }
243

244
  @Override
245
  public Object getSecurityConfig() {
246
    return securityConfig;
×
247
  }
248

249
  @Override
250
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
251
      String resourceName,
252
      ResourceWatcher<T> watcher,
253
      Executor watcherExecutor) {
254
    syncContext.execute(new Runnable() {
1✔
255
      @Override
256
      @SuppressWarnings("unchecked")
257
      public void run() {
258
        if (!resourceSubscribers.containsKey(type)) {
1✔
259
          resourceSubscribers.put(type, new HashMap<>());
1✔
260
          subscribedResourceTypeUrls.put(type.typeUrl(), type);
1✔
261
        }
262
        ResourceSubscriber<T> subscriber =
1✔
263
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
264
        if (subscriber == null) {
1✔
265
          logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
1✔
266
          subscriber = new ResourceSubscriber<>(type, resourceName);
1✔
267
          resourceSubscribers.get(type).put(resourceName, subscriber);
1✔
268
          if (subscriber.controlPlaneClient != null) {
1✔
269
            subscriber.controlPlaneClient.adjustResourceSubscription(type);
1✔
270
          }
271
        }
272
        subscriber.addWatcher(watcher, watcherExecutor);
1✔
273
      }
1✔
274
    });
275
  }
1✔
276

277
  @Override
278
  public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
279
      String resourceName,
280
      ResourceWatcher<T> watcher) {
281
    syncContext.execute(new Runnable() {
1✔
282
      @Override
283
      @SuppressWarnings("unchecked")
284
      public void run() {
285
        ResourceSubscriber<T> subscriber =
1✔
286
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
287
        subscriber.removeWatcher(watcher);
1✔
288
        if (!subscriber.isWatched()) {
1✔
289
          subscriber.cancelResourceWatch();
1✔
290
          resourceSubscribers.get(type).remove(resourceName);
1✔
291
          if (subscriber.controlPlaneClient != null) {
1✔
292
            subscriber.controlPlaneClient.adjustResourceSubscription(type);
1✔
293
          }
294
          if (resourceSubscribers.get(type).isEmpty()) {
1✔
295
            resourceSubscribers.remove(type);
1✔
296
            subscribedResourceTypeUrls.remove(type.typeUrl());
1✔
297
          }
298
        }
299
      }
1✔
300
    });
301
  }
1✔
302

303
  @Override
304
  public LoadStatsManager2.ClusterDropStats addClusterDropStats(
305
      final ServerInfo serverInfo, String clusterName,
306
      @Nullable String edsServiceName) {
307
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
308
    LoadStatsManager2.ClusterDropStats dropCounter =
1✔
309
        loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
1✔
310
    syncContext.execute(new Runnable() {
1✔
311
      @Override
312
      public void run() {
313
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
314
      }
1✔
315
    });
316
    return dropCounter;
1✔
317
  }
318

319
  @Override
320
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
321
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
322
      Locality locality) {
323
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
324
    LoadStatsManager2.ClusterLocalityStats loadCounter =
1✔
325
        loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
1✔
326
    syncContext.execute(new Runnable() {
1✔
327
      @Override
328
      public void run() {
329
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
330
      }
1✔
331
    });
332
    return loadCounter;
1✔
333
  }
334

335

336
  @Override
337
  public Bootstrapper.BootstrapInfo getBootstrapInfo() {
338
    return bootstrapInfo;
1✔
339
  }
340

341
  @Override
342
  public String toString() {
343
    return logId.toString();
×
344
  }
345

346
  @Override
347
  protected void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
348
    if (isShutDown()) {
1✔
349
      return;
×
350
    }
351

352
    syncContext.execute(new Runnable() {
1✔
353
      @Override
354
      public void run() {
355
        if (isShutDown()) {
1✔
356
          return;
×
357
        }
358

359
        for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
360
          for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
361
            if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
1✔
362
              subscriber.restartTimer();
1✔
363
            }
364
          }
1✔
365
        }
1✔
366
      }
1✔
367
    });
368
  }
1✔
369

370
  private Set<String> getResourceKeys(XdsResourceType<?> xdsResourceType) {
371
    if (!resourceSubscribers.containsKey(xdsResourceType)) {
1✔
372
      return null;
×
373
    }
374

375
    return resourceSubscribers.get(xdsResourceType).keySet();
1✔
376
  }
377

378
  private void cleanUpResourceTimers() {
379
    for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
380
      for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
381
        subscriber.stopTimer();
1✔
382
      }
1✔
383
    }
1✔
384
  }
1✔
385

386
  public ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) {
387
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
388
    if (serverCpClientMap.containsKey(serverInfo)) {
1✔
389
      return serverCpClientMap.get(serverInfo);
1✔
390
    }
391

392
    XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo);
1✔
393
    ControlPlaneClient controlPlaneClient = new ControlPlaneClient(
1✔
394
        xdsTransport,
395
        serverInfo,
396
        bootstrapInfo.node(),
1✔
397
        this,
398
        this,
399
        timeService,
400
        syncContext,
401
        backoffPolicyProvider,
402
        stopwatchSupplier,
403
        this,
404
        messagePrinter);
405
    serverCpClientMap.put(serverInfo, controlPlaneClient);
1✔
406

407
    LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
1✔
408
    loadStatsManagerMap.put(serverInfo, loadStatsManager);
1✔
409
    LoadReportClient lrsClient = new LoadReportClient(
1✔
410
        loadStatsManager, xdsTransport, bootstrapInfo.node(),
1✔
411
        syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
412
    serverLrsClientMap.put(serverInfo, lrsClient);
1✔
413

414
    return controlPlaneClient;
1✔
415
  }
416

417
  @VisibleForTesting
418
  @Override
419
  public Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
420
    return ImmutableMap.copyOf(serverLrsClientMap);
1✔
421
  }
422

423
  @Nullable
424
  private ServerInfo getServerInfo(String resource) {
425
    if (resource.startsWith(XDSTP_SCHEME)) {
1✔
426
      URI uri = URI.create(resource);
1✔
427
      String authority = uri.getAuthority();
1✔
428
      if (authority == null) {
1✔
429
        authority = "";
1✔
430
      }
431
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
1✔
432
      if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
1✔
433
        return null;
1✔
434
      }
435
      return authorityInfo.xdsServers().get(0);
1✔
436
    } else {
437
      return bootstrapInfo.servers().get(0); // use first server
1✔
438
    }
439
  }
440

441
  @SuppressWarnings("unchecked")
442
  private <T extends ResourceUpdate> void handleResourceUpdate(
443
      XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType,
444
      ProcessingTracker processingTracker) {
445
    ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
1✔
446
    logger.log(XdsLogger.XdsLogLevel.INFO,
1✔
447
        "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
448
         xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
1✔
449
    Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
1✔
450
    Set<String> invalidResources = result.invalidResources;
1✔
451
    List<String> errors = result.errors;
1✔
452
    String errorDetail = null;
1✔
453
    if (errors.isEmpty()) {
1✔
454
      checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
1✔
455
      serverCpClientMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo,
1✔
456
          args.nonce);
457
    } else {
458
      errorDetail = Joiner.on('\n').join(errors);
1✔
459
      logger.log(XdsLogLevel.WARNING,
1✔
460
          "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
461
          xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
1✔
462
      serverCpClientMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail);
1✔
463
    }
464

465
    long updateTime = timeProvider.currentTimeNanos();
1✔
466
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
1✔
467
        resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
1✔
468
    for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
1✔
469
      String resourceName = entry.getKey();
1✔
470
      ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
1✔
471
      if (parsedResources.containsKey(resourceName)) {
1✔
472
        // Happy path: the resource updated successfully. Notify the watchers of the update.
473
        subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime,
1✔
474
            processingTracker);
475
        continue;
1✔
476
      }
477

478
      if (invalidResources.contains(resourceName)) {
1✔
479
        // The resource update is invalid. Capture the error without notifying the watchers.
480
        subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
1✔
481
      }
482

483
      // Nothing else to do for incremental ADS resources.
484
      if (!xdsResourceType.isFullStateOfTheWorld()) {
1✔
485
        continue;
1✔
486
      }
487

488
      // Handle State of the World ADS: invalid resources.
489
      if (invalidResources.contains(resourceName)) {
1✔
490
        // The resource is missing. Reuse the cached resource if possible.
491
        if (subscriber.data == null) {
1✔
492
          // No cached data. Notify the watchers of an invalid update.
493
          subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
1✔
494
        }
495
        continue;
496
      }
497

498
      // For State of the World services, notify watchers when their watched resource is missing
499
      // from the ADS update. Note that we can only do this if the resource update is coming from
500
      // the same xDS server that the ResourceSubscriber is subscribed to.
501
      if (subscriber.serverInfo.equals(args.serverInfo)) {
1✔
502
        subscriber.onAbsent(processingTracker);
1✔
503
      }
504
    }
1✔
505
  }
1✔
506

507
  /**
508
   * Tracks a single subscribed resource.
509
   */
510
  private final class ResourceSubscriber<T extends ResourceUpdate> {
511
    @Nullable private final ServerInfo serverInfo;
512
    @Nullable private final ControlPlaneClient controlPlaneClient;
513
    private final XdsResourceType<T> type;
514
    private final String resource;
515
    private final Map<ResourceWatcher<T>, Executor> watchers = new HashMap<>();
1✔
516
    @Nullable private T data;
517
    private boolean absent;
518
    // Tracks whether the deletion has been ignored per bootstrap server feature.
519
    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
520
    private boolean resourceDeletionIgnored;
521
    @Nullable private ScheduledHandle respTimer;
522
    @Nullable private ResourceMetadata metadata;
523
    @Nullable private String errorDescription;
524

525
    ResourceSubscriber(XdsResourceType<T> type, String resource) {
1✔
526
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
527
      this.type = type;
1✔
528
      this.resource = resource;
1✔
529
      this.serverInfo = getServerInfo(resource);
1✔
530
      if (serverInfo == null) {
1✔
531
        this.errorDescription = "Wrong configuration: xds server does not exist for resource "
1✔
532
            + resource;
533
        this.controlPlaneClient = null;
1✔
534
        return;
1✔
535
      }
536
      // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
537
      // is created but not yet requested because the client is in backoff.
538
      this.metadata = ResourceMetadata.newResourceMetadataUnknown();
1✔
539

540
      ControlPlaneClient controlPlaneClient = null;
1✔
541
      try {
542
        controlPlaneClient = getOrCreateControlPlaneClient(serverInfo);
1✔
543
        if (controlPlaneClient.isInBackoff()) {
1✔
544
          return;
1✔
545
        }
546
      } catch (IllegalArgumentException e) {
1✔
547
        controlPlaneClient = null;
1✔
548
        this.errorDescription = "Bad configuration:  " + e.getMessage();
1✔
549
        return;
1✔
550
      } finally {
551
        this.controlPlaneClient = controlPlaneClient;
1✔
552
      }
553

554
      restartTimer();
1✔
555
    }
1✔
556

557
    void addWatcher(ResourceWatcher<T> watcher, Executor watcherExecutor) {
558
      checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher);
1✔
559
      watchers.put(watcher, watcherExecutor);
1✔
560
      T savedData = data;
1✔
561
      boolean savedAbsent = absent;
1✔
562
      watcherExecutor.execute(() -> {
1✔
563
        if (errorDescription != null) {
1✔
564
          watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
1✔
565
          return;
1✔
566
        }
567
        if (savedData != null) {
1✔
568
          notifyWatcher(watcher, savedData);
1✔
569
        } else if (savedAbsent) {
1✔
570
          watcher.onResourceDoesNotExist(resource);
1✔
571
        }
572
      });
1✔
573
    }
1✔
574

575
    void removeWatcher(ResourceWatcher<T>  watcher) {
576
      checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher);
1✔
577
      watchers.remove(watcher);
1✔
578
    }
1✔
579

580
    void restartTimer() {
581
      if (data != null || absent) {  // resource already resolved
1✔
582
        return;
1✔
583
      }
584
      if (!controlPlaneClient.isReady()) { // When client becomes ready, it triggers a restartTimer
1✔
585
        return;
1✔
586
      }
587

588
      class ResourceNotFound implements Runnable {
1✔
589
        @Override
590
        public void run() {
591
          logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
1✔
592
              type, resource);
1✔
593
          respTimer = null;
1✔
594
          onAbsent(null);
1✔
595
        }
1✔
596

597
        @Override
598
        public String toString() {
599
          return type + this.getClass().getSimpleName();
1✔
600
        }
601
      }
602

603
      // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
604
      metadata = ResourceMetadata.newResourceMetadataRequested();
1✔
605

606
      respTimer = syncContext.schedule(
1✔
607
          new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
608
          timeService);
1✔
609
    }
1✔
610

611
    void stopTimer() {
612
      if (respTimer != null && respTimer.isPending()) {
1✔
613
        respTimer.cancel();
1✔
614
        respTimer = null;
1✔
615
      }
616
    }
1✔
617

618
    void cancelResourceWatch() {
619
      if (isWatched()) {
1✔
620
        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
×
621
      }
622
      stopTimer();
1✔
623
      String message = "Unsubscribing {0} resource {1} from server {2}";
1✔
624
      XdsLogLevel logLevel = XdsLogLevel.INFO;
1✔
625
      if (resourceDeletionIgnored) {
1✔
626
        message += " for which we previously ignored a deletion";
×
627
        logLevel = XdsLogLevel.FORCE_INFO;
×
628
      }
629
      logger.log(logLevel, message, type, resource,
1✔
630
          serverInfo != null ? serverInfo.target() : "unknown");
1✔
631
    }
1✔
632

633
    boolean isWatched() {
634
      return !watchers.isEmpty();
1✔
635
    }
636

637
    boolean hasResult() {
638
      return data != null || absent;
1✔
639
    }
640

641
    void onData(ParsedResource<T> parsedResource, String version, long updateTime,
642
                ProcessingTracker processingTracker) {
643
      if (respTimer != null && respTimer.isPending()) {
1✔
644
        respTimer.cancel();
1✔
645
        respTimer = null;
1✔
646
      }
647
      this.metadata = ResourceMetadata
1✔
648
          .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
1✔
649
      ResourceUpdate oldData = this.data;
1✔
650
      this.data = parsedResource.getResourceUpdate();
1✔
651
      absent = false;
1✔
652
      if (resourceDeletionIgnored) {
1✔
653
        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
1✔
654
                + "of resource for which we previously ignored a deletion: type {1} name {2}",
655
            serverInfo != null ? serverInfo.target() : "unknown", type, resource);
1✔
656
        resourceDeletionIgnored = false;
1✔
657
      }
658
      if (!Objects.equals(oldData, data)) {
1✔
659
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
660
          processingTracker.startTask();
1✔
661
          watchers.get(watcher).execute(() -> {
1✔
662
            try {
663
              notifyWatcher(watcher, data);
1✔
664
            } finally {
665
              processingTracker.onComplete();
1✔
666
            }
667
          });
1✔
668
        }
1✔
669
      }
670
    }
1✔
671

672
    void onAbsent(@Nullable ProcessingTracker processingTracker) {
673
      if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
1✔
674
        return;
1✔
675
      }
676

677
      // Ignore deletion of State of the World resources when this feature is on,
678
      // and the resource is reusable.
679
      boolean ignoreResourceDeletionEnabled =
1✔
680
          serverInfo != null && serverInfo.ignoreResourceDeletion();
1✔
681
      if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
1✔
682
        if (!resourceDeletionIgnored) {
1✔
683
          logger.log(XdsLogLevel.FORCE_WARNING,
1✔
684
              "xds server {0}: ignoring deletion for resource type {1} name {2}}",
685
              serverInfo.target(), type, resource);
1✔
686
          resourceDeletionIgnored = true;
1✔
687
        }
688
        return;
1✔
689
      }
690

691
      logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
1✔
692
      if (!absent) {
1✔
693
        data = null;
1✔
694
        absent = true;
1✔
695
        metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
696
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
697
          if (processingTracker != null) {
1✔
698
            processingTracker.startTask();
1✔
699
          }
700
          watchers.get(watcher).execute(() -> {
1✔
701
            try {
702
              watcher.onResourceDoesNotExist(resource);
1✔
703
            } finally {
704
              if (processingTracker != null) {
1✔
705
                processingTracker.onComplete();
1✔
706
              }
707
            }
708
          });
1✔
709
        }
1✔
710
      }
711
    }
1✔
712

713
    void onError(Status error, @Nullable ProcessingTracker tracker) {
714
      if (respTimer != null && respTimer.isPending()) {
1✔
715
        respTimer.cancel();
1✔
716
        respTimer = null;
1✔
717
      }
718

719
      // Include node ID in xds failures to allow cross-referencing with control plane logs
720
      // when debugging.
721
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
722
      Status errorAugmented = Status.fromCode(error.getCode())
1✔
723
          .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
1✔
724
          .withCause(error.getCause());
1✔
725

726
      for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
727
        if (tracker != null) {
1✔
728
          tracker.startTask();
1✔
729
        }
730
        watchers.get(watcher).execute(() -> {
1✔
731
          try {
732
            watcher.onError(errorAugmented);
1✔
733
          } finally {
734
            if (tracker != null) {
1✔
735
              tracker.onComplete();
1✔
736
            }
737
          }
738
        });
1✔
739
      }
1✔
740
    }
1✔
741

742
    void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
743
      metadata = ResourceMetadata
1✔
744
          .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
1✔
745
    }
1✔
746

747
    private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
748
      watcher.onChanged(update);
1✔
749
    }
1✔
750
  }
751

752
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc