• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19691

14 Feb 2025 06:23PM CUT coverage: 88.603% (-0.02%) from 88.626%
#19691

push

github

web-flow
xds:Cleanup to reduce test flakiness (#11895)

* don't process resourceDoesNotExist for watchers that have been cancelled.

* Change test to use an ArgumentMatcher instead of expecting that only the final result will be sent since depending on timing there may be configs sent for clusters being removed with their entries as errors.

34261 of 38668 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.51
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import com.google.errorprone.annotations.concurrent.GuardedBy;
23
import io.grpc.CallOptions;
24
import io.grpc.ClientStreamTracer;
25
import io.grpc.Context;
26
import io.grpc.InternalChannelz.SocketStats;
27
import io.grpc.InternalLogId;
28
import io.grpc.LoadBalancer.PickResult;
29
import io.grpc.LoadBalancer.PickSubchannelArgs;
30
import io.grpc.LoadBalancer.SubchannelPicker;
31
import io.grpc.Metadata;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import java.util.ArrayList;
37
import java.util.Collection;
38
import java.util.Collections;
39
import java.util.LinkedHashSet;
40
import java.util.concurrent.Executor;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /** Immutable state needed for picking. 'lock' must be held for writing. */
73
  private volatile PickerState pickerState = new PickerState(null, null);
1✔
74

75
  /**
76
   * Creates a new delayed transport.
77
   *
78
   * @param defaultAppExecutor pending streams will create real streams and run buffered operations
79
   *        in an application executor, which will be this executor, unless there is on provided in
80
   *        {@link CallOptions}.
81
   * @param syncContext all listener callbacks of the delayed transport will be run from this
82
   *        SynchronizationContext.
83
   */
84
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
85
    this.defaultAppExecutor = defaultAppExecutor;
1✔
86
    this.syncContext = syncContext;
1✔
87
  }
1✔
88

89
  @Override
90
  public final Runnable start(final Listener listener) {
91
    this.listener = listener;
1✔
92
    reportTransportInUse = new Runnable() {
1✔
93
        @Override
94
        public void run() {
95
          listener.transportInUse(true);
1✔
96
        }
1✔
97
      };
98
    reportTransportNotInUse = new Runnable() {
1✔
99
        @Override
100
        public void run() {
101
          listener.transportInUse(false);
1✔
102
        }
1✔
103
      };
104
    reportTransportTerminated = new Runnable() {
1✔
105
        @Override
106
        public void run() {
107
          listener.transportTerminated();
1✔
108
        }
1✔
109
      };
110
    return null;
1✔
111
  }
112

113
  /**
114
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
115
   * picker will be consulted.
116
   *
117
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
118
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
119
   */
120
  @Override
121
  public final ClientStream newStream(
122
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
123
      ClientStreamTracer[] tracers) {
124
    try {
125
      PickSubchannelArgs args = new PickSubchannelArgsImpl(
1✔
126
          method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
127
      PickerState state = pickerState;
1✔
128
      while (true) {
129
        if (state.shutdownStatus != null) {
1✔
130
          return new FailingClientStream(state.shutdownStatus, tracers);
1✔
131
        }
132
        PickResult pickResult = null;
1✔
133
        if (state.lastPicker != null) {
1✔
134
          pickResult = state.lastPicker.pickSubchannel(args);
1✔
135
          callOptions = args.getCallOptions();
1✔
136
          // User code provided authority takes precedence over the LB provided one.
137
          if (callOptions.getAuthority() == null
1✔
138
              && pickResult.getAuthorityOverride() != null) {
1✔
139
            callOptions = callOptions.withAuthority(pickResult.getAuthorityOverride());
1✔
140
          }
141
          ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
142
              callOptions.isWaitForReady());
1✔
143
          if (transport != null) {
1✔
144
            ClientStream stream = transport.newStream(
1✔
145
                args.getMethodDescriptor(), args.getHeaders(), callOptions,
1✔
146
                tracers);
147
            // User code provided authority takes precedence over the LB provided one; this will be
148
            // overwritten by ClientCallImpl if the application sets an authority override
149
            if (pickResult.getAuthorityOverride() != null) {
1✔
150
              stream.setAuthority(pickResult.getAuthorityOverride());
1✔
151
            }
152
            return stream;
1✔
153
          }
154
        }
155
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
156
        // race with reprocess()), we will buffer the RPC.  Otherwise, will try with the new picker.
157
        synchronized (lock) {
1✔
158
          PickerState newerState = pickerState;
1✔
159
          if (state == newerState) {
1✔
160
            return createPendingStream(args, tracers, pickResult);
1✔
161
          }
162
          state = newerState;
1✔
163
        }
1✔
164
      }
1✔
165
    } finally {
166
      syncContext.drain();
1✔
167
    }
168
  }
169

170
  /**
171
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
172
   * schedule tasks on syncContext.
173
   */
174
  @GuardedBy("lock")
175
  private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
176
      PickResult pickResult) {
177
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
178
    if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
1✔
179
      pendingStream.lastPickStatus = pickResult.getStatus();
1✔
180
    }
181
    pendingStreams.add(pendingStream);
1✔
182
    if (getPendingStreamsCount() == 1) {
1✔
183
      syncContext.executeLater(reportTransportInUse);
1✔
184
    }
185
    for (ClientStreamTracer streamTracer : tracers) {
1✔
186
      streamTracer.createPendingStream();
1✔
187
    }
188
    return pendingStream;
1✔
189
  }
190

191
  @Override
192
  public final void ping(final PingCallback callback, Executor executor) {
193
    throw new UnsupportedOperationException("This method is not expected to be called");
×
194
  }
195

196
  @Override
197
  public ListenableFuture<SocketStats> getStats() {
198
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
199
    ret.set(null);
×
200
    return ret;
×
201
  }
202

203
  /**
204
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
205
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
206
   * more buffered streams.
207
   */
208
  @Override
209
  public final void shutdown(final Status status) {
210
    synchronized (lock) {
1✔
211
      if (pickerState.shutdownStatus != null) {
1✔
212
        return;
1✔
213
      }
214
      pickerState = pickerState.withShutdownStatus(status);
1✔
215
      syncContext.executeLater(new Runnable() {
1✔
216
          @Override
217
          public void run() {
218
            listener.transportShutdown(status);
1✔
219
          }
1✔
220
        });
221
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
222
        syncContext.executeLater(reportTransportTerminated);
1✔
223
        reportTransportTerminated = null;
1✔
224
      }
225
    }
1✔
226
    syncContext.drain();
1✔
227
  }
1✔
228

229
  /**
230
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
231
   * this transport.
232
   */
233
  @Override
234
  public final void shutdownNow(Status status) {
235
    shutdown(status);
1✔
236
    Collection<PendingStream> savedPendingStreams;
237
    Runnable savedReportTransportTerminated;
238
    synchronized (lock) {
1✔
239
      savedPendingStreams = pendingStreams;
1✔
240
      savedReportTransportTerminated = reportTransportTerminated;
1✔
241
      reportTransportTerminated = null;
1✔
242
      if (!pendingStreams.isEmpty()) {
1✔
243
        pendingStreams = Collections.emptyList();
1✔
244
      }
245
    }
1✔
246
    if (savedReportTransportTerminated != null) {
1✔
247
      for (PendingStream stream : savedPendingStreams) {
1✔
248
        Runnable runnable = stream.setStream(
1✔
249
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
250
        if (runnable != null) {
1✔
251
          // Drain in-line instead of using an executor as failing stream just throws everything
252
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
253
          // before stream.start().
254
          runnable.run();
1✔
255
        }
256
      }
1✔
257
      syncContext.execute(savedReportTransportTerminated);
1✔
258
    }
259
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
260
    // shutdown().
261
  }
1✔
262

263
  public final boolean hasPendingStreams() {
264
    synchronized (lock) {
1✔
265
      return !pendingStreams.isEmpty();
1✔
266
    }
267
  }
268

269
  @VisibleForTesting
270
  final int getPendingStreamsCount() {
271
    synchronized (lock) {
1✔
272
      return pendingStreams.size();
1✔
273
    }
274
  }
275

276
  /**
277
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
278
   * pick is successful, otherwise keep it pending.
279
   *
280
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
281
   * streams will be served by the latest picker (if a same picker is given more than once, they are
282
   * considered different pickers) as soon as possible.
283
   *
284
   * <p>This method <strong>must not</strong> be called concurrently with itself.
285
   */
286
  final void reprocess(@Nullable SubchannelPicker picker) {
287
    ArrayList<PendingStream> toProcess;
288
    synchronized (lock) {
1✔
289
      pickerState = pickerState.withPicker(picker);
1✔
290
      if (picker == null || !hasPendingStreams()) {
1✔
291
        return;
1✔
292
      }
293
      toProcess = new ArrayList<>(pendingStreams);
1✔
294
    }
1✔
295
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
296

297
    for (final PendingStream stream : toProcess) {
1✔
298
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
299
      CallOptions callOptions = stream.args.getCallOptions();
1✔
300
      if (callOptions.isWaitForReady() && pickResult.hasResult()) {
1✔
301
        stream.lastPickStatus = pickResult.getStatus();
1✔
302
      }
303
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
304
          callOptions.isWaitForReady());
1✔
305
      if (transport != null) {
1✔
306
        Executor executor = defaultAppExecutor;
1✔
307
        // createRealStream may be expensive. It will start real streams on the transport. If
308
        // there are pending requests, they will be serialized too, which may be expensive. Since
309
        // we are now on transport thread, we need to offload the work to an executor.
310
        if (callOptions.getExecutor() != null) {
1✔
311
          executor = callOptions.getExecutor();
1✔
312
        }
313
        Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride());
1✔
314
        if (runnable != null) {
1✔
315
          executor.execute(runnable);
1✔
316
        }
317
        toRemove.add(stream);
1✔
318
      }  // else: stay pending
319
    }
1✔
320

321
    synchronized (lock) {
1✔
322
      // Between this synchronized and the previous one:
323
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
324
      //   - shutdown() may be called, which may turn pendingStreams into null.
325
      if (!hasPendingStreams()) {
1✔
326
        return;
×
327
      }
328
      pendingStreams.removeAll(toRemove);
1✔
329
      // Because delayed transport is long-lived, we take this opportunity to down-size the
330
      // hashmap.
331
      if (pendingStreams.isEmpty()) {
1✔
332
        pendingStreams = new LinkedHashSet<>();
1✔
333
      }
334
      if (!hasPendingStreams()) {
1✔
335
        // There may be a brief gap between delayed transport clearing in-use state, and first real
336
        // transport starting streams and setting in-use state.  During the gap the whole channel's
337
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
338
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
339
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
340
        syncContext.executeLater(reportTransportNotInUse);
1✔
341
        if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
1✔
342
          syncContext.executeLater(reportTransportTerminated);
1✔
343
          reportTransportTerminated = null;
1✔
344
        }
345
      }
346
    }
1✔
347
    syncContext.drain();
1✔
348
  }
1✔
349

350
  @Override
351
  public InternalLogId getLogId() {
352
    return logId;
×
353
  }
354

355
  private class PendingStream extends DelayedStream {
356
    private final PickSubchannelArgs args;
357
    private final Context context = Context.current();
1✔
358
    private final ClientStreamTracer[] tracers;
359
    private volatile Status lastPickStatus;
360

361
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
362
      this.args = args;
1✔
363
      this.tracers = tracers;
1✔
364
    }
1✔
365

366
    /** Runnable may be null. */
367
    private Runnable createRealStream(ClientTransport transport, String authorityOverride) {
368
      ClientStream realStream;
369
      Context origContext = context.attach();
1✔
370
      try {
371
        realStream = transport.newStream(
1✔
372
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
373
            tracers);
374
      } finally {
375
        context.detach(origContext);
1✔
376
      }
377
      if (authorityOverride != null) {
1✔
378
        // User code provided authority takes precedence over the LB provided one; this will be
379
        // overwritten by an enqueud call from ClientCallImpl if the application sets an authority
380
        // override. We must call the real stream directly because stream.start() has likely already
381
        // been called on the delayed stream.
382
        realStream.setAuthority(authorityOverride);
1✔
383
      }
384
      return setStream(realStream);
1✔
385
    }
386

387
    @Override
388
    public void cancel(Status reason) {
389
      super.cancel(reason);
1✔
390
      synchronized (lock) {
1✔
391
        if (reportTransportTerminated != null) {
1✔
392
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
393
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
394
            syncContext.executeLater(reportTransportNotInUse);
1✔
395
            if (pickerState.shutdownStatus != null) {
1✔
396
              syncContext.executeLater(reportTransportTerminated);
1✔
397
              reportTransportTerminated = null;
1✔
398
            }
399
          }
400
        }
401
      }
1✔
402
      syncContext.drain();
1✔
403
    }
1✔
404

405
    @Override
406
    protected void onEarlyCancellation(Status reason) {
407
      for (ClientStreamTracer tracer : tracers) {
1✔
408
        tracer.streamClosed(reason);
1✔
409
      }
410
    }
1✔
411

412
    @Override
413
    public void appendTimeoutInsight(InsightBuilder insight) {
414
      if (args.getCallOptions().isWaitForReady()) {
1✔
415
        insight.append("wait_for_ready");
1✔
416
        Status status = lastPickStatus;
1✔
417
        if (status != null && !status.isOk()) {
1✔
418
          insight.appendKeyValue("Last Pick Failure", status);
1✔
419
        }
420
      }
421
      super.appendTimeoutInsight(insight);
1✔
422
    }
1✔
423
  }
424

425
  static final class PickerState {
426
    /**
427
     * The last picker that {@link #reprocess} has used. May be set to null when the channel has
428
     * moved to idle.
429
     */
430
    @Nullable
431
    final SubchannelPicker lastPicker;
432
    /**
433
     * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
434
     * terminated.
435
     */
436
    @Nullable
437
    final Status shutdownStatus;
438

439
    private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
1✔
440
      this.lastPicker = lastPicker;
1✔
441
      this.shutdownStatus = shutdownStatus;
1✔
442
    }
1✔
443

444
    public PickerState withPicker(SubchannelPicker newPicker) {
445
      return new PickerState(newPicker, this.shutdownStatus);
1✔
446
    }
447

448
    public PickerState withShutdownStatus(Status newShutdownStatus) {
449
      return new PickerState(this.lastPicker, newShutdownStatus);
1✔
450
    }
451
  }
452
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc