• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19437

23 Aug 2024 08:05PM UTC coverage: 84.514% (+0.01%) from 84.503%
#19437

push

github

web-flow
Xds client split (#11484)

33415 of 39538 relevant lines covered (84.51%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.38
/../xds/src/main/java/io/grpc/xds/CsdsService.java
1
/*
2
 * Copyright 2021 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Verify.verifyNotNull;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.util.concurrent.ListenableFuture;
24
import com.google.protobuf.util.Timestamps;
25
import io.envoyproxy.envoy.admin.v3.ClientResourceStatus;
26
import io.envoyproxy.envoy.service.status.v3.ClientConfig;
27
import io.envoyproxy.envoy.service.status.v3.ClientConfig.GenericXdsConfig;
28
import io.envoyproxy.envoy.service.status.v3.ClientStatusDiscoveryServiceGrpc;
29
import io.envoyproxy.envoy.service.status.v3.ClientStatusRequest;
30
import io.envoyproxy.envoy.service.status.v3.ClientStatusResponse;
31
import io.grpc.BindableService;
32
import io.grpc.ServerServiceDefinition;
33
import io.grpc.Status;
34
import io.grpc.StatusException;
35
import io.grpc.internal.ObjectPool;
36
import io.grpc.stub.StreamObserver;
37
import io.grpc.xds.client.XdsClient;
38
import io.grpc.xds.client.XdsClient.ResourceMetadata;
39
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
40
import io.grpc.xds.client.XdsClient.ResourceMetadata.UpdateFailureState;
41
import io.grpc.xds.client.XdsResourceType;
42
import java.util.ArrayList;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.concurrent.ExecutionException;
46
import java.util.concurrent.TimeUnit;
47
import java.util.concurrent.TimeoutException;
48
import java.util.logging.Level;
49
import java.util.logging.Logger;
50

51
/**
52
 * The CSDS service provides information about the status of a running xDS client.
53
 *
54
 * <p><a href="https://github.com/envoyproxy/envoy/blob/main/api/envoy/service/status/v3/csds.proto">
55
 * Client Status Discovery Service</a> is a service that exposes xDS config of a given client. See
56
 * the full design at <a href="https://github.com/grpc/proposal/blob/master/A40-csds-support.md">
57
 * gRFC A40: xDS Configuration Dump via Client Status Discovery Service in gRPC</a>.
58
 *
59
 * @since 1.37.0
60
 */
61
public final class CsdsService implements BindableService {
62
  private static final Logger logger = Logger.getLogger(CsdsService.class.getName());
1✔
63
  private final XdsClientPoolFactory xdsClientPoolFactory;
64
  private final CsdsServiceInternal delegate = new CsdsServiceInternal();
1✔
65

66
  @VisibleForTesting
67
  CsdsService(XdsClientPoolFactory xdsClientPoolFactory) {
1✔
68
    this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolProvider");
1✔
69
  }
1✔
70

71
  private CsdsService() {
72
    this(SharedXdsClientPoolProvider.getDefaultProvider());
1✔
73
  }
1✔
74

75
  /** Creates an instance. */
76
  public static CsdsService newInstance() {
77
    return new CsdsService();
1✔
78
  }
79

80
  @Override
81
  public ServerServiceDefinition bindService() {
82
    return delegate.bindService();
1✔
83
  }
84

85
  /** Hide protobuf from being exposed via the API. */
86
  private final class CsdsServiceInternal
1✔
87
      extends ClientStatusDiscoveryServiceGrpc.ClientStatusDiscoveryServiceImplBase {
88
    @Override
89
    public void fetchClientStatus(
90
        ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
91
      if (handleRequest(request, responseObserver)) {
1✔
92
        responseObserver.onCompleted();
1✔
93
      }
94
      // TODO(sergiitk): Add a case covering mutating handleRequest return false to true - to verify
95
      //   that responseObserver.onCompleted() isn't erroneously called on error.
96
    }
1✔
97

98
    @Override
99
    public StreamObserver<ClientStatusRequest> streamClientStatus(
100
        final StreamObserver<ClientStatusResponse> responseObserver) {
101
      return new StreamObserver<ClientStatusRequest>() {
1✔
102
        @Override
103
        public void onNext(ClientStatusRequest request) {
104
          handleRequest(request, responseObserver);
1✔
105
        }
1✔
106

107
        @Override
108
        public void onError(Throwable t) {
109
          onCompleted();
1✔
110
        }
1✔
111

112
        @Override
113
        public void onCompleted() {
114
          responseObserver.onCompleted();
1✔
115
        }
1✔
116
      };
117
    }
118
  }
119

120
  private boolean handleRequest(
121
      ClientStatusRequest request, StreamObserver<ClientStatusResponse> responseObserver) {
122
    StatusException error = null;
1✔
123

124
    if (request.getNodeMatchersCount() > 0) {
1✔
125
      error = new StatusException(
1✔
126
          Status.INVALID_ARGUMENT.withDescription("node_matchers not supported"));
1✔
127
    } else {
128
      List<String> targets = xdsClientPoolFactory.getTargets();
1✔
129
      List<ClientConfig> clientConfigs = new ArrayList<>(targets.size());
1✔
130

131
      for (int i = 0; i < targets.size() && error == null; i++) {
1✔
132
        try {
133
          ClientConfig clientConfig = getConfigForRequest(targets.get(i));
1✔
134
          if (clientConfig != null) {
1✔
135
            clientConfigs.add(clientConfig);
1✔
136
          }
137
        } catch (InterruptedException e) {
1✔
138
          Thread.currentThread().interrupt();
1✔
139
          logger.log(Level.FINE, "Server interrupted while building CSDS config dump", e);
1✔
140
          error = Status.ABORTED.withDescription("Thread interrupted").withCause(e).asException();
1✔
141
        } catch (RuntimeException e) {
1✔
142
          logger.log(Level.WARNING, "Unexpected error while building CSDS config dump", e);
1✔
143
          error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
1✔
144
              .asException();
1✔
145
        }
1✔
146
      }
147

148
      try {
149
        responseObserver.onNext(getStatusResponse(clientConfigs));
1✔
150
      } catch (RuntimeException e) {
×
151
        logger.log(Level.WARNING, "Unexpected error while processing CSDS config dump", e);
×
152
        error = Status.INTERNAL.withDescription("Unexpected internal error").withCause(e)
×
153
            .asException();
×
154
      }
1✔
155
    }
156

157
    if (error == null) {
1✔
158
      return true; // All clients reported without error
1✔
159
    }
160
    responseObserver.onError(error);
1✔
161
    return false;
1✔
162
  }
163

164
  private ClientConfig getConfigForRequest(String target) throws InterruptedException {
165
    ObjectPool<XdsClient> xdsClientPool = xdsClientPoolFactory.get(target);
1✔
166
    if (xdsClientPool == null) {
1✔
167
      return null;
1✔
168
    }
169

170
    XdsClient xdsClient = null;
1✔
171
    try {
172
      xdsClient = xdsClientPool.getObject();
1✔
173
      return getClientConfigForXdsClient(xdsClient, target);
1✔
174
    } finally {
175
      if (xdsClient != null) {
1✔
176
        xdsClientPool.returnObject(xdsClient);
1✔
177
      }
178
    }
179
  }
180

181
  private ClientStatusResponse getStatusResponse(List<ClientConfig> clientConfigs) {
182
    if (clientConfigs.isEmpty()) {
1✔
183
      return ClientStatusResponse.getDefaultInstance();
1✔
184
    }
185
    return ClientStatusResponse.newBuilder().addAllConfig(clientConfigs).build();
1✔
186
  }
187

188
  @VisibleForTesting
189
  static ClientConfig getClientConfigForXdsClient(XdsClient xdsClient, String target)
190
      throws InterruptedException {
191
    ClientConfig.Builder builder = ClientConfig.newBuilder()
1✔
192
        .setClientScope(target)
1✔
193
        .setNode(xdsClient.getBootstrapInfo().node().toEnvoyProtoNode());
1✔
194

195
    Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
1✔
196
        awaitSubscribedResourcesMetadata(xdsClient.getSubscribedResourcesMetadataSnapshot());
1✔
197

198
    for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry
199
        : metadataByType.entrySet()) {
1✔
200
      XdsResourceType<?> type = metadataByTypeEntry.getKey();
1✔
201
      Map<String, ResourceMetadata> metadataByResourceName = metadataByTypeEntry.getValue();
1✔
202
      for (Map.Entry<String, ResourceMetadata> metadataEntry : metadataByResourceName.entrySet()) {
1✔
203
        String resourceName = metadataEntry.getKey();
1✔
204
        ResourceMetadata metadata = metadataEntry.getValue();
1✔
205
        GenericXdsConfig.Builder genericXdsConfigBuilder = GenericXdsConfig.newBuilder()
1✔
206
            .setTypeUrl(type.typeUrl())
1✔
207
            .setName(resourceName)
1✔
208
            .setClientStatus(metadataStatusToClientStatus(metadata.getStatus()));
1✔
209
        if (metadata.getRawResource() != null) {
1✔
210
          genericXdsConfigBuilder
1✔
211
              .setVersionInfo(metadata.getVersion())
1✔
212
              .setLastUpdated(Timestamps.fromNanos(metadata.getUpdateTimeNanos()))
1✔
213
              .setXdsConfig(metadata.getRawResource());
1✔
214
        }
215
        if (metadata.getStatus() == ResourceMetadataStatus.NACKED) {
1✔
216
          verifyNotNull(metadata.getErrorState(), "resource %s getErrorState", resourceName);
×
217
          genericXdsConfigBuilder
×
218
              .setErrorState(metadataUpdateFailureStateToProto(metadata.getErrorState()));
×
219
        }
220
        builder.addGenericXdsConfigs(genericXdsConfigBuilder);
1✔
221
      }
1✔
222
    }
1✔
223
    return builder.build();
1✔
224
  }
225

226
  private static Map<XdsResourceType<?>, Map<String, ResourceMetadata>>
227
      awaitSubscribedResourcesMetadata(
228
      ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>> future)
229
      throws InterruptedException {
230
    try {
231
      // Normally this shouldn't take long, but add some slack for cases like a cold JVM.
232
      return future.get(20, TimeUnit.SECONDS);
1✔
233
    } catch (ExecutionException | TimeoutException e) {
1✔
234
      // For CSDS' purposes, the exact reason why metadata not loaded isn't important.
235
      throw new RuntimeException(e);
1✔
236
    }
237
  }
238

239
  @VisibleForTesting
240
  static ClientResourceStatus metadataStatusToClientStatus(ResourceMetadataStatus status) {
241
    switch (status) {
1✔
242
      case UNKNOWN:
243
        return ClientResourceStatus.UNKNOWN;
1✔
244
      case DOES_NOT_EXIST:
245
        return ClientResourceStatus.DOES_NOT_EXIST;
1✔
246
      case REQUESTED:
247
        return ClientResourceStatus.REQUESTED;
1✔
248
      case ACKED:
249
        return ClientResourceStatus.ACKED;
1✔
250
      case NACKED:
251
        return ClientResourceStatus.NACKED;
1✔
252
      default:
253
        throw new AssertionError("Unexpected ResourceMetadataStatus: " + status);
×
254
    }
255
  }
256

257
  private static io.envoyproxy.envoy.admin.v3.UpdateFailureState metadataUpdateFailureStateToProto(
258
      UpdateFailureState errorState) {
259
    return io.envoyproxy.envoy.admin.v3.UpdateFailureState.newBuilder()
×
260
        .setLastUpdateAttempt(Timestamps.fromNanos(errorState.getFailedUpdateTimeNanos()))
×
261
        .setDetails(errorState.getFailedDetails())
×
262
        .setVersionInfo(errorState.getFailedVersion())
×
263
        .build();
×
264
  }
265
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc