• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19971

01 Sep 2025 07:40AM UTC coverage: 88.568% (+0.02%) from 88.547%
#19971

push

github

web-flow
allow java21 in jre matrix (#12281)

34692 of 39170 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.91
/../core/src/main/java/io/grpc/internal/AbstractStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.errorprone.annotations.concurrent.GuardedBy;
24
import io.grpc.Codec;
25
import io.grpc.Compressor;
26
import io.grpc.Decompressor;
27
import io.perfmark.Link;
28
import io.perfmark.PerfMark;
29
import io.perfmark.TaskCloseable;
30
import java.io.InputStream;
31
import java.util.logging.Level;
32
import java.util.logging.Logger;
33

34
/**
35
 * The stream and stream state as used by the application. Must only be called from the sending
36
 * application thread.
37
 */
38
public abstract class AbstractStream implements Stream {
1✔
39
  private static final Logger log = Logger.getLogger(AbstractStream.class.getName());
1✔
40

41
  /** The framer to use for sending messages. */
42
  protected abstract Framer framer();
43

44
  /**
45
   * Obtain the transport state corresponding to this stream. Each stream must have its own unique
46
   * transport state.
47
   */
48
  protected abstract TransportState transportState();
49

50
  @Override
51
  public void optimizeForDirectExecutor() {
52
    transportState().optimizeForDirectExecutor();
1✔
53
  }
1✔
54

55
  @Override
56
  public final void setMessageCompression(boolean enable) {
57
    framer().setMessageCompression(enable);
1✔
58
  }
1✔
59

60
  @Override
61
  public final void request(int numMessages) {
62
    transportState().requestMessagesFromDeframer(numMessages);
1✔
63
  }
1✔
64

65
  @Override
66
  public final void writeMessage(InputStream message) {
67
    checkNotNull(message, "message");
1✔
68
    try {
69
      if (!framer().isClosed()) {
1✔
70
        framer().writePayload(message);
1✔
71
      }
72
    } finally {
73
      GrpcUtil.closeQuietly(message);
1✔
74
    }
75
  }
1✔
76

77
  @Override
78
  public final void flush() {
79
    if (!framer().isClosed()) {
1✔
80
      framer().flush();
1✔
81
    }
82
  }
1✔
83

84
  /**
85
   * A hint to the stream that specifies how many bytes must be queued before
86
   * {@link #isReady()} will return false. A stream may ignore this property if
87
   * unsupported. This may only be set during stream initialization before
88
   * any messages are set.
89
   *
90
   * @param numBytes The number of bytes that must be queued. Must be a
91
   *                 positive integer.
92
   */
93
  protected void setOnReadyThreshold(int numBytes) {
94
    transportState().setOnReadyThreshold(numBytes);
1✔
95
  }
1✔
96

97
  /**
98
   * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
99
   * (half closure on client; closure on server).
100
   */
101
  protected final void endOfMessages() {
102
    framer().close();
1✔
103
  }
1✔
104

105
  @Override
106
  public final void setCompressor(Compressor compressor) {
107
    framer().setCompressor(checkNotNull(compressor, "compressor"));
1✔
108
  }
1✔
109

110
  @Override
111
  public boolean isReady() {
112
    return transportState().isReady();
1✔
113
  }
114

115
  /**
116
   * Event handler to be called by the subclass when a number of bytes are being queued for sending
117
   * to the remote endpoint.
118
   *
119
   * @param numBytes the number of bytes being sent.
120
   */
121
  protected final void onSendingBytes(int numBytes) {
122
    transportState().onSendingBytes(numBytes);
1✔
123
  }
1✔
124

125
  /**
126
   * Stream state as used by the transport. This should only be called from the transport thread
127
   * (except for private interactions with {@code AbstractStream}).
128
   */
129
  public abstract static class TransportState
130
      implements ApplicationThreadDeframer.TransportExecutor, MessageDeframer.Listener {
131
    /**
132
     * The default number of queued bytes for a given stream, below which
133
     * {@link StreamListener#onReady()} will be called.
134
     */
135
    @VisibleForTesting
136
    public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
137

138
    private Deframer deframer;
139
    private final Object onReadyLock = new Object();
1✔
140
    private final StatsTraceContext statsTraceCtx;
141
    private final TransportTracer transportTracer;
142
    private final MessageDeframer rawDeframer;
143

144
    /**
145
     * The number of bytes currently queued, waiting to be sent. When this falls below
146
     * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called.
147
     */
148
    @GuardedBy("onReadyLock")
149
    private int numSentBytesQueued;
150
    /**
151
     * Indicates the stream has been created on the connection. This implies that the stream is no
152
     * longer limited by MAX_CONCURRENT_STREAMS.
153
     */
154
    @GuardedBy("onReadyLock")
155
    private boolean allocated;
156
    /**
157
     * Indicates that the stream no longer exists for the transport. Implies that the application
158
     * should be discouraged from sending, because doing so would have no effect.
159
     */
160
    @GuardedBy("onReadyLock")
161
    private boolean deallocated;
162

163
    @GuardedBy("onReadyLock")
164
    private int onReadyThreshold;
165

166
    @SuppressWarnings("this-escape")
167
    protected TransportState(
168
        int maxMessageSize,
169
        StatsTraceContext statsTraceCtx,
170
        TransportTracer transportTracer) {
1✔
171
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
172
      this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
173
      this.rawDeframer = new MessageDeframer(
1✔
174
          this,
175
          Codec.Identity.NONE,
176
          maxMessageSize,
177
          statsTraceCtx,
178
          transportTracer);
179
      // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
180
      deframer = this.rawDeframer;
1✔
181
      onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
1✔
182
    }
1✔
183

184
    final void optimizeForDirectExecutor() {
185
      rawDeframer.setListener(this);
1✔
186
      deframer = rawDeframer;
1✔
187
    }
1✔
188

189
    protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
190
      rawDeframer.setFullStreamDecompressor(fullStreamDecompressor);
1✔
191
      deframer = new ApplicationThreadDeframer(this, this, rawDeframer);
1✔
192
    }
1✔
193

194
    final void setMaxInboundMessageSize(int maxSize) {
195
      deframer.setMaxInboundMessageSize(maxSize);
1✔
196
    }
1✔
197

198
    /**
199
     * Override this method to provide a stream listener.
200
     */
201
    protected abstract StreamListener listener();
202

203
    /**
204
     * A hint to the stream that specifies how many bytes must be queued before
205
     * {@link #isReady()} will return false. A stream may ignore this property if
206
     * unsupported. This may only be set before any messages are sent.
207
     *
208
     * @param numBytes The number of bytes that must be queued. Must be a
209
     *                 positive integer.
210
     */
211
    void setOnReadyThreshold(int numBytes) {
212
      synchronized (onReadyLock) {
1✔
213
        this.onReadyThreshold = numBytes;
1✔
214
      }
1✔
215
    }
1✔
216

217
    @Override
218
    public void messagesAvailable(StreamListener.MessageProducer producer) {
219
      listener().messagesAvailable(producer);
1✔
220
    }
1✔
221

222
    /**
223
     * Closes the deframer and frees any resources. After this method is called, additional calls
224
     * will have no effect.
225
     *
226
     * <p>When {@code stopDelivery} is false, the deframer will wait to close until any already
227
     * queued messages have been delivered.
228
     *
229
     * <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing.
230
     *
231
     * @param stopDelivery interrupt pending deliveries and close immediately
232
     */
233
    protected final void closeDeframer(boolean stopDelivery) {
234
      if (stopDelivery) {
1✔
235
        deframer.close();
1✔
236
      } else {
237
        deframer.closeWhenComplete();
1✔
238
      }
239
    }
1✔
240

241
    /**
242
     * Called to parse a received frame and attempt delivery of any completed messages. Must be
243
     * called from the transport thread.
244
     */
245
    protected final void deframe(final ReadableBuffer frame) {
246
      try {
247
        deframer.deframe(frame);
1✔
248
      } catch (Throwable t) {
1✔
249
        deframeFailed(t);
1✔
250
      }
1✔
251
    }
1✔
252

253
    /**
254
     * Called to request the given number of messages from the deframer. May be called from any
255
     * thread.
256
     */
257
    private void requestMessagesFromDeframer(final int numMessages) {
258
      if (deframer instanceof ThreadOptimizedDeframer) {
1✔
259
        try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
×
260
          deframer.request(numMessages);
×
261
        }
262
        return;
×
263
      }
264
      final Link link = PerfMark.linkOut();
1✔
265
      class RequestRunnable implements Runnable {
1✔
266
        @Override public void run() {
267
          try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
1✔
268
            PerfMark.linkIn(link);
1✔
269
            deframer.request(numMessages);
1✔
270
          } catch (Throwable t) {
×
271
            deframeFailed(t);
×
272
          }
1✔
273
        }
1✔
274
      }
275

276
      runOnTransportThread(new RequestRunnable());
1✔
277
    }
1✔
278

279
    /**
280
     * Very rarely used. Prefer stream.request() instead of this; this method is only necessary if
281
     * a stream is not available.
282
     */
283
    @VisibleForTesting
284
    public final void requestMessagesFromDeframerForTesting(int numMessages) {
285
      requestMessagesFromDeframer(numMessages);
1✔
286
    }
1✔
287

288
    public final StatsTraceContext getStatsTraceContext() {
289
      return statsTraceCtx;
1✔
290
    }
291

292
    protected final void setDecompressor(Decompressor decompressor) {
293
      deframer.setDecompressor(decompressor);
1✔
294
    }
1✔
295

296
    private boolean isReady() {
297
      synchronized (onReadyLock) {
1✔
298
        return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
1✔
299
      }
300
    }
301

302
    /**
303
     * Event handler to be called by the subclass when the stream's headers have passed any
304
     * connection flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link
305
     * StreamListener#onReady()} handler if appropriate. This must be called from the transport
306
     * thread, since the listener may be called back directly.
307
     */
308
    protected void onStreamAllocated() {
309
      checkState(listener() != null);
1✔
310
      synchronized (onReadyLock) {
1✔
311
        checkState(!allocated, "Already allocated");
1✔
312
        allocated = true;
1✔
313
      }
1✔
314
      notifyIfReady();
1✔
315
    }
1✔
316

317
    /**
318
     * Notify that the stream does not exist in a usable state any longer. This causes {@link
319
     * AbstractStream#isReady()} to return {@code false} from this point forward.
320
     *
321
     * <p>This does not generally need to be called explicitly by the transport, as it is handled
322
     * implicitly by {@link AbstractClientStream} and {@link AbstractServerStream}.
323
     */
324
    protected final void onStreamDeallocated() {
325
      synchronized (onReadyLock) {
1✔
326
        deallocated = true;
1✔
327
      }
1✔
328
    }
1✔
329

330
    protected boolean isStreamDeallocated() {
331
      synchronized (onReadyLock) {
1✔
332
        return deallocated;
1✔
333
      }
334
    }
335

336
    /**
337
     * Event handler to be called by the subclass when a number of bytes are being queued for
338
     * sending to the remote endpoint.
339
     *
340
     * @param numBytes the number of bytes being sent.
341
     */
342
    private void onSendingBytes(int numBytes) {
343
      synchronized (onReadyLock) {
1✔
344
        numSentBytesQueued += numBytes;
1✔
345
      }
1✔
346
    }
1✔
347

348
    /**
349
     * Event handler to be called by the subclass when a number of bytes has been sent to the remote
350
     * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if
351
     * appropriate.  This must be called from the transport thread, since the listener may be called
352
     * back directly.
353
     *
354
     * @param numBytes the number of bytes that were sent.
355
     */
356
    public final void onSentBytes(int numBytes) {
357
      boolean doNotify;
358
      synchronized (onReadyLock) {
1✔
359
        checkState(allocated,
1✔
360
            "onStreamAllocated was not called, but it seems the stream is active");
361
        boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
1✔
362
        numSentBytesQueued -= numBytes;
1✔
363
        boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
1✔
364
        doNotify = !belowThresholdBefore && belowThresholdAfter;
1✔
365
      }
1✔
366
      if (doNotify) {
1✔
367
        notifyIfReady();
1✔
368
      }
369
    }
1✔
370

371
    protected TransportTracer getTransportTracer() {
372
      return transportTracer;
1✔
373
    }
374

375
    private void notifyIfReady() {
376
      boolean doNotify;
377
      synchronized (onReadyLock) {
1✔
378
        doNotify = isReady();
1✔
379
        if (!doNotify && log.isLoggable(Level.FINEST)) {
1✔
380
          log.log(Level.FINEST,
×
381
              "Stream not ready so skip notifying listener.\n"
382
                  + "details: allocated/deallocated:{0}/{3}, sent queued: {1}, ready thresh: {2}",
383
              new Object[] {allocated, numSentBytesQueued, onReadyThreshold, deallocated});
×
384
        }
385
      }
1✔
386
      if (doNotify) {
1✔
387
        listener().onReady();
1✔
388
      }
389
    }
1✔
390
  }
391
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc