• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19965

28 Aug 2025 10:58PM UTC coverage: 88.565% (+0.005%) from 88.56%
#19965

push

github

web-flow
binder: Replace queryIntentServices() hack with the new SystemApis.createContextAsUser()  (#12280)

createContextAsUser() wrapper makes the call to PackageManager's
resolveService() look the same in both the same-user and cross-user
cases. This is how the Android team recommends accessing XXXAsUser() APIs in
general.

We can also remove all the apologies for using reflection since
SystemApis already explains all that.

34691 of 39170 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.22
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Preconditions;
27
import com.google.common.io.ByteStreams;
28
import io.grpc.Attributes;
29
import io.grpc.CallOptions;
30
import io.grpc.Codec;
31
import io.grpc.Compressor;
32
import io.grpc.Deadline;
33
import io.grpc.Decompressor;
34
import io.grpc.DecompressorRegistry;
35
import io.grpc.Grpc;
36
import io.grpc.Metadata;
37
import io.grpc.Status;
38
import io.grpc.internal.ClientStreamListener.RpcProgress;
39
import java.io.InputStream;
40
import java.util.concurrent.TimeUnit;
41
import java.util.logging.Level;
42
import java.util.logging.Logger;
43
import javax.annotation.Nullable;
44

45
/**
46
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
47
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
48
 * from the sending application thread.
49
 */
50
public abstract class AbstractClientStream extends AbstractStream
51
    implements ClientStream, MessageFramer.Sink {
52

53
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
54

55
  /**
56
   * A sink for outbound operations, separated from the stream simply to avoid name
57
   * collisions/confusion. Only called from application thread.
58
   */
59
  protected interface Sink {
60
    /** 
61
     * Sends the request headers to the remote end point.
62
     *
63
     * @param metadata the metadata to be sent
64
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
65
     *     when sending an unary GET request
66
     */
67
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
68

69
    /**
70
     * Sends an outbound frame to the remote end point.
71
     *
72
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
73
     *     endOfStream} with no data to send
74
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
75
     *     {@code true} if this is {@code true}
76
     * @param flush {@code true} if more data may not be arriving soon
77
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
78
     */
79
    void writeFrame(
80
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
81

82
    /**
83
     * Tears down the stream, typically in the event of a timeout. This method may be called
84
     * multiple times and from any thread.
85
     *
86
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
87
     * {@link AbstractClientStream#cancel} delegates to this method.
88
     */
89
    void cancel(Status status);
90
  }
91

92
  private final TransportTracer transportTracer;
93
  private final Framer framer;
94
  private final boolean shouldBeCountedForInUse;
95
  private final boolean useGet;
96
  private Metadata headers;
97
  /**
98
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
99
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
100
   * processed asynchronously.
101
   */
102
  private volatile boolean cancelled;
103

104
  protected AbstractClientStream(
105
      WritableBufferAllocator bufferAllocator,
106
      StatsTraceContext statsTraceCtx,
107
      TransportTracer transportTracer,
108
      Metadata headers,
109
      CallOptions callOptions,
110
      boolean useGet) {
1✔
111
    checkNotNull(headers, "headers");
1✔
112
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
113
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
114
    this.useGet = useGet;
1✔
115
    if (!useGet) {
1✔
116
      framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
117
      this.headers = headers;
1✔
118
    } else {
119
      framer = new GetFramer(headers, statsTraceCtx);
1✔
120
    }
121
  }
1✔
122

123
  @Override
124
  public void setDeadline(Deadline deadline) {
125
    headers.discardAll(TIMEOUT_KEY);
1✔
126
    headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
127
  }
1✔
128

129
  @Override
130
  public void setMaxOutboundMessageSize(int maxSize) {
131
    framer.setMaxOutboundMessageSize(maxSize);
1✔
132
  }
1✔
133

134
  @Override
135
  public void setMaxInboundMessageSize(int maxSize) {
136
    transportState().setMaxInboundMessageSize(maxSize);
1✔
137
  }
1✔
138

139
  @Override
140
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
141
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
142
  }
1✔
143

144
  @Override
145
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
146
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
147
  }
1✔
148

149
  /** {@inheritDoc} */
150
  @Override
151
  protected abstract TransportState transportState();
152

153
  @Override
154
  public final void start(ClientStreamListener listener) {
155
    transportState().setListener(listener);
1✔
156
    if (!useGet) {
1✔
157
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
158
      headers = null;
1✔
159
    }
160
  }
1✔
161

162
  /**
163
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
164
   * unique sink.
165
   */
166
  protected abstract Sink abstractClientStreamSink();
167

168
  @Override
169
  protected final Framer framer() {
170
    return framer;
1✔
171
  }
172

173
  /**
174
   * Returns true if this stream should be counted when determining the in-use state of the
175
   * transport.
176
   */
177
  public final boolean shouldBeCountedForInUse() {
178
    return shouldBeCountedForInUse;
1✔
179
  }
180

181
  @Override
182
  public final void deliverFrame(
183
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
184
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
185
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
186
  }
1✔
187

188
  @Override
189
  public final void halfClose() {
190
    if (!transportState().isOutboundClosed()) {
1✔
191
      transportState().setOutboundClosed();
1✔
192
      endOfMessages();
1✔
193
    }
194
  }
1✔
195

196
  @Override
197
  public final void cancel(Status reason) {
198
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
199
    cancelled = true;
1✔
200
    abstractClientStreamSink().cancel(reason);
1✔
201
  }
1✔
202

203
  @Override
204
  public final boolean isReady() {
205
    return super.isReady() && !cancelled;
1✔
206
  }
207

208
  @Override
209
  public final void appendTimeoutInsight(InsightBuilder insight) {
210
    Attributes attrs = getAttributes();
1✔
211
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
212
  }
1✔
213

214
  protected TransportTracer getTransportTracer() {
215
    return transportTracer;
1✔
216
  }
217

218
  /** This should only be called from the transport thread. */
219
  protected abstract static class TransportState extends AbstractStream.TransportState {
220
    /** Whether listener.closed() has been called. */
221
    private final StatsTraceContext statsTraceCtx;
222
    private boolean listenerClosed;
223
    private ClientStreamListener listener;
224
    private boolean fullStreamDecompression;
225
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
226

227
    private boolean deframerClosed = false;
1✔
228
    private Runnable deframerClosedTask;
229

230
    /** Whether the client has half-closed the stream. */
231
    private volatile boolean outboundClosed;
232

233
    /**
234
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
235
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
236
     */
237
    private boolean statusReported;
238
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
239
    private boolean statusReportedIsOk;
240

241
    protected TransportState(
242
        int maxMessageSize,
243
        StatsTraceContext statsTraceCtx,
244
        TransportTracer transportTracer,
245
        CallOptions options) {
246
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
247
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
248
      if (options.getOnReadyThreshold() != null) {
1✔
249
        this.setOnReadyThreshold(options.getOnReadyThreshold());
1✔
250
      }
251
    }
1✔
252

253
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
254
      this.fullStreamDecompression = fullStreamDecompression;
1✔
255
    }
1✔
256

257
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
258
      checkState(this.listener == null, "Already called start");
1✔
259
      this.decompressorRegistry =
1✔
260
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
261
    }
1✔
262

263
    @VisibleForTesting
264
    public final void setListener(ClientStreamListener listener) {
265
      checkState(this.listener == null, "Already called setListener");
1✔
266
      this.listener = checkNotNull(listener, "listener");
1✔
267
    }
1✔
268

269
    @Override
270
    public void deframerClosed(boolean hasPartialMessage) {
271
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
272
      deframerClosed = true;
1✔
273
      if (statusReportedIsOk && hasPartialMessage) {
1✔
274
        transportReportStatus(
1✔
275
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
276
            true,
277
            new Metadata());
278
      }
279
      if (deframerClosedTask != null) {
1✔
280
        deframerClosedTask.run();
1✔
281
        deframerClosedTask = null;
1✔
282
      }
283
    }
1✔
284

285
    @Override
286
    protected final ClientStreamListener listener() {
287
      return listener;
1✔
288
    }
289

290
    private final void setOutboundClosed() {
291
      outboundClosed = true;
1✔
292
    }
1✔
293

294
    protected final boolean isOutboundClosed() {
295
      return outboundClosed;
1✔
296
    }
297

298
    /**
299
     * Called by transport implementations when they receive headers.
300
     *
301
     * @param headers the parsed headers
302
     */
303
    protected void inboundHeadersReceived(Metadata headers) {
304
      checkState(!statusReported, "Received headers on closed stream");
1✔
305
      statsTraceCtx.clientInboundHeaders(headers);
1✔
306

307
      boolean compressedStream = false;
1✔
308
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
309
      if (fullStreamDecompression && streamEncoding != null) {
1✔
310
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
311
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
312
          compressedStream = true;
1✔
313
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
314
          deframeFailed(
1✔
315
              Status.INTERNAL
316
                  .withDescription(
1✔
317
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
318
                  .asRuntimeException());
1✔
319
          return;
1✔
320
        }
321
      }
322

323
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
324
      if (messageEncoding != null) {
1✔
325
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
326
        if (decompressor == null) {
1✔
327
          deframeFailed(
1✔
328
              Status.INTERNAL
329
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
330
                  .asRuntimeException());
1✔
331
          return;
1✔
332
        } else if (decompressor != Codec.Identity.NONE) {
1✔
333
          if (compressedStream) {
1✔
334
            deframeFailed(
1✔
335
                Status.INTERNAL
336
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
337
                    .asRuntimeException());
1✔
338
            return;
1✔
339
          }
340
          setDecompressor(decompressor);
1✔
341
        }
342
      }
343

344
      listener().headersRead(headers);
1✔
345
    }
1✔
346

347
    /**
348
     * Processes the contents of a received data frame from the server.
349
     *
350
     * @param frame the received data frame. Its ownership is transferred to this method.
351
     */
352
    protected void inboundDataReceived(ReadableBuffer frame) {
353
      checkNotNull(frame, "frame");
1✔
354
      boolean needToCloseFrame = true;
1✔
355
      try {
356
        if (statusReported) {
1✔
357
          log.log(Level.INFO, "Received data on closed stream");
×
358
          return;
×
359
        }
360

361
        needToCloseFrame = false;
1✔
362
        deframe(frame);
1✔
363
      } finally {
364
        if (needToCloseFrame) {
1✔
365
          frame.close();
×
366
        }
367
      }
368
    }
1✔
369

370
    /**
371
     * Processes the trailers and status from the server.
372
     *
373
     * @param trailers the received trailers
374
     * @param status the status extracted from the trailers
375
     */
376
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
377
      checkNotNull(status, "status");
1✔
378
      checkNotNull(trailers, "trailers");
1✔
379
      if (statusReported) {
1✔
380
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
381
            new Object[]{status, trailers});
382
        return;
1✔
383
      }
384
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
385
      transportReportStatus(status, false, trailers);
1✔
386
    }
1✔
387

388
    /**
389
     * Report stream closure with status to the application layer if not already reported. This
390
     * method must be called from the transport thread.
391
     *
392
     * @param status the new status to set
393
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
394
     *        may already be queued up in the deframer. If {@code false}, the listener will be
395
     *        notified immediately after all currently completed messages in the deframer have been
396
     *        delivered to the application.
397
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
398
     *        server
399
     */
400
    public final void transportReportStatus(final Status status, boolean stopDelivery,
401
        final Metadata trailers) {
402
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
403
    }
1✔
404

405
    /**
406
     * Report stream closure with status to the application layer if not already reported. This
407
     * method must be called from the transport thread.
408
     *
409
     * @param status the new status to set
410
     * @param rpcProgress RPC progress that the
411
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
412
     *        will receive
413
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
414
     *        may already be queued up in the deframer and overrides any previously queued status.
415
     *        If {@code false}, the listener will be notified immediately after all currently
416
     *        completed messages in the deframer have been delivered to the application.
417
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
418
     *        server
419
     */
420
    public final void transportReportStatus(
421
        final Status status,
422
        final RpcProgress rpcProgress,
423
        boolean stopDelivery,
424
        final Metadata trailers) {
425
      checkNotNull(status, "status");
1✔
426
      checkNotNull(trailers, "trailers");
1✔
427
      // If stopDelivery, we continue in case previous invocation is waiting for stall
428
      if (statusReported && !stopDelivery) {
1✔
429
        return;
1✔
430
      }
431
      statusReported = true;
1✔
432
      statusReportedIsOk = status.isOk();
1✔
433
      onStreamDeallocated();
1✔
434

435
      if (deframerClosed) {
1✔
436
        deframerClosedTask = null;
1✔
437
        closeListener(status, rpcProgress, trailers);
1✔
438
      } else {
439
        deframerClosedTask =
1✔
440
            new Runnable() {
1✔
441
              @Override
442
              public void run() {
443
                closeListener(status, rpcProgress, trailers);
1✔
444
              }
1✔
445
            };
446
        closeDeframer(stopDelivery);
1✔
447
      }
448
    }
1✔
449

450
    /**
451
     * Closes the listener if not previously closed.
452
     *
453
     * @throws IllegalStateException if the call has not yet been started.
454
     */
455
    private void closeListener(
456
        Status status, RpcProgress rpcProgress, Metadata trailers) {
457
      if (!listenerClosed) {
1✔
458
        listenerClosed = true;
1✔
459
        statsTraceCtx.streamClosed(status);
1✔
460
        if (getTransportTracer() != null) {
1✔
461
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
462
        }
463
        listener().closed(status, rpcProgress, trailers);
1✔
464
      }
465
    }
1✔
466
  }
467

468
  private class GetFramer implements Framer {
469
    private Metadata headers;
470
    private boolean closed;
471
    private final StatsTraceContext statsTraceCtx;
472
    private byte[] payload;
473

474
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
475
      this.headers = checkNotNull(headers, "headers");
1✔
476
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
477
    }
1✔
478

479
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
480
    @Override
481
    public void writePayload(InputStream message) {
482
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
483
      try {
484
        payload = ByteStreams.toByteArray(message);
1✔
485
      } catch (java.io.IOException ex) {
×
486
        throw new RuntimeException(ex);
×
487
      }
1✔
488
      statsTraceCtx.outboundMessage(0);
1✔
489
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
490
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
491
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
492
      // it using e.g., base64.  However, we are not supposed to know such detail here.
493
      //
494
      // We don't want to move this line to where the encoding happens either, because we'd better
495
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
496
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
497
      //
498
      // Because the payload is usually very small, people shouldn't care about the size difference
499
      // caused by encoding.
500
      statsTraceCtx.outboundWireSize(payload.length);
1✔
501
    }
1✔
502

503
    @Override
504
    public void flush() {}
1✔
505

506
    @Override
507
    public boolean isClosed() {
508
      return closed;
1✔
509
    }
510

511
    /** Closes, with flush. */
512
    @Override
513
    public void close() {
514
      closed = true;
1✔
515
      checkState(payload != null,
1✔
516
          "Lack of request message. GET request is only supported for unary requests");
517
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
518
      payload = null;
1✔
519
      headers = null;
1✔
520
    }
1✔
521

522
    /** Closes, without flush. */
523
    @Override
524
    public void dispose() {
525
      closed = true;
×
526
      payload = null;
×
527
      headers = null;
×
528
    }
×
529

530
    // Compression is not supported for GET encoding.
531
    @Override
532
    public Framer setMessageCompression(boolean enable) {
533
      return this;
×
534
    }
535

536
    @Override
537
    public Framer setCompressor(Compressor compressor) {
538
      return this;
×
539
    }
540

541
    // TODO(zsurocking): support this
542
    @Override
543
    public void setMaxOutboundMessageSize(int maxSize) {}
×
544
  }
545
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc