• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19398

02 Aug 2024 07:31PM CUT coverage: 84.447% (-0.001%) from 84.448%
#19398

push

github

ejona86
xds: XdsClient should unsubscribe on last resource (#11264)

Otherwise, the server will continue sending updates and if we
re-subscribe to the last resource, the server won't re-send it. Also
completely remove the per-type state, as it could only add confusion.

33234 of 39355 relevant lines covered (84.45%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/../netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import io.grpc.Status;
22
import io.netty.buffer.ByteBuf;
23
import io.netty.buffer.ByteBufUtil;
24
import io.netty.channel.ChannelDuplexHandler;
25
import io.netty.channel.ChannelFuture;
26
import io.netty.channel.ChannelFutureListener;
27
import io.netty.channel.ChannelHandler;
28
import io.netty.channel.ChannelHandlerContext;
29
import io.netty.channel.ChannelPromise;
30
import io.netty.util.ReferenceCountUtil;
31
import java.net.SocketAddress;
32
import java.util.ArrayDeque;
33
import java.util.Queue;
34
import java.util.logging.Level;
35
import java.util.logging.Logger;
36

37
/**
38
 * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
39
 * {@link #failWrites(Throwable)} is called. This handler allows us to
40
 * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
41
 * i.e.  before it's active or the TLS Handshake is complete.
42
 */
43
final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
44
  private static final Logger logger =
1✔
45
      Logger.getLogger(WriteBufferingAndExceptionHandler.class.getName());
1✔
46

47
  private final Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>();
1✔
48
  private final ChannelHandler next;
49
  private boolean writing;
50
  private boolean flushRequested;
51
  private Throwable failCause;
52

53
  WriteBufferingAndExceptionHandler(ChannelHandler next) {
1✔
54
    this.next = checkNotNull(next, "next");
1✔
55
  }
1✔
56

57
  @Override
58
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
59
    ctx.pipeline().addBefore(ctx.name(), null, next);
1✔
60
    super.handlerAdded(ctx);
1✔
61
    // kick off protocol negotiation.
62
    ctx.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
1✔
63
  }
1✔
64

65
  @Override
66
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
67
    if (!bufferedWrites.isEmpty()) {
1✔
68
      Status status = Status.INTERNAL.withDescription("Buffer removed before draining writes");
1✔
69
      failWrites(status.asRuntimeException());
1✔
70
    }
71
    super.handlerRemoved(ctx);
1✔
72
  }
1✔
73

74
  /**
75
   * If this channel becomes inactive, then notify all buffered writes that we failed.
76
   */
77
  @Override
78
  public void channelInactive(ChannelHandlerContext ctx) {
79
    Status status = Status.UNAVAILABLE.withDescription(
1✔
80
        "Connection closed while performing protocol negotiation for " + ctx.pipeline().names());
1✔
81
    failWrites(status.asRuntimeException());
1✔
82
  }
1✔
83

84
  @Override
85
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
86
    assert cause != null;
1✔
87
    Throwable previousFailure = failCause;
1✔
88
    Status status = Utils.statusFromThrowable(cause)
1✔
89
        .augmentDescription("Channel Pipeline: " + ctx.pipeline().names());
1✔
90
    failWrites(status.asRuntimeException());
1✔
91
    // Check to see if the channel is active and this is the first failure.  If a downstream
92
    // handler triggers an exception in close(), avoid being reentrant.  This is not obviously
93
    // correct, so here are the cases and how they are correctly handled:
94
    // 1. !active, prev==null: the channel is inactive, no-op
95
    // 2. !active, prev!=null: the channel is inactive, no-op
96
    // 3.  active, prev==null: this is the first error, close
97
    // 4a. active, prev!=null[channelInactive]: impossible, no-op
98
    // 4b. active, prev!=null[close]: close() cannot succeed, no point in calling ctx.close().
99
    // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write.
100
    // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect.
101
    if (ctx.channel().isActive() && previousFailure == null) {
1✔
102
      final class LogOnFailure implements ChannelFutureListener {
1✔
103
        @Override
104
        public void operationComplete(ChannelFuture future) {
105
          if (!future.isSuccess()) {
1✔
106
            logger.log(Level.FINE, "Failed closing channel", future.cause());
×
107
          }
108
        }
1✔
109
      }
110

111
      ctx.close().addListener(new LogOnFailure());
1✔
112
    }
113
  }
1✔
114

115
  /**
116
   * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
117
   * called, or we have somehow failed. If we have already failed in the past, then the write
118
   * will fail immediately.
119
   */
120
  @Override
121
  @SuppressWarnings("FutureReturnValueIgnored")
122
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
123
    if (failCause != null) {
1✔
124
      promise.setFailure(failCause);
1✔
125
      ReferenceCountUtil.release(msg);
1✔
126
    } else {
127
      // Do not special case GracefulServerCloseCommand, as we don't want to cause handshake
128
      // failures.
129
      if (msg instanceof GracefulCloseCommand || msg instanceof ForcefulCloseCommand) {
1✔
130
        // No point in continuing negotiation
131
        ctx.close();
1✔
132
        // Still enqueue the command in case the HTTP/2 handler is already on the pipeline
133
      }
134
      bufferedWrites.add(new ChannelWrite(msg, promise));
1✔
135
    }
136
  }
1✔
137

138
  /**
139
   * Connect failures do not show up as {@link #channelInactive} or {@link #exceptionCaught}, so
140
   * it needs to be watched.
141
   */
142
  @Override
143
  public void connect(
144
      ChannelHandlerContext ctx,
145
      SocketAddress remoteAddress,
146
      SocketAddress localAddress,
147
      ChannelPromise promise) throws Exception {
148
    final class ConnectListener implements ChannelFutureListener {
1✔
149
      @Override
150
      public void operationComplete(ChannelFuture future) {
151
        if (!future.isSuccess()) {
1✔
152
          failWrites(future.cause());
1✔
153
        }
154
      }
1✔
155
    }
156

157
    super.connect(ctx, remoteAddress, localAddress, promise);
1✔
158
    promise.addListener(new ConnectListener());
1✔
159
  }
1✔
160

161
  @Override
162
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
163
    try {
164
      if (logger.isLoggable(Level.FINE)) {
1✔
165
        Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg;
×
166
        logger.log(
×
167
            Level.FINE,
168
            "Unexpected channelRead()->{0} reached end of pipeline {1}",
169
            new Object[] {loggedMsg, ctx.pipeline().names()});
×
170
      }
171
      exceptionCaught(
1✔
172
          ctx,
173
          Status.INTERNAL.withDescription(
1✔
174
              "channelRead() missed by ProtocolNegotiator handler: " + msg)
175
              .asRuntimeException());
1✔
176
    } finally {
177
      ReferenceCountUtil.safeRelease(msg);
1✔
178
    }
179
  }
1✔
180

181
  /**
182
   * Calls to this method will not trigger an immediate flush. The flush will be deferred until
183
   * {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
184
   */
185
  @Override
186
  public void flush(ChannelHandlerContext ctx) {
187
    /*
188
     * Swallowing any flushes is not only an optimization but also required
189
     * for the SslHandler to work correctly. If the SslHandler receives multiple
190
     * flushes while the handshake is still ongoing, then the handshake "randomly"
191
     * times out. Not sure at this point why this is happening. Doing a single flush
192
     * seems to work but multiple flushes don't ...
193
     */
194
    flushRequested = true;
1✔
195
  }
1✔
196

197
  /**
198
   * If we are still performing protocol negotiation, then this will propagate failures to all
199
   * buffered writes.
200
   */
201
  @Override
202
  public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
203
    Status status = Status.UNAVAILABLE.withDescription(
1✔
204
        "Connection closing while performing protocol negotiation for " + ctx.pipeline().names());
1✔
205
    failWrites(status.asRuntimeException());
1✔
206
    super.close(ctx, future);
1✔
207
  }
1✔
208

209
  @SuppressWarnings("FutureReturnValueIgnored")
210
  final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
211
    // TODO(carl-mastrangelo): remove the isActive check and just fail if not yet ready.
212
    if (!ctx.channel().isActive() || writing) {
1✔
213
      return;
×
214
    }
215
    // Make sure that method can't be reentered, so that the ordering
216
    // in the queue can't be messed up.
217
    writing = true;
1✔
218
    while (!bufferedWrites.isEmpty()) {
1✔
219
      ChannelWrite write = bufferedWrites.poll();
1✔
220
      ctx.write(write.msg, write.promise);
1✔
221
    }
1✔
222
    if (flushRequested) {
1✔
223
      ctx.flush();
1✔
224
    }
225
    // Removal has to happen last as the above writes will likely trigger
226
    // new writes that have to be added to the end of queue in order to not
227
    // mess up the ordering.
228
    ctx.pipeline().remove(this);
1✔
229
  }
1✔
230

231
  /**
232
   * Propagate failures to all buffered writes.
233
   */
234
  @SuppressWarnings("FutureReturnValueIgnored")
235
  private void failWrites(Throwable cause) {
236
    if (failCause == null) {
1✔
237
      failCause = cause;
1✔
238
    } else {
239
      logger.log(Level.FINE, "Ignoring duplicate failure", cause);
1✔
240
    }
241
    while (!bufferedWrites.isEmpty()) {
1✔
242
      ChannelWrite write = bufferedWrites.poll();
1✔
243
      write.promise.setFailure(cause);
1✔
244
      ReferenceCountUtil.release(write.msg);
1✔
245
    }
1✔
246
  }
1✔
247

248
  private static final class ChannelWrite {
249
    final Object msg;
250
    final ChannelPromise promise;
251

252
    ChannelWrite(Object msg, ChannelPromise promise) {
1✔
253
      this.msg = msg;
1✔
254
      this.promise = promise;
1✔
255
    }
1✔
256
  }
257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc