• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19693

14 Feb 2025 09:20PM CUT coverage: 88.626% (-0.008%) from 88.634%
#19693

push

github

ejona86
kokoro: Increase gradle mem in android-interop

To try to aid failure when building android-interop-testing
```
The Daemon will expire after the build after running out of JVM heap space.
The project memory settings are likely not configured or are configured to an insufficient value.
The daemon will restart for the next build, which may increase subsequent build times.
These settings can be adjusted by setting 'org.gradle.jvmargs' in 'gradle.properties'.
The currently configured max heap space is '512 MiB' and the configured max metaspace is '384 MiB'.
...
Exception in thread "Daemon client event forwarder" java.lang.OutOfMemoryError: Java heap space
...
> Task :grpc-android-interop-testing:mergeDexDebug FAILED
ERROR:D8: java.lang.OutOfMemoryError: Java heap space
com.android.builder.dexing.DexArchiveMergerException: Error while merging dex archives:
```

34270 of 38668 relevant lines covered (88.63%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.12
/../netty/src/main/java/io/grpc/netty/WriteBufferingAndExceptionHandler.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.netty;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import io.grpc.Status;
22
import io.netty.buffer.ByteBuf;
23
import io.netty.buffer.ByteBufUtil;
24
import io.netty.channel.ChannelDuplexHandler;
25
import io.netty.channel.ChannelFuture;
26
import io.netty.channel.ChannelFutureListener;
27
import io.netty.channel.ChannelHandler;
28
import io.netty.channel.ChannelHandlerContext;
29
import io.netty.channel.ChannelPromise;
30
import io.netty.util.ReferenceCountUtil;
31
import java.net.SocketAddress;
32
import java.util.ArrayDeque;
33
import java.util.Queue;
34
import java.util.logging.Level;
35
import java.util.logging.Logger;
36

37
/**
38
 * Buffers all writes until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} or
39
 * {@link #failWrites(Throwable)} is called. This handler allows us to
40
 * write to a {@link io.netty.channel.Channel} before we are allowed to write to it officially
41
 * i.e.  before it's active or the TLS Handshake is complete.
42
 */
43
final class WriteBufferingAndExceptionHandler extends ChannelDuplexHandler {
44
  private static final Logger logger =
1✔
45
      Logger.getLogger(WriteBufferingAndExceptionHandler.class.getName());
1✔
46

47
  private final Queue<ChannelWrite> bufferedWrites = new ArrayDeque<>();
1✔
48
  private final ChannelHandler next;
49
  private boolean writing;
50
  private boolean flushRequested;
51
  private Throwable failCause;
52

53
  WriteBufferingAndExceptionHandler(ChannelHandler next) {
1✔
54
    this.next = checkNotNull(next, "next");
1✔
55
  }
1✔
56

57
  @Override
58
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
59
    ctx.pipeline().addBefore(ctx.name(), null, next);
1✔
60
    super.handlerAdded(ctx);
1✔
61
    // kick off protocol negotiation.
62
    ctx.pipeline().fireUserEventTriggered(ProtocolNegotiationEvent.DEFAULT);
1✔
63
  }
1✔
64

65
  @Override
66
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
67
    if (!bufferedWrites.isEmpty()) {
1✔
68
      Status status = Status.INTERNAL.withDescription("Buffer removed before draining writes");
1✔
69
      failWrites(status.asRuntimeException());
1✔
70
    }
71
    super.handlerRemoved(ctx);
1✔
72
  }
1✔
73

74
  /**
75
   * If this channel becomes inactive, then notify all buffered writes that we failed.
76
   */
77
  @Override
78
  public void channelInactive(ChannelHandlerContext ctx) {
79
    Status status = Status.UNAVAILABLE.withDescription(
1✔
80
        "Connection closed while performing protocol negotiation for " + ctx.pipeline().names());
1✔
81
    failWrites(status.asRuntimeException());
1✔
82
  }
1✔
83

84
  @Override
85
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
86
    assert cause != null;
1✔
87
    Throwable previousFailure = failCause;
1✔
88
    Status status = Utils.statusFromThrowable(cause)
1✔
89
        .augmentDescription("Channel Pipeline: " + ctx.pipeline().names());
1✔
90
    failWrites(status.asRuntimeException());
1✔
91
    // Check to see if the channel is active and this is the first failure.  If a downstream
92
    // handler triggers an exception in close(), avoid being reentrant.  This is not obviously
93
    // correct, so here are the cases and how they are correctly handled:
94
    // 1. !active, prev==null: the channel is inactive, no-op
95
    // 2. !active, prev!=null: the channel is inactive, no-op
96
    // 3.  active, prev==null: this is the first error, close
97
    // 4a. active, prev!=null[channelInactive]: impossible, no-op
98
    // 4b. active, prev!=null[close]: close() cannot succeed, no point in calling ctx.close().
99
    // 4c. active, prev!=null[handlerRemoved]: channel will be closed out-of-band by buffered write.
100
    // 4d. active, prev!=null[connect]: impossible, channel can't be active after a failed connect.
101
    if (ctx.channel().isActive() && previousFailure == null) {
1✔
102
      final class LogOnFailure implements ChannelFutureListener {
1✔
103
        @Override
104
        public void operationComplete(ChannelFuture future) {
105
          if (!future.isSuccess()) {
1✔
106
            logger.log(Level.FINE, "Failed closing channel", future.cause());
×
107
          }
108
        }
1✔
109
      }
110

111
      ctx.close().addListener(new LogOnFailure());
1✔
112
    }
113
  }
1✔
114

115
  /**
116
   * Buffers the write until either {@link #writeBufferedAndRemove(ChannelHandlerContext)} is
117
   * called, or we have somehow failed. If we have already failed in the past, then the write
118
   * will fail immediately.
119
   */
120
  @Override
121
  @SuppressWarnings("FutureReturnValueIgnored")
122
  public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
123
    if (failCause != null) {
1✔
124
      promise.setFailure(failCause);
1✔
125
      ReferenceCountUtil.release(msg);
1✔
126
    } else {
127
      // Do not special case GracefulServerCloseCommand, as we don't want to cause handshake
128
      // failures.
129
      if (msg instanceof GracefulCloseCommand || msg instanceof ForcefulCloseCommand) {
1✔
130
        // No point in continuing negotiation
131
        ctx.close();
1✔
132
        // Still enqueue the command in case the HTTP/2 handler is already on the pipeline
133
      }
134
      bufferedWrites.add(new ChannelWrite(msg, promise));
1✔
135
    }
136
  }
1✔
137

138
  /**
139
   * Connect failures do not show up as {@link #channelInactive} or {@link #exceptionCaught}, so
140
   * it needs to be watched.
141
   */
142
  @Override
143
  public void connect(
144
      ChannelHandlerContext ctx,
145
      SocketAddress remoteAddress,
146
      SocketAddress localAddress,
147
      ChannelPromise promise) throws Exception {
148
    final class ConnectListener implements ChannelFutureListener {
1✔
149
      @Override
150
      public void operationComplete(ChannelFuture future) {
151
        if (!future.isSuccess()) {
1✔
152
          failWrites(future.cause());
1✔
153
        }
154
      }
1✔
155
    }
156

157
    super.connect(ctx, remoteAddress, localAddress, promise);
1✔
158
    promise.addListener(new ConnectListener());
1✔
159
  }
1✔
160

161
  @Override
162
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
163
    try {
164
      if (logger.isLoggable(Level.FINE)) {
1✔
165
        Object loggedMsg = msg instanceof ByteBuf ? ByteBufUtil.hexDump((ByteBuf) msg) : msg;
×
166
        logger.log(
×
167
            Level.FINE,
168
            "Unexpected channelRead()->{0} reached end of pipeline {1}",
169
            new Object[] {loggedMsg, ctx.pipeline().names()});
×
170
      }
171
      exceptionCaught(
1✔
172
          ctx,
173
          Status.INTERNAL.withDescription(
1✔
174
              "channelRead() missed by ProtocolNegotiator handler: " + msg)
175
              .asRuntimeException());
1✔
176
    } finally {
177
      ReferenceCountUtil.safeRelease(msg);
1✔
178
    }
179
  }
1✔
180

181
  /**
182
   * Calls to this method will not trigger an immediate flush. The flush will be deferred until
183
   * {@link #writeBufferedAndRemove(ChannelHandlerContext)}.
184
   */
185
  @Override
186
  public void flush(ChannelHandlerContext ctx) {
187
    /*
188
     * Swallowing any flushes is not only an optimization but also required
189
     * for the SslHandler to work correctly. If the SslHandler receives multiple
190
     * flushes while the handshake is still ongoing, then the handshake "randomly"
191
     * times out. Not sure at this point why this is happening. Doing a single flush
192
     * seems to work but multiple flushes don't ...
193
     */
194
    flushRequested = true;
1✔
195
  }
1✔
196

197
  /**
198
   * If we are still performing protocol negotiation, then this will propagate failures to all
199
   * buffered writes.
200
   */
201
  @Override
202
  public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Exception {
203
    Status status = Status.UNAVAILABLE.withDescription(
1✔
204
        "Connection closing while performing protocol negotiation for " + ctx.pipeline().names());
1✔
205
    failWrites(status.asRuntimeException());
1✔
206
    super.close(ctx, future);
1✔
207
  }
1✔
208

209
  @SuppressWarnings("FutureReturnValueIgnored")
210
  final void writeBufferedAndRemove(ChannelHandlerContext ctx) {
211
    // TODO(carl-mastrangelo): remove the isActive check and just fail if not yet ready.
212
    if (!ctx.channel().isActive() || writing) {
1✔
213
      return;
×
214
    }
215
    // Make sure that method can't be reentered, so that the ordering
216
    // in the queue can't be messed up.
217
    writing = true;
1✔
218
    while (!bufferedWrites.isEmpty()) {
1✔
219
      ChannelWrite write = bufferedWrites.poll();
1✔
220
      ctx.write(write.msg, write.promise);
1✔
221
    }
1✔
222
    if (flushRequested) {
1✔
223
      ctx.flush();
1✔
224
    }
225
    // Removal has to happen last as the above writes will likely trigger
226
    // new writes that have to be added to the end of queue in order to not
227
    // mess up the ordering.
228
    ctx.pipeline().remove(this);
1✔
229
  }
1✔
230

231
  /**
232
   * Propagate failures to all buffered writes.
233
   */
234
  @SuppressWarnings("FutureReturnValueIgnored")
235
  private void failWrites(Throwable cause) {
236
    if (failCause == null) {
1✔
237
      failCause = cause;
1✔
238
    } else {
239
      logger.log(Level.FINE, "Ignoring duplicate failure", cause);
1✔
240
    }
241
    while (!bufferedWrites.isEmpty()) {
1✔
242
      ChannelWrite write = bufferedWrites.poll();
1✔
243
      write.promise.setFailure(cause);
1✔
244
      ReferenceCountUtil.release(write.msg);
1✔
245
    }
1✔
246
  }
1✔
247

248
  private static final class ChannelWrite {
249
    final Object msg;
250
    final ChannelPromise promise;
251

252
    ChannelWrite(Object msg, ChannelPromise promise) {
1✔
253
      this.msg = msg;
1✔
254
      this.promise = promise;
1✔
255
    }
1✔
256
  }
257
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc