• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19621

06 Jan 2025 09:09PM UTC coverage: 88.568%. Remained the same
#19621

push

github

web-flow
xds:Move creating the retry timer in handleRpcStreamClosed to as late as possible and call close() (#11776)

* Move creating the retry timer in handleRpcStreamClosed to as late as possible and call `close` so that the `call` is cancelled.
Also add some debug logging.

33630 of 37971 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.91
/../core/src/main/java/io/grpc/internal/AbstractStream.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import io.grpc.Codec;
24
import io.grpc.Compressor;
25
import io.grpc.Decompressor;
26
import io.perfmark.Link;
27
import io.perfmark.PerfMark;
28
import io.perfmark.TaskCloseable;
29
import java.io.InputStream;
30
import java.util.logging.Level;
31
import java.util.logging.Logger;
32
import javax.annotation.concurrent.GuardedBy;
33

34
/**
35
 * The stream and stream state as used by the application. Must only be called from the sending
36
 * application thread.
37
 */
38
public abstract class AbstractStream implements Stream {
1✔
39
  private static final Logger log = Logger.getLogger(AbstractStream.class.getName());
1✔
40

41
  /** The framer to use for sending messages. */
42
  protected abstract Framer framer();
43

44
  /**
45
   * Obtain the transport state corresponding to this stream. Each stream must have its own unique
46
   * transport state.
47
   */
48
  protected abstract TransportState transportState();
49

50
  @Override
51
  public void optimizeForDirectExecutor() {
52
    transportState().optimizeForDirectExecutor();
1✔
53
  }
1✔
54

55
  @Override
56
  public final void setMessageCompression(boolean enable) {
57
    framer().setMessageCompression(enable);
1✔
58
  }
1✔
59

60
  @Override
61
  public final void request(int numMessages) {
62
    transportState().requestMessagesFromDeframer(numMessages);
1✔
63
  }
1✔
64

65
  @Override
66
  public final void writeMessage(InputStream message) {
67
    checkNotNull(message, "message");
1✔
68
    try {
69
      if (!framer().isClosed()) {
1✔
70
        framer().writePayload(message);
1✔
71
      }
72
    } finally {
73
      GrpcUtil.closeQuietly(message);
1✔
74
    }
75
  }
1✔
76

77
  @Override
78
  public final void flush() {
79
    if (!framer().isClosed()) {
1✔
80
      framer().flush();
1✔
81
    }
82
  }
1✔
83

84
  /**
85
   * A hint to the stream that specifies how many bytes must be queued before
86
   * {@link #isReady()} will return false. A stream may ignore this property if
87
   * unsupported. This may only be set during stream initialization before
88
   * any messages are set.
89
   *
90
   * @param numBytes The number of bytes that must be queued. Must be a
91
   *                 positive integer.
92
   */
93
  protected void setOnReadyThreshold(int numBytes) {
94
    transportState().setOnReadyThreshold(numBytes);
1✔
95
  }
1✔
96

97
  /**
98
   * Closes the underlying framer. Should be called when the outgoing stream is gracefully closed
99
   * (half closure on client; closure on server).
100
   */
101
  protected final void endOfMessages() {
102
    framer().close();
1✔
103
  }
1✔
104

105
  @Override
106
  public final void setCompressor(Compressor compressor) {
107
    framer().setCompressor(checkNotNull(compressor, "compressor"));
1✔
108
  }
1✔
109

110
  @Override
111
  public boolean isReady() {
112
    return transportState().isReady();
1✔
113
  }
114

115
  /**
116
   * Event handler to be called by the subclass when a number of bytes are being queued for sending
117
   * to the remote endpoint.
118
   *
119
   * @param numBytes the number of bytes being sent.
120
   */
121
  protected final void onSendingBytes(int numBytes) {
122
    transportState().onSendingBytes(numBytes);
1✔
123
  }
1✔
124

125
  /**
126
   * Stream state as used by the transport. This should only be called from the transport thread
127
   * (except for private interactions with {@code AbstractStream}).
128
   */
129
  public abstract static class TransportState
130
      implements ApplicationThreadDeframer.TransportExecutor, MessageDeframer.Listener {
131
    /**
132
     * The default number of queued bytes for a given stream, below which
133
     * {@link StreamListener#onReady()} will be called.
134
     */
135
    @VisibleForTesting
136
    public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
137

138
    private Deframer deframer;
139
    private final Object onReadyLock = new Object();
1✔
140
    private final StatsTraceContext statsTraceCtx;
141
    private final TransportTracer transportTracer;
142
    private final MessageDeframer rawDeframer;
143

144
    /**
145
     * The number of bytes currently queued, waiting to be sent. When this falls below
146
     * DEFAULT_ONREADY_THRESHOLD, {@link StreamListener#onReady()} will be called.
147
     */
148
    @GuardedBy("onReadyLock")
149
    private int numSentBytesQueued;
150
    /**
151
     * Indicates the stream has been created on the connection. This implies that the stream is no
152
     * longer limited by MAX_CONCURRENT_STREAMS.
153
     */
154
    @GuardedBy("onReadyLock")
155
    private boolean allocated;
156
    /**
157
     * Indicates that the stream no longer exists for the transport. Implies that the application
158
     * should be discouraged from sending, because doing so would have no effect.
159
     */
160
    @GuardedBy("onReadyLock")
161
    private boolean deallocated;
162

163
    @GuardedBy("onReadyLock")
164
    private int onReadyThreshold;
165

166
    protected TransportState(
167
        int maxMessageSize,
168
        StatsTraceContext statsTraceCtx,
169
        TransportTracer transportTracer) {
1✔
170
      this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
1✔
171
      this.transportTracer = checkNotNull(transportTracer, "transportTracer");
1✔
172
      rawDeframer = new MessageDeframer(
1✔
173
          this,
174
          Codec.Identity.NONE,
175
          maxMessageSize,
176
          statsTraceCtx,
177
          transportTracer);
178
      // TODO(#7168): use MigratingThreadDeframer when enabling retry doesn't break.
179
      deframer = rawDeframer;
1✔
180
      onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
1✔
181
    }
1✔
182

183
    final void optimizeForDirectExecutor() {
184
      rawDeframer.setListener(this);
1✔
185
      deframer = rawDeframer;
1✔
186
    }
1✔
187

188
    protected void setFullStreamDecompressor(GzipInflatingBuffer fullStreamDecompressor) {
189
      rawDeframer.setFullStreamDecompressor(fullStreamDecompressor);
1✔
190
      deframer = new ApplicationThreadDeframer(this, this, rawDeframer);
1✔
191
    }
1✔
192

193
    final void setMaxInboundMessageSize(int maxSize) {
194
      deframer.setMaxInboundMessageSize(maxSize);
1✔
195
    }
1✔
196

197
    /**
198
     * Override this method to provide a stream listener.
199
     */
200
    protected abstract StreamListener listener();
201

202
    /**
203
     * A hint to the stream that specifies how many bytes must be queued before
204
     * {@link #isReady()} will return false. A stream may ignore this property if
205
     * unsupported. This may only be set before any messages are sent.
206
     *
207
     * @param numBytes The number of bytes that must be queued. Must be a
208
     *                 positive integer.
209
     */
210
    void setOnReadyThreshold(int numBytes) {
211
      synchronized (onReadyLock) {
1✔
212
        this.onReadyThreshold = numBytes;
1✔
213
      }
1✔
214
    }
1✔
215

216
    @Override
217
    public void messagesAvailable(StreamListener.MessageProducer producer) {
218
      listener().messagesAvailable(producer);
1✔
219
    }
1✔
220

221
    /**
222
     * Closes the deframer and frees any resources. After this method is called, additional calls
223
     * will have no effect.
224
     *
225
     * <p>When {@code stopDelivery} is false, the deframer will wait to close until any already
226
     * queued messages have been delivered.
227
     *
228
     * <p>The deframer will invoke {@link #deframerClosed(boolean)} upon closing.
229
     *
230
     * @param stopDelivery interrupt pending deliveries and close immediately
231
     */
232
    protected final void closeDeframer(boolean stopDelivery) {
233
      if (stopDelivery) {
1✔
234
        deframer.close();
1✔
235
      } else {
236
        deframer.closeWhenComplete();
1✔
237
      }
238
    }
1✔
239

240
    /**
241
     * Called to parse a received frame and attempt delivery of any completed messages. Must be
242
     * called from the transport thread.
243
     */
244
    protected final void deframe(final ReadableBuffer frame) {
245
      try {
246
        deframer.deframe(frame);
1✔
247
      } catch (Throwable t) {
1✔
248
        deframeFailed(t);
1✔
249
      }
1✔
250
    }
1✔
251

252
    /**
253
     * Called to request the given number of messages from the deframer. May be called from any
254
     * thread.
255
     */
256
    private void requestMessagesFromDeframer(final int numMessages) {
257
      if (deframer instanceof ThreadOptimizedDeframer) {
1✔
258
        try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
×
259
          deframer.request(numMessages);
×
260
        }
261
        return;
×
262
      }
263
      final Link link = PerfMark.linkOut();
1✔
264
      class RequestRunnable implements Runnable {
1✔
265
        @Override public void run() {
266
          try (TaskCloseable ignore = PerfMark.traceTask("AbstractStream.request")) {
1✔
267
            PerfMark.linkIn(link);
1✔
268
            deframer.request(numMessages);
1✔
269
          } catch (Throwable t) {
×
270
            deframeFailed(t);
×
271
          }
1✔
272
        }
1✔
273
      }
274

275
      runOnTransportThread(new RequestRunnable());
1✔
276
    }
1✔
277

278
    /**
279
     * Very rarely used. Prefer stream.request() instead of this; this method is only necessary if
280
     * a stream is not available.
281
     */
282
    @VisibleForTesting
283
    public final void requestMessagesFromDeframerForTesting(int numMessages) {
284
      requestMessagesFromDeframer(numMessages);
1✔
285
    }
1✔
286

287
    public final StatsTraceContext getStatsTraceContext() {
288
      return statsTraceCtx;
1✔
289
    }
290

291
    protected final void setDecompressor(Decompressor decompressor) {
292
      deframer.setDecompressor(decompressor);
1✔
293
    }
1✔
294

295
    private boolean isReady() {
296
      synchronized (onReadyLock) {
1✔
297
        return allocated && numSentBytesQueued < onReadyThreshold && !deallocated;
1✔
298
      }
299
    }
300

301
    /**
302
     * Event handler to be called by the subclass when the stream's headers have passed any
303
     * connection flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link
304
     * StreamListener#onReady()} handler if appropriate. This must be called from the transport
305
     * thread, since the listener may be called back directly.
306
     */
307
    protected void onStreamAllocated() {
308
      checkState(listener() != null);
1✔
309
      synchronized (onReadyLock) {
1✔
310
        checkState(!allocated, "Already allocated");
1✔
311
        allocated = true;
1✔
312
      }
1✔
313
      notifyIfReady();
1✔
314
    }
1✔
315

316
    /**
317
     * Notify that the stream does not exist in a usable state any longer. This causes {@link
318
     * AbstractStream#isReady()} to return {@code false} from this point forward.
319
     *
320
     * <p>This does not generally need to be called explicitly by the transport, as it is handled
321
     * implicitly by {@link AbstractClientStream} and {@link AbstractServerStream}.
322
     */
323
    protected final void onStreamDeallocated() {
324
      synchronized (onReadyLock) {
1✔
325
        deallocated = true;
1✔
326
      }
1✔
327
    }
1✔
328

329
    protected boolean isStreamDeallocated() {
330
      synchronized (onReadyLock) {
1✔
331
        return deallocated;
1✔
332
      }
333
    }
334

335
    /**
336
     * Event handler to be called by the subclass when a number of bytes are being queued for
337
     * sending to the remote endpoint.
338
     *
339
     * @param numBytes the number of bytes being sent.
340
     */
341
    private void onSendingBytes(int numBytes) {
342
      synchronized (onReadyLock) {
1✔
343
        numSentBytesQueued += numBytes;
1✔
344
      }
1✔
345
    }
1✔
346

347
    /**
348
     * Event handler to be called by the subclass when a number of bytes has been sent to the remote
349
     * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if
350
     * appropriate.  This must be called from the transport thread, since the listener may be called
351
     * back directly.
352
     *
353
     * @param numBytes the number of bytes that were sent.
354
     */
355
    public final void onSentBytes(int numBytes) {
356
      boolean doNotify;
357
      synchronized (onReadyLock) {
1✔
358
        checkState(allocated,
1✔
359
            "onStreamAllocated was not called, but it seems the stream is active");
360
        boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
1✔
361
        numSentBytesQueued -= numBytes;
1✔
362
        boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
1✔
363
        doNotify = !belowThresholdBefore && belowThresholdAfter;
1✔
364
      }
1✔
365
      if (doNotify) {
1✔
366
        notifyIfReady();
1✔
367
      }
368
    }
1✔
369

370
    protected TransportTracer getTransportTracer() {
371
      return transportTracer;
1✔
372
    }
373

374
    private void notifyIfReady() {
375
      boolean doNotify;
376
      synchronized (onReadyLock) {
1✔
377
        doNotify = isReady();
1✔
378
        if (!doNotify && log.isLoggable(Level.FINEST)) {
1✔
379
          log.log(Level.FINEST,
×
380
              "Stream not ready so skip notifying listener.\n"
381
                  + "details: allocated/deallocated:{0}/{3}, sent queued: {1}, ready thresh: {2}",
382
              new Object[] {allocated, numSentBytesQueued, onReadyThreshold, deallocated});
×
383
        }
384
      }
1✔
385
      if (doNotify) {
1✔
386
        listener().onReady();
1✔
387
      }
388
    }
1✔
389
  }
390
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc