• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19962

28 Aug 2025 03:57AM UTC coverage: 88.537% (-0.02%) from 88.554%
#19962

push

github

web-flow
servlet: extract ServletServerStream.serializeHeaders() method (#12299)

34680 of 39170 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.22
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.Preconditions;
27
import com.google.common.io.ByteStreams;
28
import io.grpc.Attributes;
29
import io.grpc.CallOptions;
30
import io.grpc.Codec;
31
import io.grpc.Compressor;
32
import io.grpc.Deadline;
33
import io.grpc.Decompressor;
34
import io.grpc.DecompressorRegistry;
35
import io.grpc.Grpc;
36
import io.grpc.Metadata;
37
import io.grpc.Status;
38
import io.grpc.internal.ClientStreamListener.RpcProgress;
39
import java.io.InputStream;
40
import java.util.concurrent.TimeUnit;
41
import java.util.logging.Level;
42
import java.util.logging.Logger;
43
import javax.annotation.Nullable;
44

45
/**
46
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
47
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
48
 * from the sending application thread.
49
 */
50
public abstract class AbstractClientStream extends AbstractStream
51
    implements ClientStream, MessageFramer.Sink {
52

53
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
54

55
  /**
56
   * A sink for outbound operations, separated from the stream simply to avoid name
57
   * collisions/confusion. Only called from application thread.
58
   */
59
  protected interface Sink {
60
    /** 
61
     * Sends the request headers to the remote end point.
62
     *
63
     * @param metadata the metadata to be sent
64
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
65
     *     when sending an unary GET request
66
     */
67
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
68

69
    /**
70
     * Sends an outbound frame to the remote end point.
71
     *
72
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
73
     *     endOfStream} with no data to send
74
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
75
     *     {@code true} if this is {@code true}
76
     * @param flush {@code true} if more data may not be arriving soon
77
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
78
     */
79
    void writeFrame(
80
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
81

82
    /**
83
     * Tears down the stream, typically in the event of a timeout. This method may be called
84
     * multiple times and from any thread.
85
     *
86
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
87
     * {@link AbstractClientStream#cancel} delegates to this method.
88
     */
89
    void cancel(Status status);
90
  }
91

92
  private final TransportTracer transportTracer;
93
  private final Framer framer;
94
  private final boolean shouldBeCountedForInUse;
95
  private final boolean useGet;
96
  private Metadata headers;
97
  /**
98
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
99
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
100
   * processed asynchronously.
101
   */
102
  private volatile boolean cancelled;
103

104
  protected AbstractClientStream(
105
      WritableBufferAllocator bufferAllocator,
106
      StatsTraceContext statsTraceCtx,
107
      TransportTracer transportTracer,
108
      Metadata headers,
109
      CallOptions callOptions,
110
      boolean useGet) {
1✔
111
    checkNotNull(headers, "headers");
1✔
112
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
113
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
114
    this.useGet = useGet;
1✔
115
    if (!useGet) {
1✔
116
      framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
117
      this.headers = headers;
1✔
118
    } else {
119
      framer = new GetFramer(headers, statsTraceCtx);
1✔
120
    }
121
  }
1✔
122

123
  @Override
124
  public void setDeadline(Deadline deadline) {
125
    headers.discardAll(TIMEOUT_KEY);
1✔
126
    headers.put(TIMEOUT_KEY, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
127
  }
1✔
128

129
  @Override
130
  public void setMaxOutboundMessageSize(int maxSize) {
131
    framer.setMaxOutboundMessageSize(maxSize);
1✔
132
  }
1✔
133

134
  @Override
135
  public void setMaxInboundMessageSize(int maxSize) {
136
    transportState().setMaxInboundMessageSize(maxSize);
1✔
137
  }
1✔
138

139
  @Override
140
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
141
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
142
  }
1✔
143

144
  @Override
145
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
146
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
147
  }
1✔
148

149
  /** {@inheritDoc} */
150
  @Override
151
  protected abstract TransportState transportState();
152

153
  @Override
154
  public final void start(ClientStreamListener listener) {
155
    transportState().setListener(listener);
1✔
156
    if (!useGet) {
1✔
157
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
158
      headers = null;
1✔
159
    }
160
  }
1✔
161

162
  /**
163
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
164
   * unique sink.
165
   */
166
  protected abstract Sink abstractClientStreamSink();
167

168
  @Override
169
  protected final Framer framer() {
170
    return framer;
1✔
171
  }
172

173
  /**
174
   * Returns true if this stream should be counted when determining the in-use state of the
175
   * transport.
176
   */
177
  public final boolean shouldBeCountedForInUse() {
178
    return shouldBeCountedForInUse;
1✔
179
  }
180

181
  @Override
182
  public final void deliverFrame(
183
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
184
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
185
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
186
  }
1✔
187

188
  @Override
189
  public final void halfClose() {
190
    if (!transportState().isOutboundClosed()) {
1✔
191
      transportState().setOutboundClosed();
1✔
192
      endOfMessages();
1✔
193
    }
194
  }
1✔
195

196
  @Override
197
  public final void cancel(Status reason) {
198
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
199
    cancelled = true;
1✔
200
    abstractClientStreamSink().cancel(reason);
1✔
201
  }
1✔
202

203
  @Override
204
  public final boolean isReady() {
205
    return super.isReady() && !cancelled;
1✔
206
  }
207

208
  @Override
209
  public final void appendTimeoutInsight(InsightBuilder insight) {
210
    Attributes attrs = getAttributes();
1✔
211
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
212
  }
1✔
213

214
  protected TransportTracer getTransportTracer() {
215
    return transportTracer;
1✔
216
  }
217

218
  /** This should only be called from the transport thread. */
219
  protected abstract static class TransportState extends AbstractStream.TransportState {
220
    /** Whether listener.closed() has been called. */
221
    private final StatsTraceContext statsTraceCtx;
222
    private boolean listenerClosed;
223
    private ClientStreamListener listener;
224
    private boolean fullStreamDecompression;
225
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
226

227
    private boolean deframerClosed = false;
1✔
228
    private Runnable deframerClosedTask;
229

230
    /** Whether the client has half-closed the stream. */
231
    private volatile boolean outboundClosed;
232

233
    /**
234
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
235
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
236
     */
237
    private boolean statusReported;
238
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
239
    private boolean statusReportedIsOk;
240

241
    protected TransportState(
242
        int maxMessageSize,
243
        StatsTraceContext statsTraceCtx,
244
        TransportTracer transportTracer,
245
        CallOptions options) {
246
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
247
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
248
      if (options.getOnReadyThreshold() != null) {
1✔
249
        this.setOnReadyThreshold(options.getOnReadyThreshold());
1✔
250
      }
251
    }
1✔
252

253
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
254
      this.fullStreamDecompression = fullStreamDecompression;
1✔
255
    }
1✔
256

257
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
258
      checkState(this.listener == null, "Already called start");
1✔
259
      this.decompressorRegistry =
1✔
260
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
261
    }
1✔
262

263
    @VisibleForTesting
264
    public final void setListener(ClientStreamListener listener) {
265
      checkState(this.listener == null, "Already called setListener");
1✔
266
      this.listener = checkNotNull(listener, "listener");
1✔
267
    }
1✔
268

269
    @Override
270
    public void deframerClosed(boolean hasPartialMessage) {
271
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
272
      deframerClosed = true;
1✔
273
      if (statusReportedIsOk && hasPartialMessage) {
1✔
274
        transportReportStatus(
1✔
275
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
276
            true,
277
            new Metadata());
278
      }
279
      if (deframerClosedTask != null) {
1✔
280
        deframerClosedTask.run();
1✔
281
        deframerClosedTask = null;
1✔
282
      }
283
    }
1✔
284

285
    @Override
286
    protected final ClientStreamListener listener() {
287
      return listener;
1✔
288
    }
289

290
    private final void setOutboundClosed() {
291
      outboundClosed = true;
1✔
292
    }
1✔
293

294
    protected final boolean isOutboundClosed() {
295
      return outboundClosed;
1✔
296
    }
297

298
    /**
299
     * Called by transport implementations when they receive headers.
300
     *
301
     * @param headers the parsed headers
302
     */
303
    protected void inboundHeadersReceived(Metadata headers) {
304
      checkState(!statusReported, "Received headers on closed stream");
1✔
305
      statsTraceCtx.clientInboundHeaders(headers);
1✔
306

307
      boolean compressedStream = false;
1✔
308
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
309
      if (fullStreamDecompression && streamEncoding != null) {
1✔
310
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
311
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
312
          compressedStream = true;
1✔
313
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
314
          deframeFailed(
1✔
315
              Status.INTERNAL
316
                  .withDescription(
1✔
317
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
318
                  .asRuntimeException());
1✔
319
          return;
1✔
320
        }
321
      }
322

323
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
324
      if (messageEncoding != null) {
1✔
325
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
326
        if (decompressor == null) {
1✔
327
          deframeFailed(
1✔
328
              Status.INTERNAL
329
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
330
                  .asRuntimeException());
1✔
331
          return;
1✔
332
        } else if (decompressor != Codec.Identity.NONE) {
1✔
333
          if (compressedStream) {
1✔
334
            deframeFailed(
1✔
335
                Status.INTERNAL
336
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
337
                    .asRuntimeException());
1✔
338
            return;
1✔
339
          }
340
          setDecompressor(decompressor);
1✔
341
        }
342
      }
343

344
      listener().headersRead(headers);
1✔
345
    }
1✔
346

347
    /**
348
     * Processes the contents of a received data frame from the server.
349
     *
350
     * @param frame the received data frame. Its ownership is transferred to this method.
351
     */
352
    protected void inboundDataReceived(ReadableBuffer frame) {
353
      checkNotNull(frame, "frame");
1✔
354
      boolean needToCloseFrame = true;
1✔
355
      try {
356
        if (statusReported) {
1✔
357
          log.log(Level.INFO, "Received data on closed stream");
×
358
          return;
×
359
        }
360

361
        needToCloseFrame = false;
1✔
362
        deframe(frame);
1✔
363
      } finally {
364
        if (needToCloseFrame) {
1✔
365
          frame.close();
×
366
        }
367
      }
368
    }
1✔
369

370
    /**
371
     * Processes the trailers and status from the server.
372
     *
373
     * @param trailers the received trailers
374
     * @param status the status extracted from the trailers
375
     */
376
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
377
      checkNotNull(status, "status");
1✔
378
      checkNotNull(trailers, "trailers");
1✔
379
      if (statusReported) {
1✔
380
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
381
            new Object[]{status, trailers});
382
        return;
1✔
383
      }
384
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
385
      transportReportStatus(status, false, trailers);
1✔
386
    }
1✔
387

388
    /**
389
     * Report stream closure with status to the application layer if not already reported. This
390
     * method must be called from the transport thread.
391
     *
392
     * @param status the new status to set
393
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
394
     *        may already be queued up in the deframer. If {@code false}, the listener will be
395
     *        notified immediately after all currently completed messages in the deframer have been
396
     *        delivered to the application.
397
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
398
     *        server
399
     */
400
    public final void transportReportStatus(final Status status, boolean stopDelivery,
401
        final Metadata trailers) {
402
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
403
    }
1✔
404

405
    /**
406
     * Report stream closure with status to the application layer if not already reported. This
407
     * method must be called from the transport thread.
408
     *
409
     * @param status the new status to set
410
     * @param rpcProgress RPC progress that the
411
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
412
     *        will receive
413
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
414
     *        may already be queued up in the deframer and overrides any previously queued status.
415
     *        If {@code false}, the listener will be notified immediately after all currently
416
     *        completed messages in the deframer have been delivered to the application.
417
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
418
     *        server
419
     */
420
    public final void transportReportStatus(
421
        final Status status,
422
        final RpcProgress rpcProgress,
423
        boolean stopDelivery,
424
        final Metadata trailers) {
425
      checkNotNull(status, "status");
1✔
426
      checkNotNull(trailers, "trailers");
1✔
427
      // If stopDelivery, we continue in case previous invocation is waiting for stall
428
      if (statusReported && !stopDelivery) {
1✔
429
        return;
1✔
430
      }
431
      statusReported = true;
1✔
432
      statusReportedIsOk = status.isOk();
1✔
433
      onStreamDeallocated();
1✔
434

435
      if (deframerClosed) {
1✔
436
        deframerClosedTask = null;
1✔
437
        closeListener(status, rpcProgress, trailers);
1✔
438
      } else {
439
        deframerClosedTask =
1✔
440
            new Runnable() {
1✔
441
              @Override
442
              public void run() {
443
                closeListener(status, rpcProgress, trailers);
1✔
444
              }
1✔
445
            };
446
        closeDeframer(stopDelivery);
1✔
447
      }
448
    }
1✔
449

450
    /**
451
     * Closes the listener if not previously closed.
452
     *
453
     * @throws IllegalStateException if the call has not yet been started.
454
     */
455
    private void closeListener(
456
        Status status, RpcProgress rpcProgress, Metadata trailers) {
457
      if (!listenerClosed) {
1✔
458
        listenerClosed = true;
1✔
459
        statsTraceCtx.streamClosed(status);
1✔
460
        if (getTransportTracer() != null) {
1✔
461
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
462
        }
463
        listener().closed(status, rpcProgress, trailers);
1✔
464
      }
465
    }
1✔
466
  }
467

468
  private class GetFramer implements Framer {
469
    private Metadata headers;
470
    private boolean closed;
471
    private final StatsTraceContext statsTraceCtx;
472
    private byte[] payload;
473

474
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
475
      this.headers = checkNotNull(headers, "headers");
1✔
476
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
477
    }
1✔
478

479
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
480
    @Override
481
    public void writePayload(InputStream message) {
482
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
483
      try {
484
        payload = ByteStreams.toByteArray(message);
1✔
485
      } catch (java.io.IOException ex) {
×
486
        throw new RuntimeException(ex);
×
487
      }
1✔
488
      statsTraceCtx.outboundMessage(0);
1✔
489
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
490
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
491
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
492
      // it using e.g., base64.  However, we are not supposed to know such detail here.
493
      //
494
      // We don't want to move this line to where the encoding happens either, because we'd better
495
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
496
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
497
      //
498
      // Because the payload is usually very small, people shouldn't care about the size difference
499
      // caused by encoding.
500
      statsTraceCtx.outboundWireSize(payload.length);
1✔
501
    }
1✔
502

503
    @Override
504
    public void flush() {}
1✔
505

506
    @Override
507
    public boolean isClosed() {
508
      return closed;
1✔
509
    }
510

511
    /** Closes, with flush. */
512
    @Override
513
    public void close() {
514
      closed = true;
1✔
515
      checkState(payload != null,
1✔
516
          "Lack of request message. GET request is only supported for unary requests");
517
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
518
      payload = null;
1✔
519
      headers = null;
1✔
520
    }
1✔
521

522
    /** Closes, without flush. */
523
    @Override
524
    public void dispose() {
525
      closed = true;
×
526
      payload = null;
×
527
      headers = null;
×
528
    }
×
529

530
    // Compression is not supported for GET encoding.
531
    @Override
532
    public Framer setMessageCompression(boolean enable) {
533
      return this;
×
534
    }
535

536
    @Override
537
    public Framer setCompressor(Compressor compressor) {
538
      return this;
×
539
    }
540

541
    // TODO(zsurocking): support this
542
    @Override
543
    public void setMaxOutboundMessageSize(int maxSize) {}
×
544
  }
545
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc