• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19203

06 May 2024 11:11PM CUT coverage: 88.25% (+0.006%) from 88.244%
#19203

push

github

web-flow
buildscripts: simplify PSM interop Kokoro buildscripts (#11121) (#11165)

Integrates the new features of the the Kokoro PSM Interop install library introduced in grpc/psm-interop#73.

Nearly all common functionality was moved from per-language/per-branch PSM Interop build scripts to [psm_interop_kokoro_lib.sh](https://github.com/grpc/psm-interop/blob/main/.kokoro/psm_interop_kokoro_lib.sh):
1. The list of tests in the each test suite
2. Per-test-suite flag customization
3. `run_test` methods
4. `build_docker_images_if_needed` methods
5. Generic `build_test_app_docker_images` methods (simple docker build + docker push + docker tag). grpc-java is one exception, as it doesn't run docker directly, but a cloudbuild flow.

Now all PSM Interop jobs share the same buildscripts by all test suites:
1.  buildscript that invokes the test: `psm-interop-test-{language}.sh` (configured as `build_file` in the build cfg)
2. buildscript that builds the xDS test client/server and publishes them as a Docker image: `psm-interop-build-{language}.sh` (conventional name called from `psm_interop_kokoro_lib.sh`)

`psm-interop-test-{language}.sh`:
1. Sets `GRPC_LANGUAGE`, `BUILD_SCRIPT_DIR` environment variables.
2. Downloads the shared `psm_interop_kokoro_lib.sh` from the main branch of the psm-interop repo.
3. Sources `psm-interop-build-{language}.sh`
4. Calls `psm::run "${PSM_TEST_SUITE}"` (`PSM_TEST_SUITE` configured in the cfg file).

`psm-interop-build-{language}.sh`:
1. Defines `psm::lang::build_docker_images` which is called from `psm_interop_kokoro_lib.sh`.
2. Invokes any repo-specific logic.
3. May use `psm::build::docker_images_generic` for generic Docker build, tag, push, or provide implement its own build/publish method.

References:
- b/288578634
- See the full list of the new features at grpc/psm-interop#73.
- Additional fixes to the shared lib: grpc/psm-interop#78, grpc/psm-interop#79

30629 of 34707 relevant lines covered (88.25%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.06
/../xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.Stopwatch;
25
import com.google.common.base.Supplier;
26
import com.google.protobuf.Any;
27
import com.google.rpc.Code;
28
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
29
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
30
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
31
import io.grpc.Channel;
32
import io.grpc.Context;
33
import io.grpc.InternalLogId;
34
import io.grpc.ManagedChannel;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext;
37
import io.grpc.SynchronizationContext.ScheduledHandle;
38
import io.grpc.internal.BackoffPolicy;
39
import io.grpc.stub.ClientCallStreamObserver;
40
import io.grpc.stub.ClientResponseObserver;
41
import io.grpc.stub.StreamObserver;
42
import io.grpc.xds.Bootstrapper.ServerInfo;
43
import io.grpc.xds.EnvoyProtoData.Node;
44
import io.grpc.xds.XdsClient.ResourceStore;
45
import io.grpc.xds.XdsClient.XdsResponseHandler;
46
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
47
import io.grpc.xds.XdsLogger.XdsLogLevel;
48
import java.util.Collection;
49
import java.util.Collections;
50
import java.util.HashMap;
51
import java.util.HashSet;
52
import java.util.List;
53
import java.util.Map;
54
import java.util.Set;
55
import java.util.concurrent.ScheduledExecutorService;
56
import java.util.concurrent.TimeUnit;
57
import javax.annotation.Nullable;
58

59
/**
60
 * Common base type for XdsClient implementations, which encapsulates the layer abstraction of
61
 * the xDS RPC stream.
62
 */
63
final class ControlPlaneClient {
64

65
  public static final String CLOSED_BY_SERVER = "Closed by server";
66
  private final SynchronizationContext syncContext;
67
  private final InternalLogId logId;
68
  private final XdsLogger logger;
69
  private final ServerInfo serverInfo;
70
  private final ManagedChannel channel;
71
  private final XdsResponseHandler xdsResponseHandler;
72
  private final ResourceStore resourceStore;
73
  private final Context context;
74
  private final ScheduledExecutorService timeService;
75
  private final BackoffPolicy.Provider backoffPolicyProvider;
76
  private final Stopwatch stopwatch;
77
  private final Node bootstrapNode;
78
  private final XdsClient.TimerLaunch timerLaunch;
79

80
  // Last successfully applied version_info for each resource type. Starts with empty string.
81
  // A version_info is used to update management server with client's most recent knowledge of
82
  // resources.
83
  private final Map<XdsResourceType<?>, String> versions = new HashMap<>();
1✔
84

85
  private boolean shutdown;
86
  @Nullable
87
  private AbstractAdsStream adsStream;
88
  @Nullable
89
  private BackoffPolicy retryBackoffPolicy;
90
  @Nullable
91
  private ScheduledHandle rpcRetryTimer;
92

93
  /** An entity that manages ADS RPCs over a single channel. */
94
  // TODO: rename to XdsChannel
95
  ControlPlaneClient(
96
      XdsChannelFactory xdsChannelFactory,
97
      ServerInfo serverInfo,
98
      Node bootstrapNode,
99
      XdsResponseHandler xdsResponseHandler,
100
      ResourceStore resourceStore,
101
      Context context,
102
      ScheduledExecutorService
103
      timeService,
104
      SynchronizationContext syncContext,
105
      BackoffPolicy.Provider backoffPolicyProvider,
106
      Supplier<Stopwatch> stopwatchSupplier,
107
      XdsClient.TimerLaunch timerLaunch) {
1✔
108
    this.serverInfo = checkNotNull(serverInfo, "serverInfo");
1✔
109
    this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
1✔
110
    this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
1✔
111
    this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
1✔
112
    this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
1✔
113
    this.context = checkNotNull(context, "context");
1✔
114
    this.timeService = checkNotNull(timeService, "timeService");
1✔
115
    this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
116
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
117
    this.timerLaunch  = checkNotNull(timerLaunch, "timerLaunch");
1✔
118
    stopwatch = checkNotNull(stopwatchSupplier, "stopwatchSupplier").get();
1✔
119
    logId = InternalLogId.allocate("xds-client", serverInfo.target());
1✔
120
    logger = XdsLogger.withLogId(logId);
1✔
121
    logger.log(XdsLogLevel.INFO, "Created");
1✔
122
  }
1✔
123

124
  /** The underlying channel. */
125
  // Currently, only externally used for LrsClient.
126
  Channel channel() {
127
    return channel;
1✔
128
  }
129

130
  void shutdown() {
131
    syncContext.execute(new Runnable() {
1✔
132
      @Override
133
      public void run() {
134
        shutdown = true;
1✔
135
        logger.log(XdsLogLevel.INFO, "Shutting down");
1✔
136
        if (adsStream != null) {
1✔
137
          adsStream.close(Status.CANCELLED.withDescription("shutdown").asException());
1✔
138
        }
139
        if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
1✔
140
          rpcRetryTimer.cancel();
1✔
141
        }
142
        channel.shutdown();
1✔
143
      }
1✔
144
    });
145
  }
1✔
146

147
  @Override
148
  public String toString() {
149
    return logId.toString();
×
150
  }
151

152
  /**
153
   * Updates the resource subscription for the given resource type.
154
   */
155
  // Must be synchronized.
156
  void adjustResourceSubscription(XdsResourceType<?> resourceType) {
157
    if (isInBackoff()) {
1✔
158
      return;
1✔
159
    }
160
    if (adsStream == null) {
1✔
161
      startRpcStream();
1✔
162
    }
163
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, resourceType);
1✔
164
    if (resources != null) {
1✔
165
      adsStream.sendDiscoveryRequest(resourceType, resources);
1✔
166
    }
167
  }
1✔
168

169
  /**
170
   * Accepts the update for the given resource type by updating the latest resource version
171
   * and sends an ACK request to the management server.
172
   */
173
  // Must be synchronized.
174
  void ackResponse(XdsResourceType<?> type, String versionInfo, String nonce) {
175
    versions.put(type, versionInfo);
1✔
176
    logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
1✔
177
        type.typeName(), nonce, versionInfo);
1✔
178
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
179
    if (resources == null) {
1✔
180
      resources = Collections.emptyList();
×
181
    }
182
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, null);
1✔
183
  }
1✔
184

185
  /**
186
   * Rejects the update for the given resource type and sends an NACK request (request with last
187
   * accepted version) to the management server.
188
   */
189
  // Must be synchronized.
190
  void nackResponse(XdsResourceType<?> type, String nonce, String errorDetail) {
191
    String versionInfo = versions.getOrDefault(type, "");
1✔
192
    logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
1✔
193
        type.typeName(), nonce, versionInfo);
1✔
194
    Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
195
    if (resources == null) {
1✔
196
      resources = Collections.emptyList();
×
197
    }
198
    adsStream.sendDiscoveryRequest(type, versionInfo, resources, nonce, errorDetail);
1✔
199
  }
1✔
200

201
  /**
202
   * Returns {@code true} if the resource discovery is currently in backoff.
203
   */
204
  // Must be synchronized.
205
  boolean isInBackoff() {
206
    return rpcRetryTimer != null && rpcRetryTimer.isPending();
1✔
207
  }
208

209
  boolean isReady() {
210
    return adsStream != null && adsStream.isReady();
1✔
211
  }
212

213
  /**
214
   * Starts a timer for each requested resource that hasn't been responded to and
215
   * has been waiting for the channel to get ready.
216
   */
217
  void readyHandler() {
218
    if (!isReady()) {
1✔
219
      return;
1✔
220
    }
221

222
    if (isInBackoff()) {
1✔
223
      rpcRetryTimer.cancel();
×
224
      rpcRetryTimer = null;
×
225
    }
226

227
    timerLaunch.startSubscriberTimersIfNeeded(serverInfo);
1✔
228
  }
1✔
229

230
  /**
231
   * Establishes the RPC connection by creating a new RPC stream on the given channel for
232
   * xDS protocol communication.
233
   */
234
  // Must be synchronized.
235
  private void startRpcStream() {
236
    checkState(adsStream == null, "Previous adsStream has not been cleared yet");
1✔
237
    adsStream = new AdsStreamV3();
1✔
238
    Context prevContext = context.attach();
1✔
239
    try {
240
      adsStream.start();
1✔
241
    } finally {
242
      context.detach(prevContext);
1✔
243
    }
244
    logger.log(XdsLogLevel.INFO, "ADS stream started");
1✔
245
    stopwatch.reset().start();
1✔
246
  }
1✔
247

248
  @VisibleForTesting
249
  final class RpcRetryTask implements Runnable {
1✔
250
    @Override
251
    public void run() {
252
      if (shutdown) {
1✔
253
        return;
×
254
      }
255
      startRpcStream();
1✔
256
      Set<XdsResourceType<?>> subscribedResourceTypes =
1✔
257
          new HashSet<>(resourceStore.getSubscribedResourceTypesWithTypeUrl().values());
1✔
258
      for (XdsResourceType<?> type : subscribedResourceTypes) {
1✔
259
        Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
1✔
260
        if (resources != null) {
1✔
261
          adsStream.sendDiscoveryRequest(type, resources);
1✔
262
        }
263
      }
1✔
264
      xdsResponseHandler.handleStreamRestarted(serverInfo);
1✔
265
    }
1✔
266
  }
267

268
  @VisibleForTesting
269
  @Nullable
270
  XdsResourceType<?> fromTypeUrl(String typeUrl) {
271
    return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
1✔
272
  }
273

274
  private abstract class AbstractAdsStream {
1✔
275
    private boolean responseReceived;
276
    private boolean closed;
277
    // Response nonce for the most recently received discovery responses of each resource type.
278
    // Client initiated requests start response nonce with empty string.
279
    // Nonce in each response is echoed back in the following ACK/NACK request. It is
280
    // used for management server to identify which response the client is ACKing/NACking.
281
    // To avoid confusion, client-initiated requests will always use the nonce in
282
    // most recently received responses of each resource type.
283
    private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();
1✔
284

285
    abstract void start();
286

287
    abstract void sendError(Exception error);
288

289
    abstract boolean isReady();
290

291
    /**
292
     * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
293
     * {@code errorDetail}. Used for reacting to a specific discovery response. For
294
     * client-initiated discovery requests, use {@link
295
     * #sendDiscoveryRequest(XdsResourceType, Collection)}.
296
     */
297
    abstract void sendDiscoveryRequest(XdsResourceType<?> type, String version,
298
        Collection<String> resources, String nonce, @Nullable String errorDetail);
299

300
    /**
301
     * Sends a client-initiated discovery request.
302
     */
303
    final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> resources) {
304
      logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
1✔
305
      sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
1✔
306
          respNonces.getOrDefault(type, ""), null);
1✔
307
    }
1✔
308

309
    final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
310
                                 String nonce) {
311
      checkNotNull(type, "type");
1✔
312
      if (closed) {
1✔
313
        return;
×
314
      }
315
      responseReceived = true;
1✔
316
      respNonces.put(type, nonce);
1✔
317
      xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
1✔
318
    }
1✔
319

320
    final void handleRpcError(Throwable t) {
321
      handleRpcStreamClosed(Status.fromThrowable(t));
1✔
322
    }
1✔
323

324
    final void handleRpcCompleted() {
325
      handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
×
326
    }
×
327

328
    private void handleRpcStreamClosed(Status error) {
329
      if (closed) {
1✔
330
        return;
1✔
331
      }
332

333
      if (responseReceived || retryBackoffPolicy == null) {
1✔
334
        // Reset the backoff sequence if had received a response, or backoff sequence
335
        // has never been initialized.
336
        retryBackoffPolicy = backoffPolicyProvider.get();
1✔
337
      }
338
      // Need this here to avoid tsan race condition in XdsClientImplTestBase.sendToNonexistentHost
339
      long elapsed = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
340
      long delayNanos = Math.max(0, retryBackoffPolicy.nextBackoffNanos() - elapsed);
1✔
341
      rpcRetryTimer = syncContext.schedule(
1✔
342
          new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
343

344
      checkArgument(!error.isOk(), "unexpected OK status");
1✔
345
      String errorMsg = error.getDescription() != null
1✔
346
          && error.getDescription().equals(CLOSED_BY_SERVER)
1✔
347
              ? "ADS stream closed with status {0}: {1}. Cause: {2}"
×
348
              : "ADS stream failed with status {0}: {1}. Cause: {2}";
1✔
349
      logger.log(
1✔
350
          XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause());
1✔
351
      closed = true;
1✔
352
      xdsResponseHandler.handleStreamClosed(error);
1✔
353
      cleanUp();
1✔
354

355
      logger.log(XdsLogLevel.INFO, "Retry ADS stream in {0} ns", delayNanos);
1✔
356
    }
1✔
357

358
    private void close(Exception error) {
359
      if (closed) {
1✔
360
        return;
×
361
      }
362
      closed = true;
1✔
363
      cleanUp();
1✔
364
      sendError(error);
1✔
365
    }
1✔
366

367
    private void cleanUp() {
368
      if (adsStream == this) {
1✔
369
        adsStream = null;
1✔
370
      }
371
    }
1✔
372
  }
373

374
  private final class AdsStreamV3 extends AbstractAdsStream {
1✔
375
    private StreamObserver<DiscoveryRequest> requestWriter;
376

377
    @Override
378
    public boolean isReady() {
379
      return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
1✔
380
    }
381

382
    @Override
383
    void start() {
384
      AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
1✔
385
          AggregatedDiscoveryServiceGrpc.newStub(channel);
1✔
386
      StreamObserver<DiscoveryResponse> responseReader =
1✔
387
          new ClientResponseObserver<DiscoveryRequest,DiscoveryResponse>() {
1✔
388

389
        @Override
390
        public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
391
          requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
1✔
392
        }
1✔
393

394
        @Override
395
        public void onNext(final DiscoveryResponse response) {
396
          syncContext.execute(new Runnable() {
1✔
397
            @Override
398
            public void run() {
399
              XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
1✔
400
              if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
401
                logger.log(
×
402
                    XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
403
                    MessagePrinter.print(response));
×
404
              }
405
              if (type == null) {
1✔
406
                logger.log(
1✔
407
                    XdsLogLevel.WARNING,
408
                    "Ignore an unknown type of DiscoveryResponse: {0}",
409
                    response.getTypeUrl());
1✔
410
                return;
1✔
411
              }
412
              handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
1✔
413
                  response.getNonce());
1✔
414
            }
1✔
415
          });
416
        }
1✔
417

418
        @Override
419
        public void onError(final Throwable t) {
420
          syncContext.execute(new Runnable() {
1✔
421
            @Override
422
            public void run() {
423
              handleRpcError(t);
1✔
424
            }
1✔
425
          });
426
        }
1✔
427

428
        @Override
429
        public void onCompleted() {
430
          syncContext.execute(new Runnable() {
×
431
            @Override
432
            public void run() {
433
              handleRpcCompleted();
×
434
            }
×
435
          });
436
        }
×
437
      };
438
      requestWriter = stub.streamAggregatedResources(responseReader);
1✔
439
    }
1✔
440

441
    @Override
442
    void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
443
                              Collection<String> resources, String nonce,
444
                              @Nullable String errorDetail) {
445
      checkState(requestWriter != null, "ADS stream has not been started");
1✔
446
      DiscoveryRequest.Builder builder =
447
          DiscoveryRequest.newBuilder()
1✔
448
              .setVersionInfo(versionInfo)
1✔
449
              .setNode(bootstrapNode.toEnvoyProtoNode())
1✔
450
              .addAllResourceNames(resources)
1✔
451
              .setTypeUrl(type.typeUrl())
1✔
452
              .setResponseNonce(nonce);
1✔
453
      if (errorDetail != null) {
1✔
454
        com.google.rpc.Status error =
455
            com.google.rpc.Status.newBuilder()
1✔
456
                .setCode(Code.INVALID_ARGUMENT_VALUE)  // FIXME(chengyuanzhang): use correct code
1✔
457
                .setMessage(errorDetail)
1✔
458
                .build();
1✔
459
        builder.setErrorDetail(error);
1✔
460
      }
461
      DiscoveryRequest request = builder.build();
1✔
462
      requestWriter.onNext(request);
1✔
463
      if (logger.isLoggable(XdsLogLevel.DEBUG)) {
1✔
464
        logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request));
×
465
      }
466
    }
1✔
467

468
    @Override
469
    void sendError(Exception error) {
470
      requestWriter.onError(error);
1✔
471
    }
1✔
472
  }
473
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc