• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20015

13 Oct 2025 07:57AM UTC coverage: 88.57% (+0.02%) from 88.552%
#20015

push

github

web-flow
xds: ORCA to LRS propagation changes (#12203)

Implements gRFC A85 (https://github.com/grpc/proposal/pull/454).

34925 of 39432 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.11
/../xds/src/main/java/io/grpc/xds/client/XdsClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
21

22
import com.google.common.base.Joiner;
23
import com.google.common.base.Splitter;
24
import com.google.common.net.UrlEscapers;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.MoreExecutors;
27
import com.google.protobuf.Any;
28
import io.grpc.ExperimentalApi;
29
import io.grpc.Status;
30
import io.grpc.xds.client.Bootstrapper.ServerInfo;
31
import java.net.URI;
32
import java.net.URISyntaxException;
33
import java.util.ArrayList;
34
import java.util.Collection;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.Map;
38
import java.util.concurrent.Executor;
39
import java.util.concurrent.Future;
40
import java.util.concurrent.atomic.AtomicInteger;
41
import javax.annotation.Nullable;
42

43
/**
44
 * An {@link XdsClient} instance encapsulates all of the logic for communicating with the xDS
45
 * server. It may create multiple RPC streams (or a single ADS stream) for a series of xDS
46
 * protocols (e.g., LDS, RDS, VHDS, CDS and EDS) over a single channel. Watch-based interfaces
47
 * are provided for each set of data needed by gRPC.
48
 */
49
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
50
public abstract class XdsClient {
1✔
51

52
  public static boolean isResourceNameValid(String resourceName, String typeUrl) {
53
    checkNotNull(resourceName, "resourceName");
1✔
54
    if (!resourceName.startsWith(XDSTP_SCHEME)) {
1✔
55
      return true;
1✔
56
    }
57
    URI uri;
58
    try {
59
      uri = new URI(resourceName);
1✔
60
    } catch (URISyntaxException e) {
×
61
      return false;
×
62
    }
1✔
63
    String path = uri.getPath();
1✔
64
    // path must be in the form of /{resource type}/{id/*}
65
    Splitter slashSplitter = Splitter.on('/').omitEmptyStrings();
1✔
66
    if (path == null) {
1✔
67
      return false;
×
68
    }
69
    List<String> pathSegs = slashSplitter.splitToList(path);
1✔
70
    if (pathSegs.size() < 2) {
1✔
71
      return false;
1✔
72
    }
73
    String type = pathSegs.get(0);
1✔
74
    if (!type.equals(slashSplitter.splitToList(typeUrl).get(1))) {
1✔
75
      return false;
1✔
76
    }
77
    return true;
1✔
78
  }
79

80
  /*
81
   * Convert the XDSTP resource name to its canonical version.
82
   */
83
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
84
  public static String canonifyResourceName(String resourceName) {
85
    checkNotNull(resourceName, "resourceName");
1✔
86
    if (!resourceName.startsWith(XDSTP_SCHEME)) {
1✔
87
      return resourceName;
1✔
88
    }
89
    URI uri = URI.create(resourceName);
1✔
90
    String rawQuery = uri.getRawQuery();
1✔
91
    Splitter ampSplitter = Splitter.on('&').omitEmptyStrings();
1✔
92
    if (rawQuery == null) {
1✔
93
      return resourceName;
1✔
94
    }
95
    List<String> queries = ampSplitter.splitToList(rawQuery);
1✔
96
    if (queries.size() < 2) {
1✔
97
      return resourceName;
1✔
98
    }
99
    List<String> canonicalContextParams = new ArrayList<>(queries.size());
1✔
100
    for (String query : queries) {
1✔
101
      canonicalContextParams.add(query);
1✔
102
    }
1✔
103
    Collections.sort(canonicalContextParams);
1✔
104
    String canonifiedQuery = Joiner.on('&').join(canonicalContextParams);
1✔
105
    return resourceName.replace(rawQuery, canonifiedQuery);
1✔
106
  }
107

108
  /*
109
   * Percent encode the input using the url path segment escaper.
110
   */
111
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
112
  public static String percentEncodePath(String input) {
113
    Iterable<String> pathSegs = Splitter.on('/').split(input);
1✔
114
    List<String> encodedSegs = new ArrayList<>();
1✔
115
    for (String pathSeg : pathSegs) {
1✔
116
      encodedSegs.add(UrlEscapers.urlPathSegmentEscaper().escape(pathSeg));
1✔
117
    }
1✔
118
    return Joiner.on('/').join(encodedSegs);
1✔
119
  }
120

121
  /**
122
   * Returns the authority from the resource name.
123
   */
124
  public static String getAuthorityFromResourceName(String resourceNames) {
125
    String authority;
126
    if (resourceNames.startsWith(XDSTP_SCHEME)) {
1✔
127
      URI uri = URI.create(resourceNames);
1✔
128
      authority = uri.getAuthority();
1✔
129
      if (authority == null) {
1✔
130
        authority = "";
1✔
131
      }
132
    } else {
1✔
133
      authority = null;
1✔
134
    }
135
    return authority;
1✔
136
  }
137

138
  public interface ResourceUpdate {}
139

140
  /**
141
   * Watcher interface for a single requested xDS resource.
142
   */
143
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
144
  public interface ResourceWatcher<T extends ResourceUpdate> {
145

146
    /**
147
     * Called when the resource discovery RPC encounters some transient error.
148
     *
149
     * <p>Note that we expect that the implementer to:
150
     * - Comply with the guarantee to not generate certain statuses by the library:
151
     *   https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
152
     *   propagated to the channel, override it with {@link io.grpc.Status.Code#UNAVAILABLE}.
153
     * - Keep {@link Status} description in one form or another, as it contains valuable debugging
154
     *   information.
155
     */
156
    void onError(Status error);
157

158
    /**
159
     * Called when the requested resource is not available.
160
     *
161
     * @param resourceName name of the resource requested in discovery request.
162
     */
163
    void onResourceDoesNotExist(String resourceName);
164

165
    void onChanged(T update);
166
  }
167

168
  /**
169
   * The metadata of the xDS resource; used by the xDS config dump.
170
   */
171
  public static final class ResourceMetadata {
172
    private final String version;
173
    private final ResourceMetadataStatus status;
174
    private final long updateTimeNanos;
175
    private final boolean cached;
176
    @Nullable private final Any rawResource;
177
    @Nullable private final UpdateFailureState errorState;
178

179
    private ResourceMetadata(
180
        ResourceMetadataStatus status, String version, long updateTimeNanos, boolean cached,
181
        @Nullable Any rawResource, @Nullable UpdateFailureState errorState) {
1✔
182
      this.status = checkNotNull(status, "status");
1✔
183
      this.version = checkNotNull(version, "version");
1✔
184
      this.updateTimeNanos = updateTimeNanos;
1✔
185
      this.cached = cached;
1✔
186
      this.rawResource = rawResource;
1✔
187
      this.errorState = errorState;
1✔
188
    }
1✔
189

190
    public static ResourceMetadata newResourceMetadataUnknown() {
191
      return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, false,null, null);
1✔
192
    }
193

194
    public static ResourceMetadata newResourceMetadataRequested() {
195
      return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, false, null, null);
1✔
196
    }
197

198
    public static ResourceMetadata newResourceMetadataDoesNotExist() {
199
      return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
1✔
200
    }
201

202
    public static ResourceMetadata newResourceMetadataTimeout() {
203
      return new ResourceMetadata(ResourceMetadataStatus.TIMEOUT, "", 0, false, null, null);
1✔
204
    }
205

206
    public static ResourceMetadata newResourceMetadataAcked(
207
        Any rawResource, String version, long updateTimeNanos) {
208
      checkNotNull(rawResource, "rawResource");
1✔
209
      return new ResourceMetadata(
1✔
210
          ResourceMetadataStatus.ACKED, version, updateTimeNanos, true, rawResource, null);
211
    }
212

213
    public static ResourceMetadata newResourceMetadataNacked(
214
        ResourceMetadata metadata, String failedVersion, long failedUpdateTime,
215
        String failedDetails, boolean cached) {
216
      checkNotNull(metadata, "metadata");
1✔
217
      return new ResourceMetadata(ResourceMetadataStatus.NACKED,
1✔
218
          metadata.getVersion(), metadata.getUpdateTimeNanos(), cached, metadata.getRawResource(),
1✔
219
          new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails));
220
    }
221

222
    /** The last successfully updated version of the resource. */
223
    public String getVersion() {
224
      return version;
1✔
225
    }
226

227
    /** The client status of this resource. */
228
    public ResourceMetadataStatus getStatus() {
229
      return status;
1✔
230
    }
231

232
    /** The timestamp when the resource was last successfully updated. */
233
    public long getUpdateTimeNanos() {
234
      return updateTimeNanos;
1✔
235
    }
236

237
    /** Returns whether the resource was cached. */
238
    public boolean isCached() {
239
      return cached;
1✔
240
    }
241

242
    /** The last successfully updated xDS resource as it was returned by the server. */
243
    @Nullable
244
    public Any getRawResource() {
245
      return rawResource;
1✔
246
    }
247

248
    /** The metadata capturing the error details of the last rejected update of the resource. */
249
    @Nullable
250
    public UpdateFailureState getErrorState() {
251
      return errorState;
1✔
252
    }
253

254
    /**
255
     * Resource status from the view of a xDS client, which tells the synchronization
256
     * status between the xDS client and the xDS server.
257
     *
258
     * <p>This is a native representation of xDS ConfigDump ClientResourceStatus, see
259
     * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto">
260
     * config_dump.proto</a>
261
     */
262
    public enum ResourceMetadataStatus {
1✔
263
      UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED, TIMEOUT
1✔
264
    }
265

266
    /**
267
     * Captures error metadata of failed resource updates.
268
     *
269
     * <p>This is a native representation of xDS ConfigDump UpdateFailureState, see
270
     * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto">
271
     * config_dump.proto</a>
272
     */
273
    public static final class UpdateFailureState {
274
      private final String failedVersion;
275
      private final long failedUpdateTimeNanos;
276
      private final String failedDetails;
277

278
      private UpdateFailureState(
279
          String failedVersion, long failedUpdateTimeNanos, String failedDetails) {
1✔
280
        this.failedVersion = checkNotNull(failedVersion, "failedVersion");
1✔
281
        this.failedUpdateTimeNanos = failedUpdateTimeNanos;
1✔
282
        this.failedDetails = checkNotNull(failedDetails, "failedDetails");
1✔
283
      }
1✔
284

285
      /** The rejected version string of the last failed update attempt. */
286
      public String getFailedVersion() {
287
        return failedVersion;
1✔
288
      }
289

290
      /** Details about the last failed update attempt. */
291
      public long getFailedUpdateTimeNanos() {
292
        return failedUpdateTimeNanos;
1✔
293
      }
294

295
      /** Timestamp of the last failed update attempt. */
296
      public String getFailedDetails() {
297
        return failedDetails;
1✔
298
      }
299
    }
300
  }
301

302
  /**
303
   * Shutdown this {@link XdsClient} and release resources.
304
   */
305
  public void shutdown() {
306
    throw new UnsupportedOperationException();
×
307
  }
308

309
  /**
310
   * Returns {@code true} if {@link #shutdown()} has been called.
311
   */
312
  public boolean isShutDown() {
313
    throw new UnsupportedOperationException();
×
314
  }
315

316
  /**
317
   * Returns the config used to bootstrap this XdsClient {@link Bootstrapper.BootstrapInfo}.
318
   */
319
  public Bootstrapper.BootstrapInfo getBootstrapInfo() {
320
    throw new UnsupportedOperationException();
×
321
  }
322

323
  /**
324
   * Returns the implementation specific security configuration used in this XdsClient.
325
   */
326
  public Object getSecurityConfig() {
327
    throw new UnsupportedOperationException();
×
328
  }
329

330
  /**
331
   * Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
332
   * they are at the moment of the call.
333
   *
334
   * <p>The snapshot is a map from the "resource type" to
335
   * a map ("resource name": "resource metadata").
336
   */
337
  // Must be synchronized.
338
  public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
339
      getSubscribedResourcesMetadataSnapshot() {
340
    throw new UnsupportedOperationException();
×
341
  }
342

343
  /**
344
   * Registers a data watcher for the given Xds resource.
345
   */
346
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
347
      String resourceName,
348
      ResourceWatcher<T> watcher,
349
      Executor executor) {
350
    throw new UnsupportedOperationException();
×
351
  }
352

353
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
354
      String resourceName,
355
      ResourceWatcher<T> watcher) {
356
    watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
1✔
357
  }
1✔
358

359
  /**
360
   * Unregisters the given resource watcher.
361
   */
362
  public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
363
      String resourceName,
364
      ResourceWatcher<T> watcher) {
365
    throw new UnsupportedOperationException();
×
366
  }
367

368
  /**
369
   * Adds drop stats for the specified cluster with edsServiceName by using the returned object
370
   * to record dropped requests. Drop stats recorded with the returned object will be reported
371
   * to the load reporting server. The returned object is reference counted and the caller should
372
   * use {@link LoadStatsManager2.ClusterDropStats#release} to release its <i>hard</i> reference
373
   * when it is safe to stop reporting dropped RPCs for the specified cluster in the future.
374
   */
375
  public LoadStatsManager2.ClusterDropStats addClusterDropStats(
376
      Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
377
    throw new UnsupportedOperationException();
×
378
  }
379

380
  /**
381
   * Adds load stats for the specified locality (in the specified cluster with edsServiceName) by
382
   * using the returned object to record RPCs. Load stats recorded with the returned object will
383
   * be reported to the load reporting server. The returned object is reference counted and the
384
   * caller should use {@link LoadStatsManager2.ClusterLocalityStats#release} to release its
385
   * <i>hard</i> reference when it is safe to stop reporting RPC loads for the specified locality
386
   * in the future.
387
   */
388
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
389
      Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
390
      Locality locality) {
391
    return addClusterLocalityStats(serverInfo, clusterName, edsServiceName, locality, null);
×
392
  }
393

394
  /**
395
   * Adds load stats for the specified locality (in the specified cluster with edsServiceName) by
396
   * using the returned object to record RPCs. Load stats recorded with the returned object will
397
   * be reported to the load reporting server. The returned object is reference counted and the
398
   * caller should use {@link LoadStatsManager2.ClusterLocalityStats#release} to release its
399
   * <i>hard</i> reference when it is safe to stop reporting RPC loads for the specified locality
400
   * in the future.
401
   *
402
   * @param backendMetricPropagation Configuration for which backend metrics should be propagated
403
   *     to LRS load reports. If null, all metrics will be propagated (legacy behavior).
404
   */
405
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
406
      Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
407
      Locality locality, @Nullable BackendMetricPropagation backendMetricPropagation) {
408
    throw new UnsupportedOperationException();
×
409
  }
410

411
  /**
412
   * Returns a map of control plane server info objects to the LoadReportClients that are
413
   * responsible for sending load reports to the control plane servers.
414
   */
415
  public Map<Bootstrapper.ServerInfo, LoadReportClient> getServerLrsClientMap() {
416
    throw new UnsupportedOperationException();
×
417
  }
418

419
  /** Callback used to report a gauge metric value for server connections. */
420
  public interface ServerConnectionCallback {
421
    void reportServerConnectionGauge(boolean isConnected, String xdsServer);
422
  }
423

424
  /**
425
   * Reports whether xDS client has a "working" ADS stream to xDS server. The definition of a
426
   * working stream is defined in gRFC A78.
427
   *
428
   * @see <a
429
   *     href="https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient">
430
   *     A78-grpc-metrics-wrr-pf-xds.md</a>
431
   */
432
  public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
433
    throw new UnsupportedOperationException();
×
434
  }
435

436
  static final class ProcessingTracker {
437
    private final AtomicInteger pendingTask = new AtomicInteger(1);
1✔
438
    private final Executor executor;
439
    private final Runnable completionListener;
440

441
    ProcessingTracker(Runnable completionListener, Executor executor) {
1✔
442
      this.executor = executor;
1✔
443
      this.completionListener = completionListener;
1✔
444
    }
1✔
445

446
    void startTask() {
447
      pendingTask.incrementAndGet();
1✔
448
    }
1✔
449

450
    void onComplete() {
451
      if (pendingTask.decrementAndGet() == 0) {
1✔
452
        executor.execute(completionListener);
1✔
453
      }
454
    }
1✔
455
  }
456

457
  interface XdsResponseHandler {
458
    /** Called when a xds response is received. */
459
    void handleResourceResponse(
460
        XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
461
        List<Any> resources, String nonce, boolean isFirstResponse,
462
        ProcessingTracker processingTracker);
463

464
    /** Called when the ADS stream is closed passively. */
465
    // Must be synchronized.
466
    void handleStreamClosed(Status error, boolean shouldTryFallback);
467
  }
468

469
  interface ResourceStore {
470

471
    /**
472
     * Returns the collection of resources currently subscribed to which have an authority matching
473
     * one of those for which the ControlPlaneClient associated with the specified ServerInfo is
474
     * the active one, or {@code null} if no such resources are currently subscribed to.
475
     *
476
     * <p>Note an empty collection indicates subscribing to resources of the given type with
477
     * wildcard mode.
478
     *
479
     * @param serverInfo the xds server to get the resources from
480
     * @param type       the type of the resources that should be retrieved
481
     */
482
    // Must be synchronized.
483
    @Nullable
484
    Collection<String> getSubscribedResources(
485
        ServerInfo serverInfo, XdsResourceType<? extends ResourceUpdate> type);
486

487
    Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
488

489
    /**
490
     * For any of the subscribers to one of the specified resources, if there isn't a result or
491
     * an existing timer for the resource, start a timer for the resource.
492
     */
493
    void startMissingResourceTimers(Collection<String> resourceNames,
494
                                    XdsResourceType<?> resourceType);
495
  }
496
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc