• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19965

28 Aug 2025 10:58PM UTC coverage: 88.565% (+0.005%) from 88.56%
#19965

push

github

web-flow
binder: Replace queryIntentServices() hack with the new SystemApis.createContextAsUser()  (#12280)

createContextAsUser() wrapper makes the call to PackageManager's
resolveService() look the same in both the same-user and cross-user
cases. This is how the Android team recommends accessing XXXAsUser() APIs in
general.

We can also remove all the apologies for using reflection since
SystemApis already explains all that.

34691 of 39170 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.55
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import com.google.errorprone.annotations.concurrent.GuardedBy;
23
import io.grpc.CallOptions;
24
import io.grpc.ClientStreamTracer;
25
import io.grpc.Context;
26
import io.grpc.InternalChannelz.SocketStats;
27
import io.grpc.InternalLogId;
28
import io.grpc.LoadBalancer.PickResult;
29
import io.grpc.LoadBalancer.PickSubchannelArgs;
30
import io.grpc.LoadBalancer.SubchannelPicker;
31
import io.grpc.Metadata;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import java.util.ArrayList;
37
import java.util.Collection;
38
import java.util.Collections;
39
import java.util.LinkedHashSet;
40
import java.util.concurrent.Executor;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /** Immutable state needed for picking. 'lock' must be held for writing. */
73
  private volatile PickerState pickerState = new PickerState(null, null);
1✔
74

75
  /**
76
   * Creates a new delayed transport.
77
   *
78
   * @param defaultAppExecutor pending streams will create real streams and run buffered operations
79
   *        in an application executor, which will be this executor, unless there is on provided in
80
   *        {@link CallOptions}.
81
   * @param syncContext all listener callbacks of the delayed transport will be run from this
82
   *        SynchronizationContext.
83
   */
84
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
85
    this.defaultAppExecutor = defaultAppExecutor;
1✔
86
    this.syncContext = syncContext;
1✔
87
  }
1✔
88

89
  @Override
90
  public final Runnable start(final Listener listener) {
91
    this.listener = listener;
1✔
92
    reportTransportInUse = new Runnable() {
1✔
93
        @Override
94
        public void run() {
95
          listener.transportInUse(true);
1✔
96
        }
1✔
97
      };
98
    reportTransportNotInUse = new Runnable() {
1✔
99
        @Override
100
        public void run() {
101
          listener.transportInUse(false);
1✔
102
        }
1✔
103
      };
104
    reportTransportTerminated = new Runnable() {
1✔
105
        @Override
106
        public void run() {
107
          listener.transportTerminated();
1✔
108
        }
1✔
109
      };
110
    return null;
1✔
111
  }
112

113
  /**
114
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
115
   * picker will be consulted.
116
   *
117
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
118
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
119
   */
120
  @Override
121
  public final ClientStream newStream(
122
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
123
      ClientStreamTracer[] tracers) {
124
    try {
125
      PickSubchannelArgs args = new PickSubchannelArgsImpl(
1✔
126
          method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
127
      PickerState state = pickerState;
1✔
128
      while (true) {
129
        if (state.shutdownStatus != null) {
1✔
130
          return new FailingClientStream(state.shutdownStatus, tracers);
1✔
131
        }
132
        PickResult pickResult = null;
1✔
133
        if (state.lastPicker != null) {
1✔
134
          pickResult = state.lastPicker.pickSubchannel(args);
1✔
135
          callOptions = args.getCallOptions();
1✔
136
          // User code provided authority takes precedence over the LB provided one.
137
          if (callOptions.getAuthority() == null
1✔
138
              && pickResult.getAuthorityOverride() != null) {
1✔
139
            callOptions = callOptions.withAuthority(pickResult.getAuthorityOverride());
1✔
140
          }
141
          ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
142
              callOptions.isWaitForReady());
1✔
143
          if (transport != null) {
1✔
144
            ClientStream stream = transport.newStream(
1✔
145
                args.getMethodDescriptor(), args.getHeaders(), callOptions,
1✔
146
                tracers);
147
            // User code provided authority takes precedence over the LB provided one; this will be
148
            // overwritten by ClientCallImpl if the application sets an authority override
149
            if (pickResult.getAuthorityOverride() != null) {
1✔
150
              stream.setAuthority(pickResult.getAuthorityOverride());
1✔
151
            }
152
            return stream;
1✔
153
          }
154
        }
155
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
156
        // race with reprocess()), we will buffer the RPC.  Otherwise, will try with the new picker.
157
        synchronized (lock) {
1✔
158
          PickerState newerState = pickerState;
1✔
159
          if (state == newerState) {
1✔
160
            return createPendingStream(args, tracers, pickResult);
1✔
161
          }
162
          state = newerState;
1✔
163
        }
1✔
164
      }
1✔
165
    } finally {
166
      syncContext.drain();
1✔
167
    }
168
  }
169

170
  /**
171
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
172
   * schedule tasks on syncContext.
173
   */
174
  @GuardedBy("lock")
175
  private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers,
176
      PickResult pickResult) {
177
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
178
    if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) {
1✔
179
      pendingStream.lastPickStatus = pickResult.getStatus();
1✔
180
    }
181
    pendingStreams.add(pendingStream);
1✔
182
    if (getPendingStreamsCount() == 1) {
1✔
183
      syncContext.executeLater(reportTransportInUse);
1✔
184
    }
185
    for (ClientStreamTracer streamTracer : tracers) {
1✔
186
      streamTracer.createPendingStream();
1✔
187
    }
188
    return pendingStream;
1✔
189
  }
190

191
  @Override
192
  public final void ping(final PingCallback callback, Executor executor) {
193
    throw new UnsupportedOperationException("This method is not expected to be called");
×
194
  }
195

196
  @Override
197
  public ListenableFuture<SocketStats> getStats() {
198
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
199
    ret.set(null);
×
200
    return ret;
×
201
  }
202

203
  /**
204
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
205
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
206
   * more buffered streams.
207
   */
208
  @Override
209
  public final void shutdown(final Status status) {
210
    synchronized (lock) {
1✔
211
      if (pickerState.shutdownStatus != null) {
1✔
212
        return;
1✔
213
      }
214
      pickerState = pickerState.withShutdownStatus(status);
1✔
215
      syncContext.executeLater(new Runnable() {
1✔
216
          @Override
217
          public void run() {
218
            listener.transportShutdown(status);
1✔
219
          }
1✔
220
        });
221
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
222
        syncContext.executeLater(reportTransportTerminated);
1✔
223
        reportTransportTerminated = null;
1✔
224
      }
225
    }
1✔
226
    syncContext.drain();
1✔
227
  }
1✔
228

229
  /**
230
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
231
   * this transport.
232
   */
233
  @Override
234
  public final void shutdownNow(Status status) {
235
    shutdown(status);
1✔
236
    Collection<PendingStream> savedPendingStreams;
237
    Runnable savedReportTransportTerminated;
238
    synchronized (lock) {
1✔
239
      savedPendingStreams = pendingStreams;
1✔
240
      savedReportTransportTerminated = reportTransportTerminated;
1✔
241
      reportTransportTerminated = null;
1✔
242
      if (!pendingStreams.isEmpty()) {
1✔
243
        pendingStreams = Collections.emptyList();
1✔
244
      }
245
    }
1✔
246
    if (savedReportTransportTerminated != null) {
1✔
247
      for (PendingStream stream : savedPendingStreams) {
1✔
248
        Runnable runnable = stream.setStream(
1✔
249
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
250
        if (runnable != null) {
1✔
251
          // Drain in-line instead of using an executor as failing stream just throws everything
252
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
253
          // before stream.start().
254
          runnable.run();
1✔
255
        }
256
      }
1✔
257
      syncContext.execute(savedReportTransportTerminated);
1✔
258
    }
259
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
260
    // shutdown().
261
  }
1✔
262

263
  public final boolean hasPendingStreams() {
264
    synchronized (lock) {
1✔
265
      return !pendingStreams.isEmpty();
1✔
266
    }
267
  }
268

269
  @VisibleForTesting
270
  final int getPendingStreamsCount() {
271
    synchronized (lock) {
1✔
272
      return pendingStreams.size();
1✔
273
    }
274
  }
275

276
  /**
277
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
278
   * pick is successful, otherwise keep it pending.
279
   *
280
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
281
   * streams will be served by the latest picker (if a same picker is given more than once, they are
282
   * considered different pickers) as soon as possible.
283
   *
284
   * <p>This method <strong>must not</strong> be called concurrently with itself.
285
   */
286
  final void reprocess(@Nullable SubchannelPicker picker) {
287
    ArrayList<PendingStream> toProcess;
288
    synchronized (lock) {
1✔
289
      pickerState = pickerState.withPicker(picker);
1✔
290
      if (picker == null || !hasPendingStreams()) {
1✔
291
        return;
1✔
292
      }
293
      toProcess = new ArrayList<>(pendingStreams);
1✔
294
    }
1✔
295
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
296

297
    for (final PendingStream stream : toProcess) {
1✔
298
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
299
      CallOptions callOptions = stream.args.getCallOptions();
1✔
300
      if (callOptions.isWaitForReady() && pickResult.hasResult()) {
1✔
301
        stream.lastPickStatus = pickResult.getStatus();
1✔
302
      }
303
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
304
          callOptions.isWaitForReady());
1✔
305
      if (transport != null) {
1✔
306
        Executor executor = defaultAppExecutor;
1✔
307
        // createRealStream may be expensive. It will start real streams on the transport. If
308
        // there are pending requests, they will be serialized too, which may be expensive. Since
309
        // we are now on transport thread, we need to offload the work to an executor.
310
        if (callOptions.getExecutor() != null) {
1✔
311
          executor = callOptions.getExecutor();
1✔
312
        }
313
        Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride());
1✔
314
        if (runnable != null) {
1✔
315
          executor.execute(runnable);
1✔
316
        }
317
        toRemove.add(stream);
1✔
318
      }  // else: stay pending
319
    }
1✔
320

321
    synchronized (lock) {
1✔
322
      // Between this synchronized and the previous one:
323
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
324
      //   - shutdown() may be called, which may turn pendingStreams into null.
325
      if (!hasPendingStreams()) {
1✔
326
        return;
×
327
      }
328
      // Avoid pendingStreams.removeAll() as it can degrade to calling toRemove.contains() for each
329
      // element in pendingStreams.
330
      for (PendingStream stream : toRemove) {
1✔
331
        pendingStreams.remove(stream);
1✔
332
      }
1✔
333
      // Because delayed transport is long-lived, we take this opportunity to down-size the
334
      // hashmap.
335
      if (pendingStreams.isEmpty()) {
1✔
336
        pendingStreams = new LinkedHashSet<>();
1✔
337
      }
338
      if (!hasPendingStreams()) {
1✔
339
        // There may be a brief gap between delayed transport clearing in-use state, and first real
340
        // transport starting streams and setting in-use state.  During the gap the whole channel's
341
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
342
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
343
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
344
        syncContext.executeLater(reportTransportNotInUse);
1✔
345
        if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
1✔
346
          syncContext.executeLater(reportTransportTerminated);
1✔
347
          reportTransportTerminated = null;
1✔
348
        }
349
      }
350
    }
1✔
351
    syncContext.drain();
1✔
352
  }
1✔
353

354
  @Override
355
  public InternalLogId getLogId() {
356
    return logId;
×
357
  }
358

359
  private class PendingStream extends DelayedStream {
360
    private final PickSubchannelArgs args;
361
    private final Context context = Context.current();
1✔
362
    private final ClientStreamTracer[] tracers;
363
    private volatile Status lastPickStatus;
364

365
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
366
      this.args = args;
1✔
367
      this.tracers = tracers;
1✔
368
    }
1✔
369

370
    /** Runnable may be null. */
371
    private Runnable createRealStream(ClientTransport transport, String authorityOverride) {
372
      ClientStream realStream;
373
      Context origContext = context.attach();
1✔
374
      try {
375
        realStream = transport.newStream(
1✔
376
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
377
            tracers);
378
      } finally {
379
        context.detach(origContext);
1✔
380
      }
381
      if (authorityOverride != null) {
1✔
382
        // User code provided authority takes precedence over the LB provided one; this will be
383
        // overwritten by an enqueud call from ClientCallImpl if the application sets an authority
384
        // override. We must call the real stream directly because stream.start() has likely already
385
        // been called on the delayed stream.
386
        realStream.setAuthority(authorityOverride);
1✔
387
      }
388
      return setStream(realStream);
1✔
389
    }
390

391
    @Override
392
    public void cancel(Status reason) {
393
      super.cancel(reason);
1✔
394
      synchronized (lock) {
1✔
395
        if (reportTransportTerminated != null) {
1✔
396
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
397
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
398
            syncContext.executeLater(reportTransportNotInUse);
1✔
399
            if (pickerState.shutdownStatus != null) {
1✔
400
              syncContext.executeLater(reportTransportTerminated);
1✔
401
              reportTransportTerminated = null;
1✔
402
            }
403
          }
404
        }
405
      }
1✔
406
      syncContext.drain();
1✔
407
    }
1✔
408

409
    @Override
410
    protected void onEarlyCancellation(Status reason) {
411
      for (ClientStreamTracer tracer : tracers) {
1✔
412
        tracer.streamClosed(reason);
1✔
413
      }
414
    }
1✔
415

416
    @Override
417
    public void appendTimeoutInsight(InsightBuilder insight) {
418
      if (args.getCallOptions().isWaitForReady()) {
1✔
419
        insight.append("wait_for_ready");
1✔
420
        Status status = lastPickStatus;
1✔
421
        if (status != null && !status.isOk()) {
1✔
422
          insight.appendKeyValue("Last Pick Failure", status);
1✔
423
        }
424
      }
425
      super.appendTimeoutInsight(insight);
1✔
426
    }
1✔
427
  }
428

429
  static final class PickerState {
430
    /**
431
     * The last picker that {@link #reprocess} has used. May be set to null when the channel has
432
     * moved to idle.
433
     */
434
    @Nullable
435
    final SubchannelPicker lastPicker;
436
    /**
437
     * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
438
     * terminated.
439
     */
440
    @Nullable
441
    final Status shutdownStatus;
442

443
    private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
1✔
444
      this.lastPicker = lastPicker;
1✔
445
      this.shutdownStatus = shutdownStatus;
1✔
446
    }
1✔
447

448
    public PickerState withPicker(SubchannelPicker newPicker) {
449
      return new PickerState(newPicker, this.shutdownStatus);
1✔
450
    }
451

452
    public PickerState withShutdownStatus(Status newShutdownStatus) {
453
      return new PickerState(this.lastPicker, newShutdownStatus);
1✔
454
    }
455
  }
456
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc