• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19619

04 Jan 2025 12:01AM UTC coverage: 88.558% (+0.008%) from 88.55%
#19619

push

github

ejona86
xds: Avoid depending on io.grpc.xds.Internal* classes

Internal* classes should generally be accessors that are used outside of
the package/project. Only one attribute was used outside of xds, so
leave only that one attribute in InternalXdsAttributes. One attribute
was used by the internal.security package, so move the definition to the
same package to reduce the circular dependencies.

33621 of 37965 relevant lines covered (88.56%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.25
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import io.grpc.CallOptions;
23
import io.grpc.ClientStreamTracer;
24
import io.grpc.Context;
25
import io.grpc.InternalChannelz.SocketStats;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer.PickResult;
28
import io.grpc.LoadBalancer.PickSubchannelArgs;
29
import io.grpc.LoadBalancer.SubchannelPicker;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import java.util.ArrayList;
36
import java.util.Collection;
37
import java.util.Collections;
38
import java.util.LinkedHashSet;
39
import java.util.concurrent.Executor;
40
import javax.annotation.Nonnull;
41
import javax.annotation.Nullable;
42
import javax.annotation.concurrent.GuardedBy;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /** Immutable state needed for picking. 'lock' must be held for writing. */
73
  private volatile PickerState pickerState = new PickerState(null, null);
1✔
74

75
  /**
76
   * Creates a new delayed transport.
77
   *
78
   * @param defaultAppExecutor pending streams will create real streams and run buffered operations
79
   *        in an application executor, which will be this executor, unless there is on provided in
80
   *        {@link CallOptions}.
81
   * @param syncContext all listener callbacks of the delayed transport will be run from this
82
   *        SynchronizationContext.
83
   */
84
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
85
    this.defaultAppExecutor = defaultAppExecutor;
1✔
86
    this.syncContext = syncContext;
1✔
87
  }
1✔
88

89
  @Override
90
  public final Runnable start(final Listener listener) {
91
    this.listener = listener;
1✔
92
    reportTransportInUse = new Runnable() {
1✔
93
        @Override
94
        public void run() {
95
          listener.transportInUse(true);
1✔
96
        }
1✔
97
      };
98
    reportTransportNotInUse = new Runnable() {
1✔
99
        @Override
100
        public void run() {
101
          listener.transportInUse(false);
1✔
102
        }
1✔
103
      };
104
    reportTransportTerminated = new Runnable() {
1✔
105
        @Override
106
        public void run() {
107
          listener.transportTerminated();
1✔
108
        }
1✔
109
      };
110
    return null;
1✔
111
  }
112

113
  /**
114
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
115
   * picker will be consulted.
116
   *
117
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
118
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
119
   */
120
  @Override
121
  public final ClientStream newStream(
122
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
123
      ClientStreamTracer[] tracers) {
124
    try {
125
      PickSubchannelArgs args = new PickSubchannelArgsImpl(
1✔
126
          method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
127
      PickerState state = pickerState;
1✔
128
      while (true) {
129
        if (state.shutdownStatus != null) {
1✔
130
          return new FailingClientStream(state.shutdownStatus, tracers);
1✔
131
        }
132
        if (state.lastPicker != null) {
1✔
133
          PickResult pickResult = state.lastPicker.pickSubchannel(args);
1✔
134
          callOptions = args.getCallOptions();
1✔
135
          // User code provided authority takes precedence over the LB provided one.
136
          if (callOptions.getAuthority() == null
1✔
137
              && pickResult.getAuthorityOverride() != null) {
1✔
138
            callOptions = callOptions.withAuthority(pickResult.getAuthorityOverride());
1✔
139
          }
140
          ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
141
              callOptions.isWaitForReady());
1✔
142
          if (transport != null) {
1✔
143
            return transport.newStream(
1✔
144
                args.getMethodDescriptor(), args.getHeaders(), callOptions,
1✔
145
                tracers);
146
          }
147
        }
148
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
149
        // race with reprocess()), we will buffer the RPC.  Otherwise, will try with the new picker.
150
        synchronized (lock) {
1✔
151
          PickerState newerState = pickerState;
1✔
152
          if (state == newerState) {
1✔
153
            return createPendingStream(args, tracers);
1✔
154
          }
155
          state = newerState;
1✔
156
        }
1✔
157
      }
158
    } finally {
159
      syncContext.drain();
1✔
160
    }
161
  }
162

163
  /**
164
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
165
   * schedule tasks on syncContext.
166
   */
167
  @GuardedBy("lock")
168
  private PendingStream createPendingStream(
169
      PickSubchannelArgs args, ClientStreamTracer[] tracers) {
170
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
171
    pendingStreams.add(pendingStream);
1✔
172
    if (getPendingStreamsCount() == 1) {
1✔
173
      syncContext.executeLater(reportTransportInUse);
1✔
174
    }
175
    for (ClientStreamTracer streamTracer : tracers) {
1✔
176
      streamTracer.createPendingStream();
1✔
177
    }
178
    return pendingStream;
1✔
179
  }
180

181
  @Override
182
  public final void ping(final PingCallback callback, Executor executor) {
183
    throw new UnsupportedOperationException("This method is not expected to be called");
×
184
  }
185

186
  @Override
187
  public ListenableFuture<SocketStats> getStats() {
188
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
189
    ret.set(null);
×
190
    return ret;
×
191
  }
192

193
  /**
194
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
195
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
196
   * more buffered streams.
197
   */
198
  @Override
199
  public final void shutdown(final Status status) {
200
    synchronized (lock) {
1✔
201
      if (pickerState.shutdownStatus != null) {
1✔
202
        return;
1✔
203
      }
204
      pickerState = pickerState.withShutdownStatus(status);
1✔
205
      syncContext.executeLater(new Runnable() {
1✔
206
          @Override
207
          public void run() {
208
            listener.transportShutdown(status);
1✔
209
          }
1✔
210
        });
211
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
212
        syncContext.executeLater(reportTransportTerminated);
1✔
213
        reportTransportTerminated = null;
1✔
214
      }
215
    }
1✔
216
    syncContext.drain();
1✔
217
  }
1✔
218

219
  /**
220
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
221
   * this transport.
222
   */
223
  @Override
224
  public final void shutdownNow(Status status) {
225
    shutdown(status);
1✔
226
    Collection<PendingStream> savedPendingStreams;
227
    Runnable savedReportTransportTerminated;
228
    synchronized (lock) {
1✔
229
      savedPendingStreams = pendingStreams;
1✔
230
      savedReportTransportTerminated = reportTransportTerminated;
1✔
231
      reportTransportTerminated = null;
1✔
232
      if (!pendingStreams.isEmpty()) {
1✔
233
        pendingStreams = Collections.emptyList();
1✔
234
      }
235
    }
1✔
236
    if (savedReportTransportTerminated != null) {
1✔
237
      for (PendingStream stream : savedPendingStreams) {
1✔
238
        Runnable runnable = stream.setStream(
1✔
239
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
240
        if (runnable != null) {
1✔
241
          // Drain in-line instead of using an executor as failing stream just throws everything
242
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
243
          // before stream.start().
244
          runnable.run();
1✔
245
        }
246
      }
1✔
247
      syncContext.execute(savedReportTransportTerminated);
1✔
248
    }
249
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
250
    // shutdown().
251
  }
1✔
252

253
  public final boolean hasPendingStreams() {
254
    synchronized (lock) {
1✔
255
      return !pendingStreams.isEmpty();
1✔
256
    }
257
  }
258

259
  @VisibleForTesting
260
  final int getPendingStreamsCount() {
261
    synchronized (lock) {
1✔
262
      return pendingStreams.size();
1✔
263
    }
264
  }
265

266
  /**
267
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
268
   * pick is successful, otherwise keep it pending.
269
   *
270
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
271
   * streams will be served by the latest picker (if a same picker is given more than once, they are
272
   * considered different pickers) as soon as possible.
273
   *
274
   * <p>This method <strong>must not</strong> be called concurrently with itself.
275
   */
276
  final void reprocess(@Nullable SubchannelPicker picker) {
277
    ArrayList<PendingStream> toProcess;
278
    synchronized (lock) {
1✔
279
      pickerState = pickerState.withPicker(picker);
1✔
280
      if (picker == null || !hasPendingStreams()) {
1✔
281
        return;
1✔
282
      }
283
      toProcess = new ArrayList<>(pendingStreams);
1✔
284
    }
1✔
285
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
286

287
    for (final PendingStream stream : toProcess) {
1✔
288
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
289
      CallOptions callOptions = stream.args.getCallOptions();
1✔
290
      // User code provided authority takes precedence over the LB provided one.
291
      if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
1✔
292
        stream.setAuthority(pickResult.getAuthorityOverride());
1✔
293
      }
294
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
295
          callOptions.isWaitForReady());
1✔
296
      if (transport != null) {
1✔
297
        Executor executor = defaultAppExecutor;
1✔
298
        // createRealStream may be expensive. It will start real streams on the transport. If
299
        // there are pending requests, they will be serialized too, which may be expensive. Since
300
        // we are now on transport thread, we need to offload the work to an executor.
301
        if (callOptions.getExecutor() != null) {
1✔
302
          executor = callOptions.getExecutor();
1✔
303
        }
304
        Runnable runnable = stream.createRealStream(transport);
1✔
305
        if (runnable != null) {
1✔
306
          executor.execute(runnable);
1✔
307
        }
308
        toRemove.add(stream);
1✔
309
      }  // else: stay pending
310
    }
1✔
311

312
    synchronized (lock) {
1✔
313
      // Between this synchronized and the previous one:
314
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
315
      //   - shutdown() may be called, which may turn pendingStreams into null.
316
      if (!hasPendingStreams()) {
1✔
317
        return;
×
318
      }
319
      pendingStreams.removeAll(toRemove);
1✔
320
      // Because delayed transport is long-lived, we take this opportunity to down-size the
321
      // hashmap.
322
      if (pendingStreams.isEmpty()) {
1✔
323
        pendingStreams = new LinkedHashSet<>();
1✔
324
      }
325
      if (!hasPendingStreams()) {
1✔
326
        // There may be a brief gap between delayed transport clearing in-use state, and first real
327
        // transport starting streams and setting in-use state.  During the gap the whole channel's
328
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
329
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
330
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
331
        syncContext.executeLater(reportTransportNotInUse);
1✔
332
        if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
1✔
333
          syncContext.executeLater(reportTransportTerminated);
1✔
334
          reportTransportTerminated = null;
1✔
335
        }
336
      }
337
    }
1✔
338
    syncContext.drain();
1✔
339
  }
1✔
340

341
  @Override
342
  public InternalLogId getLogId() {
343
    return logId;
×
344
  }
345

346
  private class PendingStream extends DelayedStream {
347
    private final PickSubchannelArgs args;
348
    private final Context context = Context.current();
1✔
349
    private final ClientStreamTracer[] tracers;
350

351
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
352
      this.args = args;
1✔
353
      this.tracers = tracers;
1✔
354
    }
1✔
355

356
    /** Runnable may be null. */
357
    private Runnable createRealStream(ClientTransport transport) {
358
      ClientStream realStream;
359
      Context origContext = context.attach();
1✔
360
      try {
361
        realStream = transport.newStream(
1✔
362
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
363
            tracers);
364
      } finally {
365
        context.detach(origContext);
1✔
366
      }
367
      return setStream(realStream);
1✔
368
    }
369

370
    @Override
371
    public void cancel(Status reason) {
372
      super.cancel(reason);
1✔
373
      synchronized (lock) {
1✔
374
        if (reportTransportTerminated != null) {
1✔
375
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
376
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
377
            syncContext.executeLater(reportTransportNotInUse);
1✔
378
            if (pickerState.shutdownStatus != null) {
1✔
379
              syncContext.executeLater(reportTransportTerminated);
1✔
380
              reportTransportTerminated = null;
1✔
381
            }
382
          }
383
        }
384
      }
1✔
385
      syncContext.drain();
1✔
386
    }
1✔
387

388
    @Override
389
    protected void onEarlyCancellation(Status reason) {
390
      for (ClientStreamTracer tracer : tracers) {
1✔
391
        tracer.streamClosed(reason);
1✔
392
      }
393
    }
1✔
394

395
    @Override
396
    public void appendTimeoutInsight(InsightBuilder insight) {
397
      if (args.getCallOptions().isWaitForReady()) {
1✔
398
        insight.append("wait_for_ready");
1✔
399
      }
400
      super.appendTimeoutInsight(insight);
1✔
401
    }
1✔
402
  }
403

404
  static final class PickerState {
405
    /**
406
     * The last picker that {@link #reprocess} has used. May be set to null when the channel has
407
     * moved to idle.
408
     */
409
    @Nullable
410
    final SubchannelPicker lastPicker;
411
    /**
412
     * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
413
     * terminated.
414
     */
415
    @Nullable
416
    final Status shutdownStatus;
417

418
    private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
1✔
419
      this.lastPicker = lastPicker;
1✔
420
      this.shutdownStatus = shutdownStatus;
1✔
421
    }
1✔
422

423
    public PickerState withPicker(SubchannelPicker newPicker) {
424
      return new PickerState(newPicker, this.shutdownStatus);
1✔
425
    }
426

427
    public PickerState withShutdownStatus(Status newShutdownStatus) {
428
      return new PickerState(this.lastPicker, newShutdownStatus);
1✔
429
    }
430
  }
431
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc