• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19443

30 Aug 2024 07:17PM UTC coverage: 84.523% (+0.03%) from 84.497%
#19443

push

github

web-flow
add OpenTelemetryTracingModule (#11477)

33516 of 39653 relevant lines covered (84.52%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.86
/../xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
22
import static io.grpc.xds.client.XdsResourceType.ParsedResource;
23
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Joiner;
27
import com.google.common.base.Stopwatch;
28
import com.google.common.base.Supplier;
29
import com.google.common.collect.ImmutableMap;
30
import com.google.common.collect.ImmutableSet;
31
import com.google.common.util.concurrent.ListenableFuture;
32
import com.google.common.util.concurrent.SettableFuture;
33
import com.google.protobuf.Any;
34
import io.grpc.Internal;
35
import io.grpc.InternalLogId;
36
import io.grpc.Status;
37
import io.grpc.SynchronizationContext;
38
import io.grpc.SynchronizationContext.ScheduledHandle;
39
import io.grpc.internal.BackoffPolicy;
40
import io.grpc.internal.TimeProvider;
41
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
42
import io.grpc.xds.client.Bootstrapper.ServerInfo;
43
import io.grpc.xds.client.XdsClient.ResourceStore;
44
import io.grpc.xds.client.XdsClient.XdsResponseHandler;
45
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
46
import java.net.URI;
47
import java.util.Collection;
48
import java.util.Collections;
49
import java.util.HashMap;
50
import java.util.List;
51
import java.util.Map;
52
import java.util.Objects;
53
import java.util.Set;
54
import java.util.concurrent.Executor;
55
import java.util.concurrent.ScheduledExecutorService;
56
import java.util.concurrent.TimeUnit;
57
import javax.annotation.Nullable;
58

59
/**
60
 * XdsClient implementation.
61
 */
62
@Internal
63
public final class XdsClientImpl extends XdsClient implements XdsResponseHandler, ResourceStore {
64

65
  // Longest time to wait, since the subscription to some resource, for concluding its absence.
66
  @VisibleForTesting
67
  public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15;
68

69
  private final SynchronizationContext syncContext = new SynchronizationContext(
1✔
70
      new Thread.UncaughtExceptionHandler() {
1✔
71
        @Override
72
        public void uncaughtException(Thread t, Throwable e) {
73
          logger.log(
×
74
              XdsLogLevel.ERROR,
75
              "Uncaught exception in XdsClient SynchronizationContext. Panic!",
76
              e);
77
          // TODO(chengyuanzhang): better error handling.
78
          throw new AssertionError(e);
×
79
        }
80
      });
81

82
  private final Map<ServerInfo, LoadStatsManager2> loadStatsManagerMap =
1✔
83
      new HashMap<>();
84
  final Map<ServerInfo, LoadReportClient> serverLrsClientMap =
1✔
85
      new HashMap<>();
86

87
  private final Map<ServerInfo, ControlPlaneClient> serverCpClientMap = new HashMap<>();
1✔
88
  private final Map<XdsResourceType<? extends ResourceUpdate>,
1✔
89
      Map<String, ResourceSubscriber<? extends ResourceUpdate>>>
90
      resourceSubscribers = new HashMap<>();
91
  private final Map<String, XdsResourceType<?>> subscribedResourceTypeUrls = new HashMap<>();
1✔
92
  private final XdsTransportFactory xdsTransportFactory;
93
  private final Bootstrapper.BootstrapInfo bootstrapInfo;
94
  private final ScheduledExecutorService timeService;
95
  private final BackoffPolicy.Provider backoffPolicyProvider;
96
  private final Supplier<Stopwatch> stopwatchSupplier;
97
  private final TimeProvider timeProvider;
98
  private final Object securityConfig;
99
  private final InternalLogId logId;
100
  private final XdsLogger logger;
101
  private volatile boolean isShutdown;
102
  private final MessagePrettyPrinter messagePrinter;
103

104
  public XdsClientImpl(
105
      XdsTransportFactory xdsTransportFactory,
106
      Bootstrapper.BootstrapInfo bootstrapInfo,
107
      ScheduledExecutorService timeService,
108
      BackoffPolicy.Provider backoffPolicyProvider,
109
      Supplier<Stopwatch> stopwatchSupplier,
110
      TimeProvider timeProvider,
111
      MessagePrettyPrinter messagePrinter,
112
      Object securityConfig) {
1✔
113
    this.xdsTransportFactory = xdsTransportFactory;
1✔
114
    this.bootstrapInfo = bootstrapInfo;
1✔
115
    this.timeService = timeService;
1✔
116
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
117
    this.stopwatchSupplier = stopwatchSupplier;
1✔
118
    this.timeProvider = timeProvider;
1✔
119
    this.messagePrinter = messagePrinter;
1✔
120
    this.securityConfig = securityConfig;
1✔
121
    logId = InternalLogId.allocate("xds-client", null);
1✔
122
    logger = XdsLogger.withLogId(logId);
1✔
123
    logger.log(XdsLogLevel.INFO, "Created");
1✔
124
  }
1✔
125

126
  @Override
127
  public void handleResourceResponse(
128
      XdsResourceType<?> xdsResourceType, ServerInfo serverInfo, String versionInfo,
129
      List<Any> resources, String nonce, ProcessingTracker processingTracker) {
130
    checkNotNull(xdsResourceType, "xdsResourceType");
1✔
131
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
132
    Set<String> toParseResourceNames =
133
        xdsResourceType.shouldRetrieveResourceKeysForArgs()
1✔
134
            ? getResourceKeys(xdsResourceType)
1✔
135
            : null;
1✔
136
    XdsResourceType.Args args = new XdsResourceType.Args(serverInfo, versionInfo, nonce,
1✔
137
        bootstrapInfo, securityConfig, toParseResourceNames);
138
    handleResourceUpdate(args, resources, xdsResourceType, processingTracker);
1✔
139
  }
1✔
140

141
  @Override
142
  public void handleStreamClosed(Status error) {
143
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
144
    cleanUpResourceTimers();
1✔
145
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
146
        resourceSubscribers.values()) {
1✔
147
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
148
        if (!subscriber.hasResult()) {
1✔
149
          subscriber.onError(error, null);
1✔
150
        }
151
      }
1✔
152
    }
1✔
153
  }
1✔
154

155
  @Override
156
  public void handleStreamRestarted(ServerInfo serverInfo) {
157
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
158
    for (Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscriberMap :
159
        resourceSubscribers.values()) {
1✔
160
      for (ResourceSubscriber<? extends ResourceUpdate> subscriber : subscriberMap.values()) {
1✔
161
        if (subscriber.serverInfo.equals(serverInfo)) {
1✔
162
          subscriber.restartTimer();
1✔
163
        }
164
      }
1✔
165
    }
1✔
166
  }
1✔
167

168
  @Override
169
  public void shutdown() {
170
    syncContext.execute(
1✔
171
        new Runnable() {
1✔
172
          @Override
173
          public void run() {
174
            if (isShutdown) {
1✔
175
              return;
×
176
            }
177
            isShutdown = true;
1✔
178
            for (ControlPlaneClient xdsChannel : serverCpClientMap.values()) {
1✔
179
              xdsChannel.shutdown();
1✔
180
            }
1✔
181
            for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
1✔
182
              lrsClient.stopLoadReporting();
1✔
183
            }
1✔
184
            cleanUpResourceTimers();
1✔
185
          }
1✔
186
        });
187
  }
1✔
188

189
  @Override
190
  public boolean isShutDown() {
191
    return isShutdown;
1✔
192
  }
193

194
  @Override
195
  public Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl() {
196
    return Collections.unmodifiableMap(subscribedResourceTypeUrls);
1✔
197
  }
198

199
  @Nullable
200
  @Override
201
  public Collection<String> getSubscribedResources(ServerInfo serverInfo,
202
                                                   XdsResourceType<? extends ResourceUpdate> type) {
203
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> resources =
1✔
204
        resourceSubscribers.getOrDefault(type, Collections.emptyMap());
1✔
205
    ImmutableSet.Builder<String> builder = ImmutableSet.builder();
1✔
206
    for (String key : resources.keySet()) {
1✔
207
      if (resources.get(key).serverInfo.equals(serverInfo)) {
1✔
208
        builder.add(key);
1✔
209
      }
210
    }
1✔
211
    Collection<String> retVal = builder.build();
1✔
212
    return retVal.isEmpty() ? null : retVal;
1✔
213
  }
214

215
  // As XdsClient APIs becomes resource agnostic, subscribed resource types are dynamic.
216
  // ResourceTypes that do not have subscribers does not show up in the snapshot keys.
217
  @Override
218
  public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
219
      getSubscribedResourcesMetadataSnapshot() {
220
    final SettableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future =
221
        SettableFuture.create();
1✔
222
    syncContext.execute(new Runnable() {
1✔
223
      @Override
224
      public void run() {
225
        // A map from a "resource type" to a map ("resource name": "resource metadata")
226
        ImmutableMap.Builder<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataSnapshot =
227
            ImmutableMap.builder();
1✔
228
        for (XdsResourceType<?> resourceType: resourceSubscribers.keySet()) {
1✔
229
          ImmutableMap.Builder<String, ResourceMetadata> metadataMap = ImmutableMap.builder();
1✔
230
          for (Map.Entry<String, ResourceSubscriber<? extends ResourceUpdate>> resourceEntry
231
              : resourceSubscribers.get(resourceType).entrySet()) {
1✔
232
            metadataMap.put(resourceEntry.getKey(), resourceEntry.getValue().metadata);
1✔
233
          }
1✔
234
          metadataSnapshot.put(resourceType, metadataMap.buildOrThrow());
1✔
235
        }
1✔
236
        future.set(metadataSnapshot.buildOrThrow());
1✔
237
      }
1✔
238
    });
239
    return future;
1✔
240
  }
241

242
  @Override
243
  public Object getSecurityConfig() {
244
    return securityConfig;
×
245
  }
246

247
  @Override
248
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
249
      String resourceName,
250
      ResourceWatcher<T> watcher,
251
      Executor watcherExecutor) {
252
    syncContext.execute(new Runnable() {
1✔
253
      @Override
254
      @SuppressWarnings("unchecked")
255
      public void run() {
256
        if (!resourceSubscribers.containsKey(type)) {
1✔
257
          resourceSubscribers.put(type, new HashMap<>());
1✔
258
          subscribedResourceTypeUrls.put(type.typeUrl(), type);
1✔
259
        }
260
        ResourceSubscriber<T> subscriber =
1✔
261
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
262
        if (subscriber == null) {
1✔
263
          logger.log(XdsLogLevel.INFO, "Subscribe {0} resource {1}", type, resourceName);
1✔
264
          subscriber = new ResourceSubscriber<>(type, resourceName);
1✔
265
          resourceSubscribers.get(type).put(resourceName, subscriber);
1✔
266
          if (subscriber.controlPlaneClient != null) {
1✔
267
            subscriber.controlPlaneClient.adjustResourceSubscription(type);
1✔
268
          }
269
        }
270
        subscriber.addWatcher(watcher, watcherExecutor);
1✔
271
      }
1✔
272
    });
273
  }
1✔
274

275
  @Override
276
  public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
277
      String resourceName,
278
      ResourceWatcher<T> watcher) {
279
    syncContext.execute(new Runnable() {
1✔
280
      @Override
281
      @SuppressWarnings("unchecked")
282
      public void run() {
283
        ResourceSubscriber<T> subscriber =
1✔
284
            (ResourceSubscriber<T>) resourceSubscribers.get(type).get(resourceName);
1✔
285
        subscriber.removeWatcher(watcher);
1✔
286
        if (!subscriber.isWatched()) {
1✔
287
          subscriber.cancelResourceWatch();
1✔
288
          resourceSubscribers.get(type).remove(resourceName);
1✔
289
          if (subscriber.controlPlaneClient != null) {
1✔
290
            subscriber.controlPlaneClient.adjustResourceSubscription(type);
1✔
291
          }
292
          if (resourceSubscribers.get(type).isEmpty()) {
1✔
293
            resourceSubscribers.remove(type);
1✔
294
            subscribedResourceTypeUrls.remove(type.typeUrl());
1✔
295
          }
296
        }
297
      }
1✔
298
    });
299
  }
1✔
300

301
  @Override
302
  public LoadStatsManager2.ClusterDropStats addClusterDropStats(
303
      final ServerInfo serverInfo, String clusterName,
304
      @Nullable String edsServiceName) {
305
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
306
    LoadStatsManager2.ClusterDropStats dropCounter =
1✔
307
        loadStatsManager.getClusterDropStats(clusterName, edsServiceName);
1✔
308
    syncContext.execute(new Runnable() {
1✔
309
      @Override
310
      public void run() {
311
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
312
      }
1✔
313
    });
314
    return dropCounter;
1✔
315
  }
316

317
  @Override
318
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
319
      final ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
320
      Locality locality) {
321
    LoadStatsManager2 loadStatsManager = loadStatsManagerMap.get(serverInfo);
1✔
322
    LoadStatsManager2.ClusterLocalityStats loadCounter =
1✔
323
        loadStatsManager.getClusterLocalityStats(clusterName, edsServiceName, locality);
1✔
324
    syncContext.execute(new Runnable() {
1✔
325
      @Override
326
      public void run() {
327
        serverLrsClientMap.get(serverInfo).startLoadReporting();
1✔
328
      }
1✔
329
    });
330
    return loadCounter;
1✔
331
  }
332

333

334
  @Override
335
  public Bootstrapper.BootstrapInfo getBootstrapInfo() {
336
    return bootstrapInfo;
1✔
337
  }
338

339
  @Override
340
  public String toString() {
341
    return logId.toString();
×
342
  }
343

344
  @Override
345
  protected void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
346
    if (isShutDown()) {
1✔
347
      return;
×
348
    }
349

350
    syncContext.execute(new Runnable() {
1✔
351
      @Override
352
      public void run() {
353
        if (isShutDown()) {
1✔
354
          return;
×
355
        }
356

357
        for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
358
          for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
359
            if (subscriber.serverInfo.equals(serverInfo) && subscriber.respTimer == null) {
1✔
360
              subscriber.restartTimer();
1✔
361
            }
362
          }
1✔
363
        }
1✔
364
      }
1✔
365
    });
366
  }
1✔
367

368
  private Set<String> getResourceKeys(XdsResourceType<?> xdsResourceType) {
369
    if (!resourceSubscribers.containsKey(xdsResourceType)) {
1✔
370
      return null;
×
371
    }
372

373
    return resourceSubscribers.get(xdsResourceType).keySet();
1✔
374
  }
375

376
  private void cleanUpResourceTimers() {
377
    for (Map<String, ResourceSubscriber<?>> subscriberMap : resourceSubscribers.values()) {
1✔
378
      for (ResourceSubscriber<?> subscriber : subscriberMap.values()) {
1✔
379
        subscriber.stopTimer();
1✔
380
      }
1✔
381
    }
1✔
382
  }
1✔
383

384
  public ControlPlaneClient getOrCreateControlPlaneClient(ServerInfo serverInfo) {
385
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
386
    if (serverCpClientMap.containsKey(serverInfo)) {
1✔
387
      return serverCpClientMap.get(serverInfo);
1✔
388
    }
389

390
    XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo);
1✔
391
    ControlPlaneClient controlPlaneClient = new ControlPlaneClient(
1✔
392
        xdsTransport,
393
        serverInfo,
394
        bootstrapInfo.node(),
1✔
395
        this,
396
        this,
397
        timeService,
398
        syncContext,
399
        backoffPolicyProvider,
400
        stopwatchSupplier,
401
        this,
402
        messagePrinter);
403
    serverCpClientMap.put(serverInfo, controlPlaneClient);
1✔
404

405
    LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier);
1✔
406
    loadStatsManagerMap.put(serverInfo, loadStatsManager);
1✔
407
    LoadReportClient lrsClient = new LoadReportClient(
1✔
408
        loadStatsManager, xdsTransport, bootstrapInfo.node(),
1✔
409
        syncContext, timeService, backoffPolicyProvider, stopwatchSupplier);
410
    serverLrsClientMap.put(serverInfo, lrsClient);
1✔
411

412
    return controlPlaneClient;
1✔
413
  }
414

415
  @VisibleForTesting
416
  @Override
417
  public Map<ServerInfo, LoadReportClient> getServerLrsClientMap() {
418
    return ImmutableMap.copyOf(serverLrsClientMap);
1✔
419
  }
420

421
  @Nullable
422
  private ServerInfo getServerInfo(String resource) {
423
    if (resource.startsWith(XDSTP_SCHEME)) {
1✔
424
      URI uri = URI.create(resource);
1✔
425
      String authority = uri.getAuthority();
1✔
426
      if (authority == null) {
1✔
427
        authority = "";
1✔
428
      }
429
      AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
1✔
430
      if (authorityInfo == null || authorityInfo.xdsServers().isEmpty()) {
1✔
431
        return null;
1✔
432
      }
433
      return authorityInfo.xdsServers().get(0);
1✔
434
    } else {
435
      return bootstrapInfo.servers().get(0); // use first server
1✔
436
    }
437
  }
438

439
  @SuppressWarnings("unchecked")
440
  private <T extends ResourceUpdate> void handleResourceUpdate(
441
      XdsResourceType.Args args, List<Any> resources, XdsResourceType<T> xdsResourceType,
442
      ProcessingTracker processingTracker) {
443
    ValidatedResourceUpdate<T> result = xdsResourceType.parse(args, resources);
1✔
444
    logger.log(XdsLogger.XdsLogLevel.INFO,
1✔
445
        "Received {0} Response version {1} nonce {2}. Parsed resources: {3}",
446
         xdsResourceType.typeName(), args.versionInfo, args.nonce, result.unpackedResources);
1✔
447
    Map<String, ParsedResource<T>> parsedResources = result.parsedResources;
1✔
448
    Set<String> invalidResources = result.invalidResources;
1✔
449
    List<String> errors = result.errors;
1✔
450
    String errorDetail = null;
1✔
451
    if (errors.isEmpty()) {
1✔
452
      checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
1✔
453
      serverCpClientMap.get(args.serverInfo).ackResponse(xdsResourceType, args.versionInfo,
1✔
454
          args.nonce);
455
    } else {
456
      errorDetail = Joiner.on('\n').join(errors);
1✔
457
      logger.log(XdsLogLevel.WARNING,
1✔
458
          "Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
459
          xdsResourceType.typeName(), args.versionInfo, args.nonce, errorDetail);
1✔
460
      serverCpClientMap.get(args.serverInfo).nackResponse(xdsResourceType, args.nonce, errorDetail);
1✔
461
    }
462

463
    long updateTime = timeProvider.currentTimeNanos();
1✔
464
    Map<String, ResourceSubscriber<? extends ResourceUpdate>> subscribedResources =
1✔
465
        resourceSubscribers.getOrDefault(xdsResourceType, Collections.emptyMap());
1✔
466
    for (Map.Entry<String, ResourceSubscriber<?>> entry : subscribedResources.entrySet()) {
1✔
467
      String resourceName = entry.getKey();
1✔
468
      ResourceSubscriber<T> subscriber = (ResourceSubscriber<T>) entry.getValue();
1✔
469
      if (parsedResources.containsKey(resourceName)) {
1✔
470
        // Happy path: the resource updated successfully. Notify the watchers of the update.
471
        subscriber.onData(parsedResources.get(resourceName), args.versionInfo, updateTime,
1✔
472
            processingTracker);
473
        continue;
1✔
474
      }
475

476
      if (invalidResources.contains(resourceName)) {
1✔
477
        // The resource update is invalid. Capture the error without notifying the watchers.
478
        subscriber.onRejected(args.versionInfo, updateTime, errorDetail);
1✔
479
      }
480

481
      // Nothing else to do for incremental ADS resources.
482
      if (!xdsResourceType.isFullStateOfTheWorld()) {
1✔
483
        continue;
1✔
484
      }
485

486
      // Handle State of the World ADS: invalid resources.
487
      if (invalidResources.contains(resourceName)) {
1✔
488
        // The resource is missing. Reuse the cached resource if possible.
489
        if (subscriber.data == null) {
1✔
490
          // No cached data. Notify the watchers of an invalid update.
491
          subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail), processingTracker);
1✔
492
        }
493
        continue;
494
      }
495

496
      // For State of the World services, notify watchers when their watched resource is missing
497
      // from the ADS update. Note that we can only do this if the resource update is coming from
498
      // the same xDS server that the ResourceSubscriber is subscribed to.
499
      if (subscriber.serverInfo.equals(args.serverInfo)) {
1✔
500
        subscriber.onAbsent(processingTracker);
1✔
501
      }
502
    }
1✔
503
  }
1✔
504

505
  /**
506
   * Tracks a single subscribed resource.
507
   */
508
  private final class ResourceSubscriber<T extends ResourceUpdate> {
509
    @Nullable private final ServerInfo serverInfo;
510
    @Nullable private final ControlPlaneClient controlPlaneClient;
511
    private final XdsResourceType<T> type;
512
    private final String resource;
513
    private final Map<ResourceWatcher<T>, Executor> watchers = new HashMap<>();
1✔
514
    @Nullable private T data;
515
    private boolean absent;
516
    // Tracks whether the deletion has been ignored per bootstrap server feature.
517
    // See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
518
    private boolean resourceDeletionIgnored;
519
    @Nullable private ScheduledHandle respTimer;
520
    @Nullable private ResourceMetadata metadata;
521
    @Nullable private String errorDescription;
522

523
    ResourceSubscriber(XdsResourceType<T> type, String resource) {
1✔
524
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
525
      this.type = type;
1✔
526
      this.resource = resource;
1✔
527
      this.serverInfo = getServerInfo(resource);
1✔
528
      if (serverInfo == null) {
1✔
529
        this.errorDescription = "Wrong configuration: xds server does not exist for resource "
1✔
530
            + resource;
531
        this.controlPlaneClient = null;
1✔
532
        return;
1✔
533
      }
534
      // Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
535
      // is created but not yet requested because the client is in backoff.
536
      this.metadata = ResourceMetadata.newResourceMetadataUnknown();
1✔
537

538
      ControlPlaneClient controlPlaneClient = null;
1✔
539
      try {
540
        controlPlaneClient = getOrCreateControlPlaneClient(serverInfo);
1✔
541
        if (controlPlaneClient.isInBackoff()) {
1✔
542
          return;
1✔
543
        }
544
      } catch (IllegalArgumentException e) {
1✔
545
        controlPlaneClient = null;
1✔
546
        this.errorDescription = "Bad configuration:  " + e.getMessage();
1✔
547
        return;
1✔
548
      } finally {
549
        this.controlPlaneClient = controlPlaneClient;
1✔
550
      }
551

552
      restartTimer();
1✔
553
    }
1✔
554

555
    void addWatcher(ResourceWatcher<T> watcher, Executor watcherExecutor) {
556
      checkArgument(!watchers.containsKey(watcher), "watcher %s already registered", watcher);
1✔
557
      watchers.put(watcher, watcherExecutor);
1✔
558
      T savedData = data;
1✔
559
      boolean savedAbsent = absent;
1✔
560
      watcherExecutor.execute(() -> {
1✔
561
        if (errorDescription != null) {
1✔
562
          watcher.onError(Status.INVALID_ARGUMENT.withDescription(errorDescription));
1✔
563
          return;
1✔
564
        }
565
        if (savedData != null) {
1✔
566
          notifyWatcher(watcher, savedData);
1✔
567
        } else if (savedAbsent) {
1✔
568
          watcher.onResourceDoesNotExist(resource);
1✔
569
        }
570
      });
1✔
571
    }
1✔
572

573
    void removeWatcher(ResourceWatcher<T>  watcher) {
574
      checkArgument(watchers.containsKey(watcher), "watcher %s not registered", watcher);
1✔
575
      watchers.remove(watcher);
1✔
576
    }
1✔
577

578
    void restartTimer() {
579
      if (data != null || absent) {  // resource already resolved
1✔
580
        return;
1✔
581
      }
582
      if (!controlPlaneClient.isReady()) { // When client becomes ready, it triggers a restartTimer
1✔
583
        return;
1✔
584
      }
585

586
      class ResourceNotFound implements Runnable {
1✔
587
        @Override
588
        public void run() {
589
          logger.log(XdsLogLevel.INFO, "{0} resource {1} initial fetch timeout",
1✔
590
              type, resource);
1✔
591
          respTimer = null;
1✔
592
          onAbsent(null);
1✔
593
        }
1✔
594

595
        @Override
596
        public String toString() {
597
          return type + this.getClass().getSimpleName();
1✔
598
        }
599
      }
600

601
      // Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
602
      metadata = ResourceMetadata.newResourceMetadataRequested();
1✔
603

604
      respTimer = syncContext.schedule(
1✔
605
          new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
606
          timeService);
1✔
607
    }
1✔
608

609
    void stopTimer() {
610
      if (respTimer != null && respTimer.isPending()) {
1✔
611
        respTimer.cancel();
1✔
612
        respTimer = null;
1✔
613
      }
614
    }
1✔
615

616
    void cancelResourceWatch() {
617
      if (isWatched()) {
1✔
618
        throw new IllegalStateException("Can't cancel resource watch with active watchers present");
×
619
      }
620
      stopTimer();
1✔
621
      String message = "Unsubscribing {0} resource {1} from server {2}";
1✔
622
      XdsLogLevel logLevel = XdsLogLevel.INFO;
1✔
623
      if (resourceDeletionIgnored) {
1✔
624
        message += " for which we previously ignored a deletion";
×
625
        logLevel = XdsLogLevel.FORCE_INFO;
×
626
      }
627
      logger.log(logLevel, message, type, resource,
1✔
628
          serverInfo != null ? serverInfo.target() : "unknown");
1✔
629
    }
1✔
630

631
    boolean isWatched() {
632
      return !watchers.isEmpty();
1✔
633
    }
634

635
    boolean hasResult() {
636
      return data != null || absent;
1✔
637
    }
638

639
    void onData(ParsedResource<T> parsedResource, String version, long updateTime,
640
                ProcessingTracker processingTracker) {
641
      if (respTimer != null && respTimer.isPending()) {
1✔
642
        respTimer.cancel();
1✔
643
        respTimer = null;
1✔
644
      }
645
      this.metadata = ResourceMetadata
1✔
646
          .newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
1✔
647
      ResourceUpdate oldData = this.data;
1✔
648
      this.data = parsedResource.getResourceUpdate();
1✔
649
      absent = false;
1✔
650
      if (resourceDeletionIgnored) {
1✔
651
        logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
1✔
652
                + "of resource for which we previously ignored a deletion: type {1} name {2}",
653
            serverInfo != null ? serverInfo.target() : "unknown", type, resource);
1✔
654
        resourceDeletionIgnored = false;
1✔
655
      }
656
      if (!Objects.equals(oldData, data)) {
1✔
657
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
658
          processingTracker.startTask();
1✔
659
          watchers.get(watcher).execute(() -> {
1✔
660
            try {
661
              notifyWatcher(watcher, data);
1✔
662
            } finally {
663
              processingTracker.onComplete();
1✔
664
            }
665
          });
1✔
666
        }
1✔
667
      }
668
    }
1✔
669

670
    void onAbsent(@Nullable ProcessingTracker processingTracker) {
671
      if (respTimer != null && respTimer.isPending()) {  // too early to conclude absence
1✔
672
        return;
1✔
673
      }
674

675
      // Ignore deletion of State of the World resources when this feature is on,
676
      // and the resource is reusable.
677
      boolean ignoreResourceDeletionEnabled =
1✔
678
          serverInfo != null && serverInfo.ignoreResourceDeletion();
1✔
679
      if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) {
1✔
680
        if (!resourceDeletionIgnored) {
1✔
681
          logger.log(XdsLogLevel.FORCE_WARNING,
1✔
682
              "xds server {0}: ignoring deletion for resource type {1} name {2}}",
683
              serverInfo.target(), type, resource);
1✔
684
          resourceDeletionIgnored = true;
1✔
685
        }
686
        return;
1✔
687
      }
688

689
      logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
1✔
690
      if (!absent) {
1✔
691
        data = null;
1✔
692
        absent = true;
1✔
693
        metadata = ResourceMetadata.newResourceMetadataDoesNotExist();
1✔
694
        for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
695
          if (processingTracker != null) {
1✔
696
            processingTracker.startTask();
1✔
697
          }
698
          watchers.get(watcher).execute(() -> {
1✔
699
            try {
700
              watcher.onResourceDoesNotExist(resource);
1✔
701
            } finally {
702
              if (processingTracker != null) {
1✔
703
                processingTracker.onComplete();
1✔
704
              }
705
            }
706
          });
1✔
707
        }
1✔
708
      }
709
    }
1✔
710

711
    void onError(Status error, @Nullable ProcessingTracker tracker) {
712
      if (respTimer != null && respTimer.isPending()) {
1✔
713
        respTimer.cancel();
1✔
714
        respTimer = null;
1✔
715
      }
716

717
      // Include node ID in xds failures to allow cross-referencing with control plane logs
718
      // when debugging.
719
      String description = error.getDescription() == null ? "" : error.getDescription() + " ";
1✔
720
      Status errorAugmented = Status.fromCode(error.getCode())
1✔
721
          .withDescription(description + "nodeID: " + bootstrapInfo.node().getId())
1✔
722
          .withCause(error.getCause());
1✔
723

724
      for (ResourceWatcher<T> watcher : watchers.keySet()) {
1✔
725
        if (tracker != null) {
1✔
726
          tracker.startTask();
1✔
727
        }
728
        watchers.get(watcher).execute(() -> {
1✔
729
          try {
730
            watcher.onError(errorAugmented);
1✔
731
          } finally {
732
            if (tracker != null) {
1✔
733
              tracker.onComplete();
1✔
734
            }
735
          }
736
        });
1✔
737
      }
1✔
738
    }
1✔
739

740
    void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
741
      metadata = ResourceMetadata
1✔
742
          .newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
1✔
743
    }
1✔
744

745
    private void notifyWatcher(ResourceWatcher<T> watcher, T update) {
746
      watcher.onChanged(update);
1✔
747
    }
1✔
748
  }
749

750
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc