• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19563

26 Nov 2024 12:47AM UTC coverage: 88.559% (-0.02%) from 88.582%
#19563

push

github

web-flow
xds: Add counter and gauge metrics  (#11661)

Adds the following xDS client metrics defined in [A78](https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient).

Counters
- grpc.xds_client.server_failure
- grpc.xds_client.resource_updates_valid
- grpc.xds_client.resource_updates_invalid

Gauges
- grpc.xds_client.connected
- grpc.xds_client.resources

33368 of 37679 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.85
/../xds/src/main/java/io/grpc/xds/client/XdsClient.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.client;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.xds.client.Bootstrapper.XDSTP_SCHEME;
21

22
import com.google.common.base.Joiner;
23
import com.google.common.base.Splitter;
24
import com.google.common.net.UrlEscapers;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.common.util.concurrent.MoreExecutors;
27
import com.google.protobuf.Any;
28
import io.grpc.ExperimentalApi;
29
import io.grpc.Status;
30
import io.grpc.xds.client.Bootstrapper.ServerInfo;
31
import java.net.URI;
32
import java.net.URISyntaxException;
33
import java.util.ArrayList;
34
import java.util.Collection;
35
import java.util.Collections;
36
import java.util.List;
37
import java.util.Map;
38
import java.util.concurrent.Executor;
39
import java.util.concurrent.Future;
40
import java.util.concurrent.atomic.AtomicInteger;
41
import javax.annotation.Nullable;
42

43
/**
44
 * An {@link XdsClient} instance encapsulates all of the logic for communicating with the xDS
45
 * server. It may create multiple RPC streams (or a single ADS stream) for a series of xDS
46
 * protocols (e.g., LDS, RDS, VHDS, CDS and EDS) over a single channel. Watch-based interfaces
47
 * are provided for each set of data needed by gRPC.
48
 */
49
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
50
public abstract class XdsClient {
1✔
51

52
  public static boolean isResourceNameValid(String resourceName, String typeUrl) {
53
    checkNotNull(resourceName, "resourceName");
1✔
54
    if (!resourceName.startsWith(XDSTP_SCHEME)) {
1✔
55
      return true;
1✔
56
    }
57
    URI uri;
58
    try {
59
      uri = new URI(resourceName);
1✔
60
    } catch (URISyntaxException e) {
×
61
      return false;
×
62
    }
1✔
63
    String path = uri.getPath();
1✔
64
    // path must be in the form of /{resource type}/{id/*}
65
    Splitter slashSplitter = Splitter.on('/').omitEmptyStrings();
1✔
66
    if (path == null) {
1✔
67
      return false;
×
68
    }
69
    List<String> pathSegs = slashSplitter.splitToList(path);
1✔
70
    if (pathSegs.size() < 2) {
1✔
71
      return false;
1✔
72
    }
73
    String type = pathSegs.get(0);
1✔
74
    if (!type.equals(slashSplitter.splitToList(typeUrl).get(1))) {
1✔
75
      return false;
1✔
76
    }
77
    return true;
1✔
78
  }
79

80
  /*
81
   * Convert the XDSTP resource name to its canonical version.
82
   */
83
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
84
  public static String canonifyResourceName(String resourceName) {
85
    checkNotNull(resourceName, "resourceName");
1✔
86
    if (!resourceName.startsWith(XDSTP_SCHEME)) {
1✔
87
      return resourceName;
1✔
88
    }
89
    URI uri = URI.create(resourceName);
1✔
90
    String rawQuery = uri.getRawQuery();
1✔
91
    Splitter ampSplitter = Splitter.on('&').omitEmptyStrings();
1✔
92
    if (rawQuery == null) {
1✔
93
      return resourceName;
1✔
94
    }
95
    List<String> queries = ampSplitter.splitToList(rawQuery);
1✔
96
    if (queries.size() < 2) {
1✔
97
      return resourceName;
1✔
98
    }
99
    List<String> canonicalContextParams = new ArrayList<>(queries.size());
1✔
100
    for (String query : queries) {
1✔
101
      canonicalContextParams.add(query);
1✔
102
    }
1✔
103
    Collections.sort(canonicalContextParams);
1✔
104
    String canonifiedQuery = Joiner.on('&').join(canonicalContextParams);
1✔
105
    return resourceName.replace(rawQuery, canonifiedQuery);
1✔
106
  }
107

108
  /*
109
   * Percent encode the input using the url path segment escaper.
110
   */
111
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
112
  public static String percentEncodePath(String input) {
113
    Iterable<String> pathSegs = Splitter.on('/').split(input);
1✔
114
    List<String> encodedSegs = new ArrayList<>();
1✔
115
    for (String pathSeg : pathSegs) {
1✔
116
      encodedSegs.add(UrlEscapers.urlPathSegmentEscaper().escape(pathSeg));
1✔
117
    }
1✔
118
    return Joiner.on('/').join(encodedSegs);
1✔
119
  }
120

121
  public interface ResourceUpdate {}
122

123
  /**
124
   * Watcher interface for a single requested xDS resource.
125
   */
126
  @ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
127
  public interface ResourceWatcher<T extends ResourceUpdate> {
128

129
    /**
130
     * Called when the resource discovery RPC encounters some transient error.
131
     *
132
     * <p>Note that we expect that the implementer to:
133
     * - Comply with the guarantee to not generate certain statuses by the library:
134
     *   https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
135
     *   propagated to the channel, override it with {@link io.grpc.Status.Code#UNAVAILABLE}.
136
     * - Keep {@link Status} description in one form or another, as it contains valuable debugging
137
     *   information.
138
     */
139
    void onError(Status error);
140

141
    /**
142
     * Called when the requested resource is not available.
143
     *
144
     * @param resourceName name of the resource requested in discovery request.
145
     */
146
    void onResourceDoesNotExist(String resourceName);
147

148
    void onChanged(T update);
149
  }
150

151
  /**
152
   * The metadata of the xDS resource; used by the xDS config dump.
153
   */
154
  public static final class ResourceMetadata {
155
    private final String version;
156
    private final ResourceMetadataStatus status;
157
    private final long updateTimeNanos;
158
    private final boolean cached;
159
    @Nullable private final Any rawResource;
160
    @Nullable private final UpdateFailureState errorState;
161

162
    private ResourceMetadata(
163
        ResourceMetadataStatus status, String version, long updateTimeNanos, boolean cached,
164
        @Nullable Any rawResource, @Nullable UpdateFailureState errorState) {
1✔
165
      this.status = checkNotNull(status, "status");
1✔
166
      this.version = checkNotNull(version, "version");
1✔
167
      this.updateTimeNanos = updateTimeNanos;
1✔
168
      this.cached = cached;
1✔
169
      this.rawResource = rawResource;
1✔
170
      this.errorState = errorState;
1✔
171
    }
1✔
172

173
    public static ResourceMetadata newResourceMetadataUnknown() {
174
      return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, false,null, null);
1✔
175
    }
176

177
    public static ResourceMetadata newResourceMetadataRequested() {
178
      return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, false, null, null);
1✔
179
    }
180

181
    public static ResourceMetadata newResourceMetadataDoesNotExist() {
182
      return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
1✔
183
    }
184

185
    public static ResourceMetadata newResourceMetadataAcked(
186
        Any rawResource, String version, long updateTimeNanos) {
187
      checkNotNull(rawResource, "rawResource");
1✔
188
      return new ResourceMetadata(
1✔
189
          ResourceMetadataStatus.ACKED, version, updateTimeNanos, true, rawResource, null);
190
    }
191

192
    public static ResourceMetadata newResourceMetadataNacked(
193
        ResourceMetadata metadata, String failedVersion, long failedUpdateTime,
194
        String failedDetails, boolean cached) {
195
      checkNotNull(metadata, "metadata");
1✔
196
      return new ResourceMetadata(ResourceMetadataStatus.NACKED,
1✔
197
          metadata.getVersion(), metadata.getUpdateTimeNanos(), cached, metadata.getRawResource(),
1✔
198
          new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails));
199
    }
200

201
    /** The last successfully updated version of the resource. */
202
    public String getVersion() {
203
      return version;
1✔
204
    }
205

206
    /** The client status of this resource. */
207
    public ResourceMetadataStatus getStatus() {
208
      return status;
1✔
209
    }
210

211
    /** The timestamp when the resource was last successfully updated. */
212
    public long getUpdateTimeNanos() {
213
      return updateTimeNanos;
1✔
214
    }
215

216
    /** Returns whether the resource was cached. */
217
    public boolean isCached() {
218
      return cached;
1✔
219
    }
220

221
    /** The last successfully updated xDS resource as it was returned by the server. */
222
    @Nullable
223
    public Any getRawResource() {
224
      return rawResource;
1✔
225
    }
226

227
    /** The metadata capturing the error details of the last rejected update of the resource. */
228
    @Nullable
229
    public UpdateFailureState getErrorState() {
230
      return errorState;
1✔
231
    }
232

233
    /**
234
     * Resource status from the view of a xDS client, which tells the synchronization
235
     * status between the xDS client and the xDS server.
236
     *
237
     * <p>This is a native representation of xDS ConfigDump ClientResourceStatus, see
238
     * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto">
239
     * config_dump.proto</a>
240
     */
241
    public enum ResourceMetadataStatus {
1✔
242
      UNKNOWN, REQUESTED, DOES_NOT_EXIST, ACKED, NACKED
1✔
243
    }
244

245
    /**
246
     * Captures error metadata of failed resource updates.
247
     *
248
     * <p>This is a native representation of xDS ConfigDump UpdateFailureState, see
249
     * <a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/admin/v3/config_dump.proto">
250
     * config_dump.proto</a>
251
     */
252
    public static final class UpdateFailureState {
253
      private final String failedVersion;
254
      private final long failedUpdateTimeNanos;
255
      private final String failedDetails;
256

257
      private UpdateFailureState(
258
          String failedVersion, long failedUpdateTimeNanos, String failedDetails) {
1✔
259
        this.failedVersion = checkNotNull(failedVersion, "failedVersion");
1✔
260
        this.failedUpdateTimeNanos = failedUpdateTimeNanos;
1✔
261
        this.failedDetails = checkNotNull(failedDetails, "failedDetails");
1✔
262
      }
1✔
263

264
      /** The rejected version string of the last failed update attempt. */
265
      public String getFailedVersion() {
266
        return failedVersion;
1✔
267
      }
268

269
      /** Details about the last failed update attempt. */
270
      public long getFailedUpdateTimeNanos() {
271
        return failedUpdateTimeNanos;
1✔
272
      }
273

274
      /** Timestamp of the last failed update attempt. */
275
      public String getFailedDetails() {
276
        return failedDetails;
1✔
277
      }
278
    }
279
  }
280

281
  /**
282
   * Shutdown this {@link XdsClient} and release resources.
283
   */
284
  public void shutdown() {
285
    throw new UnsupportedOperationException();
×
286
  }
287

288
  /**
289
   * Returns {@code true} if {@link #shutdown()} has been called.
290
   */
291
  public boolean isShutDown() {
292
    throw new UnsupportedOperationException();
×
293
  }
294

295
  /**
296
   * Returns the config used to bootstrap this XdsClient {@link Bootstrapper.BootstrapInfo}.
297
   */
298
  public Bootstrapper.BootstrapInfo getBootstrapInfo() {
299
    throw new UnsupportedOperationException();
×
300
  }
301

302
  /**
303
   * Returns the implementation specific security configuration used in this XdsClient.
304
   */
305
  public Object getSecurityConfig() {
306
    throw new UnsupportedOperationException();
×
307
  }
308

309
  /**
310
   * For all subscriber's for the specified server, if the resource hasn't yet been
311
   * resolved then start a timer for it.
312
   */
313
  protected void startSubscriberTimersIfNeeded(ServerInfo serverInfo) {
314
    throw new UnsupportedOperationException();
×
315
  }
316

317
  /**
318
   * Returns a {@link ListenableFuture} to the snapshot of the subscribed resources as
319
   * they are at the moment of the call.
320
   *
321
   * <p>The snapshot is a map from the "resource type" to
322
   * a map ("resource name": "resource metadata").
323
   */
324
  // Must be synchronized.
325
  public ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
326
      getSubscribedResourcesMetadataSnapshot() {
327
    throw new UnsupportedOperationException();
×
328
  }
329

330
  /**
331
   * Registers a data watcher for the given Xds resource.
332
   */
333
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
334
      String resourceName,
335
      ResourceWatcher<T> watcher,
336
      Executor executor) {
337
    throw new UnsupportedOperationException();
×
338
  }
339

340
  public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
341
      String resourceName,
342
      ResourceWatcher<T> watcher) {
343
    watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
1✔
344
  }
1✔
345

346
  /**
347
   * Unregisters the given resource watcher.
348
   */
349
  public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
350
      String resourceName,
351
      ResourceWatcher<T> watcher) {
352
    throw new UnsupportedOperationException();
×
353
  }
354

355
  /**
356
   * Adds drop stats for the specified cluster with edsServiceName by using the returned object
357
   * to record dropped requests. Drop stats recorded with the returned object will be reported
358
   * to the load reporting server. The returned object is reference counted and the caller should
359
   * use {@link LoadStatsManager2.ClusterDropStats#release} to release its <i>hard</i> reference
360
   * when it is safe to stop reporting dropped RPCs for the specified cluster in the future.
361
   */
362
  public LoadStatsManager2.ClusterDropStats addClusterDropStats(
363
      Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName) {
364
    throw new UnsupportedOperationException();
×
365
  }
366

367
  /**
368
   * Adds load stats for the specified locality (in the specified cluster with edsServiceName) by
369
   * using the returned object to record RPCs. Load stats recorded with the returned object will
370
   * be reported to the load reporting server. The returned object is reference counted and the
371
   * caller should use {@link LoadStatsManager2.ClusterLocalityStats#release} to release its
372
   * <i>hard</i> reference when it is safe to stop reporting RPC loads for the specified locality
373
   * in the future.
374
   */
375
  public LoadStatsManager2.ClusterLocalityStats addClusterLocalityStats(
376
      Bootstrapper.ServerInfo serverInfo, String clusterName, @Nullable String edsServiceName,
377
      Locality locality) {
378
    throw new UnsupportedOperationException();
×
379
  }
380

381
  /**
382
   * Returns a map of control plane server info objects to the LoadReportClients that are
383
   * responsible for sending load reports to the control plane servers.
384
   */
385
  public Map<Bootstrapper.ServerInfo, LoadReportClient> getServerLrsClientMap() {
386
    throw new UnsupportedOperationException();
×
387
  }
388

389
  /** Callback used to report a gauge metric value for server connections. */
390
  public interface ServerConnectionCallback {
391
    void reportServerConnectionGauge(boolean isConnected, String xdsServer);
392
  }
393

394
  /**
395
   * Reports whether xDS client has a "working" ADS stream to xDS server. The definition of a
396
   * working stream is defined in gRFC A78.
397
   *
398
   * @see <a
399
   *     href="https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient">
400
   *     A78-grpc-metrics-wrr-pf-xds.md</a>
401
   */
402
  public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
403
    throw new UnsupportedOperationException();
×
404
  }
405

406
  static final class ProcessingTracker {
407
    private final AtomicInteger pendingTask = new AtomicInteger(1);
1✔
408
    private final Executor executor;
409
    private final Runnable completionListener;
410

411
    ProcessingTracker(Runnable completionListener, Executor executor) {
1✔
412
      this.executor = executor;
1✔
413
      this.completionListener = completionListener;
1✔
414
    }
1✔
415

416
    void startTask() {
417
      pendingTask.incrementAndGet();
1✔
418
    }
1✔
419

420
    void onComplete() {
421
      if (pendingTask.decrementAndGet() == 0) {
1✔
422
        executor.execute(completionListener);
1✔
423
      }
424
    }
1✔
425
  }
426

427
  interface XdsResponseHandler {
428
    /** Called when a xds response is received. */
429
    void handleResourceResponse(
430
        XdsResourceType<?> resourceType, ServerInfo serverInfo, String versionInfo,
431
        List<Any> resources, String nonce, ProcessingTracker processingTracker);
432

433
    /** Called when the ADS stream is closed passively. */
434
    // Must be synchronized.
435
    void handleStreamClosed(Status error);
436

437
    /** Called when the ADS stream has been recreated. */
438
    // Must be synchronized.
439
    void handleStreamRestarted(ServerInfo serverInfo);
440
  }
441

442
  public interface ResourceStore {
443
    /**
444
     * Returns the collection of resources currently subscribing to or {@code null} if not
445
     * subscribing to any resources for the given type.
446
     *
447
     * <p>Note an empty collection indicates subscribing to resources of the given type with
448
     * wildcard mode.
449
     */
450
    // Must be synchronized.
451
    @Nullable
452
    Collection<String> getSubscribedResources(ServerInfo serverInfo,
453
                                              XdsResourceType<? extends ResourceUpdate> type);
454

455
    Map<String, XdsResourceType<?>> getSubscribedResourceTypesWithTypeUrl();
456
  }
457
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc