• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18744

pending completion
#18744

push

github-actions

web-flow
Revert "Release v1.57.0 (#10417)" (#10419)

This reverts commit d23e39e64.

30630 of 34705 relevant lines covered (88.26%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.03
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import io.grpc.CallOptions;
23
import io.grpc.ClientStreamTracer;
24
import io.grpc.Context;
25
import io.grpc.InternalChannelz.SocketStats;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer.PickResult;
28
import io.grpc.LoadBalancer.PickSubchannelArgs;
29
import io.grpc.LoadBalancer.SubchannelPicker;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import java.util.ArrayList;
36
import java.util.Collection;
37
import java.util.Collections;
38
import java.util.LinkedHashSet;
39
import java.util.concurrent.Executor;
40
import javax.annotation.Nonnull;
41
import javax.annotation.Nullable;
42
import javax.annotation.concurrent.GuardedBy;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /**
73
   * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
74
   * terminated.
75
   */
76
  @GuardedBy("lock")
77
  private Status shutdownStatus;
78

79
  /**
80
   * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
81
   * to idle.
82
   */
83
  @GuardedBy("lock")
84
  @Nullable
85
  private SubchannelPicker lastPicker;
86

87
  @GuardedBy("lock")
88
  private long lastPickerVersion;
89

90
  /**
91
   * Creates a new delayed transport.
92
   *
93
   * @param defaultAppExecutor pending streams will create real streams and run bufferred operations
94
   *        in an application executor, which will be this executor, unless there is on provided in
95
   *        {@link CallOptions}.
96
   * @param syncContext all listener callbacks of the delayed transport will be run from this
97
   *        SynchronizationContext.
98
   */
99
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
100
    this.defaultAppExecutor = defaultAppExecutor;
1✔
101
    this.syncContext = syncContext;
1✔
102
  }
1✔
103

104
  @Override
105
  public final Runnable start(final Listener listener) {
106
    this.listener = listener;
1✔
107
    reportTransportInUse = new Runnable() {
1✔
108
        @Override
109
        public void run() {
110
          listener.transportInUse(true);
1✔
111
        }
1✔
112
      };
113
    reportTransportNotInUse = new Runnable() {
1✔
114
        @Override
115
        public void run() {
116
          listener.transportInUse(false);
1✔
117
        }
1✔
118
      };
119
    reportTransportTerminated = new Runnable() {
1✔
120
        @Override
121
        public void run() {
122
          listener.transportTerminated();
1✔
123
        }
1✔
124
      };
125
    return null;
1✔
126
  }
127

128
  /**
129
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
130
   * picker will be consulted.
131
   *
132
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
133
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
134
   */
135
  @Override
136
  public final ClientStream newStream(
137
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
138
      ClientStreamTracer[] tracers) {
139
    try {
140
      PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1✔
141
      SubchannelPicker picker = null;
1✔
142
      long pickerVersion = -1;
1✔
143
      while (true) {
144
        synchronized (lock) {
1✔
145
          if (shutdownStatus != null) {
1✔
146
            return new FailingClientStream(shutdownStatus, tracers);
1✔
147
          }
148
          if (lastPicker == null) {
1✔
149
            return createPendingStream(args, tracers);
1✔
150
          }
151
          // Check for second time through the loop, and whether anything changed
152
          if (picker != null && pickerVersion == lastPickerVersion) {
1✔
153
            return createPendingStream(args, tracers);
1✔
154
          }
155
          picker = lastPicker;
1✔
156
          pickerVersion = lastPickerVersion;
1✔
157
        }
1✔
158
        PickResult pickResult = picker.pickSubchannel(args);
1✔
159
        ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
160
            callOptions.isWaitForReady());
1✔
161
        if (transport != null) {
1✔
162
          return transport.newStream(
1✔
163
              args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
164
              tracers);
165
        }
166
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
167
        // race with reprocess()), we will buffer it.  Otherwise, will try with the new picker.
168
      }
1✔
169
    } finally {
170
      syncContext.drain();
1✔
171
    }
172
  }
173

174
  /**
175
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
176
   * schedule tasks on syncContext.
177
   */
178
  @GuardedBy("lock")
179
  private PendingStream createPendingStream(
180
      PickSubchannelArgs args, ClientStreamTracer[] tracers) {
181
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
182
    pendingStreams.add(pendingStream);
1✔
183
    if (getPendingStreamsCount() == 1) {
1✔
184
      syncContext.executeLater(reportTransportInUse);
1✔
185
    }
186
    for (ClientStreamTracer streamTracer : tracers) {
1✔
187
      streamTracer.createPendingStream();
1✔
188
    }
189
    return pendingStream;
1✔
190
  }
191

192
  @Override
193
  public final void ping(final PingCallback callback, Executor executor) {
194
    throw new UnsupportedOperationException("This method is not expected to be called");
×
195
  }
196

197
  @Override
198
  public ListenableFuture<SocketStats> getStats() {
199
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
200
    ret.set(null);
×
201
    return ret;
×
202
  }
203

204
  /**
205
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
206
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
207
   * more buffered streams.
208
   */
209
  @Override
210
  public final void shutdown(final Status status) {
211
    synchronized (lock) {
1✔
212
      if (shutdownStatus != null) {
1✔
213
        return;
1✔
214
      }
215
      shutdownStatus = status;
1✔
216
      syncContext.executeLater(new Runnable() {
1✔
217
          @Override
218
          public void run() {
219
            listener.transportShutdown(status);
1✔
220
          }
1✔
221
        });
222
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
223
        syncContext.executeLater(reportTransportTerminated);
1✔
224
        reportTransportTerminated = null;
1✔
225
      }
226
    }
1✔
227
    syncContext.drain();
1✔
228
  }
1✔
229

230
  /**
231
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
232
   * this transport.
233
   */
234
  @Override
235
  public final void shutdownNow(Status status) {
236
    shutdown(status);
1✔
237
    Collection<PendingStream> savedPendingStreams;
238
    Runnable savedReportTransportTerminated;
239
    synchronized (lock) {
1✔
240
      savedPendingStreams = pendingStreams;
1✔
241
      savedReportTransportTerminated = reportTransportTerminated;
1✔
242
      reportTransportTerminated = null;
1✔
243
      if (!pendingStreams.isEmpty()) {
1✔
244
        pendingStreams = Collections.emptyList();
1✔
245
      }
246
    }
1✔
247
    if (savedReportTransportTerminated != null) {
1✔
248
      for (PendingStream stream : savedPendingStreams) {
1✔
249
        Runnable runnable = stream.setStream(
1✔
250
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
251
        if (runnable != null) {
1✔
252
          // Drain in-line instead of using an executor as failing stream just throws everything
253
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
254
          // before stream.start().
255
          runnable.run();
1✔
256
        }
257
      }
1✔
258
      syncContext.execute(savedReportTransportTerminated);
1✔
259
    }
260
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
261
    // shutdown().
262
  }
1✔
263

264
  public final boolean hasPendingStreams() {
265
    synchronized (lock) {
1✔
266
      return !pendingStreams.isEmpty();
1✔
267
    }
268
  }
269

270
  @VisibleForTesting
271
  final int getPendingStreamsCount() {
272
    synchronized (lock) {
1✔
273
      return pendingStreams.size();
1✔
274
    }
275
  }
276

277
  /**
278
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
279
   * pick is successful, otherwise keep it pending.
280
   *
281
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
282
   * streams will be served by the latest picker (if a same picker is given more than once, they are
283
   * considered different pickers) as soon as possible.
284
   *
285
   * <p>This method <strong>must not</strong> be called concurrently with itself.
286
   */
287
  final void reprocess(@Nullable SubchannelPicker picker) {
288
    ArrayList<PendingStream> toProcess;
289
    synchronized (lock) {
1✔
290
      lastPicker = picker;
1✔
291
      lastPickerVersion++;
1✔
292
      if (picker == null || !hasPendingStreams()) {
1✔
293
        return;
1✔
294
      }
295
      toProcess = new ArrayList<>(pendingStreams);
1✔
296
    }
1✔
297
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
298

299
    for (final PendingStream stream : toProcess) {
1✔
300
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
301
      CallOptions callOptions = stream.args.getCallOptions();
1✔
302
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
303
          callOptions.isWaitForReady());
1✔
304
      if (transport != null) {
1✔
305
        Executor executor = defaultAppExecutor;
1✔
306
        // createRealStream may be expensive. It will start real streams on the transport. If
307
        // there are pending requests, they will be serialized too, which may be expensive. Since
308
        // we are now on transport thread, we need to offload the work to an executor.
309
        if (callOptions.getExecutor() != null) {
1✔
310
          executor = callOptions.getExecutor();
1✔
311
        }
312
        Runnable runnable = stream.createRealStream(transport);
1✔
313
        if (runnable != null) {
1✔
314
          executor.execute(runnable);
1✔
315
        }
316
        toRemove.add(stream);
1✔
317
      }  // else: stay pending
318
    }
1✔
319

320
    synchronized (lock) {
1✔
321
      // Between this synchronized and the previous one:
322
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
323
      //   - shutdown() may be called, which may turn pendingStreams into null.
324
      if (!hasPendingStreams()) {
1✔
325
        return;
×
326
      }
327
      pendingStreams.removeAll(toRemove);
1✔
328
      // Because delayed transport is long-lived, we take this opportunity to down-size the
329
      // hashmap.
330
      if (pendingStreams.isEmpty()) {
1✔
331
        pendingStreams = new LinkedHashSet<>();
1✔
332
      }
333
      if (!hasPendingStreams()) {
1✔
334
        // There may be a brief gap between delayed transport clearing in-use state, and first real
335
        // transport starting streams and setting in-use state.  During the gap the whole channel's
336
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
337
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
338
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
339
        syncContext.executeLater(reportTransportNotInUse);
1✔
340
        if (shutdownStatus != null && reportTransportTerminated != null) {
1✔
341
          syncContext.executeLater(reportTransportTerminated);
1✔
342
          reportTransportTerminated = null;
1✔
343
        }
344
      }
345
    }
1✔
346
    syncContext.drain();
1✔
347
  }
1✔
348

349
  @Override
350
  public InternalLogId getLogId() {
351
    return logId;
×
352
  }
353

354
  private class PendingStream extends DelayedStream {
355
    private final PickSubchannelArgs args;
356
    private final Context context = Context.current();
1✔
357
    private final ClientStreamTracer[] tracers;
358

359
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
360
      this.args = args;
1✔
361
      this.tracers = tracers;
1✔
362
    }
1✔
363

364
    /** Runnable may be null. */
365
    private Runnable createRealStream(ClientTransport transport) {
366
      ClientStream realStream;
367
      Context origContext = context.attach();
1✔
368
      try {
369
        realStream = transport.newStream(
1✔
370
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
371
            tracers);
372
      } finally {
373
        context.detach(origContext);
1✔
374
      }
375
      return setStream(realStream);
1✔
376
    }
377

378
    @Override
379
    public void cancel(Status reason) {
380
      super.cancel(reason);
1✔
381
      synchronized (lock) {
1✔
382
        if (reportTransportTerminated != null) {
1✔
383
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
384
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
385
            syncContext.executeLater(reportTransportNotInUse);
1✔
386
            if (shutdownStatus != null) {
1✔
387
              syncContext.executeLater(reportTransportTerminated);
1✔
388
              reportTransportTerminated = null;
1✔
389
            }
390
          }
391
        }
392
      }
1✔
393
      syncContext.drain();
1✔
394
    }
1✔
395

396
    @Override
397
    protected void onEarlyCancellation(Status reason) {
398
      for (ClientStreamTracer tracer : tracers) {
1✔
399
        tracer.streamClosed(reason);
1✔
400
      }
401
    }
1✔
402

403
    @Override
404
    public void appendTimeoutInsight(InsightBuilder insight) {
405
      if (args.getCallOptions().isWaitForReady()) {
1✔
406
        insight.append("wait_for_ready");
1✔
407
      }
408
      super.appendTimeoutInsight(insight);
1✔
409
    }
1✔
410
  }
411
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc