• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18923

28 Nov 2023 12:26AM CUT coverage: 88.258% (-0.005%) from 88.263%
#18923

push

github

ejona86
Bump version to 1.59.2-SNAPSHOT

30320 of 34354 relevant lines covered (88.26%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.84
/../xds/src/main/java/io/grpc/xds/XdsClientImpl.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
22
import static io.grpc.xds.XdsResourceType.ParsedResource;
23
import static io.grpc.xds.XdsResourceType.ValidatedResourceUpdate;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Joiner;
27
import com.google.common.base.Stopwatch;
28
import com.google.common.base.Supplier;
29
import com.google.common.collect.ImmutableMap;
30
import com.google.common.collect.ImmutableSet;
31
import com.google.common.util.concurrent.ListenableFuture;
32
import com.google.common.util.concurrent.SettableFuture;
33
import com.google.protobuf.Any;
34
import io.grpc.ChannelCredentials;
35
import io.grpc.Context;
36
import io.grpc.Grpc;
37
import io.grpc.InternalLogId;
38
import io.grpc.LoadBalancerRegistry;
39
import io.grpc.ManagedChannel;
40
import io.grpc.Status;
41
import io.grpc.SynchronizationContext;
42
import io.grpc.SynchronizationContext.ScheduledHandle;
43
import io.grpc.internal.BackoffPolicy;
44
import io.grpc.internal.TimeProvider;
45
import io.grpc.xds.Bootstrapper.AuthorityInfo;
46
import io.grpc.xds.Bootstrapper.ServerInfo;
47
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
48
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
49
import io.grpc.xds.XdsClient.ResourceStore;
50
import io.grpc.xds.XdsClient.TimerLaunch;
51
import io.grpc.xds.XdsClient.XdsResponseHandler;
52
import io.grpc.xds.XdsLogger.XdsLogLevel;
53
import java.net.URI;
54
import java.util.Collection;
55
import java.util.Collections;
56
import java.util.HashMap;
57
import java.util.HashSet;
58
import java.util.List;
59
import java.util.Map;
60
import java.util.Objects;
61
import java.util.Set;
62
import java.util.concurrent.ScheduledExecutorService;
63
import java.util.concurrent.TimeUnit;
64
import java.util.logging.Level;
65
import java.util.logging.Logger;
66
import javax.annotation.Nullable;
67

68
/**
69
 * XdsClient implementation.
70
 */
71
final class XdsClientImpl extends XdsClient
72
    implements XdsResponseHandler, ResourceStore, TimerLaunch {
73

74
  private static boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
1✔
75
      System.getenv("GRPC_LOG_XDS_NODE_ID"));
1✔
76
  private static final Logger classLogger = Logger.getLogger(XdsClientImpl.class.getName());
1✔
77

78
  // Longest time to wait, since the subscription to some resource, for concluding its absence.
79
  @VisibleForTesting
80
  static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
81
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
82
      new Thread.UncaughtExceptionHandler() {
1✔
83
        @Override
84
        public void uncaughtException(Thread t, Throwable e) {
85
          logger.log(
×
86
              XdsLogLevel.ERROR,
87
              "Uncaught exception in XdsClient SynchronizationContext. Panic!",
88
              e);
89
          // TODO(chengyuanzhang): better error handling.
90
          throw new AssertionError(e);
×
91
        }
92
      });
93
  private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
1✔
94
  private final LoadBalancerRegistry loadBalancerRegistry
1✔
95
      = LoadBalancerRegistry.getDefaultRegistry();
1✔
96
  private final Map<ServerInfo, ControlPlaneClient> serverChannelMap = new HashMap<>();
1✔
97
  private final Map<XdsResourceType<? extends ResourceUpdate>,
1✔
98
      Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
99
      resourceSubscribers = new HashMap<>();
100
  private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
1✔
101
  private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
1✔
102
  private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
1✔
103
  private final XdsChannelFactory xdsChannelFactory;
104
  private final Bootstrapper.BootstrapInfo bootstrapInfo;
105
  private final Context context;
106
  private final ScheduledExecutorService timeService;
107
  private final BackoffPolicy.Provider backoffPolicyProvider;
108
  private final Supplier<Stopwatch> stopwatchSupplier;
109
  private final TimeProvider timeProvider;
110
  private final TlsContextManager tlsContextManager;
111
  private final InternalLogId logId;
112
  private final XdsLogger logger;
113
  private volatile boolean isShutdown;
114

115
  XdsClientImpl(
116
      XdsChannelFactory xdsChannelFactory,
117
      Bootstrapper.BootstrapInfo bootstrapInfo,
118
      Context context,
119
      ScheduledExecutorService timeService,
120
      BackoffPolicy.Provider backoffPolicyProvider,
121
      Supplier<Stopwatch> stopwatchSupplier,
122
      TimeProvider timeProvider,
123
      TlsContextManager tlsContextManager) {
1✔
124
    this.xdsChannelFactory = xdsChannelFactory;
1✔
125
    this.bootstrapInfo = bootstrapInfo;
1✔
126
    this.context = context;
1✔
127
    this.timeService = timeService;
1✔
128
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
129
    this.stopwatchSupplier = stopwatchSupplier;
1✔
130
    this.timeProvider = timeProvider;
1✔
131
    this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager");
1✔
132
    logId = InternalLogId.allocate("xds-client", null);
1✔
133
    logger = XdsLogger.withLogId(logId);
1✔
134
    logger.log(XdsLogLevel.INFO, "Created");
1✔
135
    if (LOG_XDS_NODE_ID) {
1✔
136
      classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
×
137
    }
138
  }
1✔
139

140
  private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
141
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
142
    if (serverChannelMap.containsKey(serverInfo)) {
1✔
143
      return;
1✔
144
    }
145
    ControlPlaneClient xdsChannel = new ControlPlaneClient(
1✔
146
        xdsChannelFactory,
147
        serverInfo,
148
        bootstrapInfo.node(),
1✔
149
        this,
150
        this,
151
        context,
152
        timeService,
153
        syncContext,
154
        backoffPolicyProvider,
155
        stopwatchSupplier,
156
        this);
157
    LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
1✔
158
    loadStatsManagerMap.put(serverInfo, loadStatsManager);
1✔
159
    LoadReportClient lrsClient = new LoadReportClient(
1✔
160
        loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
1✔
161
        timeService, backoffPolicyProvider, stopwatchSupplier);
162
    serverChannelMap.put(serverInfo, xdsChannel);
1✔
163
    serverLrsClientMap.put(serverInfo, lrsClient);
1✔
164
  }
1✔
165

166
  @Override
167
  public void handleResourceResponse(
168
      XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
169
      List<Any> resources, String nonce) {
170
    checkNotNull(xdsResourceType, "xdsResourceType");
1✔
171
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
172
    Set<String> toParseResourceNames = null;
1✔
173
    if (!(xdsResourceType == XdsListenerResource.getInstance()
1✔
174
        || xdsResourceType == XdsRouteConfigureResource.getInstance())
1✔
175
        && resourceSubscribers.containsKey(xdsResourceType)) {
1✔
176
      toParseResourceNames = resourceSubscribers.get(xdsResourceType).keySet();
1✔
177
    }
178
    XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
1✔
179
        bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager,
180
        toParseResourceNames);
181
    handleResourceUpdate(args, resources, xdsResourceType);
1✔
182
  }
1✔
183

184
  @Override
185
  public void handleStreamClosed(Status error) {
186
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
187
    cleanUpResourceTimers();
1✔
188
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
189
        resourceSubscribers.values()) {
1✔
190
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
191
        if (!subscriber.hasResult()) {
1✔
192
          subscriber.onError(error);
1✔
193
        }
194
      }
1✔
195
    }
1✔
196
  }
1✔
197

198
  @Override
199
  public void handleStreamRestarted(ServerInfo serverInfo) {
200
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
201
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
202
        resourceSubscribers.values()) {
1✔
203
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
204
        if (subscriber.serverInfo.equals(serverInfo)) {
1✔
205
          subscriber.restartTimer();
1✔
206
        }
207
      }
1✔
208
    }
1✔
209
  }
1✔
210

211
  @Override
212
  void shutdown() {
213
    syncContext.execute(
1✔
214
        new Runnable() {
1✔
215
          @Override
216
          public void run() {
217
            if (isShutdown) {
1✔
218
              return;
×
219
            }
220
            isShutdown = true;
1✔
221
            for (ControlPlaneClient xdsChannel : serverChannelMap.values()) {
1✔
222
              xdsChannel.shutdown();
1✔
223
            }
1✔
224
            for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
1✔
225
              lrsClient.stopLoadReporting();
1✔
226
            }
1✔
227
            cleanUpResourceTimers();
1✔
228
          }
1✔
229
        });
230
  }
1✔
231

232
  @Override
233
  boolean isShutDown() {
234
    return isShutdown;
1✔
235
  }
236

237
  @Override
238
  public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
239
    return Collections.unmodifiableMap(subscribedResourceTypeUrls);
1✔
240
  }
241

242
  @Nullable
243
  @Override
244
  public Collection<String> getSubscribedResources(ServerInfo serverInfo,
245
                                                   XdsResourceType<? extends ResourceUpdate> type) {
246
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
1✔
247
        resourceSubscribers.getOrDefault(type, Collections.emptyMap());
1✔
248
    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
1✔
249
    for (String key : resources.keySet()) {
1✔
250
      if (resources.get(key).serverInfo.equals(serverInfo)) {
1✔
251
        builder.add(key);
1✔
252
      }
253
    }
1✔
254
    Collection<String> retVal = builder.build();
1✔
255
    return retVal.isEmpty() ? null : retVal;
1✔
256
  }
257

258
  // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
259
  // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
260
  @Override
261
  ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
262
      getSubscribedResourcesMetadataSnapshot() {
263
    final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
264
        SettableFuture.create();
1✔
265
    syncContext.execute(new Runnable() {
1✔
266
      @Override
267
      public void run() {
268
        // A map from a "resource type" to a map ("resource name": "resource metadata")
269
        ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
270
            ImmutableMap.builder();
1✔
271
        for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
1✔
272
          ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
1✔
273
          for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
274
              : resourceSubscribers.get(resourceType).entrySet()) {
1✔
275
            metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
1✔
276
          }
1✔
277
          metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
1✔
278
        }
1✔
279
        future.set(metadataSnapshot.buildOrThrow());
1✔
280
      }
1✔
281
    });
282
    return future;
1✔
283
  }
284

285
  @Override
286
  TlsContextManager getTlsContextManager() {
287
    return tlsContextManager;
×
288
  }
289

290
  @Override
291
  <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
292
                                                   ResourceWatcher<T> watcher) {
293
    syncContext.execute(new Runnable() {
1✔
294
      @Override
295
      @SuppressWarnings("unchecked")
296
      public void run() {
297
        if (!resourceSubscribers.containsKey(type)) {
1✔
298
          resourceSubscribers.put(type, new HashMap<>());
1✔
299
          subscribedResourceTypeUrls.put(type.typeUrl(), type);
1✔
300
        }
301
        ResourceSubscriber<T> subscriber =
1✔
302
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
1✔
303
        if (subscriber == null) {
1✔
304
          logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
1✔
305
          subscriber = new ResourceSubscriber<>(type, resourceName);
1✔
306
          resourceSubscribers.get(type).put(resourceName, subscriber);
1✔
307
          if (subscriber.xdsChannel != null) {
1✔
308
            subscriber.xdsChannel.adjustResourceSubscription(type);
1✔
309
          }
310
        }
311
        subscriber.addWatcher(watcher);
1✔
312
      }
1✔
313
    });
314
  }
1✔
315

316
  @Override
317
  <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
318
                                                         String resourceName,
319
                                                         ResourceWatcher<T> watcher) {
320
    syncContext.execute(new Runnable() {
1✔
321
      @Override
322
      @SuppressWarnings("unchecked")
323
      public void run() {
324
        ResourceSubscriber<T> subscriber =
1✔
325
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
1✔
326
        subscriber.removeWatcher(watcher);
1✔
327
        if (!subscriber.isWatched()) {
1✔
328
          subscriber.cancelResourceWatch();
1✔
329
          resourceSubscribers.get(type).remove(resourceName);
1✔
330
          if (subscriber.xdsChannel != null) {
1✔
331
            subscriber.xdsChannel.adjustResourceSubscription(type);
1✔
332
          }
333
          if (resourceSubscribers.get(type).isEmpty()) {
1✔
334
            resourceSubscribers.remove(type);
1✔
335
            subscribedResourceTypeUrls.remove(type.typeUrl());
1✔
336

337
          }
338
        }
339
      }
1✔
340
    });
341
  }
1✔
342

343
  @Override
344
  ClusterDropStats addClusterDropStats(
345
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
346
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
347
    ClusterDropStats dropCounter =
1✔
348
        loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
1✔
349
    syncContext.execute(new Runnable() {
1✔
350
      @Override
351
      public void run() {
352
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
353
      }
1✔
354
    });
355
    return dropCounter;
1✔
356
  }
357

358
  @Override
359
  ClusterLocalityStats addClusterLocalityStats(
360
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
361
      Locality locality) {
362
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
363
    ClusterLocalityStats loadCounter =
1✔
364
        loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
1✔
365
    syncContext.execute(new Runnable() {
1✔
366
      @Override
367
      public void run() {
368
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
369
      }
1✔
370
    });
371
    return loadCounter;
1✔
372
  }
373

374
  @Override
375
  Bootstrapper.BootstrapInfo getBootstrapInfo() {
376
    return bootstrapInfo;
1✔
377
  }
378

379
  @VisibleForTesting
380
  @Override
381
  Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
382
    return ImmutableMap.copyOf(serverLrsClientMap);
1✔
383
  }
384

385
  @Override
386
  public String toString() {
387
    return logId.toString();
×
388
  }
389

390
  @Override
391
  public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
392
    if (isShutDown()) {
1✔
393
      return;
×
394
    }
395

396
    syncContext.execute(new Runnable() {
1✔
397
      @Override
398
      public void run() {
399
        if (isShutDown()) {
1✔
400
          return;
×
401
        }
402

403
        for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
404
          for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
405
            if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
1✔
406
              subscriber.restartTimer();
1✔
407
            }
408
          }
1✔
409
        }
1✔
410
      }
1✔
411
    });
412
  }
1✔
413

414
  private void cleanUpResourceTimers() {
415
    for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
416
      for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
417
        subscriber.stopTimer();
1✔
418
      }
1✔
419
    }
1✔
420
  }
1✔
421

422
  @SuppressWarnings("unchecked")
423
  private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Args args,
424
                                                               List<Any> resources,
425
                                                               XdsResourceType<T> xdsResourceType) {
426
    ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
1✔
427
    logger.log(XdsLogger.XdsLogLevel.INFO,
1✔
428
        "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
429
         xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
1✔
430
    Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
1✔
431
    Set<String> invalidResources = result.invalidResources;
1✔
432
    List<String> errors = result.errors;
1✔
433
    String errorDetail = null;
1✔
434
    if (errors.isEmpty()) {
1✔
435
      checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
1✔
436
      serverChannelMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo,
1✔
437
          args.nonce);
438
    } else {
439
      errorDetail = Joiner.on('\n').join(errors);
1✔
440
      logger.log(XdsLogLevel.WARNING,
1✔
441
          "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
442
          xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
1✔
443
      serverChannelMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail);
1✔
444
    }
445

446
    long updateTime = timeProvider.currentTimeNanos();
1✔
447
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
1✔
448
        resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
1✔
449
    for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
1✔
450
      String resourceName = entry.getKey();
1✔
451
      ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
1✔
452

453
      if (parsedResources.containsKey(resourceName)) {
1✔
454
        // Happy path: the resource updated successfully. Notify the watchers of the update.
455
        subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
1✔
456
        continue;
1✔
457
      }
458

459
      if (invalidResources.contains(resourceName)) {
1✔
460
        // The resource update is invalid. Capture the error without notifying the watchers.
461
        subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
1✔
462
      }
463

464
      // Nothing else to do for incremental ADS resources.
465
      if (!xdsResourceType.isFullStateOfTheWorld()) {
1✔
466
        continue;
1✔
467
      }
468

469
      // Handle State of the World ADS: invalid resources.
470
      if (invalidResources.contains(resourceName)) {
1✔
471
        // The resource is missing. Reuse the cached resource if possible.
472
        if (subscriber.data == null) {
1✔
473
          // No cached data. Notify the watchers of an invalid update.
474
          subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
1✔
475
        }
476
        continue;
477
      }
478

479
      // For State of the World services, notify watchers when their watched resource is missing
480
      // from the ADS update. Note that we can only do this if the resource update is coming from
481
      // the same xDS server that the ResourceSubscriber is subscribed to.
482
      if (subscriber.serverInfo.equals(args.serverInfo)) {
1✔
483
        subscriber.onAbsent();
1✔
484
      }
485
    }
1✔
486
  }
1✔
487

488
  /**
489
   * Tracks a single subscribed resource.
490
   */
491
  private final class ResourceSubscriber<T extends ResourceUpdate> {
492
    @Nullable private final ServerInfo serverInfo;
493
    @Nullable private final ControlPlaneClient xdsChannel;
494
    private final XdsResourceType<T> type;
495
    private final String resource;
496
    private final Set<ResourceWatcher<T>> watchers = new HashSet<>();
1✔
497
    @Nullable private T data;
498
    private boolean absent;
499
    // Tracks whether the deletion has been ignored per bootstrap server feature.
500
    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
501
    private boolean resourceDeletionIgnored;
502
    @Nullable private ScheduledHandle respTimer;
503
    @Nullable private ResourceMetadata metadata;
504
    @Nullable private String errorDescription;
505

506
    ResourceSubscriber(XdsResourceType<T> type, String resource) {
1✔
507
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
508
      this.type = type;
1✔
509
      this.resource = resource;
1✔
510
      this.serverInfo = getServerInfo(resource);
1✔
511
      if (serverInfo == null) {
1✔
512
        this.errorDescription = "Wrong configuration: xds server does not exist for resource "
1✔
513
            + resource;
514
        this.xdsChannel = null;
1✔
515
        return;
1✔
516
      }
517
      // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
518
      // is created but not yet requested because the client is in backoff.
519
      this.metadata = ResourceMetadata.newResourceMetadataUnknown();
1✔
520

521
      ControlPlaneClient xdsChannelTemp = null;
1✔
522
      try {
523
        maybeCreateXdsChannelWithLrs(serverInfo);
1✔
524
        xdsChannelTemp = serverChannelMap.get(serverInfo);
1✔
525
        if (xdsChannelTemp.isInBackoff()) {
1✔
526
          return;
1✔
527
        }
528
      } catch (IllegalArgumentException e) {
1✔
529
        xdsChannelTemp = null;
1✔
530
        this.errorDescription = "Bad configuration:  " + e.getMessage();
1✔
531
        return;
1✔
532
      } finally {
533
        this.xdsChannel = xdsChannelTemp;
1✔
534
      }
535

536
      restartTimer();
1✔
537
    }
1✔
538

539
    @Nullable
540
    private ServerInfo getServerInfo(String resource) {
541
      if (BootstrapperImpl.enableFederation && resource.startsWith(XDSTP_SCHEME)) {
1✔
542
        URI uri = URI.create(resource);
1✔
543
        String authority = uri.getAuthority();
1✔
544
        if (authority == null) {
1✔
545
          authority = "";
1✔
546
        }
547
        AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
1✔
548
        if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
1✔
549
          return null;
1✔
550
        }
551
        return authorityInfo.xdsServers().get(0);
1✔
552
      }
553
      return bootstrapInfo.servers().get(0); // use first server
1✔
554
    }
555

556
    void addWatcher(ResourceWatcher<T> watcher) {
557
      checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
1✔
558
      watchers.add(watcher);
1✔
559
      if (errorDescription != null) {
1✔
560
        watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
1✔
561
        return;
1✔
562
      }
563
      if (data != null) {
1✔
564
        notifyWatcher(watcher, data);
1✔
565
      } else if (absent) {
1✔
566
        watcher.onResourceDoesNotExist(resource);
1✔
567
      }
568
    }
1✔
569

570
    void removeWatcher(ResourceWatcher<T>  watcher) {
571
      checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher);
1✔
572
      watchers.remove(watcher);
1✔
573
    }
1✔
574

575
    void restartTimer() {
576
      if (data != null || absent) {  // resource already resolved
1✔
577
        return;
1✔
578
      }
579
      if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer
1✔
580
        return;
1✔
581
      }
582

583
      class ResourceNotFound implements Runnable {
1✔
584
        @Override
585
        public void run() {
586
          logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
1✔
587
              type, resource);
1✔
588
          respTimer = null;
1✔
589
          onAbsent();
1✔
590
        }
1✔
591

592
        @Override
593
        public String toString() {
594
          return type + this.getClass().getSimpleName();
1✔
595
        }
596
      }
597

598
      // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
599
      metadata = ResourceMetadata.newResourceMetadataRequested();
1✔
600

601
      respTimer = syncContext.schedule(
1✔
602
          new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
603
          timeService);
1✔
604
    }
1✔
605

606
    void stopTimer() {
607
      if (respTimer != null && respTimer.isPending()) {
1✔
608
        respTimer.cancel();
1✔
609
        respTimer = null;
1✔
610
      }
611
    }
1✔
612

613
    void cancelResourceWatch() {
614
      if (isWatched()) {
1✔
615
        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
×
616
      }
617
      stopTimer();
1✔
618
      String message = "Unsubscribing {0} resource {1} from server {2}";
1✔
619
      XdsLogLevel logLevel = XdsLogLevel.INFO;
1✔
620
      if (resourceDeletionIgnored) {
1✔
621
        message += " for which we previously ignored a deletion";
×
622
        logLevel = XdsLogLevel.FORCE_INFO;
×
623
      }
624
      logger.log(logLevel, message, type, resource,
1✔
625
          serverInfo != null ? serverInfo.target() : "unknown");
1✔
626
    }
1✔
627

628
    boolean isWatched() {
629
      return !watchers.isEmpty();
1✔
630
    }
631

632
    boolean hasResult() {
633
      return data != null || absent;
1✔
634
    }
635

636
    void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
637
      if (respTimer != null && respTimer.isPending()) {
1✔
638
        respTimer.cancel();
1✔
639
        respTimer = null;
1✔
640
      }
641
      this.metadata = ResourceMetadata
1✔
642
          .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
1✔
643
      ResourceUpdate oldData = this.data;
1✔
644
      this.data = parsedResource.getResourceUpdate();
1✔
645
      absent = false;
1✔
646
      if (resourceDeletionIgnored) {
1✔
647
        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
1✔
648
                + "of resource for which we previously ignored a deletion: type {1} name {2}",
649
            serverInfo != null ? serverInfo.target() : "unknown", type, resource);
1✔
650
        resourceDeletionIgnored = false;
1✔
651
      }
652
      if (!Objects.equals(oldData, data)) {
1✔
653
        for (ResourceWatcher<T> watcher : watchers) {
1✔
654
          notifyWatcher(watcher, data);
1✔
655
        }
1✔
656
      }
657
    }
1✔
658

659
    void onAbsent() {
660
      if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
1✔
661
        return;
1✔
662
      }
663

664
      // Ignore deletion of State of the World resources when this feature is on,
665
      // and the resource is reusable.
666
      boolean ignoreResourceDeletionEnabled =
1✔
667
          serverInfo != null && serverInfo.ignoreResourceDeletion();
1✔
668
      if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
1✔
669
        if (!resourceDeletionIgnored) {
1✔
670
          logger.log(XdsLogLevel.FORCE_WARNING,
1✔
671
              "xds server {0}: ignoring deletion for resource type {1} name {2}}",
672
              serverInfo.target(), type, resource);
1✔
673
          resourceDeletionIgnored = true;
1✔
674
        }
675
        return;
1✔
676
      }
677

678
      logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
1✔
679
      if (!absent) {
1✔
680
        data = null;
1✔
681
        absent = true;
1✔
682
        metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
683
        for (ResourceWatcher<T> watcher : watchers) {
1✔
684
          watcher.onResourceDoesNotExist(resource);
1✔
685
        }
1✔
686
      }
687
    }
1✔
688

689
    void onError(Status error) {
690
      if (respTimer != null && respTimer.isPending()) {
1✔
691
        respTimer.cancel();
1✔
692
        respTimer = null;
1✔
693
      }
694

695
      // Include node ID in xds failures to allow cross-referencing with control plane logs
696
      // when debugging.
697
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
698
      Status errorAugmented = Status.fromCode(error.getCode())
1✔
699
          .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
1✔
700
          .withCause(error.getCause());
1✔
701

702
      for (ResourceWatcher<T> watcher : watchers) {
1✔
703
        watcher.onError(errorAugmented);
1✔
704
      }
1✔
705
    }
1✔
706

707
    void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
708
      metadata = ResourceMetadata
1✔
709
          .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
1✔
710
    }
1✔
711

712
    private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
713
      watcher.onChanged(update);
1✔
714
    }
1✔
715
  }
716

717
  static final class ResourceInvalidException extends Exception {
718
    private static final long serialVersionUID = 0L;
719

720
    ResourceInvalidException(String message) {
721
      super(message, null, false, false);
1✔
722
    }
1✔
723

724
    ResourceInvalidException(String message, Throwable cause) {
725
      super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
1✔
726
    }
1✔
727
  }
728

729
  abstract static class XdsChannelFactory {
1✔
730
    static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() {
1✔
731
      @Override
732
      ManagedChannel create(ServerInfo serverInfo) {
733
        String target = serverInfo.target();
1✔
734
        ChannelCredentials channelCredentials = serverInfo.channelCredentials();
1✔
735
        return Grpc.newChannelBuilder(target, channelCredentials)
1✔
736
            .keepAliveTime(5, TimeUnit.MINUTES)
1✔
737
            .build();
1✔
738
      }
739
    };
740

741
    abstract ManagedChannel create(ServerInfo serverInfo);
742
  }
743
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc