• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18812

25 Aug 2023 11:55PM UTC coverage: 88.316% (+0.006%) from 88.31%
#18812

push

github-actions

ejona86
interop-testing: Modernize stress test channel creation

OverrideAuthority() is the modern alternative to rewriting the
InetAddress, and can also work if we change the channel creation to use
target strings instead. We can use test CAs now without resorting to the
Netty-specific APIs. After this change, only 4 classes in
interop-testing depend on Netty.

30341 of 34355 relevant lines covered (88.32%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.71
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
20
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
21

22
import com.google.common.base.Preconditions;
23
import com.google.common.util.concurrent.Futures;
24
import com.google.common.util.concurrent.ListenableFuture;
25
import io.grpc.Attributes;
26
import io.grpc.InternalChannelz;
27
import io.grpc.InternalLogId;
28
import io.grpc.InternalStatus;
29
import io.grpc.Metadata;
30
import io.grpc.ServerStreamTracer;
31
import io.grpc.Status;
32
import io.grpc.internal.GrpcUtil;
33
import io.grpc.internal.KeepAliveEnforcer;
34
import io.grpc.internal.KeepAliveManager;
35
import io.grpc.internal.LogExceptionRunnable;
36
import io.grpc.internal.MaxConnectionIdleManager;
37
import io.grpc.internal.ObjectPool;
38
import io.grpc.internal.SerializingExecutor;
39
import io.grpc.internal.ServerTransport;
40
import io.grpc.internal.ServerTransportListener;
41
import io.grpc.internal.StatsTraceContext;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.okhttp.internal.framed.ErrorCode;
44
import io.grpc.okhttp.internal.framed.FrameReader;
45
import io.grpc.okhttp.internal.framed.FrameWriter;
46
import io.grpc.okhttp.internal.framed.Header;
47
import io.grpc.okhttp.internal.framed.HeadersMode;
48
import io.grpc.okhttp.internal.framed.Http2;
49
import io.grpc.okhttp.internal.framed.Settings;
50
import io.grpc.okhttp.internal.framed.Variant;
51
import java.io.IOException;
52
import java.net.Socket;
53
import java.util.List;
54
import java.util.Locale;
55
import java.util.Map;
56
import java.util.TreeMap;
57
import java.util.concurrent.Executor;
58
import java.util.concurrent.ScheduledExecutorService;
59
import java.util.concurrent.ScheduledFuture;
60
import java.util.concurrent.TimeUnit;
61
import java.util.logging.Level;
62
import java.util.logging.Logger;
63
import javax.annotation.Nullable;
64
import javax.annotation.concurrent.GuardedBy;
65
import okio.Buffer;
66
import okio.BufferedSource;
67
import okio.ByteString;
68
import okio.Okio;
69

70
/**
71
 * OkHttp-based server transport.
72
 */
73
final class OkHttpServerTransport implements ServerTransport,
74
      ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport {
75
  private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName());
1✔
76
  private static final int GRACEFUL_SHUTDOWN_PING = 0x1111;
77

78
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1);
1✔
79

80
  private static final int KEEPALIVE_PING = 0xDEAD;
81
  private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method");
1✔
82
  private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT");
1✔
83
  private static final ByteString POST_METHOD = ByteString.encodeUtf8("POST");
1✔
84
  private static final ByteString SCHEME = ByteString.encodeUtf8(":scheme");
1✔
85
  private static final ByteString PATH = ByteString.encodeUtf8(":path");
1✔
86
  private static final ByteString AUTHORITY = ByteString.encodeUtf8(":authority");
1✔
87
  private static final ByteString CONNECTION = ByteString.encodeUtf8("connection");
1✔
88
  private static final ByteString HOST = ByteString.encodeUtf8("host");
1✔
89
  private static final ByteString TE = ByteString.encodeUtf8("te");
1✔
90
  private static final ByteString TE_TRAILERS = ByteString.encodeUtf8("trailers");
1✔
91
  private static final ByteString CONTENT_TYPE = ByteString.encodeUtf8("content-type");
1✔
92
  private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
1✔
93

94
  private final Config config;
95
  private final Variant variant = new Http2();
1✔
96
  private final TransportTracer tracer;
97
  private final InternalLogId logId;
98
  private Socket socket;
99
  private ServerTransportListener listener;
100
  private Executor transportExecutor;
101
  private ScheduledExecutorService scheduledExecutorService;
102
  private Attributes attributes;
103
  private KeepAliveManager keepAliveManager;
104
  private MaxConnectionIdleManager maxConnectionIdleManager;
105
  private ScheduledFuture<?> maxConnectionAgeMonitor;
106
  private final KeepAliveEnforcer keepAliveEnforcer;
107

108
  private final Object lock = new Object();
1✔
109
  @GuardedBy("lock")
110
  private boolean abruptShutdown;
111
  @GuardedBy("lock")
112
  private boolean gracefulShutdown;
113
  @GuardedBy("lock")
114
  private boolean handshakeShutdown;
115
  @GuardedBy("lock")
116
  private InternalChannelz.Security securityInfo;
117
  @GuardedBy("lock")
118
  private ExceptionHandlingFrameWriter frameWriter;
119
  @GuardedBy("lock")
120
  private OutboundFlowController outboundFlow;
121
  @GuardedBy("lock")
1✔
122
  private final Map<Integer, StreamState> streams = new TreeMap<>();
123
  @GuardedBy("lock")
124
  private int lastStreamId;
125
  @GuardedBy("lock")
1✔
126
  private int goAwayStreamId = Integer.MAX_VALUE;
127
  /**
128
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
129
   * streams may continue.
130
   */
131
  @GuardedBy("lock")
132
  private Status goAwayStatus;
133
  /** Non-{@code null} when gracefully shutting down and have not yet sent second GOAWAY. */
134
  @GuardedBy("lock")
135
  private ScheduledFuture<?> secondGoawayTimer;
136
  /** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
137
  @GuardedBy("lock")
138
  private ScheduledFuture<?> forcefulCloseTimer;
139
  @GuardedBy("lock")
1✔
140
  private Long gracefulShutdownPeriod = null;
141

142
  public OkHttpServerTransport(Config config, Socket bareSocket) {
1✔
143
    this.config = Preconditions.checkNotNull(config, "config");
1✔
144
    this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
1✔
145

146
    tracer = config.transportTracerFactory.create();
1✔
147
    tracer.setFlowControlWindowReader(this::readFlowControlWindow);
1✔
148
    logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
1✔
149
    transportExecutor = config.transportExecutorPool.getObject();
1✔
150
    scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
1✔
151
    keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
1✔
152
        config.permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
153
  }
1✔
154

155
  public void start(ServerTransportListener listener) {
156
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
157

158
    SerializingExecutor serializingExecutor = new SerializingExecutor(transportExecutor);
1✔
159
    serializingExecutor.execute(() -> startIo(serializingExecutor));
1✔
160
  }
1✔
161

162
  private void startIo(SerializingExecutor serializingExecutor) {
163
    try {
164
      // The socket implementation is lazily initialized, but had broken thread-safety 
165
      // for that laziness https://bugs.openjdk.org/browse/JDK-8278326. 
166
      // As a workaround, we lock to synchronize initialization with shutdown().
167
      synchronized (lock) {
1✔
168
        socket.setTcpNoDelay(true);
1✔
169
      }
1✔
170
      HandshakerSocketFactory.HandshakeResult result =
1✔
171
          config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
1✔
172
      synchronized (lock) {
1✔
173
        this.socket = result.socket;
1✔
174
      }
1✔
175
      this.attributes = result.attributes;
1✔
176

177
      int maxQueuedControlFrames = 10000;
1✔
178
      AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
179
      asyncSink.becomeConnected(Okio.sink(socket), socket);
1✔
180
      FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
181
          variant.newWriter(Okio.buffer(asyncSink), false));
1✔
182
      FrameWriter writeMonitoringFrameWriter = new ForwardingFrameWriter(rawFrameWriter) {
1✔
183
        @Override
184
        public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
185
            throws IOException {
186
          keepAliveEnforcer.resetCounters();
1✔
187
          super.synReply(outFinished, streamId, headerBlock);
1✔
188
        }
1✔
189

190
        @Override
191
        public void headers(int streamId, List<Header> headerBlock) throws IOException {
192
          keepAliveEnforcer.resetCounters();
1✔
193
          super.headers(streamId, headerBlock);
1✔
194
        }
1✔
195

196
        @Override
197
        public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
198
            throws IOException {
199
          keepAliveEnforcer.resetCounters();
1✔
200
          super.data(outFinished, streamId, source, byteCount);
1✔
201
        }
1✔
202
      };
203
      synchronized (lock) {
1✔
204
        this.securityInfo = result.securityInfo;
1✔
205

206
        // Handle FrameWriter exceptions centrally, since there are many callers. Note that
207
        // errors coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink
208
        // does not propagate syscall errors through the FrameWriter. But we handle the
209
        // AsyncSink failures with the same TransportExceptionHandler instance so it is all
210
        // mixed back together.
211
        frameWriter = new ExceptionHandlingFrameWriter(this, writeMonitoringFrameWriter);
1✔
212
        outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
213

214
        // These writes will be queued in the serializingExecutor waiting for this function to
215
        // return.
216
        frameWriter.connectionPreface();
1✔
217
        Settings settings = new Settings();
1✔
218
        OkHttpSettingsUtil.set(settings,
1✔
219
            OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
220
        OkHttpSettingsUtil.set(settings,
1✔
221
            OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
222
        frameWriter.settings(settings);
1✔
223
        if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
1✔
224
          frameWriter.windowUpdate(
1✔
225
              Utils.CONNECTION_STREAM_ID, config.flowControlWindow - Utils.DEFAULT_WINDOW_SIZE);
226
        }
227
        frameWriter.flush();
1✔
228
      }
1✔
229

230
      if (config.keepAliveTimeNanos != GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
231
        keepAliveManager = new KeepAliveManager(
1✔
232
            new KeepAlivePinger(), scheduledExecutorService, config.keepAliveTimeNanos,
233
            config.keepAliveTimeoutNanos, true);
234
        keepAliveManager.onTransportStarted();
1✔
235
      }
236

237
      if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
238
        maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
1✔
239
        maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
1✔
240
      }
241

242
      if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
243
        long maxConnectionAgeInNanos =
1✔
244
            (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
1✔
245
        maxConnectionAgeMonitor = scheduledExecutorService.schedule(
1✔
246
            new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
1✔
247
            maxConnectionAgeInNanos,
248
            TimeUnit.NANOSECONDS);
249
      }
250

251
      transportExecutor.execute(new FrameHandler(
1✔
252
          variant.newReader(Okio.buffer(Okio.source(socket)), false)));
1✔
253
    } catch (Error | IOException | RuntimeException ex) {
1✔
254
      synchronized (lock) {
1✔
255
        if (!handshakeShutdown) {
1✔
256
          log.log(Level.INFO, "Socket failed to handshake", ex);
1✔
257
        }
258
      }
1✔
259
      GrpcUtil.closeQuietly(socket);
1✔
260
      terminated();
1✔
261
    }
1✔
262
  }
1✔
263

264
  @Override
265
  public void shutdown() {
266
    shutdown(null);
1✔
267
  }
1✔
268

269
  private void shutdown(@Nullable Long gracefulShutdownPeriod) {
270
    synchronized (lock) {
1✔
271
      if (gracefulShutdown || abruptShutdown) {
1✔
272
        return;
1✔
273
      }
274
      gracefulShutdown = true;
1✔
275
      this.gracefulShutdownPeriod = gracefulShutdownPeriod;
1✔
276
      if (frameWriter == null) {
1✔
277
        handshakeShutdown = true;
1✔
278
        GrpcUtil.closeQuietly(socket);
1✔
279
      } else {
280
        // RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
281
        // we also set a timer to limit the upper bound in case the PING is excessively stalled or
282
        // the client is malicious.
283
        secondGoawayTimer = scheduledExecutorService.schedule(
1✔
284
            this::triggerGracefulSecondGoaway,
285
            GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
286
        frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
1✔
287
        frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
1✔
288
        frameWriter.flush();
1✔
289
      }
290
    }
1✔
291
  }
1✔
292

293
  private void triggerGracefulSecondGoaway() {
294
    synchronized (lock) {
1✔
295
      if (secondGoawayTimer == null) {
1✔
296
        return;
×
297
      }
298
      secondGoawayTimer.cancel(false);
1✔
299
      secondGoawayTimer = null;
1✔
300
      frameWriter.goAway(lastStreamId, ErrorCode.NO_ERROR, new byte[0]);
1✔
301
      goAwayStreamId = lastStreamId;
1✔
302
      if (streams.isEmpty()) {
1✔
303
        frameWriter.close();
1✔
304
      } else {
305
        frameWriter.flush();
1✔
306
      }
307
      if (gracefulShutdownPeriod != null) {
1✔
308
        forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
309
            this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
1✔
310
      }
311
    }
1✔
312
  }
1✔
313

314
  @Override
315
  public void shutdownNow(Status reason) {
316
    synchronized (lock) {
1✔
317
      if (frameWriter == null) {
1✔
318
        handshakeShutdown = true;
1✔
319
        GrpcUtil.closeQuietly(socket);
1✔
320
        return;
1✔
321
      }
322
    }
1✔
323
    abruptShutdown(ErrorCode.NO_ERROR, "", reason, true);
1✔
324
  }
1✔
325

326
  /**
327
   * Finish all active streams due to an IOException, then close the transport.
328
   */
329
  @Override
330
  public void onException(Throwable failureCause) {
331
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
332
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
333
    abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
334
  }
1✔
335

336
  private void abruptShutdown(
337
      ErrorCode errorCode, String moreDetail, Status reason, boolean rstStreams) {
338
    synchronized (lock) {
1✔
339
      if (abruptShutdown) {
1✔
340
        return;
1✔
341
      }
342
      abruptShutdown = true;
1✔
343
      goAwayStatus = reason;
1✔
344

345
      if (secondGoawayTimer != null) {
1✔
346
        secondGoawayTimer.cancel(false);
1✔
347
        secondGoawayTimer = null;
1✔
348
      }
349
      for (Map.Entry<Integer, StreamState> entry : streams.entrySet()) {
1✔
350
        if (rstStreams) {
1✔
351
          frameWriter.rstStream(entry.getKey(), ErrorCode.CANCEL);
1✔
352
        }
353
        entry.getValue().transportReportStatus(reason);
1✔
354
      }
1✔
355
      streams.clear();
1✔
356

357
      // RFC7540 §5.4.1. Attempt to inform the client what went wrong. We try to write the GOAWAY
358
      // _and then_ close our side of the connection. But place an upper-bound for how long we wait
359
      // for I/O with a timer, which forcefully closes the socket.
360
      frameWriter.goAway(lastStreamId, errorCode, moreDetail.getBytes(GrpcUtil.US_ASCII));
1✔
361
      goAwayStreamId = lastStreamId;
1✔
362
      frameWriter.close();
1✔
363
      forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
364
          this::triggerForcefulClose, 1, TimeUnit.SECONDS);
365
    }
1✔
366
  }
1✔
367

368
  private void triggerForcefulClose() {
369
    // Safe to do unconditionally; no need to check if timer cancellation raced
370
    GrpcUtil.closeQuietly(socket);
1✔
371
  }
1✔
372

373
  private void terminated() {
374
    synchronized (lock) {
1✔
375
      if (forcefulCloseTimer != null) {
1✔
376
        forcefulCloseTimer.cancel(false);
1✔
377
        forcefulCloseTimer = null;
1✔
378
      }
379
    }
1✔
380
    if (keepAliveManager != null) {
1✔
381
      keepAliveManager.onTransportTermination();
1✔
382
    }
383
    if (maxConnectionIdleManager != null) {
1✔
384
      maxConnectionIdleManager.onTransportTermination();
1✔
385
    }
386

387
    if (maxConnectionAgeMonitor != null) {
1✔
388
      maxConnectionAgeMonitor.cancel(false);
1✔
389
    }
390
    transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
1✔
391
    scheduledExecutorService =
1✔
392
        config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
1✔
393
    listener.transportTerminated();
1✔
394
  }
1✔
395

396
  @Override
397
  public ScheduledExecutorService getScheduledExecutorService() {
398
    return scheduledExecutorService;
1✔
399
  }
400

401
  @Override
402
  public ListenableFuture<InternalChannelz.SocketStats> getStats() {
403
    synchronized (lock) {
1✔
404
      return Futures.immediateFuture(new InternalChannelz.SocketStats(
1✔
405
          tracer.getStats(),
1✔
406
          socket.getLocalSocketAddress(),
1✔
407
          socket.getRemoteSocketAddress(),
1✔
408
          Utils.getSocketOptions(socket),
1✔
409
          securityInfo));
410
    }
411
  }
412

413
  private TransportTracer.FlowControlWindows readFlowControlWindow() {
414
    synchronized (lock) {
1✔
415
      long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
416
      // connectionUnacknowledgedBytesRead is only readable by FrameHandler, so we provide a lower
417
      // bound.
418
      long remote = (long) (config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO);
1✔
419
      return new TransportTracer.FlowControlWindows(local, remote);
1✔
420
    }
421
  }
422

423
  @Override
424
  public InternalLogId getLogId() {
425
    return logId;
1✔
426
  }
427

428
  @Override
429
  public OutboundFlowController.StreamState[] getActiveStreams() {
430
    synchronized (lock) {
1✔
431
      OutboundFlowController.StreamState[] flowStreams =
1✔
432
          new OutboundFlowController.StreamState[streams.size()];
1✔
433
      int i = 0;
1✔
434
      for (StreamState stream : streams.values()) {
1✔
435
        flowStreams[i++] = stream.getOutboundFlowState();
1✔
436
      }
1✔
437
      return flowStreams;
1✔
438
    }
439
  }
440

441
  /**
442
   * Notify the transport that the stream was closed. Any frames for the stream must be enqueued
443
   * before calling.
444
   */
445
  void streamClosed(int streamId, boolean flush) {
446
    synchronized (lock) {
1✔
447
      streams.remove(streamId);
1✔
448
      if (streams.isEmpty()) {
1✔
449
        keepAliveEnforcer.onTransportIdle();
1✔
450
        if (maxConnectionIdleManager != null) {
1✔
451
          maxConnectionIdleManager.onTransportIdle();
1✔
452
        }
453
      }
454
      if (gracefulShutdown && streams.isEmpty()) {
1✔
455
        frameWriter.close();
1✔
456
      } else {
457
        if (flush) {
1✔
458
          frameWriter.flush();
1✔
459
        }
460
      }
461
    }
1✔
462
  }
1✔
463

464
  private static String asciiString(ByteString value) {
465
    // utf8() string is cached in ByteString, so we prefer it when the contents are ASCII. This
466
    // provides benefit if the header was reused via HPACK.
467
    for (int i = 0; i < value.size(); i++) {
1✔
468
      if (value.getByte(i) < 0) {
1✔
469
        return value.string(GrpcUtil.US_ASCII);
×
470
      }
471
    }
472
    return value.utf8();
1✔
473
  }
474

475
  private static int headerFind(List<Header> header, ByteString key, int startIndex) {
476
    for (int i = startIndex; i < header.size(); i++) {
1✔
477
      if (header.get(i).name.equals(key)) {
1✔
478
        return i;
1✔
479
      }
480
    }
481
    return -1;
1✔
482
  }
483

484
  private static boolean headerContains(List<Header> header, ByteString key) {
485
    return headerFind(header, key, 0) != -1;
1✔
486
  }
487

488
  private static void headerRemove(List<Header> header, ByteString key) {
489
    int i = 0;
1✔
490
    while ((i = headerFind(header, key, i)) != -1) {
1✔
491
      header.remove(i);
1✔
492
    }
493
  }
1✔
494

495
  /** Assumes that caller requires this field, so duplicates are treated as missing. */
496
  private static ByteString headerGetRequiredSingle(List<Header> header, ByteString key) {
497
    int i = headerFind(header, key, 0);
1✔
498
    if (i == -1) {
1✔
499
      return null;
1✔
500
    }
501
    if (headerFind(header, key, i + 1) != -1) {
1✔
502
      return null;
1✔
503
    }
504
    return header.get(i).value;
1✔
505
  }
506

507
  static final class Config {
508
    final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
509
    final ObjectPool<Executor> transportExecutorPool;
510
    final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
511
    final TransportTracer.Factory transportTracerFactory;
512
    final HandshakerSocketFactory handshakerSocketFactory;
513
    final long keepAliveTimeNanos;
514
    final long keepAliveTimeoutNanos;
515
    final int flowControlWindow;
516
    final int maxInboundMessageSize;
517
    final int maxInboundMetadataSize;
518
    final long maxConnectionIdleNanos;
519
    final boolean permitKeepAliveWithoutCalls;
520
    final long permitKeepAliveTimeInNanos;
521
    final long maxConnectionAgeInNanos;
522
    final long maxConnectionAgeGraceInNanos;
523

524
    public Config(
525
        OkHttpServerBuilder builder,
526
        List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
1✔
527
      this.streamTracerFactories = Preconditions.checkNotNull(
1✔
528
          streamTracerFactories, "streamTracerFactories");
529
      transportExecutorPool = Preconditions.checkNotNull(
1✔
530
          builder.transportExecutorPool, "transportExecutorPool");
531
      scheduledExecutorServicePool = Preconditions.checkNotNull(
1✔
532
          builder.scheduledExecutorServicePool, "scheduledExecutorServicePool");
533
      transportTracerFactory = Preconditions.checkNotNull(
1✔
534
          builder.transportTracerFactory, "transportTracerFactory");
535
      handshakerSocketFactory = Preconditions.checkNotNull(
1✔
536
          builder.handshakerSocketFactory, "handshakerSocketFactory");
537
      keepAliveTimeNanos = builder.keepAliveTimeNanos;
1✔
538
      keepAliveTimeoutNanos = builder.keepAliveTimeoutNanos;
1✔
539
      flowControlWindow = builder.flowControlWindow;
1✔
540
      maxInboundMessageSize = builder.maxInboundMessageSize;
1✔
541
      maxInboundMetadataSize = builder.maxInboundMetadataSize;
1✔
542
      maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
1✔
543
      permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
1✔
544
      permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
1✔
545
      maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
1✔
546
      maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
1✔
547
    }
1✔
548
  }
549

550
  /**
551
   * Runnable which reads frames and dispatches them to in flight calls.
552
   */
553
  class FrameHandler implements FrameReader.Handler, Runnable {
554
    private final OkHttpFrameLogger frameLogger =
1✔
555
        new OkHttpFrameLogger(Level.FINE, OkHttpServerTransport.class);
556
    private final FrameReader frameReader;
557
    private boolean receivedSettings;
558
    private int connectionUnacknowledgedBytesRead;
559

560
    public FrameHandler(FrameReader frameReader) {
1✔
561
      this.frameReader = frameReader;
1✔
562
    }
1✔
563

564
    @Override
565
    public void run() {
566
      String threadName = Thread.currentThread().getName();
1✔
567
      Thread.currentThread().setName("OkHttpServerTransport");
1✔
568
      try {
569
        frameReader.readConnectionPreface();
1✔
570
        if (!frameReader.nextFrame(this)) {
1✔
571
          connectionError(ErrorCode.INTERNAL_ERROR, "Failed to read initial SETTINGS");
1✔
572
          return;
1✔
573
        }
574
        if (!receivedSettings) {
1✔
575
          connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
576
              "First HTTP/2 frame must be SETTINGS. RFC7540 section 3.5");
577
          return;
1✔
578
        }
579
        // Read until the underlying socket closes.
580
        while (frameReader.nextFrame(this)) {
1✔
581
          if (keepAliveManager != null) {
1✔
582
            keepAliveManager.onDataReceived();
1✔
583
          }
584
        }
585
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
586
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
587
        // nothing, otherwise, we finish all streams since it's a real IO issue.
588
        Status status;
589
        synchronized (lock) {
1✔
590
          status = goAwayStatus;
1✔
591
        }
1✔
592
        if (status == null) {
1✔
593
          status = Status.UNAVAILABLE.withDescription("TCP connection closed or IOException");
1✔
594
        }
595
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
596
      } catch (Throwable t) {
1✔
597
        log.log(Level.WARNING, "Error decoding HTTP/2 frames", t);
1✔
598
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "Error in frame decoder",
1✔
599
            Status.INTERNAL.withDescription("Error decoding HTTP/2 frames").withCause(t), false);
1✔
600
      } finally {
601
        // Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
602
        try {
603
          GrpcUtil.exhaust(socket.getInputStream());
1✔
604
        } catch (IOException ex) {
1✔
605
          // Unable to wait, so just proceed to tear-down. The socket is probably already closed so
606
          // the GOAWAY can't be sent anyway.
607
        }
1✔
608
        GrpcUtil.closeQuietly(socket);
1✔
609
        terminated();
1✔
610
        Thread.currentThread().setName(threadName);
1✔
611
      }
612
    }
1✔
613

614
    /**
615
     * Handle HTTP2 HEADER and CONTINUATION frames.
616
     */
617
    @Override
618
    public void headers(boolean outFinished,
619
        boolean inFinished,
620
        int streamId,
621
        int associatedStreamId,
622
        List<Header> headerBlock,
623
        HeadersMode headersMode) {
624
      frameLogger.logHeaders(
1✔
625
          OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
626
      // streamId == 0 checking is in HTTP/2 decoder
627
      if ((streamId & 1) == 0) {
1✔
628
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
629
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
630
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
631
        return;
1✔
632
      }
633
      boolean newStream;
634
      synchronized (lock) {
1✔
635
        if (streamId > goAwayStreamId) {
1✔
636
          return;
×
637
        }
638
        newStream = streamId > lastStreamId;
1✔
639
        if (newStream) {
1✔
640
          lastStreamId = streamId;
1✔
641
        }
642
      }
1✔
643

644
      int metadataSize = headerBlockSize(headerBlock);
1✔
645
      if (metadataSize > config.maxInboundMetadataSize) {
1✔
646
        respondWithHttpError(streamId, inFinished, 431, Status.Code.RESOURCE_EXHAUSTED,
1✔
647
            String.format(
1✔
648
                Locale.US,
649
                "Request metadata larger than %d: %d",
650
                config.maxInboundMetadataSize,
1✔
651
                metadataSize));
1✔
652
        return;
1✔
653
      }
654

655
      headerRemove(headerBlock, ByteString.EMPTY);
1✔
656

657
      ByteString httpMethod = null;
1✔
658
      ByteString scheme = null;
1✔
659
      ByteString path = null;
1✔
660
      ByteString authority = null;
1✔
661
      while (headerBlock.size() > 0 && headerBlock.get(0).name.getByte(0) == ':') {
1✔
662
        Header header = headerBlock.remove(0);
1✔
663
        if (HTTP_METHOD.equals(header.name) && httpMethod == null) {
1✔
664
          httpMethod = header.value;
1✔
665
        } else if (SCHEME.equals(header.name) && scheme == null) {
1✔
666
          scheme = header.value;
1✔
667
        } else if (PATH.equals(header.name) && path == null) {
1✔
668
          path = header.value;
1✔
669
        } else if (AUTHORITY.equals(header.name) && authority == null) {
1✔
670
          authority = header.value;
1✔
671
        } else {
672
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
673
              "Unexpected pseudo header. RFC7540 section 8.1.2.1");
674
          return;
1✔
675
        }
676
      }
1✔
677
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
678
        if (headerBlock.get(i).name.getByte(0) == ':') {
1✔
679
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
680
              "Pseudo header not before regular headers. RFC7540 section 8.1.2.1");
681
          return;
1✔
682
        }
683
      }
684
      if (!CONNECT_METHOD.equals(httpMethod)
1✔
685
          && newStream
686
          && (httpMethod == null || scheme == null || path == null)) {
687
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
688
            "Missing required pseudo header. RFC7540 section 8.1.2.3");
689
        return;
1✔
690
      }
691
      if (headerContains(headerBlock, CONNECTION)) {
1✔
692
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
693
            "Connection-specific headers not permitted. RFC7540 section 8.1.2.2");
694
        return;
1✔
695
      }
696

697
      if (!newStream) {
1✔
698
        if (inFinished) {
1✔
699
          synchronized (lock) {
1✔
700
            StreamState stream = streams.get(streamId);
1✔
701
            if (stream == null) {
1✔
702
              streamError(streamId, ErrorCode.STREAM_CLOSED, "Received headers for closed stream");
×
703
              return;
×
704
            }
705
            if (stream.hasReceivedEndOfStream()) {
1✔
706
              streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
707
                  "Received HEADERS for half-closed (remote) stream. RFC7540 section 5.1");
708
              return;
1✔
709
            }
710
            // Ignore the trailers, but still half-close the stream
711
            stream.inboundDataReceived(new Buffer(), 0, 0, true);
1✔
712
            return;
1✔
713
          }
714
        } else {
715
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
716
              "Headers disallowed in the middle of the stream. RFC7540 section 8.1");
717
          return;
1✔
718
        }
719
      }
720

721
      if (authority == null) {
1✔
722
        int i = headerFind(headerBlock, HOST, 0);
1✔
723
        if (i != -1) {
1✔
724
          if (headerFind(headerBlock, HOST, i + 1) != -1) {
1✔
725
            respondWithHttpError(streamId, inFinished, 400, Status.Code.INTERNAL,
1✔
726
                "Multiple host headers disallowed. RFC7230 section 5.4");
727
            return;
1✔
728
          }
729
          authority = headerBlock.get(i).value;
1✔
730
        }
731
      }
732
      headerRemove(headerBlock, HOST);
1✔
733

734
      // Remove the leading slash of the path and get the fully qualified method name
735
      if (path.size() == 0 || path.getByte(0) != '/') {
1✔
736
        respondWithHttpError(streamId, inFinished, 404, Status.Code.UNIMPLEMENTED,
1✔
737
            "Expected path to start with /: " + asciiString(path));
1✔
738
        return;
1✔
739
      }
740
      String method = asciiString(path).substring(1);
1✔
741

742
      ByteString contentType = headerGetRequiredSingle(headerBlock, CONTENT_TYPE);
1✔
743
      if (contentType == null) {
1✔
744
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
745
            "Content-Type is missing or duplicated");
746
        return;
1✔
747
      }
748
      String contentTypeString = asciiString(contentType);
1✔
749
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
750
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
751
            "Content-Type is not supported: " + contentTypeString);
752
        return;
1✔
753
      }
754

755
      if (!POST_METHOD.equals(httpMethod)) {
1✔
756
        respondWithHttpError(streamId, inFinished, 405, Status.Code.INTERNAL,
1✔
757
            "HTTP Method is not supported: " + asciiString(httpMethod));
1✔
758
        return;
1✔
759
      }
760

761
      ByteString te = headerGetRequiredSingle(headerBlock, TE);
1✔
762
      if (!TE_TRAILERS.equals(te)) {
1✔
763
        respondWithGrpcError(streamId, inFinished, Status.Code.INTERNAL,
1✔
764
            String.format("Expected header TE: %s, but %s is received. "
1✔
765
              + "Some intermediate proxy may not support trailers",
766
              asciiString(TE_TRAILERS), te == null ? "<missing>" : asciiString(te)));
1✔
767
        return;
1✔
768
      }
769
      headerRemove(headerBlock, CONTENT_LENGTH);
1✔
770

771
      Metadata metadata = Utils.convertHeaders(headerBlock);
1✔
772
      StatsTraceContext statsTraceCtx =
1✔
773
          StatsTraceContext.newServerContext(config.streamTracerFactories, method, metadata);
1✔
774
      synchronized (lock) {
1✔
775
        OkHttpServerStream.TransportState stream = new OkHttpServerStream.TransportState(
1✔
776
            OkHttpServerTransport.this,
777
            streamId,
778
            config.maxInboundMessageSize,
1✔
779
            statsTraceCtx,
780
            lock,
1✔
781
            frameWriter,
1✔
782
            outboundFlow,
1✔
783
            config.flowControlWindow,
1✔
784
            tracer,
1✔
785
            method);
786
        OkHttpServerStream streamForApp = new OkHttpServerStream(
1✔
787
            stream,
788
            attributes,
1✔
789
            authority == null ? null : asciiString(authority),
1✔
790
            statsTraceCtx,
791
            tracer);
1✔
792
        if (streams.isEmpty()) {
1✔
793
          keepAliveEnforcer.onTransportActive();
1✔
794
          if (maxConnectionIdleManager != null) {
1✔
795
            maxConnectionIdleManager.onTransportActive();
1✔
796
          }
797
        }
798
        streams.put(streamId, stream);
1✔
799
        listener.streamCreated(streamForApp, method, metadata);
1✔
800
        stream.onStreamAllocated();
1✔
801
        if (inFinished) {
1✔
802
          stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
1✔
803
        }
804
      }
1✔
805
    }
1✔
806

807
    private int headerBlockSize(List<Header> headerBlock) {
808
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
809
      long size = 0;
1✔
810
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
811
        Header header = headerBlock.get(i);
1✔
812
        size += 32 + header.name.size() + header.value.size();
1✔
813
      }
814
      size = Math.min(size, Integer.MAX_VALUE);
1✔
815
      return (int) size;
1✔
816
    }
817

818
    /**
819
     * Handle an HTTP2 DATA frame.
820
     */
821
    @Override
822
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
823
                     int paddedLength)
824
        throws IOException {
825
      frameLogger.logData(
1✔
826
          OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
1✔
827
      if (streamId == 0) {
1✔
828
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
829
            "Stream 0 is reserved for control messages. RFC7540 section 5.1.1");
830
        return;
1✔
831
      }
832
      if ((streamId & 1) == 0) {
1✔
833
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
834
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
835
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
836
        return;
1✔
837
      }
838

839
      // Wait until the frame is complete. We only support 16 KiB frames, and the max permitted in
840
      // HTTP/2 is 16 MiB. This is verified in OkHttp's Http2 deframer, so we don't need to be
841
      // concerned with the window being exceeded at this point.
842
      in.require(length);
1✔
843

844
      synchronized (lock) {
1✔
845
        StreamState stream = streams.get(streamId);
1✔
846
        if (stream == null) {
1✔
847
          in.skip(length);
1✔
848
          streamError(streamId, ErrorCode.STREAM_CLOSED, "Received data for closed stream");
1✔
849
          return;
1✔
850
        }
851
        if (stream.hasReceivedEndOfStream()) {
1✔
852
          in.skip(length);
1✔
853
          streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
854
              "Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
855
          return;
1✔
856
        }
857
        if (stream.inboundWindowAvailable() < paddedLength) {
1✔
858
          in.skip(length);
1✔
859
          streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
1✔
860
              "Received DATA size exceeded window size. RFC7540 section 6.9");
861
          return;
1✔
862
        }
863
        Buffer buf = new Buffer();
1✔
864
        buf.write(in.getBuffer(), length);
1✔
865
        stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
1✔
866
      }
1✔
867

868
      // connection window update
869
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
870
      if (connectionUnacknowledgedBytesRead
1✔
871
          >= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
872
        synchronized (lock) {
1✔
873
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
874
          frameWriter.flush();
1✔
875
        }
1✔
876
        connectionUnacknowledgedBytesRead = 0;
1✔
877
      }
878
    }
1✔
879

880
    @Override
881
    public void rstStream(int streamId, ErrorCode errorCode) {
882
      frameLogger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
883
      // streamId == 0 checking is in HTTP/2 decoder
884

885
      if (!(ErrorCode.NO_ERROR.equals(errorCode)
1✔
886
            || ErrorCode.CANCEL.equals(errorCode)
1✔
887
            || ErrorCode.STREAM_CLOSED.equals(errorCode))) {
1✔
888
        log.log(Level.INFO, "Received RST_STREAM: " + errorCode);
×
889
      }
890
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
891
          .withDescription("RST_STREAM");
1✔
892
      synchronized (lock) {
1✔
893
        StreamState stream = streams.get(streamId);
1✔
894
        if (stream != null) {
1✔
895
          stream.inboundRstReceived(status);
1✔
896
          streamClosed(streamId, /*flush=*/ false);
1✔
897
        }
898
      }
1✔
899
    }
1✔
900

901
    @Override
902
    public void settings(boolean clearPrevious, Settings settings) {
903
      frameLogger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
904
      synchronized (lock) {
1✔
905
        boolean outboundWindowSizeIncreased = false;
1✔
906
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
907
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
908
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
909
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
910
        }
911

912
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
913
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
914
        // otherwise it will cause a stream error (RST_STREAM).
915
        frameWriter.ackSettings(settings);
1✔
916
        frameWriter.flush();
1✔
917
        if (!receivedSettings) {
1✔
918
          receivedSettings = true;
1✔
919
          attributes = listener.transportReady(attributes);
1✔
920
        }
921

922
        // send any pending bytes / streams
923
        if (outboundWindowSizeIncreased) {
1✔
924
          outboundFlow.writeStreams();
1✔
925
        }
926
      }
1✔
927
    }
1✔
928

929
    @Override
930
    public void ping(boolean ack, int payload1, int payload2) {
931
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
932
        abruptShutdown(ErrorCode.ENHANCE_YOUR_CALM, "too_many_pings",
1✔
933
            Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"), false);
1✔
934
        return;
1✔
935
      }
936
      long payload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
937
      if (!ack) {
1✔
938
        frameLogger.logPing(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
939
        synchronized (lock) {
1✔
940
          frameWriter.ping(true, payload1, payload2);
1✔
941
          frameWriter.flush();
1✔
942
        }
1✔
943
      } else {
944
        frameLogger.logPingAck(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
945
        if (KEEPALIVE_PING == payload) {
1✔
946
          return;
×
947
        }
948
        if (GRACEFUL_SHUTDOWN_PING == payload) {
1✔
949
          triggerGracefulSecondGoaway();
1✔
950
          return;
1✔
951
        }
952
        log.log(Level.INFO, "Received unexpected ping ack: " + payload);
×
953
      }
954
    }
1✔
955

956
    @Override
957
    public void ackSettings() {}
1✔
958

959
    @Override
960
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
961
      frameLogger.logGoAway(
1✔
962
          OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
963
      String description = String.format("Received GOAWAY: %s '%s'", errorCode, debugData.utf8());
1✔
964
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
965
          .withDescription(description);
1✔
966
      if (!ErrorCode.NO_ERROR.equals(errorCode)) {
1✔
967
        log.log(
×
968
            Level.WARNING, "Received GOAWAY: {0} {1}", new Object[] {errorCode, debugData.utf8()});
×
969
      }
970
      synchronized (lock) {
1✔
971
        goAwayStatus = status;
1✔
972
      }
1✔
973
    }
1✔
974

975
    @Override
976
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
977
        throws IOException {
978
      frameLogger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
979
          streamId, promisedStreamId, requestHeaders);
980
      // streamId == 0 checking is in HTTP/2 decoder.
981
      // The server doesn't use PUSH_PROMISE, so all even streams are IDLE, and odd streams are not
982
      // peer-initiated.
983
      connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
984
          "PUSH_PROMISE only allowed on peer-initiated streams. RFC7540 section 6.6");
985
    }
1✔
986

987
    @Override
988
    public void windowUpdate(int streamId, long delta) {
989
      frameLogger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
990
      // delta == 0 checking is in HTTP/2 decoder. And it isn't quite right, as it will always cause
991
      // a GOAWAY. RFC7540 section 6.9 says to use RST_STREAM if the stream id isn't 0. Doesn't
992
      // matter much though.
993
      synchronized (lock) {
1✔
994
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
995
          outboundFlow.windowUpdate(null, (int) delta);
1✔
996
        } else {
997
          StreamState stream = streams.get(streamId);
1✔
998
          if (stream != null) {
1✔
999
            outboundFlow.windowUpdate(stream.getOutboundFlowState(), (int) delta);
1✔
1000
          }
1001
        }
1002
      }
1✔
1003
    }
1✔
1004

1005
    @Override
1006
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1007
      frameLogger.logPriority(
1✔
1008
          OkHttpFrameLogger.Direction.INBOUND, streamId, streamDependency, weight, exclusive);
1009
      // streamId == 0 checking is in HTTP/2 decoder.
1010
      // Ignore priority change.
1011
    }
1✔
1012

1013
    @Override
1014
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1015
        int port, long maxAge) {}
×
1016

1017
    /**
1018
     * Send GOAWAY to the server, then finish all streams and close the transport. RFC7540 §5.4.1.
1019
     */
1020
    private void connectionError(ErrorCode errorCode, String moreDetail) {
1021
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1022
          .withDescription(String.format("HTTP2 connection error: %s '%s'", errorCode, moreDetail));
1✔
1023
      abruptShutdown(errorCode, moreDetail, status, false);
1✔
1024
    }
1✔
1025

1026
    /**
1027
     * Respond with RST_STREAM, making sure to kill the associated stream if it exists. Reason will
1028
     * rarely be seen. RFC7540 §5.4.2.
1029
     */
1030
    private void streamError(int streamId, ErrorCode errorCode, String reason) {
1031
      if (errorCode == ErrorCode.PROTOCOL_ERROR) {
1✔
1032
        log.log(
1✔
1033
            Level.FINE, "Responding with RST_STREAM {0}: {1}", new Object[] {errorCode, reason});
1034
      }
1035
      synchronized (lock) {
1✔
1036
        frameWriter.rstStream(streamId, errorCode);
1✔
1037
        frameWriter.flush();
1✔
1038
        StreamState stream = streams.get(streamId);
1✔
1039
        if (stream != null) {
1✔
1040
          stream.transportReportStatus(
1✔
1041
              Status.INTERNAL.withDescription(
1✔
1042
                  String.format("Responded with RST_STREAM %s: %s", errorCode, reason)));
1✔
1043
          streamClosed(streamId, /*flush=*/ false);
1✔
1044
        }
1045
      }
1✔
1046
    }
1✔
1047

1048
    private void respondWithHttpError(
1049
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg) {
1050
      Metadata metadata = new Metadata();
1✔
1051
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1052
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1053
      List<Header> headers =
1✔
1054
          Headers.createHttpResponseHeaders(httpCode, "text/plain; charset=utf-8", metadata);
1✔
1055
      Buffer data = new Buffer().writeUtf8(msg);
1✔
1056

1057
      synchronized (lock) {
1✔
1058
        Http2ErrorStreamState stream =
1✔
1059
            new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
1✔
1060
        if (streams.isEmpty()) {
1✔
1061
          keepAliveEnforcer.onTransportActive();
1✔
1062
          if (maxConnectionIdleManager != null) {
1✔
1063
            maxConnectionIdleManager.onTransportActive();
1✔
1064
          }
1065
        }
1066
        streams.put(streamId, stream);
1✔
1067
        if (inFinished) {
1✔
1068
          stream.inboundDataReceived(new Buffer(), 0, 0, true);
×
1069
        }
1070
        frameWriter.headers(streamId, headers);
1✔
1071
        outboundFlow.data(
1✔
1072
            /*outFinished=*/true, stream.getOutboundFlowState(), data, /*flush=*/true);
1✔
1073
        outboundFlow.notifyWhenNoPendingData(
1✔
1074
            stream.getOutboundFlowState(), () -> rstOkAtEndOfHttpError(stream));
1✔
1075
      }
1✔
1076
    }
1✔
1077

1078
    private void rstOkAtEndOfHttpError(Http2ErrorStreamState stream) {
1079
      synchronized (lock) {
1✔
1080
        if (!stream.hasReceivedEndOfStream()) {
1✔
1081
          frameWriter.rstStream(stream.streamId, ErrorCode.NO_ERROR);
1✔
1082
        }
1083
        streamClosed(stream.streamId, /*flush=*/ true);
1✔
1084
      }
1✔
1085
    }
1✔
1086

1087
    private void respondWithGrpcError(
1088
        int streamId, boolean inFinished, Status.Code statusCode, String msg) {
1089
      Metadata metadata = new Metadata();
1✔
1090
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1091
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1092
      List<Header> headers = Headers.createResponseTrailers(metadata, false);
1✔
1093

1094
      synchronized (lock) {
1✔
1095
        frameWriter.synReply(true, streamId, headers);
1✔
1096
        if (!inFinished) {
1✔
1097
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
1098
        }
1099
        frameWriter.flush();
1✔
1100
      }
1✔
1101
    }
1✔
1102
  }
1103

1104
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1✔
1105
    @Override
1106
    public void ping() {
1107
      synchronized (lock) {
×
1108
        frameWriter.ping(false, 0, KEEPALIVE_PING);
×
1109
        frameWriter.flush();
×
1110
      }
×
1111
      tracer.reportKeepAliveSent();
×
1112
    }
×
1113

1114
    @Override
1115
    public void onPingTimeout() {
1116
      synchronized (lock) {
×
1117
        goAwayStatus = Status.UNAVAILABLE
×
1118
            .withDescription("Keepalive failed. Considering connection dead");
×
1119
        GrpcUtil.closeQuietly(socket);
×
1120
      }
×
1121
    }
×
1122
  }
1123

1124
  interface StreamState {
1125
    /** Must be holding 'lock' when calling. */
1126
    void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
1127

1128
    /** Must be holding 'lock' when calling. */
1129
    boolean hasReceivedEndOfStream();
1130

1131
    /** Must be holding 'lock' when calling. */
1132
    int inboundWindowAvailable();
1133

1134
    /** Must be holding 'lock' when calling. */
1135
    void transportReportStatus(Status status);
1136

1137
    /** Must be holding 'lock' when calling. */
1138
    void inboundRstReceived(Status status);
1139

1140
    OutboundFlowController.StreamState getOutboundFlowState();
1141
  }
1142

1143
  static class Http2ErrorStreamState implements StreamState, OutboundFlowController.Stream {
1144
    private final int streamId;
1145
    private final Object lock;
1146
    private final OutboundFlowController.StreamState outboundFlowState;
1147
    @GuardedBy("lock")
1148
    private int window;
1149
    @GuardedBy("lock")
1150
    private boolean receivedEndOfStream;
1151

1152
    Http2ErrorStreamState(
1153
        int streamId, Object lock, OutboundFlowController outboundFlow, int initialWindowSize) {
1✔
1154
      this.streamId = streamId;
1✔
1155
      this.lock = lock;
1✔
1156
      this.outboundFlowState = outboundFlow.createState(this, streamId);
1✔
1157
      this.window = initialWindowSize;
1✔
1158
    }
1✔
1159

1160
    @Override public void onSentBytes(int frameBytes) {}
1✔
1161

1162
    @Override public void inboundDataReceived(
1163
        Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
1164
      synchronized (lock) {
×
1165
        if (endOfStream) {
×
1166
          receivedEndOfStream = true;
×
1167
        }
1168
        window -= dataLength + paddingLength;
×
1169
        try {
1170
          frame.skip(frame.size()); // Recycle segments
×
1171
        } catch (IOException ex) {
×
1172
          throw new AssertionError(ex);
×
1173
        }
×
1174
      }
×
1175
    }
×
1176

1177
    @Override public boolean hasReceivedEndOfStream() {
1178
      synchronized (lock) {
1✔
1179
        return receivedEndOfStream;
1✔
1180
      }
1181
    }
1182

1183
    @Override public int inboundWindowAvailable() {
1184
      synchronized (lock) {
×
1185
        return window;
×
1186
      }
1187
    }
1188

1189
    @Override public void transportReportStatus(Status status) {}
×
1190

1191
    @Override public void inboundRstReceived(Status status) {}
×
1192

1193
    @Override public OutboundFlowController.StreamState getOutboundFlowState() {
1194
      synchronized (lock) {
1✔
1195
        return outboundFlowState;
1✔
1196
      }
1197
    }
1198
  }
1199
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc