• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18940

12 Dec 2023 10:50PM CUT coverage: 88.309% (+0.005%) from 88.304%
#18940

push

github

web-flow
core: de-expermentalize pick first config parsing (#10531) (#10742)

Co-authored-by: Terry Wilson <tmwilson@google.com>

30336 of 34352 relevant lines covered (88.31%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.18
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24
import static java.lang.Math.max;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Preconditions;
28
import com.google.common.io.ByteStreams;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.Codec;
32
import io.grpc.Compressor;
33
import io.grpc.Deadline;
34
import io.grpc.Decompressor;
35
import io.grpc.DecompressorRegistry;
36
import io.grpc.Grpc;
37
import io.grpc.Metadata;
38
import io.grpc.Status;
39
import io.grpc.internal.ClientStreamListener.RpcProgress;
40
import java.io.InputStream;
41
import java.util.concurrent.TimeUnit;
42
import java.util.logging.Level;
43
import java.util.logging.Logger;
44
import javax.annotation.Nullable;
45

46
/**
47
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
48
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
49
 * from the sending application thread.
50
 */
51
public abstract class AbstractClientStream extends AbstractStream
52
    implements ClientStream, MessageFramer.Sink {
53

54
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
55

56
  /**
57
   * A sink for outbound operations, separated from the stream simply to avoid name
58
   * collisions/confusion. Only called from application thread.
59
   */
60
  protected interface Sink {
61
    /** 
62
     * Sends the request headers to the remote end point.
63
     *
64
     * @param metadata the metadata to be sent
65
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
66
     *     when sending an unary GET request
67
     */
68
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
69

70
    /**
71
     * Sends an outbound frame to the remote end point.
72
     *
73
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
74
     *     endOfStream} with no data to send
75
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
76
     *     {@code true} if this is {@code true}
77
     * @param flush {@code true} if more data may not be arriving soon
78
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
79
     */
80
    void writeFrame(
81
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
82

83
    /**
84
     * Tears down the stream, typically in the event of a timeout. This method may be called
85
     * multiple times and from any thread.
86
     *
87
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
88
     * {@link AbstractClientStream#cancel} delegates to this method.
89
     */
90
    void cancel(Status status);
91
  }
92

93
  private final TransportTracer transportTracer;
94
  private final Framer framer;
95
  private boolean shouldBeCountedForInUse;
96
  private boolean useGet;
97
  private Metadata headers;
98
  /**
99
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
100
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
101
   * processed asynchronously.
102
   */
103
  private volatile boolean cancelled;
104

105
  protected AbstractClientStream(
106
      WritableBufferAllocator bufferAllocator,
107
      StatsTraceContext statsTraceCtx,
108
      TransportTracer transportTracer,
109
      Metadata headers,
110
      CallOptions callOptions,
111
      boolean useGet) {
1✔
112
    checkNotNull(headers, "headers");
1✔
113
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
114
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
115
    this.useGet = useGet;
1✔
116
    if (!useGet) {
1✔
117
      framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
118
      this.headers = headers;
1✔
119
    } else {
120
      framer = new GetFramer(headers, statsTraceCtx);
1✔
121
    }
122
  }
1✔
123

124
  @Override
125
  public void setDeadline(Deadline deadline) {
126
    headers.discardAll(TIMEOUT_KEY);
1✔
127
    long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
128
    headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
129
  }
1✔
130

131
  @Override
132
  public void setMaxOutboundMessageSize(int maxSize) {
133
    framer.setMaxOutboundMessageSize(maxSize);
1✔
134
  }
1✔
135

136
  @Override
137
  public void setMaxInboundMessageSize(int maxSize) {
138
    transportState().setMaxInboundMessageSize(maxSize);
1✔
139
  }
1✔
140

141
  @Override
142
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
143
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
144
  }
1✔
145

146
  @Override
147
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
148
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
149
  }
1✔
150

151
  /** {@inheritDoc} */
152
  @Override
153
  protected abstract TransportState transportState();
154

155
  @Override
156
  public final void start(ClientStreamListener listener) {
157
    transportState().setListener(listener);
1✔
158
    if (!useGet) {
1✔
159
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
160
      headers = null;
1✔
161
    }
162
  }
1✔
163

164
  /**
165
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
166
   * unique sink.
167
   */
168
  protected abstract Sink abstractClientStreamSink();
169

170
  @Override
171
  protected final Framer framer() {
172
    return framer;
1✔
173
  }
174

175
  /**
176
   * Returns true if this stream should be counted when determining the in-use state of the
177
   * transport.
178
   */
179
  public final boolean shouldBeCountedForInUse() {
180
    return shouldBeCountedForInUse;
1✔
181
  }
182

183
  @Override
184
  public final void deliverFrame(
185
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
186
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
187
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
188
  }
1✔
189

190
  @Override
191
  public final void halfClose() {
192
    if (!transportState().isOutboundClosed()) {
1✔
193
      transportState().setOutboundClosed();
1✔
194
      endOfMessages();
1✔
195
    }
196
  }
1✔
197

198
  @Override
199
  public final void cancel(Status reason) {
200
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
201
    cancelled = true;
1✔
202
    abstractClientStreamSink().cancel(reason);
1✔
203
  }
1✔
204

205
  @Override
206
  public final boolean isReady() {
207
    return super.isReady() && !cancelled;
1✔
208
  }
209

210
  @Override
211
  public final void appendTimeoutInsight(InsightBuilder insight) {
212
    Attributes attrs = getAttributes();
1✔
213
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
214
  }
1✔
215

216
  protected TransportTracer getTransportTracer() {
217
    return transportTracer;
1✔
218
  }
219

220
  /** This should only be called from the transport thread. */
221
  protected abstract static class TransportState extends AbstractStream.TransportState {
222
    /** Whether listener.closed() has been called. */
223
    private final StatsTraceContext statsTraceCtx;
224
    private boolean listenerClosed;
225
    private ClientStreamListener listener;
226
    private boolean fullStreamDecompression;
227
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
228

229
    private boolean deframerClosed = false;
1✔
230
    private Runnable deframerClosedTask;
231

232
    /** Whether the client has half-closed the stream. */
233
    private volatile boolean outboundClosed;
234

235
    /**
236
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
237
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
238
     */
239
    private boolean statusReported;
240
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
241
    private boolean statusReportedIsOk;
242

243
    protected TransportState(
244
        int maxMessageSize,
245
        StatsTraceContext statsTraceCtx,
246
        TransportTracer transportTracer) {
247
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
248
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
249
    }
1✔
250

251
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
252
      this.fullStreamDecompression = fullStreamDecompression;
1✔
253
    }
1✔
254

255
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
256
      checkState(this.listener == null, "Already called start");
1✔
257
      this.decompressorRegistry =
1✔
258
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
259
    }
1✔
260

261
    @VisibleForTesting
262
    public final void setListener(ClientStreamListener listener) {
263
      checkState(this.listener == null, "Already called setListener");
1✔
264
      this.listener = checkNotNull(listener, "listener");
1✔
265
    }
1✔
266

267
    @Override
268
    public void deframerClosed(boolean hasPartialMessage) {
269
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
270
      deframerClosed = true;
1✔
271
      if (statusReportedIsOk && hasPartialMessage) {
1✔
272
        transportReportStatus(
1✔
273
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
274
            true,
275
            new Metadata());
276
      }
277
      if (deframerClosedTask != null) {
1✔
278
        deframerClosedTask.run();
1✔
279
        deframerClosedTask = null;
1✔
280
      }
281
    }
1✔
282

283
    @Override
284
    protected final ClientStreamListener listener() {
285
      return listener;
1✔
286
    }
287

288
    private final void setOutboundClosed() {
289
      outboundClosed = true;
1✔
290
    }
1✔
291

292
    protected final boolean isOutboundClosed() {
293
      return outboundClosed;
1✔
294
    }
295

296
    /**
297
     * Called by transport implementations when they receive headers.
298
     *
299
     * @param headers the parsed headers
300
     */
301
    protected void inboundHeadersReceived(Metadata headers) {
302
      checkState(!statusReported, "Received headers on closed stream");
1✔
303
      statsTraceCtx.clientInboundHeaders();
1✔
304

305
      boolean compressedStream = false;
1✔
306
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
307
      if (fullStreamDecompression && streamEncoding != null) {
1✔
308
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
309
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
310
          compressedStream = true;
1✔
311
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
312
          deframeFailed(
1✔
313
              Status.INTERNAL
314
                  .withDescription(
1✔
315
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
316
                  .asRuntimeException());
1✔
317
          return;
1✔
318
        }
319
      }
320

321
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
322
      if (messageEncoding != null) {
1✔
323
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
324
        if (decompressor == null) {
1✔
325
          deframeFailed(
1✔
326
              Status.INTERNAL
327
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
328
                  .asRuntimeException());
1✔
329
          return;
1✔
330
        } else if (decompressor != Codec.Identity.NONE) {
1✔
331
          if (compressedStream) {
1✔
332
            deframeFailed(
1✔
333
                Status.INTERNAL
334
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
335
                    .asRuntimeException());
1✔
336
            return;
1✔
337
          }
338
          setDecompressor(decompressor);
1✔
339
        }
340
      }
341

342
      listener().headersRead(headers);
1✔
343
    }
1✔
344

345
    /**
346
     * Processes the contents of a received data frame from the server.
347
     *
348
     * @param frame the received data frame. Its ownership is transferred to this method.
349
     */
350
    protected void inboundDataReceived(ReadableBuffer frame) {
351
      checkNotNull(frame, "frame");
1✔
352
      boolean needToCloseFrame = true;
1✔
353
      try {
354
        if (statusReported) {
1✔
355
          log.log(Level.INFO, "Received data on closed stream");
×
356
          return;
×
357
        }
358

359
        needToCloseFrame = false;
1✔
360
        deframe(frame);
1✔
361
      } finally {
362
        if (needToCloseFrame) {
1✔
363
          frame.close();
×
364
        }
365
      }
366
    }
1✔
367

368
    /**
369
     * Processes the trailers and status from the server.
370
     *
371
     * @param trailers the received trailers
372
     * @param status the status extracted from the trailers
373
     */
374
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
375
      checkNotNull(status, "status");
1✔
376
      checkNotNull(trailers, "trailers");
1✔
377
      if (statusReported) {
1✔
378
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
379
            new Object[]{status, trailers});
380
        return;
1✔
381
      }
382
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
383
      transportReportStatus(status, false, trailers);
1✔
384
    }
1✔
385

386
    /**
387
     * Report stream closure with status to the application layer if not already reported. This
388
     * method must be called from the transport thread.
389
     *
390
     * @param status the new status to set
391
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
392
     *        may already be queued up in the deframer. If {@code false}, the listener will be
393
     *        notified immediately after all currently completed messages in the deframer have been
394
     *        delivered to the application.
395
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
396
     *        server
397
     */
398
    public final void transportReportStatus(final Status status, boolean stopDelivery,
399
        final Metadata trailers) {
400
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
401
    }
1✔
402

403
    /**
404
     * Report stream closure with status to the application layer if not already reported. This
405
     * method must be called from the transport thread.
406
     *
407
     * @param status the new status to set
408
     * @param rpcProgress RPC progress that the
409
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
410
     *        will receive
411
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
412
     *        may already be queued up in the deframer and overrides any previously queued status.
413
     *        If {@code false}, the listener will be notified immediately after all currently
414
     *        completed messages in the deframer have been delivered to the application.
415
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
416
     *        server
417
     */
418
    public final void transportReportStatus(
419
        final Status status,
420
        final RpcProgress rpcProgress,
421
        boolean stopDelivery,
422
        final Metadata trailers) {
423
      checkNotNull(status, "status");
1✔
424
      checkNotNull(trailers, "trailers");
1✔
425
      // If stopDelivery, we continue in case previous invocation is waiting for stall
426
      if (statusReported && !stopDelivery) {
1✔
427
        return;
1✔
428
      }
429
      statusReported = true;
1✔
430
      statusReportedIsOk = status.isOk();
1✔
431
      onStreamDeallocated();
1✔
432

433
      if (deframerClosed) {
1✔
434
        deframerClosedTask = null;
1✔
435
        closeListener(status, rpcProgress, trailers);
1✔
436
      } else {
437
        deframerClosedTask =
1✔
438
            new Runnable() {
1✔
439
              @Override
440
              public void run() {
441
                closeListener(status, rpcProgress, trailers);
1✔
442
              }
1✔
443
            };
444
        closeDeframer(stopDelivery);
1✔
445
      }
446
    }
1✔
447

448
    /**
449
     * Closes the listener if not previously closed.
450
     *
451
     * @throws IllegalStateException if the call has not yet been started.
452
     */
453
    private void closeListener(
454
        Status status, RpcProgress rpcProgress, Metadata trailers) {
455
      if (!listenerClosed) {
1✔
456
        listenerClosed = true;
1✔
457
        statsTraceCtx.streamClosed(status);
1✔
458
        listener().closed(status, rpcProgress, trailers);
1✔
459
        if (getTransportTracer() != null) {
1✔
460
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
461
        }
462
      }
463
    }
1✔
464
  }
465

466
  private class GetFramer implements Framer {
467
    private Metadata headers;
468
    private boolean closed;
469
    private final StatsTraceContext statsTraceCtx;
470
    private byte[] payload;
471

472
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
473
      this.headers = checkNotNull(headers, "headers");
1✔
474
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
475
    }
1✔
476

477
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
478
    @Override
479
    public void writePayload(InputStream message) {
480
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
481
      try {
482
        payload = ByteStreams.toByteArray(message);
1✔
483
      } catch (java.io.IOException ex) {
×
484
        throw new RuntimeException(ex);
×
485
      }
1✔
486
      statsTraceCtx.outboundMessage(0);
1✔
487
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
488
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
489
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
490
      // it using e.g., base64.  However, we are not supposed to know such detail here.
491
      //
492
      // We don't want to move this line to where the encoding happens either, because we'd better
493
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
494
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
495
      //
496
      // Because the payload is usually very small, people shouldn't care about the size difference
497
      // caused by encoding.
498
      statsTraceCtx.outboundWireSize(payload.length);
1✔
499
    }
1✔
500

501
    @Override
502
    public void flush() {}
1✔
503

504
    @Override
505
    public boolean isClosed() {
506
      return closed;
1✔
507
    }
508

509
    /** Closes, with flush. */
510
    @Override
511
    public void close() {
512
      closed = true;
1✔
513
      checkState(payload != null,
1✔
514
          "Lack of request message. GET request is only supported for unary requests");
515
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
516
      payload = null;
1✔
517
      headers = null;
1✔
518
    }
1✔
519

520
    /** Closes, without flush. */
521
    @Override
522
    public void dispose() {
523
      closed = true;
×
524
      payload = null;
×
525
      headers = null;
×
526
    }
×
527

528
    // Compression is not supported for GET encoding.
529
    @Override
530
    public Framer setMessageCompression(boolean enable) {
531
      return this;
×
532
    }
533

534
    @Override
535
    public Framer setCompressor(Compressor compressor) {
536
      return this;
×
537
    }
538

539
    // TODO(zsurocking): support this
540
    @Override
541
    public void setMaxOutboundMessageSize(int maxSize) {}
×
542
  }
543
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc