• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19470

24 Sep 2024 11:18PM UTC coverage: 84.544% (-0.003%) from 84.547%
#19470

push

github

web-flow
xds: Check for validity of xdsClient in ClusterImplLbHelper (#11553)

* Added null check for xdsClient in onSubChannelState. This avoids NPE
for xdsClient when LB is shutdown and onSubChannelState is called later
as part of listener callback. As shutdown is racy and eventually consistent,
this check would avoid calculating locality after LB is shutdown.

33619 of 39765 relevant lines covered (84.54%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.86
/../xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
23
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Joiner;
27
import com.google.common.base.Stopwatch;
28
import com.google.common.base.Supplier;
29
import com.google.common.collect.ImmutableMap;
30
import com.google.common.collect.ImmutableSet;
31
import com.google.common.util.concurrent.ListenableFuture;
32
import com.google.common.util.concurrent.SettableFuture;
33
import com.google.protobuf.Any;
34
import io.grpc.Internal;
35
import io.grpc.InternalLogId;
36
import io.grpc.Status;
37
import io.grpc.SynchronizationContext;
38
import io.grpc.SynchronizationContext.ScheduledHandle;
39
import io.grpc.internal.BackoffPolicy;
40
import io.grpc.internal.TimeProvider;
41
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
42
import io.grpc.xds.client.Bootstrapper.ServerInfo;
43
import io.grpc.xds.client.XdsClient.ResourceStore;
44
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
45
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
46
import java.net.URI;
47
import java.util.Collection;
48
import java.util.Collections;
49
import java.util.HashMap;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Objects;
53
import java.util.Set;
54
import java.util.concurrent.Executor;
55
import java.util.concurrent.ScheduledExecutorService;
56
import java.util.concurrent.TimeUnit;
57
import javax.annotation.Nullable;
58

59
/**
60
 * XdsClient implementation.
61
 */
62
@Internal
63
public final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore {
64

65
  // Longest time to wait, since the subscription to some resource, for concluding its absence.
66
  @VisibleForTesting
67
  public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
68

69
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
70
      new Thread.UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          logger.log(
×
74
              XdsLogLevel.ERROR,
75
              "Uncaught exception in XdsClient SynchronizationContext. Panic!",
76
              e);
77
          // TODO(chengyuanzhang): better error handling.
78
          throw new AssertionError(e);
×
79
        }
80
      });
81

82
  private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap =
1✔
83
      new HashMap<>();
84
  final Map<ServerInfo, LoadReportClient> serverLrsClientMap =
1✔
85
      new HashMap<>();
86

87
  private final Map<ServerInfo, ControlPlaneClient> serverCpClientMap = new HashMap<>();
1✔
88
  private final Map<XdsResourceType<? extends ResourceUpdate>,
1✔
89
      Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
90
      resourceSubscribers = new HashMap<>();
91
  private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
1✔
92
  private final XdsTransportFactory xdsTransportFactory;
93
  private final Bootstrapper.BootstrapInfo bootstrapInfo;
94
  private final ScheduledExecutorService timeService;
95
  private final BackoffPolicy.Provider backoffPolicyProvider;
96
  private final Supplier<Stopwatch> stopwatchSupplier;
97
  private final TimeProvider timeProvider;
98
  private final Object securityConfig;
99
  private final InternalLogId logId;
100
  private final XdsLogger logger;
101
  private volatile boolean isShutdown;
102
  private final MessagePrettyPrinter messagePrinter;
103

104
  public XdsClientImpl(
105
      XdsTransportFactory xdsTransportFactory,
106
      Bootstrapper.BootstrapInfo bootstrapInfo,
107
      ScheduledExecutorService timeService,
108
      BackoffPolicy.Provider backoffPolicyProvider,
109
      Supplier<Stopwatch> stopwatchSupplier,
110
      TimeProvider timeProvider,
111
      MessagePrettyPrinter messagePrinter,
112
      Object securityConfig) {
1✔
113
    this.xdsTransportFactory = xdsTransportFactory;
1✔
114
    this.bootstrapInfo = bootstrapInfo;
1✔
115
    this.timeService = timeService;
1✔
116
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
117
    this.stopwatchSupplier = stopwatchSupplier;
1✔
118
    this.timeProvider = timeProvider;
1✔
119
    this.messagePrinter = messagePrinter;
1✔
120
    this.securityConfig = securityConfig;
1✔
121
    logId = InternalLogId.allocate("xds-client", null);
1✔
122
    logger = XdsLogger.withLogId(logId);
1✔
123
    logger.log(XdsLogLevel.INFO, "Created");
1✔
124
  }
1✔
125

126
  @Override
127
  public void handleResourceResponse(
128
      XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
129
      List<Any> resources, String nonce, ProcessingTracker processingTracker) {
130
    checkNotNull(xdsResourceType, "xdsResourceType");
1✔
131
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
132
    Set<String> toParseResourceNames =
133
        xdsResourceType.shouldRetrieveResourceKeysForArgs()
1✔
134
            ? getResourceKeys(xdsResourceType)
1✔
135
            : null;
1✔
136
    XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
1✔
137
        bootstrapInfo, securityConfig, toParseResourceNames);
138
    handleResourceUpdate(args, resources, xdsResourceType, processingTracker);
1✔
139
  }
1✔
140

141
  @Override
142
  public void handleStreamClosed(Status error) {
143
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
144
    cleanUpResourceTimers();
1✔
145
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
146
        resourceSubscribers.values()) {
1✔
147
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
148
        if (!subscriber.hasResult()) {
1✔
149
          subscriber.onError(error, null);
1✔
150
        }
151
      }
1✔
152
    }
1✔
153
  }
1✔
154

155
  @Override
156
  public void handleStreamRestarted(ServerInfo serverInfo) {
157
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
158
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
159
        resourceSubscribers.values()) {
1✔
160
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
161
        if (subscriber.serverInfo.equals(serverInfo)) {
1✔
162
          subscriber.restartTimer();
1✔
163
        }
164
      }
1✔
165
    }
1✔
166
  }
1✔
167

168
  @Override
169
  public void shutdown() {
170
    syncContext.execute(
1✔
171
        new Runnable() {
1✔
172
          @Override
173
          public void run() {
174
            if (isShutdown) {
1✔
175
              return;
×
176
            }
177
            isShutdown = true;
1✔
178
            for (ControlPlaneClient xdsChannel : serverCpClientMap.values()) {
1✔
179
              xdsChannel.shutdown();
1✔
180
            }
1✔
181
            for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
1✔
182
              lrsClient.stopLoadReporting();
1✔
183
            }
1✔
184
            cleanUpResourceTimers();
1✔
185
          }
1✔
186
        });
187
  }
1✔
188

189
  @Override
190
  public boolean isShutDown() {
191
    return isShutdown;
1✔
192
  }
193

194
  @Override
195
  public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
196
    return Collections.unmodifiableMap(subscribedResourceTypeUrls);
1✔
197
  }
198

199
  @Nullable
200
  @Override
201
  public Collection<String> getSubscribedResources(ServerInfo serverInfo,
202
                                                   XdsResourceType<? extends ResourceUpdate> type) {
203
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
1✔
204
        resourceSubscribers.getOrDefault(type, Collections.emptyMap());
1✔
205
    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
1✔
206
    for (String key : resources.keySet()) {
1✔
207
      if (resources.get(key).serverInfo.equals(serverInfo)) {
1✔
208
        builder.add(key);
1✔
209
      }
210
    }
1✔
211
    Collection<String> retVal = builder.build();
1✔
212
    return retVal.isEmpty() ? null : retVal;
1✔
213
  }
214

215
  // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
216
  // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
217
  @Override
218
  public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
219
      getSubscribedResourcesMetadataSnapshot() {
220
    final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
221
        SettableFuture.create();
1✔
222
    syncContext.execute(new Runnable() {
1✔
223
      @Override
224
      public void run() {
225
        // A map from a "resource type" to a map ("resource name": "resource metadata")
226
        ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
227
            ImmutableMap.builder();
1✔
228
        for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
1✔
229
          ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
1✔
230
          for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
231
              : resourceSubscribers.get(resourceType).entrySet()) {
1✔
232
            metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
1✔
233
          }
1✔
234
          metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
1✔
235
        }
1✔
236
        future.set(metadataSnapshot.buildOrThrow());
1✔
237
      }
1✔
238
    });
239
    return future;
1✔
240
  }
241

242
  @Override
243
  public Object getSecurityConfig() {
244
    return securityConfig;
×
245
  }
246

247
  @Override
248
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
249
      String resourceName,
250
      ResourceWatcher<T> watcher,
251
      Executor watcherExecutor) {
252
    syncContext.execute(new Runnable() {
1✔
253
      @Override
254
      @SuppressWarnings("unchecked")
255
      public void run() {
256
        if (!resourceSubscribers.containsKey(type)) {
1✔
257
          resourceSubscribers.put(type, new HashMap<>());
1✔
258
          subscribedResourceTypeUrls.put(type.typeUrl(), type);
1✔
259
        }
260
        ResourceSubscriber<T> subscriber =
1✔
261
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
262
        if (subscriber == null) {
1✔
263
          logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
1✔
264
          subscriber = new ResourceSubscriber<>(type, resourceName);
1✔
265
          resourceSubscribers.get(type).put(resourceName, subscriber);
1✔
266
          if (subscriber.controlPlaneClient != null) {
1✔
267
            subscriber.controlPlaneClient.adjustResourceSubscription(type);
1✔
268
          }
269
        }
270
        subscriber.addWatcher(watcher, watcherExecutor);
1✔
271
      }
1✔
272
    });
273
  }
1✔
274

275
  @Override
276
  public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
277
      String resourceName,
278
      ResourceWatcher<T> watcher) {
279
    syncContext.execute(new Runnable() {
1✔
280
      @Override
281
      @SuppressWarnings("unchecked")
282
      public void run() {
283
        ResourceSubscriber<T> subscriber =
1✔
284
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
285
        subscriber.removeWatcher(watcher);
1✔
286
        if (!subscriber.isWatched()) {
1✔
287
          subscriber.cancelResourceWatch();
1✔
288
          resourceSubscribers.get(type).remove(resourceName);
1✔
289
          if (subscriber.controlPlaneClient != null) {
1✔
290
            subscriber.controlPlaneClient.adjustResourceSubscription(type);
1✔
291
          }
292
          if (resourceSubscribers.get(type).isEmpty()) {
1✔
293
            resourceSubscribers.remove(type);
1✔
294
            subscribedResourceTypeUrls.remove(type.typeUrl());
1✔
295
          }
296
        }
297
      }
1✔
298
    });
299
  }
1✔
300

301
  @Override
302
  public LoadStatsManager2.ClusterDropStats addClusterDropStats(
303
      final ServerInfo serverInfo, String clusterName,
304
      @Nullable String edsServiceName) {
305
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
306
    LoadStatsManager2.ClusterDropStats dropCounter =
1✔
307
        loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
1✔
308
    syncContext.execute(new Runnable() {
1✔
309
      @Override
310
      public void run() {
311
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
312
      }
1✔
313
    });
314
    return dropCounter;
1✔
315
  }
316

317
  @Override
318
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
319
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
320
      Locality locality) {
321
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
322
    LoadStatsManager2.ClusterLocalityStats loadCounter =
1✔
323
        loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
1✔
324
    syncContext.execute(new Runnable() {
1✔
325
      @Override
326
      public void run() {
327
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
328
      }
1✔
329
    });
330
    return loadCounter;
1✔
331
  }
332

333

334
  @Override
335
  public Bootstrapper.BootstrapInfo getBootstrapInfo() {
336
    return bootstrapInfo;
1✔
337
  }
338

339
  @Override
340
  public String toString() {
341
    return logId.toString();
×
342
  }
343

344
  @Override
345
  protected void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
346
    if (isShutDown()) {
1✔
347
      return;
×
348
    }
349

350
    syncContext.execute(new Runnable() {
1✔
351
      @Override
352
      public void run() {
353
        if (isShutDown()) {
1✔
354
          return;
×
355
        }
356

357
        for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
358
          for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
359
            if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
1✔
360
              subscriber.restartTimer();
1✔
361
            }
362
          }
1✔
363
        }
1✔
364
      }
1✔
365
    });
366
  }
1✔
367

368
  private Set<String> getResourceKeys(XdsResourceType<?> xdsResourceType) {
369
    if (!resourceSubscribers.containsKey(xdsResourceType)) {
1✔
370
      return null;
×
371
    }
372

373
    return resourceSubscribers.get(xdsResourceType).keySet();
1✔
374
  }
375

376
  private void cleanUpResourceTimers() {
377
    for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
378
      for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
379
        subscriber.stopTimer();
1✔
380
      }
1✔
381
    }
1✔
382
  }
1✔
383

384
  public ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) {
385
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
386
    if (serverCpClientMap.containsKey(serverInfo)) {
1✔
387
      return serverCpClientMap.get(serverInfo);
1✔
388
    }
389

390
    XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo);
1✔
391
    ControlPlaneClient controlPlaneClient = new ControlPlaneClient(
1✔
392
        xdsTransport,
393
        serverInfo,
394
        bootstrapInfo.node(),
1✔
395
        this,
396
        this,
397
        timeService,
398
        syncContext,
399
        backoffPolicyProvider,
400
        stopwatchSupplier,
401
        this,
402
        messagePrinter);
403
    serverCpClientMap.put(serverInfo, controlPlaneClient);
1✔
404

405
    LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
1✔
406
    loadStatsManagerMap.put(serverInfo, loadStatsManager);
1✔
407
    LoadReportClient lrsClient = new LoadReportClient(
1✔
408
        loadStatsManager, xdsTransport, bootstrapInfo.node(),
1✔
409
        syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
410
    serverLrsClientMap.put(serverInfo, lrsClient);
1✔
411

412
    return controlPlaneClient;
1✔
413
  }
414

415
  @VisibleForTesting
416
  @Override
417
  public Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
418
    return ImmutableMap.copyOf(serverLrsClientMap);
1✔
419
  }
420

421
  @Nullable
422
  private ServerInfo getServerInfo(String resource) {
423
    if (resource.startsWith(XDSTP_SCHEME)) {
1✔
424
      URI uri = URI.create(resource);
1✔
425
      String authority = uri.getAuthority();
1✔
426
      if (authority == null) {
1✔
427
        authority = "";
1✔
428
      }
429
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
1✔
430
      if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
1✔
431
        return null;
1✔
432
      }
433
      return authorityInfo.xdsServers().get(0);
1✔
434
    } else {
435
      return bootstrapInfo.servers().get(0); // use first server
1✔
436
    }
437
  }
438

439
  @SuppressWarnings("unchecked")
440
  private <T extends ResourceUpdate> void handleResourceUpdate(
441
      XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType,
442
      ProcessingTracker processingTracker) {
443
    ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
1✔
444
    logger.log(XdsLogger.XdsLogLevel.INFO,
1✔
445
        "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
446
         xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
1✔
447
    Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
1✔
448
    Set<String> invalidResources = result.invalidResources;
1✔
449
    List<String> errors = result.errors;
1✔
450
    String errorDetail = null;
1✔
451
    if (errors.isEmpty()) {
1✔
452
      checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
1✔
453
      serverCpClientMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo,
1✔
454
          args.nonce);
455
    } else {
456
      errorDetail = Joiner.on('\n').join(errors);
1✔
457
      logger.log(XdsLogLevel.WARNING,
1✔
458
          "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
459
          xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
1✔
460
      serverCpClientMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail);
1✔
461
    }
462

463
    long updateTime = timeProvider.currentTimeNanos();
1✔
464
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
1✔
465
        resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
1✔
466
    for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
1✔
467
      String resourceName = entry.getKey();
1✔
468
      ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
1✔
469
      if (parsedResources.containsKey(resourceName)) {
1✔
470
        // Happy path: the resource updated successfully. Notify the watchers of the update.
471
        subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime,
1✔
472
            processingTracker);
473
        continue;
1✔
474
      }
475

476
      if (invalidResources.contains(resourceName)) {
1✔
477
        // The resource update is invalid. Capture the error without notifying the watchers.
478
        subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
1✔
479
      }
480

481
      // Nothing else to do for incremental ADS resources.
482
      if (!xdsResourceType.isFullStateOfTheWorld()) {
1✔
483
        continue;
1✔
484
      }
485

486
      // Handle State of the World ADS: invalid resources.
487
      if (invalidResources.contains(resourceName)) {
1✔
488
        // The resource is missing. Reuse the cached resource if possible.
489
        if (subscriber.data == null) {
1✔
490
          // No cached data. Notify the watchers of an invalid update.
491
          subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
1✔
492
        }
493
        continue;
494
      }
495

496
      // For State of the World services, notify watchers when their watched resource is missing
497
      // from the ADS update. Note that we can only do this if the resource update is coming from
498
      // the same xDS server that the ResourceSubscriber is subscribed to.
499
      if (subscriber.serverInfo.equals(args.serverInfo)) {
1✔
500
        subscriber.onAbsent(processingTracker);
1✔
501
      }
502
    }
1✔
503
  }
1✔
504

505
  /**
506
   * Tracks a single subscribed resource.
507
   */
508
  private final class ResourceSubscriber<T extends ResourceUpdate> {
509
    @Nullable private final ServerInfo serverInfo;
510
    @Nullable private final ControlPlaneClient controlPlaneClient;
511
    private final XdsResourceType<T> type;
512
    private final String resource;
513
    private final Map<ResourceWatcher<T>, Executor> watchers = new HashMap<>();
1✔
514
    @Nullable private T data;
515
    private boolean absent;
516
    // Tracks whether the deletion has been ignored per bootstrap server feature.
517
    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
518
    private boolean resourceDeletionIgnored;
519
    @Nullable private ScheduledHandle respTimer;
520
    @Nullable private ResourceMetadata metadata;
521
    @Nullable private String errorDescription;
522

523
    ResourceSubscriber(XdsResourceType<T> type, String resource) {
1✔
524
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
525
      this.type = type;
1✔
526
      this.resource = resource;
1✔
527
      this.serverInfo = getServerInfo(resource);
1✔
528
      if (serverInfo == null) {
1✔
529
        this.errorDescription = "Wrong configuration: xds server does not exist for resource "
1✔
530
            + resource;
531
        this.controlPlaneClient = null;
1✔
532
        return;
1✔
533
      }
534
      // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
535
      // is created but not yet requested because the client is in backoff.
536
      this.metadata = ResourceMetadata.newResourceMetadataUnknown();
1✔
537

538
      ControlPlaneClient controlPlaneClient = null;
1✔
539
      try {
540
        controlPlaneClient = getOrCreateControlPlaneClient(serverInfo);
1✔
541
        if (controlPlaneClient.isInBackoff()) {
1✔
542
          return;
1✔
543
        }
544
      } catch (IllegalArgumentException e) {
1✔
545
        controlPlaneClient = null;
1✔
546
        this.errorDescription = "Bad configuration:  " + e.getMessage();
1✔
547
        return;
1✔
548
      } finally {
549
        this.controlPlaneClient = controlPlaneClient;
1✔
550
      }
551

552
      restartTimer();
1✔
553
    }
1✔
554

555
    void addWatcher(ResourceWatcher<T> watcher, Executor watcherExecutor) {
556
      checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher);
1✔
557
      watchers.put(watcher, watcherExecutor);
1✔
558
      T savedData = data;
1✔
559
      boolean savedAbsent = absent;
1✔
560
      watcherExecutor.execute(() -> {
1✔
561
        if (errorDescription != null) {
1✔
562
          watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
1✔
563
          return;
1✔
564
        }
565
        if (savedData != null) {
1✔
566
          notifyWatcher(watcher, savedData);
1✔
567
        } else if (savedAbsent) {
1✔
568
          watcher.onResourceDoesNotExist(resource);
1✔
569
        }
570
      });
1✔
571
    }
1✔
572

573
    void removeWatcher(ResourceWatcher<T>  watcher) {
574
      checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher);
1✔
575
      watchers.remove(watcher);
1✔
576
    }
1✔
577

578
    void restartTimer() {
579
      if (data != null || absent) {  // resource already resolved
1✔
580
        return;
1✔
581
      }
582
      if (!controlPlaneClient.isReady()) { // When client becomes ready, it triggers a restartTimer
1✔
583
        return;
1✔
584
      }
585

586
      class ResourceNotFound implements Runnable {
1✔
587
        @Override
588
        public void run() {
589
          logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
1✔
590
              type, resource);
1✔
591
          respTimer = null;
1✔
592
          onAbsent(null);
1✔
593
        }
1✔
594

595
        @Override
596
        public String toString() {
597
          return type + this.getClass().getSimpleName();
1✔
598
        }
599
      }
600

601
      // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
602
      metadata = ResourceMetadata.newResourceMetadataRequested();
1✔
603

604
      respTimer = syncContext.schedule(
1✔
605
          new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
606
          timeService);
1✔
607
    }
1✔
608

609
    void stopTimer() {
610
      if (respTimer != null && respTimer.isPending()) {
1✔
611
        respTimer.cancel();
1✔
612
        respTimer = null;
1✔
613
      }
614
    }
1✔
615

616
    void cancelResourceWatch() {
617
      if (isWatched()) {
1✔
618
        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
×
619
      }
620
      stopTimer();
1✔
621
      String message = "Unsubscribing {0} resource {1} from server {2}";
1✔
622
      XdsLogLevel logLevel = XdsLogLevel.INFO;
1✔
623
      if (resourceDeletionIgnored) {
1✔
624
        message += " for which we previously ignored a deletion";
×
625
        logLevel = XdsLogLevel.FORCE_INFO;
×
626
      }
627
      logger.log(logLevel, message, type, resource,
1✔
628
          serverInfo != null ? serverInfo.target() : "unknown");
1✔
629
    }
1✔
630

631
    boolean isWatched() {
632
      return !watchers.isEmpty();
1✔
633
    }
634

635
    boolean hasResult() {
636
      return data != null || absent;
1✔
637
    }
638

639
    void onData(ParsedResource<T> parsedResource, String version, long updateTime,
640
                ProcessingTracker processingTracker) {
641
      if (respTimer != null && respTimer.isPending()) {
1✔
642
        respTimer.cancel();
1✔
643
        respTimer = null;
1✔
644
      }
645
      this.metadata = ResourceMetadata
1✔
646
          .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
1✔
647
      ResourceUpdate oldData = this.data;
1✔
648
      this.data = parsedResource.getResourceUpdate();
1✔
649
      absent = false;
1✔
650
      if (resourceDeletionIgnored) {
1✔
651
        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
1✔
652
                + "of resource for which we previously ignored a deletion: type {1} name {2}",
653
            serverInfo != null ? serverInfo.target() : "unknown", type, resource);
1✔
654
        resourceDeletionIgnored = false;
1✔
655
      }
656
      if (!Objects.equals(oldData, data)) {
1✔
657
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
658
          processingTracker.startTask();
1✔
659
          watchers.get(watcher).execute(() -> {
1✔
660
            try {
661
              notifyWatcher(watcher, data);
1✔
662
            } finally {
663
              processingTracker.onComplete();
1✔
664
            }
665
          });
1✔
666
        }
1✔
667
      }
668
    }
1✔
669

670
    void onAbsent(@Nullable ProcessingTracker processingTracker) {
671
      if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
1✔
672
        return;
1✔
673
      }
674

675
      // Ignore deletion of State of the World resources when this feature is on,
676
      // and the resource is reusable.
677
      boolean ignoreResourceDeletionEnabled =
1✔
678
          serverInfo != null && serverInfo.ignoreResourceDeletion();
1✔
679
      if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
1✔
680
        if (!resourceDeletionIgnored) {
1✔
681
          logger.log(XdsLogLevel.FORCE_WARNING,
1✔
682
              "xds server {0}: ignoring deletion for resource type {1} name {2}}",
683
              serverInfo.target(), type, resource);
1✔
684
          resourceDeletionIgnored = true;
1✔
685
        }
686
        return;
1✔
687
      }
688

689
      logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
1✔
690
      if (!absent) {
1✔
691
        data = null;
1✔
692
        absent = true;
1✔
693
        metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
694
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
695
          if (processingTracker != null) {
1✔
696
            processingTracker.startTask();
1✔
697
          }
698
          watchers.get(watcher).execute(() -> {
1✔
699
            try {
700
              watcher.onResourceDoesNotExist(resource);
1✔
701
            } finally {
702
              if (processingTracker != null) {
1✔
703
                processingTracker.onComplete();
1✔
704
              }
705
            }
706
          });
1✔
707
        }
1✔
708
      }
709
    }
1✔
710

711
    void onError(Status error, @Nullable ProcessingTracker tracker) {
712
      if (respTimer != null && respTimer.isPending()) {
1✔
713
        respTimer.cancel();
1✔
714
        respTimer = null;
1✔
715
      }
716

717
      // Include node ID in xds failures to allow cross-referencing with control plane logs
718
      // when debugging.
719
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
720
      Status errorAugmented = Status.fromCode(error.getCode())
1✔
721
          .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
1✔
722
          .withCause(error.getCause());
1✔
723

724
      for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
725
        if (tracker != null) {
1✔
726
          tracker.startTask();
1✔
727
        }
728
        watchers.get(watcher).execute(() -> {
1✔
729
          try {
730
            watcher.onError(errorAugmented);
1✔
731
          } finally {
732
            if (tracker != null) {
1✔
733
              tracker.onComplete();
1✔
734
            }
735
          }
736
        });
1✔
737
      }
1✔
738
    }
1✔
739

740
    void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
741
      metadata = ResourceMetadata
1✔
742
          .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
1✔
743
    }
1✔
744

745
    private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
746
      watcher.onChanged(update);
1✔
747
    }
1✔
748
  }
749

750
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc