• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19108

21 Mar 2024 10:37PM UTC coverage: 88.277% (-0.002%) from 88.279%
#19108

push

github

web-flow
Allow configuration of the queued byte threshold at which a Stream is considered not ready (#10977)

* Allow the queued byte threshold for a Stream to be ready to be configurable

- on clients this is exposed by setting a CallOption
- on servers this is configured by calling a method on ServerCall or ServerStreamListener

31190 of 35332 relevant lines covered (88.28%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.68
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
22

23
import com.google.common.io.BaseEncoding;
24
import io.grpc.Attributes;
25
import io.grpc.CallOptions;
26
import io.grpc.Metadata;
27
import io.grpc.MethodDescriptor;
28
import io.grpc.Status;
29
import io.grpc.internal.AbstractClientStream;
30
import io.grpc.internal.Http2ClientStreamTransportState;
31
import io.grpc.internal.StatsTraceContext;
32
import io.grpc.internal.TransportTracer;
33
import io.grpc.internal.WritableBuffer;
34
import io.grpc.okhttp.internal.framed.ErrorCode;
35
import io.grpc.okhttp.internal.framed.Header;
36
import io.perfmark.PerfMark;
37
import io.perfmark.Tag;
38
import io.perfmark.TaskCloseable;
39
import java.util.List;
40
import javax.annotation.concurrent.GuardedBy;
41
import okio.Buffer;
42

43
/**
44
 * Client stream for the okhttp transport.
45
 */
46
class OkHttpClientStream extends AbstractClientStream {
47

48
  private static final Buffer EMPTY_BUFFER = new Buffer();
1✔
49

50
  public static final int ABSENT_ID = -1;
51

52
  private final MethodDescriptor<?, ?> method;
53

54
  private final String userAgent;
55
  private final StatsTraceContext statsTraceCtx;
56
  private String authority;
57
  private final TransportState state;
58
  private final Sink sink = new Sink();
1✔
59
  private final Attributes attributes;
60

61
  private boolean useGet = false;
1✔
62

63
  OkHttpClientStream(
64
      MethodDescriptor<?, ?> method,
65
      Metadata headers,
66
      ExceptionHandlingFrameWriter frameWriter,
67
      OkHttpClientTransport transport,
68
      OutboundFlowController outboundFlow,
69
      Object lock,
70
      int maxMessageSize,
71
      int initialWindowSize,
72
      String authority,
73
      String userAgent,
74
      StatsTraceContext statsTraceCtx,
75
      TransportTracer transportTracer,
76
      CallOptions callOptions,
77
      boolean useGetForSafeMethods) {
78
    super(
1✔
79
        new OkHttpWritableBufferAllocator(),
80
        statsTraceCtx,
81
        transportTracer,
82
        headers,
83
        callOptions,
84
        useGetForSafeMethods && method.isSafe());
1✔
85
    this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
86
    this.method = method;
1✔
87
    this.authority = authority;
1✔
88
    this.userAgent = userAgent;
1✔
89
    // OkHttpClientStream is only created after the transport has finished connecting,
90
    // so it is safe to read the transport attributes.
91
    // We make a copy here for convenience, even though we can ask the transport.
92
    this.attributes = transport.getAttributes();
1✔
93
    this.state =
1✔
94
        new TransportState(
95
            maxMessageSize,
96
            statsTraceCtx,
97
            lock,
98
            frameWriter,
99
            outboundFlow,
100
            transport,
101
            initialWindowSize,
102
            method.getFullMethodName(),
1✔
103
            callOptions);
104
  }
1✔
105

106
  @Override
107
  protected TransportState transportState() {
108
    return state;
1✔
109
  }
110

111
  @Override
112
  protected Sink abstractClientStreamSink() {
113
    return sink;
1✔
114
  }
115

116
  /**
117
   * Returns the type of this stream.
118
   */
119
  public MethodDescriptor.MethodType getType() {
120
    return method.getType();
1✔
121
  }
122

123
  /**
124
   * Returns whether the stream uses GET. This is not known until after {@link Sink#writeHeaders} is
125
   * invoked.
126
   */
127
  boolean useGet() {
128
    return useGet;
1✔
129
  }
130

131
  @Override
132
  public void setAuthority(String authority) {
133
    this.authority = checkNotNull(authority, "authority");
×
134
  }
×
135

136
  @Override
137
  public Attributes getAttributes() {
138
    return attributes;
1✔
139
  }
140

141
  class Sink implements AbstractClientStream.Sink {
1✔
142
    @Override
143
    public void writeHeaders(Metadata metadata, byte[] payload) {
144
      try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeHeaders")) {
1✔
145
        String defaultPath = "/" + method.getFullMethodName();
1✔
146
        if (payload != null) {
1✔
147
          useGet = true;
1✔
148
          defaultPath += "?" + BaseEncoding.base64().encode(payload);
1✔
149
        }
150
        synchronized (state.lock) {
1✔
151
          state.streamReady(metadata, defaultPath);
1✔
152
        }
1✔
153
      }
154
    }
1✔
155

156
    @Override
157
    public void writeFrame(
158
        WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
159
      try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.writeFrame")) {
1✔
160
        Buffer buffer;
161
        if (frame == null) {
1✔
162
          buffer = EMPTY_BUFFER;
1✔
163
        } else {
164
          buffer = ((OkHttpWritableBuffer) frame).buffer();
1✔
165
          int size = (int) buffer.size();
1✔
166
          if (size > 0) {
1✔
167
            onSendingBytes(size);
1✔
168
          }
169
        }
170

171
        synchronized (state.lock) {
1✔
172
          state.sendBuffer(buffer, endOfStream, flush);
1✔
173
          getTransportTracer().reportMessageSent(numMessages);
1✔
174
        }
1✔
175
      }
176
    }
1✔
177

178
    @Override
179
    public void cancel(Status reason) {
180
      try (TaskCloseable ignore = PerfMark.traceTask("OkHttpClientStream$Sink.cancel")) {
1✔
181
        synchronized (state.lock) {
1✔
182
          state.cancel(reason, true, null);
1✔
183
        }
1✔
184
      }
185
    }
1✔
186
  }
187

188
  class TransportState extends Http2ClientStreamTransportState
189
      implements OutboundFlowController.Stream {
190
    private final int initialWindowSize;
191
    private final Object lock;
192
    @GuardedBy("lock")
193
    private List<Header> requestHeaders;
194
    @GuardedBy("lock")
1✔
195
    private Buffer pendingData = new Buffer();
196
    private boolean pendingDataHasEndOfStream = false;
1✔
197
    private boolean flushPendingData = false;
1✔
198
    @GuardedBy("lock")
1✔
199
    private boolean cancelSent = false;
200
    @GuardedBy("lock")
201
    private int window;
202
    @GuardedBy("lock")
203
    private int processedWindow;
204
    @GuardedBy("lock")
205
    private final ExceptionHandlingFrameWriter frameWriter;
206
    @GuardedBy("lock")
207
    private final OutboundFlowController outboundFlow;
208
    @GuardedBy("lock")
209
    private final OkHttpClientTransport transport;
210
    /** True iff neither {@link #cancel} nor {@link #start(int)} have been called. */
211
    @GuardedBy("lock")
1✔
212
    private boolean canStart = true;
213
    private final Tag tag;
214
    @GuardedBy("lock")
215
    private OutboundFlowController.StreamState outboundFlowState;
216
    private int id = ABSENT_ID;
1✔
217

218
    public TransportState(
219
        int maxMessageSize,
220
        StatsTraceContext statsTraceCtx,
221
        Object lock,
222
        ExceptionHandlingFrameWriter frameWriter,
223
        OutboundFlowController outboundFlow,
224
        OkHttpClientTransport transport,
225
        int initialWindowSize,
226
        String methodName,
227
        CallOptions options) {
1✔
228
      super(maxMessageSize, statsTraceCtx, OkHttpClientStream.this.getTransportTracer(), options);
1✔
229
      this.lock = checkNotNull(lock, "lock");
1✔
230
      this.frameWriter = frameWriter;
1✔
231
      this.outboundFlow = outboundFlow;
1✔
232
      this.transport = transport;
1✔
233
      this.window = initialWindowSize;
1✔
234
      this.processedWindow = initialWindowSize;
1✔
235
      this.initialWindowSize = initialWindowSize;
1✔
236
      tag = PerfMark.createTag(methodName);
1✔
237
    }
1✔
238

239
    @SuppressWarnings("GuardedBy")
240
    @GuardedBy("lock")
241
    public void start(int streamId) {
242
      checkState(id == ABSENT_ID, "the stream has been started with id %s", streamId);
1✔
243
      id = streamId;
1✔
244
      outboundFlowState = outboundFlow.createState(this, streamId);
1✔
245
      // TODO(b/145386688): This access should be guarded by 'OkHttpClientStream.this.state.lock';
246
      // instead found: 'this.lock'
247
      state.onStreamAllocated();
1✔
248

249
      if (canStart) {
1✔
250
        // Only happens when the stream has neither been started nor cancelled.
251
        frameWriter.synStream(useGet, false, id, 0, requestHeaders);
1✔
252
        statsTraceCtx.clientOutboundHeaders();
1✔
253
        requestHeaders = null;
1✔
254

255
        if (pendingData.size() > 0) {
1✔
256
          outboundFlow.data(
1✔
257
              pendingDataHasEndOfStream, outboundFlowState, pendingData, flushPendingData);
258

259
        }
260
        canStart = false;
1✔
261
      }
262
    }
1✔
263

264
    @GuardedBy("lock")
265
    @Override
266
    protected void onStreamAllocated() {
267
      super.onStreamAllocated();
1✔
268
      getTransportTracer().reportLocalStreamStarted();
1✔
269
    }
1✔
270

271
    @GuardedBy("lock")
272
    @Override
273
    protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
274
      cancel(status, stopDelivery, trailers);
1✔
275
    }
1✔
276

277
    @Override
278
    @GuardedBy("lock")
279
    public void deframeFailed(Throwable cause) {
280
      http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
1✔
281
    }
1✔
282

283
    @Override
284
    @GuardedBy("lock")
285
    public void bytesRead(int processedBytes) {
286
      processedWindow -= processedBytes;
1✔
287
      if (processedWindow <= initialWindowSize * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
288
        int delta = initialWindowSize - processedWindow;
1✔
289
        window += delta;
1✔
290
        processedWindow += delta;
1✔
291
        frameWriter.windowUpdate(id(), delta);
1✔
292
      }
293
    }
1✔
294

295
    @Override
296
    @GuardedBy("lock")
297
    public void deframerClosed(boolean hasPartialMessage) {
298
      onEndOfStream();
1✔
299
      super.deframerClosed(hasPartialMessage);
1✔
300
    }
1✔
301

302
    @Override
303
    @GuardedBy("lock")
304
    public void runOnTransportThread(final Runnable r) {
305
      synchronized (lock) {
1✔
306
        r.run();
1✔
307
      }
1✔
308
    }
1✔
309

310
    /**
311
     * Must be called with holding the transport lock.
312
     */
313
    @GuardedBy("lock")
314
    public void transportHeadersReceived(List<Header> headers, boolean endOfStream) {
315
      if (endOfStream) {
1✔
316
        transportTrailersReceived(Utils.convertTrailers(headers));
1✔
317
      } else {
318
        transportHeadersReceived(Utils.convertHeaders(headers));
1✔
319
      }
320
    }
1✔
321

322
    /**
323
     * Must be called with holding the transport lock.
324
     */
325
    @GuardedBy("lock")
326
    public void transportDataReceived(okio.Buffer frame, boolean endOfStream, int paddingLen) {
327
      // We only support 16 KiB frames, and the max permitted in HTTP/2 is 16 MiB. This is verified
328
      // in OkHttp's Http2 deframer. In addition, this code is after the data has been read.
329
      int length = (int) frame.size();
1✔
330
      window -= length + paddingLen;
1✔
331
      processedWindow -= paddingLen;
1✔
332
      if (window < 0) {
1✔
333
        frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
1✔
334
        transport.finishStream(
1✔
335
            id(),
1✔
336
            Status.INTERNAL.withDescription(
1✔
337
                "Received data size exceeded our receiving window size"),
338
            PROCESSED, false, null, null);
339
        return;
1✔
340
      }
341
      super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
1✔
342
    }
1✔
343

344
    @GuardedBy("lock")
345
    private void onEndOfStream() {
346
      if (!isOutboundClosed()) {
1✔
347
        // If server's end-of-stream is received before client sends end-of-stream, we just send a
348
        // reset to server to fully close the server side stream.
349
        transport.finishStream(id(),null, PROCESSED, false, ErrorCode.CANCEL, null);
1✔
350
      } else {
351
        transport.finishStream(id(), null, PROCESSED, false, null, null);
1✔
352
      }
353
    }
1✔
354

355
    @SuppressWarnings("GuardedBy")
356
    @GuardedBy("lock")
357
    private void cancel(Status reason, boolean stopDelivery, Metadata trailers) {
358
      if (cancelSent) {
1✔
359
        return;
1✔
360
      }
361
      cancelSent = true;
1✔
362
      if (canStart) {
1✔
363
        // stream is pending.
364
        // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found:
365
        // 'this.lock'
366
        transport.removePendingStream(OkHttpClientStream.this);
1✔
367
        // release holding data, so they can be GCed or returned to pool earlier.
368
        requestHeaders = null;
1✔
369
        pendingData.clear();
1✔
370
        canStart = false;
1✔
371
        transportReportStatus(reason, true, trailers != null ? trailers : new Metadata());
1✔
372
      } else {
373
        // If pendingData is null, start must have already been called, which means synStream has
374
        // been called as well.
375
        transport.finishStream(
1✔
376
            id(), reason, PROCESSED, stopDelivery, ErrorCode.CANCEL, trailers);
1✔
377
      }
378
    }
1✔
379

380
    @GuardedBy("lock")
381
    private void sendBuffer(Buffer buffer, boolean endOfStream, boolean flush) {
382
      if (cancelSent) {
1✔
383
        return;
1✔
384
      }
385
      if (canStart) {
1✔
386
        // Stream is pending start, queue the data.
387
        int dataSize = (int) buffer.size();
1✔
388
        pendingData.write(buffer, dataSize);
1✔
389
        pendingDataHasEndOfStream |= endOfStream;
1✔
390
        flushPendingData |= flush;
1✔
391
      } else {
1✔
392
        checkState(id() != ABSENT_ID, "streamId should be set");
1✔
393
        // If buffer > frameWriter.maxDataLength() the flow-controller will ensure that it is
394
        // properly chunked.
395
        outboundFlow.data(endOfStream, outboundFlowState, buffer, flush);
1✔
396
      }
397
    }
1✔
398

399
    @SuppressWarnings("GuardedBy")
400
    @GuardedBy("lock")
401
    private void streamReady(Metadata metadata, String path) {
402
      requestHeaders =
1✔
403
          Headers.createRequestHeaders(
1✔
404
              metadata,
405
              path,
406
              authority,
1✔
407
              userAgent,
1✔
408
              useGet,
1✔
409
              transport.isUsingPlaintext());
1✔
410
      // TODO(b/145386688): This access should be guarded by 'this.transport.lock'; instead found:
411
      // 'this.lock'
412
      transport.streamReadyToStart(OkHttpClientStream.this);
1✔
413
    }
1✔
414

415
    Tag tag() {
416
      return tag;
1✔
417
    }
418

419
    int id() {
420
      return id;
1✔
421
    }
422

423
    OutboundFlowController.StreamState getOutboundFlowState() {
424
      synchronized (lock) {
1✔
425
        return outboundFlowState;
1✔
426
      }
427
    }
428
  }
429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc