• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19161

16 Apr 2024 11:27PM UTC coverage: 88.117% (-0.01%) from 88.13%
#19161

push

github

web-flow
netty: Handle write queue promise failures (#11016)

Handles Netty write frame failures caused by issues in the Netty
itself.

Normally we don't need to do anything on frame write failures because
the cause of a failed future would be an IO error that resulted in
the stream closure.  Prior to this PR we treated these issues as a
noop, except the initial headers write on the client side.

However, a case like netty/netty#13805 (a bug in generating next
stream id) resulted in an unclosed stream on our side. This PR adds
write frame future failure handlers that ensures the stream is
cancelled, and the cause is propagated via Status.

Fixes #10849

31182 of 35387 relevant lines covered (88.12%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.5
/../netty/src/main/java/io/grpc/netty/NettyClientStream.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
23

24
import com.google.common.base.Preconditions;
25
import com.google.common.io.BaseEncoding;
26
import io.grpc.Attributes;
27
import io.grpc.CallOptions;
28
import io.grpc.InternalKnownTransport;
29
import io.grpc.InternalMethodDescriptor;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.internal.AbstractClientStream;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import io.grpc.internal.Http2ClientStreamTransportState;
36
import io.grpc.internal.StatsTraceContext;
37
import io.grpc.internal.TransportTracer;
38
import io.grpc.internal.WritableBuffer;
39
import io.netty.buffer.ByteBuf;
40
import io.netty.channel.Channel;
41
import io.netty.channel.ChannelFuture;
42
import io.netty.channel.ChannelFutureListener;
43
import io.netty.channel.EventLoop;
44
import io.netty.handler.codec.http2.Http2Headers;
45
import io.netty.handler.codec.http2.Http2Stream;
46
import io.netty.util.AsciiString;
47
import io.perfmark.PerfMark;
48
import io.perfmark.Tag;
49
import io.perfmark.TaskCloseable;
50
import javax.annotation.Nullable;
51

52
/**
53
 * Client stream for a Netty transport. Must only be called from the sending application
54
 * thread.
55
 */
56
class NettyClientStream extends AbstractClientStream {
57
  private static final InternalMethodDescriptor methodDescriptorAccessor =
1✔
58
      new InternalMethodDescriptor(
59
          NettyClientTransport.class.getName().contains("grpc.netty.shaded")
1✔
60
              ? InternalKnownTransport.NETTY_SHADED : InternalKnownTransport.NETTY);
1✔
61

62
  private final Sink sink = new Sink();
1✔
63
  private final TransportState state;
64
  private final WriteQueue writeQueue;
65
  private final MethodDescriptor<?, ?> method;
66
  private AsciiString authority;
67
  private final AsciiString scheme;
68
  private final AsciiString userAgent;
69

70
  NettyClientStream(
71
      TransportState state,
72
      MethodDescriptor<?, ?> method,
73
      Metadata headers,
74
      Channel channel,
75
      AsciiString authority,
76
      AsciiString scheme,
77
      AsciiString userAgent,
78
      StatsTraceContext statsTraceCtx,
79
      TransportTracer transportTracer,
80
      CallOptions callOptions,
81
      boolean useGetForSafeMethods) {
82
    super(
1✔
83
        new NettyWritableBufferAllocator(channel.alloc()),
1✔
84
        statsTraceCtx,
85
        transportTracer,
86
        headers,
87
        callOptions,
88
        useGetForSafeMethods && method.isSafe());
1✔
89
    this.state = checkNotNull(state, "transportState");
1✔
90
    this.writeQueue = state.handler.getWriteQueue();
1✔
91
    this.method = checkNotNull(method, "method");
1✔
92
    this.authority = checkNotNull(authority, "authority");
1✔
93
    this.scheme = checkNotNull(scheme, "scheme");
1✔
94
    this.userAgent = userAgent;
1✔
95
  }
1✔
96

97
  @Override
98
  protected TransportState transportState() {
99
    return state;
1✔
100
  }
101

102
  @Override
103
  protected Sink abstractClientStreamSink() {
104
    return sink;
1✔
105
  }
106

107
  @Override
108
  public void setAuthority(String authority) {
109
    this.authority = AsciiString.of(checkNotNull(authority, "authority"));
×
110
  }
×
111

112
  @Override
113
  public Attributes getAttributes() {
114
    return state.handler.getAttributes();
1✔
115
  }
116

117
  private class Sink implements AbstractClientStream.Sink {
1✔
118

119
    @Override
120
    public void writeHeaders(Metadata headers, byte[] requestPayload) {
121
      try (TaskCloseable ignore =
1✔
122
               PerfMark.traceTask("NettyClientStream$Sink.writeHeaders")) {
1✔
123
        writeHeadersInternal(headers, requestPayload);
1✔
124
      }
125
    }
1✔
126

127
    private void writeHeadersInternal(Metadata headers, byte[] requestPayload) {
128
      // Convert the headers into Netty HTTP/2 headers.
129
      AsciiString defaultPath = (AsciiString) methodDescriptorAccessor.geRawMethodName(method);
1✔
130
      if (defaultPath == null) {
1✔
131
        defaultPath = new AsciiString("/" + method.getFullMethodName());
1✔
132
        methodDescriptorAccessor.setRawMethodName(method, defaultPath);
1✔
133
      }
134
      boolean get = (requestPayload != null);
1✔
135
      AsciiString httpMethod;
136
      if (get) {
1✔
137
        // Forge the query string
138
        // TODO(ericgribkoff) Add the key back to the query string
139
        defaultPath =
1✔
140
            new AsciiString(defaultPath + "?" + BaseEncoding.base64().encode(requestPayload));
1✔
141
        httpMethod = Utils.HTTP_GET_METHOD;
1✔
142
      } else {
143
        httpMethod = Utils.HTTP_METHOD;
1✔
144
      }
145
      Http2Headers http2Headers = Utils.convertClientHeaders(headers, scheme, defaultPath,
1✔
146
          authority, httpMethod, userAgent);
1✔
147

148
      ChannelFutureListener failureListener = new ChannelFutureListener() {
1✔
149
        @Override
150
        public void operationComplete(ChannelFuture future) throws Exception {
151
          if (!future.isSuccess()) {
1✔
152
            // Stream creation failed. Close the stream if not already closed.
153
            // When the channel is shutdown, the lifecycle manager has a better view of the failure,
154
            // especially before negotiation completes (because the negotiator commonly doesn't
155
            // receive the exceptionCaught because NettyClientHandler does not propagate it).
156
            Status s = transportState().handler.getLifecycleManager().getShutdownStatus();
1✔
157
            if (s == null) {
1✔
158
              s = transportState().statusFromFailedFuture(future);
1✔
159
            }
160
            if (transportState().isNonExistent()) {
1✔
161
              transportState().transportReportStatus(
1✔
162
                  s, RpcProgress.MISCARRIED, true, new Metadata());
163
            } else {
164
              transportState().transportReportStatus(
1✔
165
                  s, RpcProgress.PROCESSED, true, new Metadata());
166
            }
167
          }
168
        }
1✔
169
      };
170
      // Write the command requesting the creation of the stream.
171
      writeQueue.enqueue(
1✔
172
          new CreateStreamCommand(http2Headers, transportState(), shouldBeCountedForInUse(), get),
1✔
173
          !method.getType().clientSendsOneMessage() || get).addListener(failureListener);
1✔
174
    }
1✔
175

176
    private void writeFrameInternal(
177
        WritableBuffer frame, boolean endOfStream, boolean flush, final int numMessages) {
178
      Preconditions.checkArgument(numMessages >= 0);
1✔
179
      ByteBuf bytebuf =
180
          frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf().touch();
1✔
181
      final int numBytes = bytebuf.readableBytes();
1✔
182
      if (numBytes > 0) {
1✔
183
        // Add the bytes to outbound flow control.
184
        onSendingBytes(numBytes);
1✔
185
        ChannelFutureListener failureListener =
1✔
186
            future -> transportState().onWriteFrameData(future, numMessages, numBytes);
1✔
187
        writeQueue.enqueue(new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush)
1✔
188
            .addListener(failureListener);
1✔
189
      } else {
1✔
190
        // The frame is empty and will not impact outbound flow control. Just send it.
191
        writeQueue.enqueue(
1✔
192
            new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream), flush);
1✔
193
      }
194
    }
1✔
195

196
    @Override
197
    public void writeFrame(
198
        WritableBuffer frame, boolean endOfStream, boolean flush, int numMessages) {
199
      try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.writeFrame")) {
1✔
200
        writeFrameInternal(frame, endOfStream, flush, numMessages);
1✔
201
      }
202
    }
1✔
203

204
    @Override
205
    public void cancel(Status status) {
206
      try (TaskCloseable ignore = PerfMark.traceTask("NettyClientStream$Sink.cancel")) {
1✔
207
        writeQueue.enqueue(new CancelClientStreamCommand(transportState(), status), true);
1✔
208
      }
209
    }
1✔
210
  }
211

212
  /** This should only be called from the transport thread. */
213
  public abstract static class TransportState extends Http2ClientStreamTransportState
214
      implements StreamIdHolder {
215
    private static final int NON_EXISTENT_ID = -1;
216

217
    private final String methodName;
218
    private final NettyClientHandler handler;
219
    private final EventLoop eventLoop;
220
    private int id;
221
    private Http2Stream http2Stream;
222
    private Tag tag;
223

224
    protected TransportState(
225
        NettyClientHandler handler,
226
        EventLoop eventLoop,
227
        int maxMessageSize,
228
        StatsTraceContext statsTraceCtx,
229
        TransportTracer transportTracer,
230
        String methodName,
231
        CallOptions options) {
232
      super(maxMessageSize, statsTraceCtx, transportTracer, options);
1✔
233
      this.methodName = checkNotNull(methodName, "methodName");
1✔
234
      this.handler = checkNotNull(handler, "handler");
1✔
235
      this.eventLoop = checkNotNull(eventLoop, "eventLoop");
1✔
236
      tag = PerfMark.createTag(methodName);
1✔
237
    }
1✔
238

239
    @Override
240
    public int id() {
241
      // id should be positive
242
      return id;
1✔
243
    }
244

245
    public void setId(int id) {
246
      checkArgument(id > 0, "id must be positive %s", id);
1✔
247
      checkState(this.id == 0, "id has been previously set: %s", this.id);
1✔
248
      this.id = id;
1✔
249
      this.tag = PerfMark.createTag(methodName, id);
1✔
250
    }
1✔
251

252
    /**
253
     * Marks the stream state as if it had never existed.  This can happen if the stream is
254
     * cancelled after it is created, but before it has been started.
255
     */
256
    void setNonExistent() {
257
      checkState(this.id == 0, "Id has been previously set: %s", this.id);
1✔
258
      this.id = NON_EXISTENT_ID;
1✔
259
    }
1✔
260

261
    boolean isNonExistent() {
262
      return this.id == NON_EXISTENT_ID || this.id == 0;
1✔
263
    }
264

265
    /**
266
     * Sets the underlying Netty {@link Http2Stream} for this stream. This must be called in the
267
     * context of the transport thread.
268
     */
269
    public void setHttp2Stream(Http2Stream http2Stream) {
270
      checkNotNull(http2Stream, "http2Stream");
1✔
271
      checkState(this.http2Stream == null, "Can only set http2Stream once");
1✔
272
      this.http2Stream = http2Stream;
1✔
273

274
      // Now that the stream has actually been initialized, call the listener's onReady callback if
275
      // appropriate.
276
      onStreamAllocated();
1✔
277
      getTransportTracer().reportLocalStreamStarted();
1✔
278
    }
1✔
279

280
    /**
281
     * Gets the underlying Netty {@link Http2Stream} for this stream.
282
     */
283
    @Nullable
284
    public Http2Stream http2Stream() {
285
      return http2Stream;
1✔
286
    }
287

288
    /**
289
     * Intended to be overridden by NettyClientTransport, which has more information about failures.
290
     * May only be called from event loop.
291
     */
292
    protected abstract Status statusFromFailedFuture(ChannelFuture f);
293

294
    @Override
295
    protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
296
      transportReportStatus(status, stopDelivery, trailers);
1✔
297
      handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, status), true);
1✔
298
    }
1✔
299

300
    private void onWriteFrameData(ChannelFuture future, int numMessages, int numBytes) {
301
      // If the future succeeds when http2stream is null, the stream has been cancelled
302
      // before it began and Netty is purging pending writes from the flow-controller.
303
      if (future.isSuccess() && http2Stream() == null) {
1✔
304
        return;
×
305
      }
306

307
      if (future.isSuccess()) {
1✔
308
        // Remove the bytes from outbound flow control, optionally notifying
309
        // the client that they can send more bytes.
310
        onSentBytes(numBytes);
1✔
311
        getTransportTracer().reportMessageSent(numMessages);
1✔
312
      } else if (!isStreamDeallocated()) {
1✔
313
        // Future failed, fail RPC.
314
        // Normally we don't need to do anything here because the cause of a failed future
315
        // while writing DATA frames would be an IO error and the stream is already closed.
316
        // However, we still need handle any unexpected failures raised in Netty.
317
        // Note: isStreamDeallocated() protects from spamming stream resets by scheduling multiple
318
        // CancelClientStreamCommand commands.
319
        http2ProcessingFailed(statusFromFailedFuture(future), true, new Metadata());
1✔
320
      }
321
    }
1✔
322

323
    @Override
324
    public void runOnTransportThread(final Runnable r) {
325
      if (eventLoop.inEventLoop()) {
1✔
326
        r.run();
1✔
327
      } else {
328
        eventLoop.execute(r);
1✔
329
      }
330
    }
1✔
331

332
    @Override
333
    public void bytesRead(int processedBytes) {
334
      handler.returnProcessedBytes(http2Stream, processedBytes);
1✔
335
      handler.getWriteQueue().scheduleFlush();
1✔
336
    }
1✔
337

338
    @Override
339
    public void deframeFailed(Throwable cause) {
340
      http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
1✔
341
    }
1✔
342

343
    void transportHeadersReceived(Http2Headers headers, boolean endOfStream) {
344
      if (endOfStream) {
1✔
345
        if (!isOutboundClosed()) {
1✔
346
          handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true);
1✔
347
        }
348
        transportTrailersReceived(Utils.convertTrailers(headers));
1✔
349
      } else {
350
        transportHeadersReceived(Utils.convertHeaders(headers));
1✔
351
      }
352
    }
1✔
353

354
    void transportDataReceived(ByteBuf frame, boolean endOfStream) {
355
      transportDataReceived(new NettyReadableBuffer(frame.retain()), endOfStream);
1✔
356
    }
1✔
357

358
    @Override
359
    public final Tag tag() {
360
      return tag;
1✔
361
    }
362
  }
363
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc