• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19139

02 Apr 2024 04:53PM UTC coverage: 88.259% (+0.02%) from 88.239%
#19139

push

github

web-flow
Use empty string instead of null for endpoint identification algorithm to disable server hostname verification, since null value gets ignored in Sun's SSLEngine implementation. (#11058)

Co-authored-by: Kannan J <kannanjgithub@google.com>

31210 of 35362 relevant lines covered (88.26%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.41
/../core/src/main/java/io/grpc/internal/AbstractStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import io.grpc.Codec;
24
import io.grpc.Compressor;
25
import io.grpc.Decompressor;
26
import io.perfmark.Link;
27
import io.perfmark.PerfMark;
28
import io.perfmark.TaskCloseable;
29
import java.io.InputStream;
30
import javax.annotation.concurrent.GuardedBy;
31

32
/**
33
 * The stream and stream state as used by the application. Must only be called from the sending
34
 * application thread.
35
 */
36
public abstract class AbstractStream implements Stream {
1✔
37
  /** The framer to use for sending messages. */
38
  protected abstract Framer framer();
39

40
  /**
41
   * Obtain the transport state corresponding to this stream. Each stream must have its own unique
42
   * transport state.
43
   */
44
  protected abstract TransportState transportState();
45

46
  @Override
47
  public void optimizeForDirectExecutor() {
48
    transportState().optimizeForDirectExecutor();
1✔
49
  }
1✔
50

51
  @Override
52
  public final void setMessageCompression(boolean enable) {
53
    framer().setMessageCompression(enable);
1✔
54
  }
1✔
55

56
  @Override
57
  public final void request(int numMessages) {
58
    transportState().requestMessagesFromDeframer(numMessages);
1✔
59
  }
1✔
60

61
  @Override
62
  public final void writeMessage(InputStream message) {
63
    checkNotNull(message, "message");
1✔
64
    try {
65
      if (!framer().isClosed()) {
1✔
66
        framer().writePayload(message);
1✔
67
      }
68
    } finally {
69
      GrpcUtil.closeQuietly(message);
1✔
70
    }
71
  }
1✔
72

73
  @Override
74
  public final void flush() {
75
    if (!framer().isClosed()) {
1✔
76
      framer().flush();
1✔
77
    }
78
  }
1✔
79

80
  /**
81
   * A hint to the stream that specifies how many bytes must be queued before
82
   * {@link #isReady()} will return false. A stream may ignore this property if
83
   * unsupported. This may only be set during stream initialization before
84
   * any messages are set.
85
   *
86
   * @param numBytes The number of bytes that must be queued. Must be a
87
   *                 positive integer.
88
   */
89
  protected void setOnReadyThreshold(int numBytes) {
90
    transportState().setOnReadyThreshold(numBytes);
1✔
91
  }
1✔
92

93
  /**
94
   * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
95
   * (half closure on client; closure on server).
96
   */
97
  protected final void endOfMessages() {
98
    framer().close();
1✔
99
  }
1✔
100

101
  @Override
102
  public final void setCompressor(Compressor compressor) {
103
    framer().setCompressor(checkNotNull(compressor, "compressor"));
1✔
104
  }
1✔
105

106
  @Override
107
  public boolean isReady() {
108
    return transportState().isReady();
1✔
109
  }
110

111
  /**
112
   * Event handler to be called by the subclass when a number of bytes are being queued for sending
113
   * to the remote endpoint.
114
   *
115
   * @param numBytes the number of bytes being sent.
116
   */
117
  protected final void onSendingBytes(int numBytes) {
118
    transportState().onSendingBytes(numBytes);
1✔
119
  }
1✔
120

121
  /**
122
   * Stream state as used by the transport. This should only be called from the transport thread
123
   * (except for private interactions with {@code AbstractStream}).
124
   */
125
  public abstract static class TransportState
126
      implements ApplicationThreadDeframer.TransportExecutor, MessageDeframer.Listener {
127
    /**
128
     * The default number of queued bytes for a given stream, below which
129
     * {@link StreamListener#onReady()} will be called.
130
     */
131
    @VisibleForTesting
132
    public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
133

134
    private Deframer deframer;
135
    private final Object onReadyLock = new Object();
1✔
136
    private final StatsTraceContext statsTraceCtx;
137
    private final TransportTracer transportTracer;
138
    private final MessageDeframer rawDeframer;
139

140
    /**
141
     * The number of bytes currently queued, waiting to be sent. When this falls below
142
     * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called.
143
     */
144
    @GuardedBy("onReadyLock")
145
    private int numSentBytesQueued;
146
    /**
147
     * Indicates the stream has been created on the connection. This implies that the stream is no
148
     * longer limited by MAX_CONCURRENT_STREAMS.
149
     */
150
    @GuardedBy("onReadyLock")
151
    private boolean allocated;
152
    /**
153
     * Indicates that the stream no longer exists for the transport. Implies that the application
154
     * should be discouraged from sending, because doing so would have no effect.
155
     */
156
    @GuardedBy("onReadyLock")
157
    private boolean deallocated;
158

159
    @GuardedBy("onReadyLock")
160
    private int onReadyThreshold;
161

162
    protected TransportState(
163
        int maxMessageSize,
164
        StatsTraceContext statsTraceCtx,
165
        TransportTracer transportTracer) {
1✔
166
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
167
      this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
168
      rawDeframer = new MessageDeframer(
1✔
169
          this,
170
          Codec.Identity.NONE,
171
          maxMessageSize,
172
          statsTraceCtx,
173
          transportTracer);
174
      // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
175
      deframer = rawDeframer;
1✔
176
      onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
1✔
177
    }
1✔
178

179
    final void optimizeForDirectExecutor() {
180
      rawDeframer.setListener(this);
1✔
181
      deframer = rawDeframer;
1✔
182
    }
1✔
183

184
    protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
185
      rawDeframer.setFullStreamDecompressor(fullStreamDecompressor);
1✔
186
      deframer = new ApplicationThreadDeframer(this, this, rawDeframer);
1✔
187
    }
1✔
188

189
    final void setMaxInboundMessageSize(int maxSize) {
190
      deframer.setMaxInboundMessageSize(maxSize);
1✔
191
    }
1✔
192

193
    /**
194
     * Override this method to provide a stream listener.
195
     */
196
    protected abstract StreamListener listener();
197

198
    /**
199
     * A hint to the stream that specifies how many bytes must be queued before
200
     * {@link #isReady()} will return false. A stream may ignore this property if
201
     * unsupported. This may only be set before any messages are sent.
202
     *
203
     * @param numBytes The number of bytes that must be queued. Must be a
204
     *                 positive integer.
205
     */
206
    void setOnReadyThreshold(int numBytes) {
207
      synchronized (onReadyLock) {
1✔
208
        this.onReadyThreshold = numBytes;
1✔
209
      }
1✔
210
    }
1✔
211

212
    @Override
213
    public void messagesAvailable(StreamListener.MessageProducer producer) {
214
      listener().messagesAvailable(producer);
1✔
215
    }
1✔
216

217
    /**
218
     * Closes the deframer and frees any resources. After this method is called, additional calls
219
     * will have no effect.
220
     *
221
     * <p>When {@code stopDelivery} is false, the deframer will wait to close until any already
222
     * queued messages have been delivered.
223
     *
224
     * <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing.
225
     *
226
     * @param stopDelivery interrupt pending deliveries and close immediately
227
     */
228
    protected final void closeDeframer(boolean stopDelivery) {
229
      if (stopDelivery) {
1✔
230
        deframer.close();
1✔
231
      } else {
232
        deframer.closeWhenComplete();
1✔
233
      }
234
    }
1✔
235

236
    /**
237
     * Called to parse a received frame and attempt delivery of any completed messages. Must be
238
     * called from the transport thread.
239
     */
240
    protected final void deframe(final ReadableBuffer frame) {
241
      try {
242
        deframer.deframe(frame);
1✔
243
      } catch (Throwable t) {
1✔
244
        deframeFailed(t);
1✔
245
      }
1✔
246
    }
1✔
247

248
    /**
249
     * Called to request the given number of messages from the deframer. May be called from any
250
     * thread.
251
     */
252
    private void requestMessagesFromDeframer(final int numMessages) {
253
      if (deframer instanceof ThreadOptimizedDeframer) {
1✔
254
        try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
×
255
          deframer.request(numMessages);
×
256
        }
257
        return;
×
258
      }
259
      final Link link = PerfMark.linkOut();
1✔
260
      class RequestRunnable implements Runnable {
1✔
261
        @Override public void run() {
262
          try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
1✔
263
            PerfMark.linkIn(link);
1✔
264
            deframer.request(numMessages);
1✔
265
          } catch (Throwable t) {
×
266
            deframeFailed(t);
×
267
          }
1✔
268
        }
1✔
269
      }
270

271
      runOnTransportThread(new RequestRunnable());
1✔
272
    }
1✔
273

274
    /**
275
     * Very rarely used. Prefer stream.request() instead of this; this method is only necessary if
276
     * a stream is not available.
277
     */
278
    @VisibleForTesting
279
    public final void requestMessagesFromDeframerForTesting(int numMessages) {
280
      requestMessagesFromDeframer(numMessages);
1✔
281
    }
1✔
282

283
    public final StatsTraceContext getStatsTraceContext() {
284
      return statsTraceCtx;
1✔
285
    }
286

287
    protected final void setDecompressor(Decompressor decompressor) {
288
      deframer.setDecompressor(decompressor);
1✔
289
    }
1✔
290

291
    private boolean isReady() {
292
      synchronized (onReadyLock) {
1✔
293
        return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
1✔
294
      }
295
    }
296

297
    /**
298
     * Event handler to be called by the subclass when the stream's headers have passed any
299
     * connection flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link
300
     * StreamListener#onReady()} handler if appropriate. This must be called from the transport
301
     * thread, since the listener may be called back directly.
302
     */
303
    protected void onStreamAllocated() {
304
      checkState(listener() != null);
1✔
305
      synchronized (onReadyLock) {
1✔
306
        checkState(!allocated, "Already allocated");
1✔
307
        allocated = true;
1✔
308
      }
1✔
309
      notifyIfReady();
1✔
310
    }
1✔
311

312
    /**
313
     * Notify that the stream does not exist in a usable state any longer. This causes {@link
314
     * AbstractStream#isReady()} to return {@code false} from this point forward.
315
     *
316
     * <p>This does not generally need to be called explicitly by the transport, as it is handled
317
     * implicitly by {@link AbstractClientStream} and {@link AbstractServerStream}.
318
     */
319
    protected final void onStreamDeallocated() {
320
      synchronized (onReadyLock) {
1✔
321
        deallocated = true;
1✔
322
      }
1✔
323
    }
1✔
324

325
    /**
326
     * Event handler to be called by the subclass when a number of bytes are being queued for
327
     * sending to the remote endpoint.
328
     *
329
     * @param numBytes the number of bytes being sent.
330
     */
331
    private void onSendingBytes(int numBytes) {
332
      synchronized (onReadyLock) {
1✔
333
        numSentBytesQueued += numBytes;
1✔
334
      }
1✔
335
    }
1✔
336

337
    /**
338
     * Event handler to be called by the subclass when a number of bytes has been sent to the remote
339
     * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if
340
     * appropriate.  This must be called from the transport thread, since the listener may be called
341
     * back directly.
342
     *
343
     * @param numBytes the number of bytes that were sent.
344
     */
345
    public final void onSentBytes(int numBytes) {
346
      boolean doNotify;
347
      synchronized (onReadyLock) {
1✔
348
        checkState(allocated,
1✔
349
            "onStreamAllocated was not called, but it seems the stream is active");
350
        boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
1✔
351
        numSentBytesQueued -= numBytes;
1✔
352
        boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
1✔
353
        doNotify = !belowThresholdBefore && belowThresholdAfter;
1✔
354
      }
1✔
355
      if (doNotify) {
1✔
356
        notifyIfReady();
1✔
357
      }
358
    }
1✔
359

360
    protected TransportTracer getTransportTracer() {
361
      return transportTracer;
1✔
362
    }
363

364
    private void notifyIfReady() {
365
      boolean doNotify;
366
      synchronized (onReadyLock) {
1✔
367
        doNotify = isReady();
1✔
368
      }
1✔
369
      if (doNotify) {
1✔
370
        listener().onReady();
1✔
371
      }
372
    }
1✔
373
  }
374
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc