• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19676

04 Feb 2025 06:51PM CUT coverage: 88.592% (+0.006%) from 88.586%
#19676

push

github

web-flow
Replace Kokoro ARM build with GitHub Actions

The Kokoro aarch64 build runs on x86 with an emulator, and has always
been flaky due to the slow execution speed. At present it is continually
failing due to deadline exceededs. GitHub Actions is running on aarch64
hardware, so is much faster (4 minutes vs 30 minutes, without including
the speedup from GitHub Action's caching).

33764 of 38112 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.32
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import com.google.errorprone.annotations.concurrent.GuardedBy;
23
import io.grpc.CallOptions;
24
import io.grpc.ClientStreamTracer;
25
import io.grpc.Context;
26
import io.grpc.InternalChannelz.SocketStats;
27
import io.grpc.InternalLogId;
28
import io.grpc.LoadBalancer.PickResult;
29
import io.grpc.LoadBalancer.PickSubchannelArgs;
30
import io.grpc.LoadBalancer.SubchannelPicker;
31
import io.grpc.Metadata;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import java.util.ArrayList;
37
import java.util.Collection;
38
import java.util.Collections;
39
import java.util.LinkedHashSet;
40
import java.util.concurrent.Executor;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /** Immutable state needed for picking. 'lock' must be held for writing. */
73
  private volatile PickerState pickerState = new PickerState(null, null);
1✔
74

75
  /**
76
   * Creates a new delayed transport.
77
   *
78
   * @param defaultAppExecutor pending streams will create real streams and run buffered operations
79
   *        in an application executor, which will be this executor, unless there is on provided in
80
   *        {@link CallOptions}.
81
   * @param syncContext all listener callbacks of the delayed transport will be run from this
82
   *        SynchronizationContext.
83
   */
84
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
85
    this.defaultAppExecutor = defaultAppExecutor;
1✔
86
    this.syncContext = syncContext;
1✔
87
  }
1✔
88

89
  @Override
90
  public final Runnable start(final Listener listener) {
91
    this.listener = listener;
1✔
92
    reportTransportInUse = new Runnable() {
1✔
93
        @Override
94
        public void run() {
95
          listener.transportInUse(true);
1✔
96
        }
1✔
97
      };
98
    reportTransportNotInUse = new Runnable() {
1✔
99
        @Override
100
        public void run() {
101
          listener.transportInUse(false);
1✔
102
        }
1✔
103
      };
104
    reportTransportTerminated = new Runnable() {
1✔
105
        @Override
106
        public void run() {
107
          listener.transportTerminated();
1✔
108
        }
1✔
109
      };
110
    return null;
1✔
111
  }
112

113
  /**
114
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
115
   * picker will be consulted.
116
   *
117
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
118
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
119
   */
120
  @Override
121
  public final ClientStream newStream(
122
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
123
      ClientStreamTracer[] tracers) {
124
    try {
125
      PickSubchannelArgs args = new PickSubchannelArgsImpl(
1✔
126
          method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
127
      PickerState state = pickerState;
1✔
128
      while (true) {
129
        if (state.shutdownStatus != null) {
1✔
130
          return new FailingClientStream(state.shutdownStatus, tracers);
1✔
131
        }
132
        if (state.lastPicker != null) {
1✔
133
          PickResult pickResult = state.lastPicker.pickSubchannel(args);
1✔
134
          callOptions = args.getCallOptions();
1✔
135
          // User code provided authority takes precedence over the LB provided one.
136
          if (callOptions.getAuthority() == null
1✔
137
              && pickResult.getAuthorityOverride() != null) {
1✔
138
            callOptions = callOptions.withAuthority(pickResult.getAuthorityOverride());
1✔
139
          }
140
          ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
141
              callOptions.isWaitForReady());
1✔
142
          if (transport != null) {
1✔
143
            ClientStream stream = transport.newStream(
1✔
144
                args.getMethodDescriptor(), args.getHeaders(), callOptions,
1✔
145
                tracers);
146
            // User code provided authority takes precedence over the LB provided one; this will be
147
            // overwritten by ClientCallImpl if the application sets an authority override
148
            if (pickResult.getAuthorityOverride() != null) {
1✔
149
              stream.setAuthority(pickResult.getAuthorityOverride());
1✔
150
            }
151
            return stream;
1✔
152
          }
153
        }
154
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
155
        // race with reprocess()), we will buffer the RPC.  Otherwise, will try with the new picker.
156
        synchronized (lock) {
1✔
157
          PickerState newerState = pickerState;
1✔
158
          if (state == newerState) {
1✔
159
            return createPendingStream(args, tracers);
1✔
160
          }
161
          state = newerState;
1✔
162
        }
1✔
163
      }
164
    } finally {
165
      syncContext.drain();
1✔
166
    }
167
  }
168

169
  /**
170
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
171
   * schedule tasks on syncContext.
172
   */
173
  @GuardedBy("lock")
174
  private PendingStream createPendingStream(
175
      PickSubchannelArgs args, ClientStreamTracer[] tracers) {
176
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
177
    pendingStreams.add(pendingStream);
1✔
178
    if (getPendingStreamsCount() == 1) {
1✔
179
      syncContext.executeLater(reportTransportInUse);
1✔
180
    }
181
    for (ClientStreamTracer streamTracer : tracers) {
1✔
182
      streamTracer.createPendingStream();
1✔
183
    }
184
    return pendingStream;
1✔
185
  }
186

187
  @Override
188
  public final void ping(final PingCallback callback, Executor executor) {
189
    throw new UnsupportedOperationException("This method is not expected to be called");
×
190
  }
191

192
  @Override
193
  public ListenableFuture<SocketStats> getStats() {
194
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
195
    ret.set(null);
×
196
    return ret;
×
197
  }
198

199
  /**
200
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
201
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
202
   * more buffered streams.
203
   */
204
  @Override
205
  public final void shutdown(final Status status) {
206
    synchronized (lock) {
1✔
207
      if (pickerState.shutdownStatus != null) {
1✔
208
        return;
1✔
209
      }
210
      pickerState = pickerState.withShutdownStatus(status);
1✔
211
      syncContext.executeLater(new Runnable() {
1✔
212
          @Override
213
          public void run() {
214
            listener.transportShutdown(status);
1✔
215
          }
1✔
216
        });
217
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
218
        syncContext.executeLater(reportTransportTerminated);
1✔
219
        reportTransportTerminated = null;
1✔
220
      }
221
    }
1✔
222
    syncContext.drain();
1✔
223
  }
1✔
224

225
  /**
226
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
227
   * this transport.
228
   */
229
  @Override
230
  public final void shutdownNow(Status status) {
231
    shutdown(status);
1✔
232
    Collection<PendingStream> savedPendingStreams;
233
    Runnable savedReportTransportTerminated;
234
    synchronized (lock) {
1✔
235
      savedPendingStreams = pendingStreams;
1✔
236
      savedReportTransportTerminated = reportTransportTerminated;
1✔
237
      reportTransportTerminated = null;
1✔
238
      if (!pendingStreams.isEmpty()) {
1✔
239
        pendingStreams = Collections.emptyList();
1✔
240
      }
241
    }
1✔
242
    if (savedReportTransportTerminated != null) {
1✔
243
      for (PendingStream stream : savedPendingStreams) {
1✔
244
        Runnable runnable = stream.setStream(
1✔
245
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
246
        if (runnable != null) {
1✔
247
          // Drain in-line instead of using an executor as failing stream just throws everything
248
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
249
          // before stream.start().
250
          runnable.run();
1✔
251
        }
252
      }
1✔
253
      syncContext.execute(savedReportTransportTerminated);
1✔
254
    }
255
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
256
    // shutdown().
257
  }
1✔
258

259
  public final boolean hasPendingStreams() {
260
    synchronized (lock) {
1✔
261
      return !pendingStreams.isEmpty();
1✔
262
    }
263
  }
264

265
  @VisibleForTesting
266
  final int getPendingStreamsCount() {
267
    synchronized (lock) {
1✔
268
      return pendingStreams.size();
1✔
269
    }
270
  }
271

272
  /**
273
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
274
   * pick is successful, otherwise keep it pending.
275
   *
276
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
277
   * streams will be served by the latest picker (if a same picker is given more than once, they are
278
   * considered different pickers) as soon as possible.
279
   *
280
   * <p>This method <strong>must not</strong> be called concurrently with itself.
281
   */
282
  final void reprocess(@Nullable SubchannelPicker picker) {
283
    ArrayList<PendingStream> toProcess;
284
    synchronized (lock) {
1✔
285
      pickerState = pickerState.withPicker(picker);
1✔
286
      if (picker == null || !hasPendingStreams()) {
1✔
287
        return;
1✔
288
      }
289
      toProcess = new ArrayList<>(pendingStreams);
1✔
290
    }
1✔
291
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
292

293
    for (final PendingStream stream : toProcess) {
1✔
294
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
295
      CallOptions callOptions = stream.args.getCallOptions();
1✔
296
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
297
          callOptions.isWaitForReady());
1✔
298
      if (transport != null) {
1✔
299
        Executor executor = defaultAppExecutor;
1✔
300
        // createRealStream may be expensive. It will start real streams on the transport. If
301
        // there are pending requests, they will be serialized too, which may be expensive. Since
302
        // we are now on transport thread, we need to offload the work to an executor.
303
        if (callOptions.getExecutor() != null) {
1✔
304
          executor = callOptions.getExecutor();
1✔
305
        }
306
        Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride());
1✔
307
        if (runnable != null) {
1✔
308
          executor.execute(runnable);
1✔
309
        }
310
        toRemove.add(stream);
1✔
311
      }  // else: stay pending
312
    }
1✔
313

314
    synchronized (lock) {
1✔
315
      // Between this synchronized and the previous one:
316
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
317
      //   - shutdown() may be called, which may turn pendingStreams into null.
318
      if (!hasPendingStreams()) {
1✔
319
        return;
×
320
      }
321
      pendingStreams.removeAll(toRemove);
1✔
322
      // Because delayed transport is long-lived, we take this opportunity to down-size the
323
      // hashmap.
324
      if (pendingStreams.isEmpty()) {
1✔
325
        pendingStreams = new LinkedHashSet<>();
1✔
326
      }
327
      if (!hasPendingStreams()) {
1✔
328
        // There may be a brief gap between delayed transport clearing in-use state, and first real
329
        // transport starting streams and setting in-use state.  During the gap the whole channel's
330
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
331
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
332
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
333
        syncContext.executeLater(reportTransportNotInUse);
1✔
334
        if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
1✔
335
          syncContext.executeLater(reportTransportTerminated);
1✔
336
          reportTransportTerminated = null;
1✔
337
        }
338
      }
339
    }
1✔
340
    syncContext.drain();
1✔
341
  }
1✔
342

343
  @Override
344
  public InternalLogId getLogId() {
345
    return logId;
×
346
  }
347

348
  private class PendingStream extends DelayedStream {
349
    private final PickSubchannelArgs args;
350
    private final Context context = Context.current();
1✔
351
    private final ClientStreamTracer[] tracers;
352

353
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
354
      this.args = args;
1✔
355
      this.tracers = tracers;
1✔
356
    }
1✔
357

358
    /** Runnable may be null. */
359
    private Runnable createRealStream(ClientTransport transport, String authorityOverride) {
360
      ClientStream realStream;
361
      Context origContext = context.attach();
1✔
362
      try {
363
        realStream = transport.newStream(
1✔
364
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
365
            tracers);
366
      } finally {
367
        context.detach(origContext);
1✔
368
      }
369
      if (authorityOverride != null) {
1✔
370
        // User code provided authority takes precedence over the LB provided one; this will be
371
        // overwritten by an enqueud call from ClientCallImpl if the application sets an authority
372
        // override. We must call the real stream directly because stream.start() has likely already
373
        // been called on the delayed stream.
374
        realStream.setAuthority(authorityOverride);
1✔
375
      }
376
      return setStream(realStream);
1✔
377
    }
378

379
    @Override
380
    public void cancel(Status reason) {
381
      super.cancel(reason);
1✔
382
      synchronized (lock) {
1✔
383
        if (reportTransportTerminated != null) {
1✔
384
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
385
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
386
            syncContext.executeLater(reportTransportNotInUse);
1✔
387
            if (pickerState.shutdownStatus != null) {
1✔
388
              syncContext.executeLater(reportTransportTerminated);
1✔
389
              reportTransportTerminated = null;
1✔
390
            }
391
          }
392
        }
393
      }
1✔
394
      syncContext.drain();
1✔
395
    }
1✔
396

397
    @Override
398
    protected void onEarlyCancellation(Status reason) {
399
      for (ClientStreamTracer tracer : tracers) {
1✔
400
        tracer.streamClosed(reason);
1✔
401
      }
402
    }
1✔
403

404
    @Override
405
    public void appendTimeoutInsight(InsightBuilder insight) {
406
      if (args.getCallOptions().isWaitForReady()) {
1✔
407
        insight.append("wait_for_ready");
1✔
408
      }
409
      super.appendTimeoutInsight(insight);
1✔
410
    }
1✔
411
  }
412

413
  static final class PickerState {
414
    /**
415
     * The last picker that {@link #reprocess} has used. May be set to null when the channel has
416
     * moved to idle.
417
     */
418
    @Nullable
419
    final SubchannelPicker lastPicker;
420
    /**
421
     * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
422
     * terminated.
423
     */
424
    @Nullable
425
    final Status shutdownStatus;
426

427
    private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
1✔
428
      this.lastPicker = lastPicker;
1✔
429
      this.shutdownStatus = shutdownStatus;
1✔
430
    }
1✔
431

432
    public PickerState withPicker(SubchannelPicker newPicker) {
433
      return new PickerState(newPicker, this.shutdownStatus);
1✔
434
    }
435

436
    public PickerState withShutdownStatus(Status newShutdownStatus) {
437
      return new PickerState(this.lastPicker, newShutdownStatus);
1✔
438
    }
439
  }
440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc