• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18904

17 Nov 2023 11:10PM UTC coverage: 88.226% (+0.02%) from 88.211%
#18904

push

github

web-flow
netty: Add option to limit RST_STREAM rate

The behavior purposefully mirrors that of Netty's
AbstractHttp2ConnectionHandlerBuilder.decoderEnforceMaxRstFramesPerWindow().
That API is not available to our code as we extend the
Http2ConnectionHandler, but we want our API to be able to delegate to
Netty's in the future if that ever becomes possible.

30370 of 34423 relevant lines covered (88.23%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.84
/../xds/src/main/java/io/grpc/xds/XdsClientImpl.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;
22
import static io.grpc.xds.XdsResourceType.ParsedResource;
23
import static io.grpc.xds.XdsResourceType.ValidatedResourceUpdate;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Joiner;
27
import com.google.common.base.Stopwatch;
28
import com.google.common.base.Supplier;
29
import com.google.common.collect.ImmutableMap;
30
import com.google.common.collect.ImmutableSet;
31
import com.google.common.util.concurrent.ListenableFuture;
32
import com.google.common.util.concurrent.SettableFuture;
33
import com.google.protobuf.Any;
34
import io.grpc.ChannelCredentials;
35
import io.grpc.Context;
36
import io.grpc.Grpc;
37
import io.grpc.InternalLogId;
38
import io.grpc.LoadBalancerRegistry;
39
import io.grpc.ManagedChannel;
40
import io.grpc.Status;
41
import io.grpc.SynchronizationContext;
42
import io.grpc.SynchronizationContext.ScheduledHandle;
43
import io.grpc.internal.BackoffPolicy;
44
import io.grpc.internal.TimeProvider;
45
import io.grpc.xds.Bootstrapper.AuthorityInfo;
46
import io.grpc.xds.Bootstrapper.ServerInfo;
47
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
48
import io.grpc.xds.LoadStatsManager2.ClusterLocalityStats;
49
import io.grpc.xds.XdsClient.ResourceStore;
50
import io.grpc.xds.XdsClient.TimerLaunch;
51
import io.grpc.xds.XdsClient.XdsResponseHandler;
52
import io.grpc.xds.XdsLogger.XdsLogLevel;
53
import java.net.URI;
54
import java.util.Collection;
55
import java.util.Collections;
56
import java.util.HashMap;
57
import java.util.HashSet;
58
import java.util.List;
59
import java.util.Map;
60
import java.util.Objects;
61
import java.util.Set;
62
import java.util.concurrent.ScheduledExecutorService;
63
import java.util.concurrent.TimeUnit;
64
import java.util.logging.Level;
65
import java.util.logging.Logger;
66
import javax.annotation.Nullable;
67

68
/**
69
 * XdsClient implementation.
70
 */
71
final class XdsClientImpl extends XdsClient
72
    implements XdsResponseHandler, ResourceStore, TimerLaunch {
73

74
  private static boolean LOG_XDS_NODE_ID = Boolean.parseBoolean(
1✔
75
      System.getenv("GRPC_LOG_XDS_NODE_ID"));
1✔
76
  private static final Logger classLogger = Logger.getLogger(XdsClientImpl.class.getName());
1✔
77

78
  // Longest time to wait, since the subscription to some resource, for concluding its absence.
79
  @VisibleForTesting
80
  static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
81
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
82
      new Thread.UncaughtExceptionHandler() {
1✔
83
        @Override
84
        public void uncaughtException(Thread t, Throwable e) {
85
          logger.log(
×
86
              XdsLogLevel.ERROR,
87
              "Uncaught exception in XdsClient SynchronizationContext. Panic!",
88
              e);
89
          // TODO(chengyuanzhang): better error handling.
90
          throw new AssertionError(e);
×
91
        }
92
      });
93
  private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
1✔
94
  private final LoadBalancerRegistry loadBalancerRegistry
1✔
95
      = LoadBalancerRegistry.getDefaultRegistry();
1✔
96
  private final Map<ServerInfo, ControlPlaneClient> serverChannelMap = new HashMap<>();
1✔
97
  private final Map<XdsResourceType<? extends ResourceUpdate>,
1✔
98
      Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
99
      resourceSubscribers = new HashMap<>();
100
  private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
1✔
101
  private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap = new HashMap<>();
1✔
102
  private final Map<ServerInfo, LoadReportClient> serverLrsClientMap = new HashMap<>();
1✔
103
  private final XdsChannelFactory xdsChannelFactory;
104
  private final Bootstrapper.BootstrapInfo bootstrapInfo;
105
  private final Context context;
106
  private final ScheduledExecutorService timeService;
107
  private final BackoffPolicy.Provider backoffPolicyProvider;
108
  private final Supplier<Stopwatch> stopwatchSupplier;
109
  private final TimeProvider timeProvider;
110
  private final TlsContextManager tlsContextManager;
111
  private final InternalLogId logId;
112
  private final XdsLogger logger;
113
  private volatile boolean isShutdown;
114

115
  XdsClientImpl(
116
      XdsChannelFactory xdsChannelFactory,
117
      Bootstrapper.BootstrapInfo bootstrapInfo,
118
      Context context,
119
      ScheduledExecutorService timeService,
120
      BackoffPolicy.Provider backoffPolicyProvider,
121
      Supplier<Stopwatch> stopwatchSupplier,
122
      TimeProvider timeProvider,
123
      TlsContextManager tlsContextManager) {
1✔
124
    this.xdsChannelFactory = xdsChannelFactory;
1✔
125
    this.bootstrapInfo = bootstrapInfo;
1✔
126
    this.context = context;
1✔
127
    this.timeService = timeService;
1✔
128
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
129
    this.stopwatchSupplier = stopwatchSupplier;
1✔
130
    this.timeProvider = timeProvider;
1✔
131
    this.tlsContextManager = checkNotNull(tlsContextManager, "tlsContextManager");
1✔
132
    logId = InternalLogId.allocate("xds-client", null);
1✔
133
    logger = XdsLogger.withLogId(logId);
1✔
134
    logger.log(XdsLogLevel.INFO, "Created");
1✔
135
    if (LOG_XDS_NODE_ID) {
1✔
136
      classLogger.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
×
137
    }
138
  }
1✔
139

140
  private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) {
141
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
142
    if (serverChannelMap.containsKey(serverInfo)) {
1✔
143
      return;
1✔
144
    }
145
    ControlPlaneClient xdsChannel = new ControlPlaneClient(
1✔
146
        xdsChannelFactory,
147
        serverInfo,
148
        bootstrapInfo.node(),
1✔
149
        this,
150
        this,
151
        context,
152
        timeService,
153
        syncContext,
154
        backoffPolicyProvider,
155
        stopwatchSupplier,
156
        this);
157
    LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
1✔
158
    loadStatsManagerMap.put(serverInfo, loadStatsManager);
1✔
159
    LoadReportClient lrsClient = new LoadReportClient(
1✔
160
        loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext,
1✔
161
        timeService, backoffPolicyProvider, stopwatchSupplier);
162
    serverChannelMap.put(serverInfo, xdsChannel);
1✔
163
    serverLrsClientMap.put(serverInfo, lrsClient);
1✔
164
  }
1✔
165

166
  @Override
167
  public void handleResourceResponse(
168
      XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
169
      List<Any> resources, String nonce) {
170
    checkNotNull(xdsResourceType, "xdsResourceType");
1✔
171
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
172
    Set<String> toParseResourceNames = null;
1✔
173
    if (!(xdsResourceType == XdsListenerResource.getInstance()
1✔
174
        || xdsResourceType == XdsRouteConfigureResource.getInstance())
1✔
175
        && resourceSubscribers.containsKey(xdsResourceType)) {
1✔
176
      toParseResourceNames = resourceSubscribers.get(xdsResourceType).keySet();
1✔
177
    }
178
    XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
1✔
179
        bootstrapInfo, filterRegistry, loadBalancerRegistry, tlsContextManager,
180
        toParseResourceNames);
181
    handleResourceUpdate(args, resources, xdsResourceType);
1✔
182
  }
1✔
183

184
  @Override
185
  public void handleStreamClosed(Status error) {
186
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
187
    cleanUpResourceTimers();
1✔
188
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
189
        resourceSubscribers.values()) {
1✔
190
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
191
        if (!subscriber.hasResult()) {
1✔
192
          subscriber.onError(error);
1✔
193
        }
194
      }
1✔
195
    }
1✔
196
  }
1✔
197

198
  @Override
199
  public void handleStreamRestarted(ServerInfo serverInfo) {
200
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
201
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
202
        resourceSubscribers.values()) {
1✔
203
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
204
        if (subscriber.serverInfo.equals(serverInfo)) {
1✔
205
          subscriber.restartTimer();
1✔
206
        }
207
      }
1✔
208
    }
1✔
209
  }
1✔
210

211
  @Override
212
  void shutdown() {
213
    syncContext.execute(
1✔
214
        new Runnable() {
1✔
215
          @Override
216
          public void run() {
217
            if (isShutdown) {
1✔
218
              return;
×
219
            }
220
            isShutdown = true;
1✔
221
            for (ControlPlaneClient xdsChannel : serverChannelMap.values()) {
1✔
222
              xdsChannel.shutdown();
1✔
223
            }
1✔
224
            for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
1✔
225
              lrsClient.stopLoadReporting();
1✔
226
            }
1✔
227
            cleanUpResourceTimers();
1✔
228
          }
1✔
229
        });
230
  }
1✔
231

232
  @Override
233
  boolean isShutDown() {
234
    return isShutdown;
1✔
235
  }
236

237
  @Override
238
  public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
239
    return Collections.unmodifiableMap(subscribedResourceTypeUrls);
1✔
240
  }
241

242
  @Nullable
243
  @Override
244
  public Collection<String> getSubscribedResources(ServerInfo serverInfo,
245
                                                   XdsResourceType<? extends ResourceUpdate> type) {
246
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
1✔
247
        resourceSubscribers.getOrDefault(type, Collections.emptyMap());
1✔
248
    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
1✔
249
    for (String key : resources.keySet()) {
1✔
250
      if (resources.get(key).serverInfo.equals(serverInfo)) {
1✔
251
        builder.add(key);
1✔
252
      }
253
    }
1✔
254
    Collection<String> retVal = builder.build();
1✔
255
    return retVal.isEmpty() ? null : retVal;
1✔
256
  }
257

258
  // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
259
  // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
260
  @Override
261
  ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
262
      getSubscribedResourcesMetadataSnapshot() {
263
    final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
264
        SettableFuture.create();
1✔
265
    syncContext.execute(new Runnable() {
1✔
266
      @Override
267
      public void run() {
268
        // A map from a "resource type" to a map ("resource name": "resource metadata")
269
        ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
270
            ImmutableMap.builder();
1✔
271
        for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
1✔
272
          ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
1✔
273
          for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
274
              : resourceSubscribers.get(resourceType).entrySet()) {
1✔
275
            metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
1✔
276
          }
1✔
277
          metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
1✔
278
        }
1✔
279
        future.set(metadataSnapshot.buildOrThrow());
1✔
280
      }
1✔
281
    });
282
    return future;
1✔
283
  }
284

285
  @Override
286
  TlsContextManager getTlsContextManager() {
287
    return tlsContextManager;
×
288
  }
289

290
  @Override
291
  <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
292
                                                   ResourceWatcher<T> watcher) {
293
    syncContext.execute(new Runnable() {
1✔
294
      @Override
295
      @SuppressWarnings("unchecked")
296
      public void run() {
297
        if (!resourceSubscribers.containsKey(type)) {
1✔
298
          resourceSubscribers.put(type, new HashMap<>());
1✔
299
          subscribedResourceTypeUrls.put(type.typeUrl(), type);
1✔
300
        }
301
        ResourceSubscriber<T> subscriber =
1✔
302
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
1✔
303
        if (subscriber == null) {
1✔
304
          logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
1✔
305
          subscriber = new ResourceSubscriber<>(type, resourceName);
1✔
306
          resourceSubscribers.get(type).put(resourceName, subscriber);
1✔
307
          if (subscriber.xdsChannel != null) {
1✔
308
            subscriber.xdsChannel.adjustResourceSubscription(type);
1✔
309
          }
310
        }
311
        subscriber.addWatcher(watcher);
1✔
312
      }
1✔
313
    });
314
  }
1✔
315

316
  @Override
317
  <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
318
                                                         String resourceName,
319
                                                         ResourceWatcher<T> watcher) {
320
    syncContext.execute(new Runnable() {
1✔
321
      @Override
322
      @SuppressWarnings("unchecked")
323
      public void run() {
324
        ResourceSubscriber<T> subscriber =
1✔
325
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);;
1✔
326
        subscriber.removeWatcher(watcher);
1✔
327
        if (!subscriber.isWatched()) {
1✔
328
          subscriber.cancelResourceWatch();
1✔
329
          resourceSubscribers.get(type).remove(resourceName);
1✔
330
          if (subscriber.xdsChannel != null) {
1✔
331
            subscriber.xdsChannel.adjustResourceSubscription(type);
1✔
332
          }
333
          if (resourceSubscribers.get(type).isEmpty()) {
1✔
334
            resourceSubscribers.remove(type);
1✔
335
            subscribedResourceTypeUrls.remove(type.typeUrl());
1✔
336

337
          }
338
        }
339
      }
1✔
340
    });
341
  }
1✔
342

343
  @Override
344
  ClusterDropStats addClusterDropStats(
345
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
346
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
347
    ClusterDropStats dropCounter =
1✔
348
        loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
1✔
349
    syncContext.execute(new Runnable() {
1✔
350
      @Override
351
      public void run() {
352
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
353
      }
1✔
354
    });
355
    return dropCounter;
1✔
356
  }
357

358
  @Override
359
  ClusterLocalityStats addClusterLocalityStats(
360
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
361
      Locality locality) {
362
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
363
    ClusterLocalityStats loadCounter =
1✔
364
        loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
1✔
365
    syncContext.execute(new Runnable() {
1✔
366
      @Override
367
      public void run() {
368
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
369
      }
1✔
370
    });
371
    return loadCounter;
1✔
372
  }
373

374
  @Override
375
  Bootstrapper.BootstrapInfo getBootstrapInfo() {
376
    return bootstrapInfo;
1✔
377
  }
378

379
  @VisibleForTesting
380
  @Override
381
  Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
382
    return ImmutableMap.copyOf(serverLrsClientMap);
1✔
383
  }
384

385
  @Override
386
  public String toString() {
387
    return logId.toString();
×
388
  }
389

390
  @Override
391
  public void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
392
    if (isShutDown()) {
1✔
393
      return;
×
394
    }
395

396
    syncContext.execute(new Runnable() {
1✔
397
      @Override
398
      public void run() {
399
        if (isShutDown()) {
1✔
400
          return;
×
401
        }
402

403
        for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
404
          for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
405
            if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
1✔
406
              subscriber.restartTimer();
1✔
407
            }
408
          }
1✔
409
        }
1✔
410
      }
1✔
411
    });
412
  }
1✔
413

414
  private void cleanUpResourceTimers() {
415
    for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
416
      for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
417
        subscriber.stopTimer();
1✔
418
      }
1✔
419
    }
1✔
420
  }
1✔
421

422
  @SuppressWarnings("unchecked")
423
  private <T extends ResourceUpdate> void handleResourceUpdate(XdsResourceType.Args args,
424
                                                               List<Any> resources,
425
                                                               XdsResourceType<T> xdsResourceType) {
426
    ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
1✔
427
    logger.log(XdsLogger.XdsLogLevel.INFO,
1✔
428
        "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
429
         xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
1✔
430
    Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
1✔
431
    Set<String> invalidResources = result.invalidResources;
1✔
432
    List<String> errors = result.errors;
1✔
433
    String errorDetail = null;
1✔
434
    if (errors.isEmpty()) {
1✔
435
      checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
1✔
436
      serverChannelMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo,
1✔
437
          args.nonce);
438
    } else {
439
      errorDetail = Joiner.on('\n').join(errors);
1✔
440
      logger.log(XdsLogLevel.WARNING,
1✔
441
          "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
442
          xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
1✔
443
      serverChannelMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail);
1✔
444
    }
445

446
    long updateTime = timeProvider.currentTimeNanos();
1✔
447
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
1✔
448
        resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
1✔
449
    for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
1✔
450
      String resourceName = entry.getKey();
1✔
451
      ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
1✔
452

453
      if (parsedResources.containsKey(resourceName)) {
1✔
454
        // Happy path: the resource updated successfully. Notify the watchers of the update.
455
        subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime);
1✔
456
        continue;
1✔
457
      }
458

459
      if (invalidResources.contains(resourceName)) {
1✔
460
        // The resource update is invalid. Capture the error without notifying the watchers.
461
        subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
1✔
462
      }
463

464
      // Nothing else to do for incremental ADS resources.
465
      if (!xdsResourceType.isFullStateOfTheWorld()) {
1✔
466
        continue;
1✔
467
      }
468

469
      // Handle State of the World ADS: invalid resources.
470
      if (invalidResources.contains(resourceName)) {
1✔
471
        // The resource is missing. Reuse the cached resource if possible.
472
        if (subscriber.data == null) {
1✔
473
          // No cached data. Notify the watchers of an invalid update.
474
          subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
1✔
475
        }
476
        continue;
477
      }
478

479
      // For State of the World services, notify watchers when their watched resource is missing
480
      // from the ADS update. Note that we can only do this if the resource update is coming from
481
      // the same xDS server that the ResourceSubscriber is subscribed to.
482
      if (subscriber.serverInfo.equals(args.serverInfo)) {
1✔
483
        subscriber.onAbsent();
1✔
484
      }
485
    }
1✔
486
  }
1✔
487

488
  /**
489
   * Tracks a single subscribed resource.
490
   */
491
  private final class ResourceSubscriber<T extends ResourceUpdate> {
492
    @Nullable private final ServerInfo serverInfo;
493
    @Nullable private final ControlPlaneClient xdsChannel;
494
    private final XdsResourceType<T> type;
495
    private final String resource;
496
    private final Set<ResourceWatcher<T>> watchers = new HashSet<>();
1✔
497
    @Nullable private T data;
498
    private boolean absent;
499
    // Tracks whether the deletion has been ignored per bootstrap server feature.
500
    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
501
    private boolean resourceDeletionIgnored;
502
    @Nullable private ScheduledHandle respTimer;
503
    @Nullable private ResourceMetadata metadata;
504
    @Nullable private String errorDescription;
505

506
    ResourceSubscriber(XdsResourceType<T> type, String resource) {
1✔
507
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
508
      this.type = type;
1✔
509
      this.resource = resource;
1✔
510
      this.serverInfo = getServerInfo(resource);
1✔
511
      if (serverInfo == null) {
1✔
512
        this.errorDescription = "Wrong configuration: xds server does not exist for resource "
1✔
513
            + resource;
514
        this.xdsChannel = null;
1✔
515
        return;
1✔
516
      }
517
      // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
518
      // is created but not yet requested because the client is in backoff.
519
      this.metadata = ResourceMetadata.newResourceMetadataUnknown();
1✔
520

521
      ControlPlaneClient xdsChannelTemp = null;
1✔
522
      try {
523
        maybeCreateXdsChannelWithLrs(serverInfo);
1✔
524
        xdsChannelTemp = serverChannelMap.get(serverInfo);
1✔
525
        if (xdsChannelTemp.isInBackoff()) {
1✔
526
          return;
1✔
527
        }
528
      } catch (IllegalArgumentException e) {
1✔
529
        xdsChannelTemp = null;
1✔
530
        this.errorDescription = "Bad configuration:  " + e.getMessage();
1✔
531
        return;
1✔
532
      } finally {
533
        this.xdsChannel = xdsChannelTemp;
1✔
534
      }
535

536
      restartTimer();
1✔
537
    }
1✔
538

539
    @Nullable
540
    private ServerInfo getServerInfo(String resource) {
541
      if (BootstrapperImpl.enableFederation && resource.startsWith(XDSTP_SCHEME)) {
1✔
542
        URI uri = URI.create(resource);
1✔
543
        String authority = uri.getAuthority();
1✔
544
        if (authority == null) {
1✔
545
          authority = "";
1✔
546
        }
547
        AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
1✔
548
        if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
1✔
549
          return null;
1✔
550
        }
551
        return authorityInfo.xdsServers().get(0);
1✔
552
      }
553
      return bootstrapInfo.servers().get(0); // use first server
1✔
554
    }
555

556
    void addWatcher(ResourceWatcher<T> watcher) {
557
      checkArgument(!watchers.contains(watcher), "watcher %s already registered", watcher);
1✔
558
      watchers.add(watcher);
1✔
559
      if (errorDescription != null) {
1✔
560
        watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
1✔
561
        return;
1✔
562
      }
563
      if (data != null) {
1✔
564
        notifyWatcher(watcher, data);
1✔
565
      } else if (absent) {
1✔
566
        watcher.onResourceDoesNotExist(resource);
1✔
567
      }
568
    }
1✔
569

570
    void removeWatcher(ResourceWatcher<T>  watcher) {
571
      checkArgument(watchers.contains(watcher), "watcher %s not registered", watcher);
1✔
572
      watchers.remove(watcher);
1✔
573
    }
1✔
574

575
    void restartTimer() {
576
      if (data != null || absent) {  // resource already resolved
1✔
577
        return;
1✔
578
      }
579
      if (!xdsChannel.isReady()) { // When channel becomes ready, it will trigger a restartTimer
1✔
580
        return;
1✔
581
      }
582

583
      class ResourceNotFound implements Runnable {
1✔
584
        @Override
585
        public void run() {
586
          logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
1✔
587
              type, resource);
1✔
588
          respTimer = null;
1✔
589
          onAbsent();
1✔
590
        }
1✔
591

592
        @Override
593
        public String toString() {
594
          return type + this.getClass().getSimpleName();
1✔
595
        }
596
      }
597

598
      // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
599
      metadata = ResourceMetadata.newResourceMetadataRequested();
1✔
600

601
      respTimer = syncContext.schedule(
1✔
602
          new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
603
          timeService);
1✔
604
    }
1✔
605

606
    void stopTimer() {
607
      if (respTimer != null && respTimer.isPending()) {
1✔
608
        respTimer.cancel();
1✔
609
        respTimer = null;
1✔
610
      }
611
    }
1✔
612

613
    void cancelResourceWatch() {
614
      if (isWatched()) {
1✔
615
        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
×
616
      }
617
      stopTimer();
1✔
618
      String message = "Unsubscribing {0} resource {1} from server {2}";
1✔
619
      XdsLogLevel logLevel = XdsLogLevel.INFO;
1✔
620
      if (resourceDeletionIgnored) {
1✔
621
        message += " for which we previously ignored a deletion";
×
622
        logLevel = XdsLogLevel.FORCE_INFO;
×
623
      }
624
      logger.log(logLevel, message, type, resource,
1✔
625
          serverInfo != null ? serverInfo.target() : "unknown");
1✔
626
    }
1✔
627

628
    boolean isWatched() {
629
      return !watchers.isEmpty();
1✔
630
    }
631

632
    boolean hasResult() {
633
      return data != null || absent;
1✔
634
    }
635

636
    void onData(ParsedResource<T> parsedResource, String version, long updateTime) {
637
      if (respTimer != null && respTimer.isPending()) {
1✔
638
        respTimer.cancel();
1✔
639
        respTimer = null;
1✔
640
      }
641
      this.metadata = ResourceMetadata
1✔
642
          .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
1✔
643
      ResourceUpdate oldData = this.data;
1✔
644
      this.data = parsedResource.getResourceUpdate();
1✔
645
      absent = false;
1✔
646
      if (resourceDeletionIgnored) {
1✔
647
        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
1✔
648
                + "of resource for which we previously ignored a deletion: type {1} name {2}",
649
            serverInfo != null ? serverInfo.target() : "unknown", type, resource);
1✔
650
        resourceDeletionIgnored = false;
1✔
651
      }
652
      if (!Objects.equals(oldData, data)) {
1✔
653
        for (ResourceWatcher<T> watcher : watchers) {
1✔
654
          notifyWatcher(watcher, data);
1✔
655
        }
1✔
656
      }
657
    }
1✔
658

659
    void onAbsent() {
660
      if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
1✔
661
        return;
1✔
662
      }
663

664
      // Ignore deletion of State of the World resources when this feature is on,
665
      // and the resource is reusable.
666
      boolean ignoreResourceDeletionEnabled =
1✔
667
          serverInfo != null && serverInfo.ignoreResourceDeletion();
1✔
668
      if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
1✔
669
        if (!resourceDeletionIgnored) {
1✔
670
          logger.log(XdsLogLevel.FORCE_WARNING,
1✔
671
              "xds server {0}: ignoring deletion for resource type {1} name {2}}",
672
              serverInfo.target(), type, resource);
1✔
673
          resourceDeletionIgnored = true;
1✔
674
        }
675
        return;
1✔
676
      }
677

678
      logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
1✔
679
      if (!absent) {
1✔
680
        data = null;
1✔
681
        absent = true;
1✔
682
        metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
683
        for (ResourceWatcher<T> watcher : watchers) {
1✔
684
          watcher.onResourceDoesNotExist(resource);
1✔
685
        }
1✔
686
      }
687
    }
1✔
688

689
    void onError(Status error) {
690
      if (respTimer != null && respTimer.isPending()) {
1✔
691
        respTimer.cancel();
1✔
692
        respTimer = null;
1✔
693
      }
694

695
      // Include node ID in xds failures to allow cross-referencing with control plane logs
696
      // when debugging.
697
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
698
      Status errorAugmented = Status.fromCode(error.getCode())
1✔
699
          .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
1✔
700
          .withCause(error.getCause());
1✔
701

702
      for (ResourceWatcher<T> watcher : watchers) {
1✔
703
        watcher.onError(errorAugmented);
1✔
704
      }
1✔
705
    }
1✔
706

707
    void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
708
      metadata = ResourceMetadata
1✔
709
          .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
1✔
710
    }
1✔
711

712
    private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
713
      watcher.onChanged(update);
1✔
714
    }
1✔
715
  }
716

717
  static final class ResourceInvalidException extends Exception {
718
    private static final long serialVersionUID = 0L;
719

720
    ResourceInvalidException(String message) {
721
      super(message, null, false, false);
1✔
722
    }
1✔
723

724
    ResourceInvalidException(String message, Throwable cause) {
725
      super(cause != null ? message + ": " + cause.getMessage() : message, cause, false, false);
1✔
726
    }
1✔
727
  }
728

729
  abstract static class XdsChannelFactory {
1✔
730
    static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() {
1✔
731
      @Override
732
      ManagedChannel create(ServerInfo serverInfo) {
733
        String target = serverInfo.target();
1✔
734
        ChannelCredentials channelCredentials = serverInfo.channelCredentials();
1✔
735
        return Grpc.newChannelBuilder(target, channelCredentials)
1✔
736
            .keepAliveTime(5, TimeUnit.MINUTES)
1✔
737
            .build();
1✔
738
      }
739
    };
740

741
    abstract ManagedChannel create(ServerInfo serverInfo);
742
  }
743
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc