• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19099

11 Mar 2024 11:14PM UTC coverage: 88.284% (+0.02%) from 88.263%
#19099

push

github

web-flow
xds: Stabilize CsdsService (#11003)

To make it stable, this PR hides protobuf from being exposed via the
API.

Note: this breaks ABI of `CsdsService.streamClientStatus` and
`CsdsService.fetchClientStatus`, but these methods should not
normally be called by the user.

Closes #8016.

31159 of 35294 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/../xds/src/main/java/io/grpc/xds/CsdsService.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Verify.verifyNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.util.concurrent.ListenableFuture;
24
import com.google.protobuf.util.Timestamps;
25
import io.envoyproxy.envoy.admin.v3.ClientResourceStatus;
26
import io.envoyproxy.envoy.service.status.v3.ClientConfig;
27
import io.envoyproxy.envoy.service.status.v3.ClientConfig.GenericXdsConfig;
28
import io.envoyproxy.envoy.service.status.v3.ClientStatusDiscoveryServiceGrpc;
29
import io.envoyproxy.envoy.service.status.v3.ClientStatusRequest;
30
import io.envoyproxy.envoy.service.status.v3.ClientStatusResponse;
31
import io.grpc.BindableService;
32
import io.grpc.ServerServiceDefinition;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ObjectPool;
36
import io.grpc.stub.StreamObserver;
37
import io.grpc.xds.client.XdsClient;
38
import io.grpc.xds.client.XdsClient.ResourceMetadata;
39
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
40
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
41
import io.grpc.xds.client.XdsResourceType;
42
import java.util.Map;
43
import java.util.concurrent.ExecutionException;
44
import java.util.concurrent.TimeUnit;
45
import java.util.concurrent.TimeoutException;
46
import java.util.logging.Level;
47
import java.util.logging.Logger;
48

49
/**
50
 * The CSDS service provides information about the status of a running xDS client.
51
 *
52
 * <p><a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.proto">
53
 * Client Status Discovery Service</a> is a service that exposes xDS config of a given client. See
54
 * the full design at <a href="https://github.com/grpc/proposal/blob/master/A40-csds-support.md">
55
 * gRFC A40: xDS Configuration Dump via Client Status Discovery Service in gRPC</a>.
56
 *
57
 * @since 1.37.0
58
 */
59
public final class CsdsService implements BindableService {
60
  private static final Logger logger = Logger.getLogger(CsdsService.class.getName());
1✔
61
  private final XdsClientPoolFactory xdsClientPoolFactory;
62
  private final CsdsServiceInternal delegate = new CsdsServiceInternal();
1✔
63

64
  @VisibleForTesting
65
  CsdsService(XdsClientPoolFactory xdsClientPoolFactory) {
1✔
66
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolProvider");
1✔
67
  }
1✔
68

69
  private CsdsService() {
70
    this(SharedXdsClientPoolProvider.getDefaultProvider());
1✔
71
  }
1✔
72

73
  /** Creates an instance. */
74
  public static CsdsService newInstance() {
75
    return new CsdsService();
1✔
76
  }
77

78
  @Override
79
  public ServerServiceDefinition bindService() {
80
    return delegate.bindService();
1✔
81
  }
82

83
  /** Hide protobuf from being exposed via the API. */
84
  private final class CsdsServiceInternal
1✔
85
      extends ClientStatusDiscoveryServiceGrpc.ClientStatusDiscoveryServiceImplBase {
86
    @Override
87
    public void fetchClientStatus(
88
        ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
89
      if (handleRequest(request, responseObserver)) {
1✔
90
        responseObserver.onCompleted();
1✔
91
      }
92
      // TODO(sergiitk): Add a case covering mutating handleRequest return false to true - to verify
93
      //   that responseObserver.onCompleted() isn't erroneously called on error.
94
    }
1✔
95

96
    @Override
97
    public StreamObserver<ClientStatusRequest> streamClientStatus(
98
        final StreamObserver<ClientStatusResponse> responseObserver) {
99
      return new StreamObserver<ClientStatusRequest>() {
1✔
100
        @Override
101
        public void onNext(ClientStatusRequest request) {
102
          handleRequest(request, responseObserver);
1✔
103
        }
1✔
104

105
        @Override
106
        public void onError(Throwable t) {
107
          onCompleted();
1✔
108
        }
1✔
109

110
        @Override
111
        public void onCompleted() {
112
          responseObserver.onCompleted();
1✔
113
        }
1✔
114
      };
115
    }
116
  }
117

118
  private boolean handleRequest(
119
      ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
120
    StatusException error;
121
    try {
122
      responseObserver.onNext(getConfigDumpForRequest(request));
1✔
123
      return true;
1✔
124
    } catch (StatusException e) {
1✔
125
      error = e;
1✔
126
    } catch (InterruptedException e) {
1✔
127
      Thread.currentThread().interrupt();
1✔
128
      logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
1✔
129
      error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
1✔
130
    } catch (RuntimeException e) {
1✔
131
      logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
1✔
132
      error =
1✔
133
          Status.INTERNAL.withDescription("Unexpected internal error").withCause(e).asException();
1✔
134
    }
1✔
135
    responseObserver.onError(error);
1✔
136
    return false;
1✔
137
  }
138

139
  private ClientStatusResponse getConfigDumpForRequest(ClientStatusRequest request)
140
      throws StatusException, InterruptedException {
141
    if (request.getNodeMatchersCount() > 0) {
1✔
142
      throw new StatusException(
1✔
143
          Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
1✔
144
    }
145

146
    ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get();
1✔
147
    if (xdsClientPool == null) {
1✔
148
      return ClientStatusResponse.getDefaultInstance();
1✔
149
    }
150

151
    XdsClient xdsClient = null;
1✔
152
    try {
153
      xdsClient = xdsClientPool.getObject();
1✔
154
      return ClientStatusResponse.newBuilder()
1✔
155
          .addConfig(getClientConfigForXdsClient(xdsClient))
1✔
156
          .build();
1✔
157
    } finally {
158
      if (xdsClient != null) {
1✔
159
        xdsClientPool.returnObject(xdsClient);
1✔
160
      }
161
    }
162
  }
163

164
  @VisibleForTesting
165
  static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient) throws InterruptedException {
166
    ClientConfig.Builder builder = ClientConfig.newBuilder()
1✔
167
        .setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());
1✔
168

169
    Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
1✔
170
        awaitSubscribedResourcesMetadata(xdsClient.getSubscribedResourcesMetadataSnapshot());
1✔
171

172
    for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry
173
        : metadataByType.entrySet()) {
1✔
174
      XdsResourceType<?> type = metadataByTypeEntry.getKey();
1✔
175
      Map<String, ResourceMetadata> metadataByResourceName = metadataByTypeEntry.getValue();
1✔
176
      for (Map.Entry<String, ResourceMetadata> metadataEntry : metadataByResourceName.entrySet()) {
1✔
177
        String resourceName = metadataEntry.getKey();
1✔
178
        ResourceMetadata metadata = metadataEntry.getValue();
1✔
179
        GenericXdsConfig.Builder genericXdsConfigBuilder = GenericXdsConfig.newBuilder()
1✔
180
            .setTypeUrl(type.typeUrl())
1✔
181
            .setName(resourceName)
1✔
182
            .setClientStatus(metadataStatusToClientStatus(metadata.getStatus()));
1✔
183
        if (metadata.getRawResource() != null) {
1✔
184
          genericXdsConfigBuilder
1✔
185
              .setVersionInfo(metadata.getVersion())
1✔
186
              .setLastUpdated(Timestamps.fromNanos(metadata.getUpdateTimeNanos()))
1✔
187
              .setXdsConfig(metadata.getRawResource());
1✔
188
        }
189
        if (metadata.getStatus() == ResourceMetadataStatus.NACKED) {
1✔
190
          verifyNotNull(metadata.getErrorState(), "resource %s getErrorState", resourceName);
×
191
          genericXdsConfigBuilder
×
192
              .setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
×
193
        }
194
        builder.addGenericXdsConfigs(genericXdsConfigBuilder);
1✔
195
      }
1✔
196
    }
1✔
197
    return builder.build();
1✔
198
  }
199

200
  private static Map<XdsResourceType<?>, Map<String, ResourceMetadata>>
201
      awaitSubscribedResourcesMetadata(
202
      ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future)
203
      throws InterruptedException {
204
    try {
205
      // Normally this shouldn't take long, but add some slack for cases like a cold JVM.
206
      return future.get(20, TimeUnit.SECONDS);
1✔
207
    } catch (ExecutionException | TimeoutException e) {
1✔
208
      // For CSDS' purposes, the exact reason why metadata not loaded isn't important.
209
      throw new RuntimeException(e);
1✔
210
    }
211
  }
212

213
  @VisibleForTesting
214
  static ClientResourceStatus metadataStatusToClientStatus(ResourceMetadataStatus status) {
215
    switch (status) {
1✔
216
      case UNKNOWN:
217
        return ClientResourceStatus.UNKNOWN;
1✔
218
      case DOES_NOT_EXIST:
219
        return ClientResourceStatus.DOES_NOT_EXIST;
1✔
220
      case REQUESTED:
221
        return ClientResourceStatus.REQUESTED;
1✔
222
      case ACKED:
223
        return ClientResourceStatus.ACKED;
1✔
224
      case NACKED:
225
        return ClientResourceStatus.NACKED;
1✔
226
      default:
227
        throw new AssertionError("Unexpected ResourceMetadataStatus: " + status);
×
228
    }
229
  }
230

231
  private static io.envoyproxy.envoy.admin.v3.UpdateFailureState metadataUpdateFailureStateToProto(
232
      UpdateFailureState errorState) {
233
    return io.envoyproxy.envoy.admin.v3.UpdateFailureState.newBuilder()
×
234
        .setLastUpdateAttempt(Timestamps.fromNanos(errorState.getFailedUpdateTimeNanos()))
×
235
        .setDetails(errorState.getFailedDetails())
×
236
        .setVersionInfo(errorState.getFailedVersion())
×
237
        .build();
×
238
  }
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc