• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19658

23 Jan 2025 04:10PM CUT coverage: 88.574% (-0.007%) from 88.581%
#19658

push

github

web-flow
xds: Include max concurrent request limit in the error status for concurre… (#11845)

Include max concurrent request limit in the error status for concurrent connections limit exceeded

33720 of 38070 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/../core/src/main/java/io/grpc/internal/AbstractClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.GrpcUtil.CONTENT_ENCODING_KEY;
22
import static io.grpc.internal.GrpcUtil.MESSAGE_ENCODING_KEY;
23
import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
24
import static java.lang.Math.max;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.Preconditions;
28
import com.google.common.io.ByteStreams;
29
import io.grpc.Attributes;
30
import io.grpc.CallOptions;
31
import io.grpc.Codec;
32
import io.grpc.Compressor;
33
import io.grpc.Deadline;
34
import io.grpc.Decompressor;
35
import io.grpc.DecompressorRegistry;
36
import io.grpc.Grpc;
37
import io.grpc.Metadata;
38
import io.grpc.Status;
39
import io.grpc.internal.ClientStreamListener.RpcProgress;
40
import java.io.InputStream;
41
import java.util.concurrent.TimeUnit;
42
import java.util.logging.Level;
43
import java.util.logging.Logger;
44
import javax.annotation.Nullable;
45

46
/**
47
 * The abstract base class for {@link ClientStream} implementations. Extending classes only need to
48
 * implement {@link #transportState()} and {@link #abstractClientStreamSink()}. Must only be called
49
 * from the sending application thread.
50
 */
51
public abstract class AbstractClientStream extends AbstractStream
52
    implements ClientStream, MessageFramer.Sink {
53

54
  private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
1✔
55

56
  /**
57
   * A sink for outbound operations, separated from the stream simply to avoid name
58
   * collisions/confusion. Only called from application thread.
59
   */
60
  protected interface Sink {
61
    /** 
62
     * Sends the request headers to the remote end point.
63
     *
64
     * @param metadata the metadata to be sent
65
     * @param payload the payload needs to be sent in the headers if not null. Should only be used
66
     *     when sending an unary GET request
67
     */
68
    void writeHeaders(Metadata metadata, @Nullable byte[] payload);
69

70
    /**
71
     * Sends an outbound frame to the remote end point.
72
     *
73
     * @param frame a buffer containing the chunk of data to be sent, or {@code null} if {@code
74
     *     endOfStream} with no data to send
75
     * @param endOfStream {@code true} if this is the last frame; {@code flush} is guaranteed to be
76
     *     {@code true} if this is {@code true}
77
     * @param flush {@code true} if more data may not be arriving soon
78
     * @param numMessages the number of messages this series of frames represents, must be >= 0.
79
     */
80
    void writeFrame(
81
        @Nullable WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages);
82

83
    /**
84
     * Tears down the stream, typically in the event of a timeout. This method may be called
85
     * multiple times and from any thread.
86
     *
87
     * <p>This is a clone of {@link ClientStream#cancel(Status)};
88
     * {@link AbstractClientStream#cancel} delegates to this method.
89
     */
90
    void cancel(Status status);
91
  }
92

93
  private final TransportTracer transportTracer;
94
  private final Framer framer;
95
  private final boolean shouldBeCountedForInUse;
96
  private final boolean useGet;
97
  private Metadata headers;
98
  /**
99
   * Whether cancel() has been called. This is not strictly necessary, but removes the delay between
100
   * cancel() being called and isReady() beginning to return false, since cancel is commonly
101
   * processed asynchronously.
102
   */
103
  private volatile boolean cancelled;
104

105
  protected AbstractClientStream(
106
      WritableBufferAllocator bufferAllocator,
107
      StatsTraceContext statsTraceCtx,
108
      TransportTracer transportTracer,
109
      Metadata headers,
110
      CallOptions callOptions,
111
      boolean useGet) {
1✔
112
    checkNotNull(headers, "headers");
1✔
113
    this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
114
    this.shouldBeCountedForInUse = GrpcUtil.shouldBeCountedForInUse(callOptions);
1✔
115
    this.useGet = useGet;
1✔
116
    if (!useGet) {
1✔
117
      framer = new MessageFramer(this, bufferAllocator, statsTraceCtx);
1✔
118
      this.headers = headers;
1✔
119
    } else {
120
      framer = new GetFramer(headers, statsTraceCtx);
1✔
121
    }
122
  }
1✔
123

124
  @Override
125
  public void setDeadline(Deadline deadline) {
126
    headers.discardAll(TIMEOUT_KEY);
1✔
127
    long effectiveTimeout = max(0, deadline.timeRemaining(TimeUnit.NANOSECONDS));
1✔
128
    headers.put(TIMEOUT_KEY, effectiveTimeout);
1✔
129
  }
1✔
130

131
  @Override
132
  public void setMaxOutboundMessageSize(int maxSize) {
133
    framer.setMaxOutboundMessageSize(maxSize);
1✔
134
  }
1✔
135

136
  @Override
137
  public void setMaxInboundMessageSize(int maxSize) {
138
    transportState().setMaxInboundMessageSize(maxSize);
1✔
139
  }
1✔
140

141
  @Override
142
  public final void setFullStreamDecompression(boolean fullStreamDecompression) {
143
    transportState().setFullStreamDecompression(fullStreamDecompression);
1✔
144
  }
1✔
145

146
  @Override
147
  public final void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
148
    transportState().setDecompressorRegistry(decompressorRegistry);
1✔
149
  }
1✔
150

151
  /** {@inheritDoc} */
152
  @Override
153
  protected abstract TransportState transportState();
154

155
  @Override
156
  public final void start(ClientStreamListener listener) {
157
    transportState().setListener(listener);
1✔
158
    if (!useGet) {
1✔
159
      abstractClientStreamSink().writeHeaders(headers, null);
1✔
160
      headers = null;
1✔
161
    }
162
  }
1✔
163

164
  /**
165
   * Sink for transport to be called to perform outbound operations. Each stream must have its own
166
   * unique sink.
167
   */
168
  protected abstract Sink abstractClientStreamSink();
169

170
  @Override
171
  protected final Framer framer() {
172
    return framer;
1✔
173
  }
174

175
  /**
176
   * Returns true if this stream should be counted when determining the in-use state of the
177
   * transport.
178
   */
179
  public final boolean shouldBeCountedForInUse() {
180
    return shouldBeCountedForInUse;
1✔
181
  }
182

183
  @Override
184
  public final void deliverFrame(
185
      WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
186
    Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
1✔
187
    abstractClientStreamSink().writeFrame(frame, endOfStream, flush, numMessages);
1✔
188
  }
1✔
189

190
  @Override
191
  public final void halfClose() {
192
    if (!transportState().isOutboundClosed()) {
1✔
193
      transportState().setOutboundClosed();
1✔
194
      endOfMessages();
1✔
195
    }
196
  }
1✔
197

198
  @Override
199
  public final void cancel(Status reason) {
200
    Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
1✔
201
    cancelled = true;
1✔
202
    abstractClientStreamSink().cancel(reason);
1✔
203
  }
1✔
204

205
  @Override
206
  public final boolean isReady() {
207
    return super.isReady() && !cancelled;
1✔
208
  }
209

210
  @Override
211
  public final void appendTimeoutInsight(InsightBuilder insight) {
212
    Attributes attrs = getAttributes();
1✔
213
    insight.appendKeyValue("remote_addr", attrs.get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR));
1✔
214
  }
1✔
215

216
  protected TransportTracer getTransportTracer() {
217
    return transportTracer;
1✔
218
  }
219

220
  /** This should only be called from the transport thread. */
221
  protected abstract static class TransportState extends AbstractStream.TransportState {
222
    /** Whether listener.closed() has been called. */
223
    private final StatsTraceContext statsTraceCtx;
224
    private boolean listenerClosed;
225
    private ClientStreamListener listener;
226
    private boolean fullStreamDecompression;
227
    private DecompressorRegistry decompressorRegistry = DecompressorRegistry.getDefaultInstance();
1✔
228

229
    private boolean deframerClosed = false;
1✔
230
    private Runnable deframerClosedTask;
231

232
    /** Whether the client has half-closed the stream. */
233
    private volatile boolean outboundClosed;
234

235
    /**
236
     * Whether the stream is closed from the transport's perspective. This can differ from {@link
237
     * #listenerClosed} because there may still be messages buffered to deliver to the application.
238
     */
239
    private boolean statusReported;
240
    /** True if the status reported (set via {@link #transportReportStatus}) is OK. */
241
    private boolean statusReportedIsOk;
242

243
    protected TransportState(
244
        int maxMessageSize,
245
        StatsTraceContext statsTraceCtx,
246
        TransportTracer transportTracer,
247
        CallOptions options) {
248
      super(maxMessageSize, statsTraceCtx, transportTracer);
1✔
249
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
250
      if (options.getOnReadyThreshold() != null) {
1✔
251
        this.setOnReadyThreshold(options.getOnReadyThreshold());
1✔
252
      }
253
    }
1✔
254

255
    private void setFullStreamDecompression(boolean fullStreamDecompression) {
256
      this.fullStreamDecompression = fullStreamDecompression;
1✔
257
    }
1✔
258

259
    private void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
260
      checkState(this.listener == null, "Already called start");
1✔
261
      this.decompressorRegistry =
1✔
262
          checkNotNull(decompressorRegistry, "decompressorRegistry");
1✔
263
    }
1✔
264

265
    @VisibleForTesting
266
    public final void setListener(ClientStreamListener listener) {
267
      checkState(this.listener == null, "Already called setListener");
1✔
268
      this.listener = checkNotNull(listener, "listener");
1✔
269
    }
1✔
270

271
    @Override
272
    public void deframerClosed(boolean hasPartialMessage) {
273
      checkState(statusReported, "status should have been reported on deframer closed");
1✔
274
      deframerClosed = true;
1✔
275
      if (statusReportedIsOk && hasPartialMessage) {
1✔
276
        transportReportStatus(
1✔
277
            Status.INTERNAL.withDescription("Encountered end-of-stream mid-frame"),
1✔
278
            true,
279
            new Metadata());
280
      }
281
      if (deframerClosedTask != null) {
1✔
282
        deframerClosedTask.run();
1✔
283
        deframerClosedTask = null;
1✔
284
      }
285
    }
1✔
286

287
    @Override
288
    protected final ClientStreamListener listener() {
289
      return listener;
1✔
290
    }
291

292
    private final void setOutboundClosed() {
293
      outboundClosed = true;
1✔
294
    }
1✔
295

296
    protected final boolean isOutboundClosed() {
297
      return outboundClosed;
1✔
298
    }
299

300
    /**
301
     * Called by transport implementations when they receive headers.
302
     *
303
     * @param headers the parsed headers
304
     */
305
    protected void inboundHeadersReceived(Metadata headers) {
306
      checkState(!statusReported, "Received headers on closed stream");
1✔
307
      statsTraceCtx.clientInboundHeaders(headers);
1✔
308

309
      boolean compressedStream = false;
1✔
310
      String streamEncoding = headers.get(CONTENT_ENCODING_KEY);
1✔
311
      if (fullStreamDecompression && streamEncoding != null) {
1✔
312
        if (streamEncoding.equalsIgnoreCase("gzip")) {
1✔
313
          setFullStreamDecompressor(new GzipInflatingBuffer());
1✔
314
          compressedStream = true;
1✔
315
        } else if (!streamEncoding.equalsIgnoreCase("identity")) {
1✔
316
          deframeFailed(
1✔
317
              Status.INTERNAL
318
                  .withDescription(
1✔
319
                      String.format("Can't find full stream decompressor for %s", streamEncoding))
1✔
320
                  .asRuntimeException());
1✔
321
          return;
1✔
322
        }
323
      }
324

325
      String messageEncoding = headers.get(MESSAGE_ENCODING_KEY);
1✔
326
      if (messageEncoding != null) {
1✔
327
        Decompressor decompressor = decompressorRegistry.lookupDecompressor(messageEncoding);
1✔
328
        if (decompressor == null) {
1✔
329
          deframeFailed(
1✔
330
              Status.INTERNAL
331
                  .withDescription(String.format("Can't find decompressor for %s", messageEncoding))
1✔
332
                  .asRuntimeException());
1✔
333
          return;
1✔
334
        } else if (decompressor != Codec.Identity.NONE) {
1✔
335
          if (compressedStream) {
1✔
336
            deframeFailed(
1✔
337
                Status.INTERNAL
338
                    .withDescription("Full stream and gRPC message encoding cannot both be set")
1✔
339
                    .asRuntimeException());
1✔
340
            return;
1✔
341
          }
342
          setDecompressor(decompressor);
1✔
343
        }
344
      }
345

346
      listener().headersRead(headers);
1✔
347
    }
1✔
348

349
    /**
350
     * Processes the contents of a received data frame from the server.
351
     *
352
     * @param frame the received data frame. Its ownership is transferred to this method.
353
     */
354
    protected void inboundDataReceived(ReadableBuffer frame) {
355
      checkNotNull(frame, "frame");
1✔
356
      boolean needToCloseFrame = true;
1✔
357
      try {
358
        if (statusReported) {
1✔
359
          log.log(Level.INFO, "Received data on closed stream");
×
360
          return;
×
361
        }
362

363
        needToCloseFrame = false;
1✔
364
        deframe(frame);
1✔
365
      } finally {
366
        if (needToCloseFrame) {
1✔
367
          frame.close();
×
368
        }
369
      }
370
    }
1✔
371

372
    /**
373
     * Processes the trailers and status from the server.
374
     *
375
     * @param trailers the received trailers
376
     * @param status the status extracted from the trailers
377
     */
378
    protected void inboundTrailersReceived(Metadata trailers, Status status) {
379
      checkNotNull(status, "status");
1✔
380
      checkNotNull(trailers, "trailers");
1✔
381
      if (statusReported) {
1✔
382
        log.log(Level.INFO, "Received trailers on closed stream:\n {1}\n {2}",
1✔
383
            new Object[]{status, trailers});
384
        return;
1✔
385
      }
386
      statsTraceCtx.clientInboundTrailers(trailers);
1✔
387
      transportReportStatus(status, false, trailers);
1✔
388
    }
1✔
389

390
    /**
391
     * Report stream closure with status to the application layer if not already reported. This
392
     * method must be called from the transport thread.
393
     *
394
     * @param status the new status to set
395
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
396
     *        may already be queued up in the deframer. If {@code false}, the listener will be
397
     *        notified immediately after all currently completed messages in the deframer have been
398
     *        delivered to the application.
399
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
400
     *        server
401
     */
402
    public final void transportReportStatus(final Status status, boolean stopDelivery,
403
        final Metadata trailers) {
404
      transportReportStatus(status, RpcProgress.PROCESSED, stopDelivery, trailers);
1✔
405
    }
1✔
406

407
    /**
408
     * Report stream closure with status to the application layer if not already reported. This
409
     * method must be called from the transport thread.
410
     *
411
     * @param status the new status to set
412
     * @param rpcProgress RPC progress that the
413
     *        {@link ClientStreamListener#closed(Status, RpcProgress, Metadata)}
414
     *        will receive
415
     * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
416
     *        may already be queued up in the deframer and overrides any previously queued status.
417
     *        If {@code false}, the listener will be notified immediately after all currently
418
     *        completed messages in the deframer have been delivered to the application.
419
     * @param trailers new instance of {@code Trailers}, either empty or those returned by the
420
     *        server
421
     */
422
    public final void transportReportStatus(
423
        final Status status,
424
        final RpcProgress rpcProgress,
425
        boolean stopDelivery,
426
        final Metadata trailers) {
427
      checkNotNull(status, "status");
1✔
428
      checkNotNull(trailers, "trailers");
1✔
429
      // If stopDelivery, we continue in case previous invocation is waiting for stall
430
      if (statusReported && !stopDelivery) {
1✔
431
        return;
1✔
432
      }
433
      statusReported = true;
1✔
434
      statusReportedIsOk = status.isOk();
1✔
435
      onStreamDeallocated();
1✔
436

437
      if (deframerClosed) {
1✔
438
        deframerClosedTask = null;
1✔
439
        closeListener(status, rpcProgress, trailers);
1✔
440
      } else {
441
        deframerClosedTask =
1✔
442
            new Runnable() {
1✔
443
              @Override
444
              public void run() {
445
                closeListener(status, rpcProgress, trailers);
1✔
446
              }
1✔
447
            };
448
        closeDeframer(stopDelivery);
1✔
449
      }
450
    }
1✔
451

452
    /**
453
     * Closes the listener if not previously closed.
454
     *
455
     * @throws IllegalStateException if the call has not yet been started.
456
     */
457
    private void closeListener(
458
        Status status, RpcProgress rpcProgress, Metadata trailers) {
459
      if (!listenerClosed) {
1✔
460
        listenerClosed = true;
1✔
461
        statsTraceCtx.streamClosed(status);
1✔
462
        if (getTransportTracer() != null) {
1✔
463
          getTransportTracer().reportStreamClosed(status.isOk());
1✔
464
        }
465
        listener().closed(status, rpcProgress, trailers);
1✔
466
      }
467
    }
1✔
468
  }
469

470
  private class GetFramer implements Framer {
471
    private Metadata headers;
472
    private boolean closed;
473
    private final StatsTraceContext statsTraceCtx;
474
    private byte[] payload;
475

476
    public GetFramer(Metadata headers, StatsTraceContext statsTraceCtx) {
1✔
477
      this.headers = checkNotNull(headers, "headers");
1✔
478
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
479
    }
1✔
480

481
    @SuppressWarnings("BetaApi") // ByteStreams is not Beta in v27
482
    @Override
483
    public void writePayload(InputStream message) {
484
      checkState(payload == null, "writePayload should not be called multiple times");
1✔
485
      try {
486
        payload = ByteStreams.toByteArray(message);
1✔
487
      } catch (java.io.IOException ex) {
×
488
        throw new RuntimeException(ex);
×
489
      }
1✔
490
      statsTraceCtx.outboundMessage(0);
1✔
491
      statsTraceCtx.outboundMessageSent(0, payload.length, payload.length);
1✔
492
      statsTraceCtx.outboundUncompressedSize(payload.length);
1✔
493
      // NB(zhangkun83): this is not accurate, because the underlying transport will probably encode
494
      // it using e.g., base64.  However, we are not supposed to know such detail here.
495
      //
496
      // We don't want to move this line to where the encoding happens either, because we'd better
497
      // contain the message stats reporting in Framer as suggested in StatsTraceContext.
498
      // Scattering the reporting sites increases the risk of mis-counting or double-counting.
499
      //
500
      // Because the payload is usually very small, people shouldn't care about the size difference
501
      // caused by encoding.
502
      statsTraceCtx.outboundWireSize(payload.length);
1✔
503
    }
1✔
504

505
    @Override
506
    public void flush() {}
1✔
507

508
    @Override
509
    public boolean isClosed() {
510
      return closed;
1✔
511
    }
512

513
    /** Closes, with flush. */
514
    @Override
515
    public void close() {
516
      closed = true;
1✔
517
      checkState(payload != null,
1✔
518
          "Lack of request message. GET request is only supported for unary requests");
519
      abstractClientStreamSink().writeHeaders(headers, payload);
1✔
520
      payload = null;
1✔
521
      headers = null;
1✔
522
    }
1✔
523

524
    /** Closes, without flush. */
525
    @Override
526
    public void dispose() {
527
      closed = true;
×
528
      payload = null;
×
529
      headers = null;
×
530
    }
×
531

532
    // Compression is not supported for GET encoding.
533
    @Override
534
    public Framer setMessageCompression(boolean enable) {
535
      return this;
×
536
    }
537

538
    @Override
539
    public Framer setCompressor(Compressor compressor) {
540
      return this;
×
541
    }
542

543
    // TODO(zsurocking): support this
544
    @Override
545
    public void setMaxOutboundMessageSize(int maxSize) {}
×
546
  }
547
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc