• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20066

12 Nov 2025 06:39PM UTC coverage: 88.552% (+0.002%) from 88.55%
#20066

push

github

web-flow
alts: Metadata server address modification to account for default port

Fixing the utilization of the GCE Metadata host server address
environment variable to account for the case where the user does not
specify a port (defaults to port 8080)

b/451639946

35080 of 39615 relevant lines covered (88.55%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.21
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
20
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
21

22
import com.google.common.base.Preconditions;
23
import com.google.common.collect.Lists;
24
import com.google.common.util.concurrent.Futures;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.InternalChannelz;
29
import io.grpc.InternalLogId;
30
import io.grpc.InternalStatus;
31
import io.grpc.Metadata;
32
import io.grpc.ServerStreamTracer;
33
import io.grpc.Status;
34
import io.grpc.internal.GrpcUtil;
35
import io.grpc.internal.KeepAliveEnforcer;
36
import io.grpc.internal.KeepAliveManager;
37
import io.grpc.internal.LogExceptionRunnable;
38
import io.grpc.internal.MaxConnectionIdleManager;
39
import io.grpc.internal.ObjectPool;
40
import io.grpc.internal.SerializingExecutor;
41
import io.grpc.internal.ServerTransport;
42
import io.grpc.internal.ServerTransportListener;
43
import io.grpc.internal.StatsTraceContext;
44
import io.grpc.internal.TransportTracer;
45
import io.grpc.okhttp.internal.framed.ErrorCode;
46
import io.grpc.okhttp.internal.framed.FrameReader;
47
import io.grpc.okhttp.internal.framed.FrameWriter;
48
import io.grpc.okhttp.internal.framed.Header;
49
import io.grpc.okhttp.internal.framed.HeadersMode;
50
import io.grpc.okhttp.internal.framed.Http2;
51
import io.grpc.okhttp.internal.framed.Settings;
52
import io.grpc.okhttp.internal.framed.Variant;
53
import java.io.IOException;
54
import java.net.Socket;
55
import java.net.SocketException;
56
import java.util.Collections;
57
import java.util.List;
58
import java.util.Locale;
59
import java.util.Map;
60
import java.util.TreeMap;
61
import java.util.concurrent.Executor;
62
import java.util.concurrent.ScheduledExecutorService;
63
import java.util.concurrent.ScheduledFuture;
64
import java.util.concurrent.TimeUnit;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68
import okio.Buffer;
69
import okio.BufferedSource;
70
import okio.ByteString;
71
import okio.Okio;
72

73
/**
74
 * OkHttp-based server transport.
75
 */
76
final class OkHttpServerTransport implements ServerTransport,
77
      ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport {
78
  private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName());
1✔
79
  private static final int GRACEFUL_SHUTDOWN_PING = 0x1111;
80

81
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1);
1✔
82

83
  private static final int KEEPALIVE_PING = 0xDEAD;
84
  private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method");
1✔
85
  private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT");
1✔
86
  private static final ByteString POST_METHOD = ByteString.encodeUtf8("POST");
1✔
87
  private static final ByteString SCHEME = ByteString.encodeUtf8(":scheme");
1✔
88
  private static final ByteString PATH = ByteString.encodeUtf8(":path");
1✔
89
  private static final ByteString AUTHORITY = ByteString.encodeUtf8(":authority");
1✔
90
  private static final ByteString CONNECTION = ByteString.encodeUtf8("connection");
1✔
91
  private static final ByteString HOST = ByteString.encodeUtf8("host");
1✔
92
  private static final ByteString TE = ByteString.encodeUtf8("te");
1✔
93
  private static final ByteString TE_TRAILERS = ByteString.encodeUtf8("trailers");
1✔
94
  private static final ByteString CONTENT_TYPE = ByteString.encodeUtf8("content-type");
1✔
95
  private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
1✔
96
  private static final ByteString ALLOW = ByteString.encodeUtf8("allow");
1✔
97

98
  private final Config config;
99
  private final Variant variant = new Http2();
1✔
100
  private final TransportTracer tracer;
101
  private final InternalLogId logId;
102
  private Socket socket;
103
  private ServerTransportListener listener;
104
  private Executor transportExecutor;
105
  private ScheduledExecutorService scheduledExecutorService;
106
  private Attributes attributes;
107
  private KeepAliveManager keepAliveManager;
108
  private MaxConnectionIdleManager maxConnectionIdleManager;
109
  private ScheduledFuture<?> maxConnectionAgeMonitor;
110
  private final KeepAliveEnforcer keepAliveEnforcer;
111

112
  private final Object lock = new Object();
1✔
113
  @GuardedBy("lock")
114
  private boolean abruptShutdown;
115
  @GuardedBy("lock")
116
  private boolean gracefulShutdown;
117
  @GuardedBy("lock")
118
  private boolean handshakeShutdown;
119
  @GuardedBy("lock")
120
  private InternalChannelz.Security securityInfo;
121
  @GuardedBy("lock")
122
  private ExceptionHandlingFrameWriter frameWriter;
123
  @GuardedBy("lock")
124
  private OutboundFlowController outboundFlow;
125
  @GuardedBy("lock")
1✔
126
  private final Map<Integer, StreamState> streams = new TreeMap<>();
127
  @GuardedBy("lock")
128
  private int lastStreamId;
129
  @GuardedBy("lock")
1✔
130
  private int goAwayStreamId = Integer.MAX_VALUE;
131
  /**
132
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
133
   * streams may continue.
134
   */
135
  @GuardedBy("lock")
136
  private Status goAwayStatus;
137
  /** Non-{@code null} when gracefully shutting down and have not yet sent second GOAWAY. */
138
  @GuardedBy("lock")
139
  private ScheduledFuture<?> secondGoawayTimer;
140
  /** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
141
  @GuardedBy("lock")
142
  private ScheduledFuture<?> forcefulCloseTimer;
143
  @GuardedBy("lock")
1✔
144
  private Long gracefulShutdownPeriod = null;
145

146
  public OkHttpServerTransport(Config config, Socket bareSocket) {
1✔
147
    this.config = Preconditions.checkNotNull(config, "config");
1✔
148
    this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
1✔
149

150
    tracer = config.transportTracerFactory.create();
1✔
151
    tracer.setFlowControlWindowReader(this::readFlowControlWindow);
1✔
152
    logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
1✔
153
    transportExecutor = config.transportExecutorPool.getObject();
1✔
154
    scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
1✔
155
    keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
1✔
156
        config.permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
157
  }
1✔
158

159
  public void start(ServerTransportListener listener) {
160
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
161

162
    SerializingExecutor serializingExecutor = new SerializingExecutor(transportExecutor);
1✔
163
    serializingExecutor.execute(() -> startIo(serializingExecutor));
1✔
164
  }
1✔
165

166
  private void startIo(SerializingExecutor serializingExecutor) {
167
    try {
168
      // The socket implementation is lazily initialized, but had broken thread-safety 
169
      // for that laziness https://bugs.openjdk.org/browse/JDK-8278326. 
170
      // As a workaround, we lock to synchronize initialization with shutdown().
171
      synchronized (lock) {
1✔
172
        socket.setTcpNoDelay(true);
1✔
173
      }
1✔
174
      HandshakerSocketFactory.HandshakeResult result =
1✔
175
          config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
1✔
176
      synchronized (lock) {
1✔
177
        if (socket.isClosed()) {
1✔
178
          // The wrapped socket may not handle the underlying socket being closed by shutdown(). In
179
          // particular, SSLSocket hangs future reads if the underlying socket is already closed at
180
          // this point, even if you call sslSocket.close() later.
181
          result.socket.close();
×
182
          throw new SocketException("Socket close raced with handshake");
×
183
        }
184
        this.socket = result.socket;
1✔
185
      }
1✔
186
      this.attributes = result.attributes;
1✔
187

188
      int maxQueuedControlFrames = 10000;
1✔
189
      AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
190
      asyncSink.becomeConnected(Okio.sink(socket), socket);
1✔
191
      FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
192
          variant.newWriter(Okio.buffer(asyncSink), false));
1✔
193
      FrameWriter writeMonitoringFrameWriter = new ForwardingFrameWriter(rawFrameWriter) {
1✔
194
        @Override
195
        public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
196
            throws IOException {
197
          keepAliveEnforcer.resetCounters();
1✔
198
          super.synReply(outFinished, streamId, headerBlock);
1✔
199
        }
1✔
200

201
        @Override
202
        public void headers(int streamId, List<Header> headerBlock) throws IOException {
203
          keepAliveEnforcer.resetCounters();
1✔
204
          super.headers(streamId, headerBlock);
1✔
205
        }
1✔
206

207
        @Override
208
        public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
209
            throws IOException {
210
          keepAliveEnforcer.resetCounters();
1✔
211
          super.data(outFinished, streamId, source, byteCount);
1✔
212
        }
1✔
213
      };
214
      synchronized (lock) {
1✔
215
        this.securityInfo = result.securityInfo;
1✔
216

217
        // Handle FrameWriter exceptions centrally, since there are many callers. Note that
218
        // errors coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink
219
        // does not propagate syscall errors through the FrameWriter. But we handle the
220
        // AsyncSink failures with the same TransportExceptionHandler instance so it is all
221
        // mixed back together.
222
        frameWriter = new ExceptionHandlingFrameWriter(this, writeMonitoringFrameWriter);
1✔
223
        outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
224

225
        // These writes will be queued in the serializingExecutor waiting for this function to
226
        // return.
227
        frameWriter.connectionPreface();
1✔
228
        Settings settings = new Settings();
1✔
229
        OkHttpSettingsUtil.set(settings,
1✔
230
            OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
231
        OkHttpSettingsUtil.set(settings,
1✔
232
            OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
233
        if (config.maxConcurrentStreams != Integer.MAX_VALUE) {
1✔
234
          OkHttpSettingsUtil.set(settings,
1✔
235
              OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams);
236
        }
237
        frameWriter.settings(settings);
1✔
238
        if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
1✔
239
          frameWriter.windowUpdate(
1✔
240
              Utils.CONNECTION_STREAM_ID, config.flowControlWindow - Utils.DEFAULT_WINDOW_SIZE);
241
        }
242
        frameWriter.flush();
1✔
243
      }
1✔
244

245
      if (config.keepAliveTimeNanos != GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
246
        keepAliveManager = new KeepAliveManager(
1✔
247
            new KeepAlivePinger(), scheduledExecutorService, config.keepAliveTimeNanos,
248
            config.keepAliveTimeoutNanos, true);
249
        keepAliveManager.onTransportStarted();
1✔
250
      }
251

252
      if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
253
        maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
1✔
254
        maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
1✔
255
      }
256

257
      if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
258
        long maxConnectionAgeInNanos =
1✔
259
            (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
1✔
260
        maxConnectionAgeMonitor = scheduledExecutorService.schedule(
1✔
261
            new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
1✔
262
            maxConnectionAgeInNanos,
263
            TimeUnit.NANOSECONDS);
264
      }
265

266
      transportExecutor.execute(new FrameHandler(
1✔
267
          variant.newReader(Okio.buffer(Okio.source(socket)), false)));
1✔
268
    } catch (Error | IOException | RuntimeException ex) {
1✔
269
      synchronized (lock) {
1✔
270
        if (!handshakeShutdown) {
1✔
271
          log.log(Level.INFO, "Socket failed to handshake", ex);
1✔
272
        }
273
      }
1✔
274
      GrpcUtil.closeQuietly(socket);
1✔
275
      terminated();
1✔
276
    }
1✔
277
  }
1✔
278

279
  @Override
280
  public void shutdown() {
281
    shutdown(null);
1✔
282
  }
1✔
283

284
  private void shutdown(@Nullable Long gracefulShutdownPeriod) {
285
    synchronized (lock) {
1✔
286
      if (gracefulShutdown || abruptShutdown) {
1✔
287
        return;
1✔
288
      }
289
      gracefulShutdown = true;
1✔
290
      this.gracefulShutdownPeriod = gracefulShutdownPeriod;
1✔
291
      if (frameWriter == null) {
1✔
292
        handshakeShutdown = true;
1✔
293
        GrpcUtil.closeQuietly(socket);
1✔
294
      } else {
295
        // RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
296
        // we also set a timer to limit the upper bound in case the PING is excessively stalled or
297
        // the client is malicious.
298
        secondGoawayTimer = scheduledExecutorService.schedule(
1✔
299
            this::triggerGracefulSecondGoaway,
300
            GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
301
        frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
1✔
302
        frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
1✔
303
        frameWriter.flush();
1✔
304
      }
305
    }
1✔
306
  }
1✔
307

308
  private void triggerGracefulSecondGoaway() {
309
    synchronized (lock) {
1✔
310
      if (secondGoawayTimer == null) {
1✔
311
        return;
×
312
      }
313
      secondGoawayTimer.cancel(false);
1✔
314
      secondGoawayTimer = null;
1✔
315
      frameWriter.goAway(lastStreamId, ErrorCode.NO_ERROR, new byte[0]);
1✔
316
      goAwayStreamId = lastStreamId;
1✔
317
      if (streams.isEmpty()) {
1✔
318
        frameWriter.close();
1✔
319
      } else {
320
        frameWriter.flush();
1✔
321
      }
322
      if (gracefulShutdownPeriod != null) {
1✔
323
        forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
324
            this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
1✔
325
      }
326
    }
1✔
327
  }
1✔
328

329
  @Override
330
  public void shutdownNow(Status reason) {
331
    synchronized (lock) {
1✔
332
      if (frameWriter == null) {
1✔
333
        handshakeShutdown = true;
1✔
334
        GrpcUtil.closeQuietly(socket);
1✔
335
        return;
1✔
336
      }
337
    }
1✔
338
    abruptShutdown(ErrorCode.NO_ERROR, "", reason, true);
1✔
339
  }
1✔
340

341
  /**
342
   * Finish all active streams due to an IOException, then close the transport.
343
   */
344
  @Override
345
  public void onException(Throwable failureCause) {
346
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
347
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
348
    abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
349
  }
1✔
350

351
  private void abruptShutdown(
352
      ErrorCode errorCode, String moreDetail, Status reason, boolean rstStreams) {
353
    synchronized (lock) {
1✔
354
      if (abruptShutdown) {
1✔
355
        return;
1✔
356
      }
357
      abruptShutdown = true;
1✔
358
      goAwayStatus = reason;
1✔
359

360
      if (secondGoawayTimer != null) {
1✔
361
        secondGoawayTimer.cancel(false);
1✔
362
        secondGoawayTimer = null;
1✔
363
      }
364
      for (Map.Entry<Integer, StreamState> entry : streams.entrySet()) {
1✔
365
        if (rstStreams) {
1✔
366
          frameWriter.rstStream(entry.getKey(), ErrorCode.CANCEL);
1✔
367
        }
368
        entry.getValue().transportReportStatus(reason);
1✔
369
      }
1✔
370
      streams.clear();
1✔
371

372
      // RFC7540 §5.4.1. Attempt to inform the client what went wrong. We try to write the GOAWAY
373
      // _and then_ close our side of the connection. But place an upper-bound for how long we wait
374
      // for I/O with a timer, which forcefully closes the socket.
375
      frameWriter.goAway(lastStreamId, errorCode, moreDetail.getBytes(GrpcUtil.US_ASCII));
1✔
376
      goAwayStreamId = lastStreamId;
1✔
377
      frameWriter.close();
1✔
378
      forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
379
          this::triggerForcefulClose, 1, TimeUnit.SECONDS);
380
    }
1✔
381
  }
1✔
382

383
  private void triggerForcefulClose() {
384
    // Safe to do unconditionally; no need to check if timer cancellation raced
385
    GrpcUtil.closeQuietly(socket);
1✔
386
  }
1✔
387

388
  private void terminated() {
389
    synchronized (lock) {
1✔
390
      if (forcefulCloseTimer != null) {
1✔
391
        forcefulCloseTimer.cancel(false);
1✔
392
        forcefulCloseTimer = null;
1✔
393
      }
394
    }
1✔
395
    if (keepAliveManager != null) {
1✔
396
      keepAliveManager.onTransportTermination();
1✔
397
    }
398
    if (maxConnectionIdleManager != null) {
1✔
399
      maxConnectionIdleManager.onTransportTermination();
1✔
400
    }
401

402
    if (maxConnectionAgeMonitor != null) {
1✔
403
      maxConnectionAgeMonitor.cancel(false);
1✔
404
    }
405
    transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
1✔
406
    scheduledExecutorService =
1✔
407
        config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
1✔
408
    listener.transportTerminated();
1✔
409
  }
1✔
410

411
  @Override
412
  public ScheduledExecutorService getScheduledExecutorService() {
413
    return scheduledExecutorService;
1✔
414
  }
415

416
  @Override
417
  public ListenableFuture<InternalChannelz.SocketStats> getStats() {
418
    synchronized (lock) {
1✔
419
      return Futures.immediateFuture(new InternalChannelz.SocketStats(
1✔
420
          tracer.getStats(),
1✔
421
          socket.getLocalSocketAddress(),
1✔
422
          socket.getRemoteSocketAddress(),
1✔
423
          Utils.getSocketOptions(socket),
1✔
424
          securityInfo));
425
    }
426
  }
427

428
  private TransportTracer.FlowControlWindows readFlowControlWindow() {
429
    synchronized (lock) {
1✔
430
      long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
431
      // connectionUnacknowledgedBytesRead is only readable by FrameHandler, so we provide a lower
432
      // bound.
433
      long remote = (long) (config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO);
1✔
434
      return new TransportTracer.FlowControlWindows(local, remote);
1✔
435
    }
436
  }
437

438
  @Override
439
  public InternalLogId getLogId() {
440
    return logId;
1✔
441
  }
442

443
  @Override
444
  public OutboundFlowController.StreamState[] getActiveStreams() {
445
    synchronized (lock) {
1✔
446
      OutboundFlowController.StreamState[] flowStreams =
1✔
447
          new OutboundFlowController.StreamState[streams.size()];
1✔
448
      int i = 0;
1✔
449
      for (StreamState stream : streams.values()) {
1✔
450
        flowStreams[i++] = stream.getOutboundFlowState();
1✔
451
      }
1✔
452
      return flowStreams;
1✔
453
    }
454
  }
455

456
  /**
457
   * Notify the transport that the stream was closed. Any frames for the stream must be enqueued
458
   * before calling.
459
   */
460
  void streamClosed(int streamId, boolean flush) {
461
    synchronized (lock) {
1✔
462
      streams.remove(streamId);
1✔
463
      if (streams.isEmpty()) {
1✔
464
        keepAliveEnforcer.onTransportIdle();
1✔
465
        if (maxConnectionIdleManager != null) {
1✔
466
          maxConnectionIdleManager.onTransportIdle();
1✔
467
        }
468
      }
469
      if (gracefulShutdown && streams.isEmpty()) {
1✔
470
        frameWriter.close();
1✔
471
      } else {
472
        if (flush) {
1✔
473
          frameWriter.flush();
1✔
474
        }
475
      }
476
    }
1✔
477
  }
1✔
478

479
  private static String asciiString(ByteString value) {
480
    // utf8() string is cached in ByteString, so we prefer it when the contents are ASCII. This
481
    // provides benefit if the header was reused via HPACK.
482
    for (int i = 0; i < value.size(); i++) {
1✔
483
      if (value.getByte(i) < 0) {
1✔
484
        return value.string(GrpcUtil.US_ASCII);
×
485
      }
486
    }
487
    return value.utf8();
1✔
488
  }
489

490
  private static int headerFind(List<Header> header, ByteString key, int startIndex) {
491
    for (int i = startIndex; i < header.size(); i++) {
1✔
492
      if (header.get(i).name.equals(key)) {
1✔
493
        return i;
1✔
494
      }
495
    }
496
    return -1;
1✔
497
  }
498

499
  private static boolean headerContains(List<Header> header, ByteString key) {
500
    return headerFind(header, key, 0) != -1;
1✔
501
  }
502

503
  private static void headerRemove(List<Header> header, ByteString key) {
504
    int i = 0;
1✔
505
    while ((i = headerFind(header, key, i)) != -1) {
1✔
506
      header.remove(i);
1✔
507
    }
508
  }
1✔
509

510
  /** Assumes that caller requires this field, so duplicates are treated as missing. */
511
  private static ByteString headerGetRequiredSingle(List<Header> header, ByteString key) {
512
    int i = headerFind(header, key, 0);
1✔
513
    if (i == -1) {
1✔
514
      return null;
1✔
515
    }
516
    if (headerFind(header, key, i + 1) != -1) {
1✔
517
      return null;
1✔
518
    }
519
    return header.get(i).value;
1✔
520
  }
521

522
  static final class Config {
523
    final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
524
    final ObjectPool<Executor> transportExecutorPool;
525
    final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
526
    final TransportTracer.Factory transportTracerFactory;
527
    final HandshakerSocketFactory handshakerSocketFactory;
528
    final long keepAliveTimeNanos;
529
    final long keepAliveTimeoutNanos;
530
    final int flowControlWindow;
531
    final int maxInboundMessageSize;
532
    final int maxInboundMetadataSize;
533
    final long maxConnectionIdleNanos;
534
    final boolean permitKeepAliveWithoutCalls;
535
    final long permitKeepAliveTimeInNanos;
536
    final long maxConnectionAgeInNanos;
537
    final long maxConnectionAgeGraceInNanos;
538
    final int maxConcurrentStreams;
539

540
    public Config(
541
        OkHttpServerBuilder builder,
542
        List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
1✔
543
      this.streamTracerFactories = Preconditions.checkNotNull(
1✔
544
          streamTracerFactories, "streamTracerFactories");
545
      transportExecutorPool = Preconditions.checkNotNull(
1✔
546
          builder.transportExecutorPool, "transportExecutorPool");
547
      scheduledExecutorServicePool = Preconditions.checkNotNull(
1✔
548
          builder.scheduledExecutorServicePool, "scheduledExecutorServicePool");
549
      transportTracerFactory = Preconditions.checkNotNull(
1✔
550
          builder.transportTracerFactory, "transportTracerFactory");
551
      handshakerSocketFactory = Preconditions.checkNotNull(
1✔
552
          builder.handshakerSocketFactory, "handshakerSocketFactory");
553
      keepAliveTimeNanos = builder.keepAliveTimeNanos;
1✔
554
      keepAliveTimeoutNanos = builder.keepAliveTimeoutNanos;
1✔
555
      flowControlWindow = builder.flowControlWindow;
1✔
556
      maxInboundMessageSize = builder.maxInboundMessageSize;
1✔
557
      maxInboundMetadataSize = builder.maxInboundMetadataSize;
1✔
558
      maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
1✔
559
      permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
1✔
560
      permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
1✔
561
      maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
1✔
562
      maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
1✔
563
      maxConcurrentStreams = builder.maxConcurrentCallsPerConnection;
1✔
564
    }
1✔
565
  }
566

567
  /**
568
   * Runnable which reads frames and dispatches them to in flight calls.
569
   */
570
  class FrameHandler implements FrameReader.Handler, Runnable {
571
    private final OkHttpFrameLogger frameLogger =
1✔
572
        new OkHttpFrameLogger(Level.FINE, OkHttpServerTransport.class);
573
    private final FrameReader frameReader;
574
    private boolean receivedSettings;
575
    private int connectionUnacknowledgedBytesRead;
576

577
    public FrameHandler(FrameReader frameReader) {
1✔
578
      this.frameReader = frameReader;
1✔
579
    }
1✔
580

581
    @Override
582
    public void run() {
583
      String threadName = Thread.currentThread().getName();
1✔
584
      Thread.currentThread().setName("OkHttpServerTransport");
1✔
585
      try {
586
        frameReader.readConnectionPreface();
1✔
587
        if (!frameReader.nextFrame(this)) {
1✔
588
          connectionError(ErrorCode.INTERNAL_ERROR, "Failed to read initial SETTINGS");
1✔
589
          return;
1✔
590
        }
591
        if (!receivedSettings) {
1✔
592
          connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
593
              "First HTTP/2 frame must be SETTINGS. RFC7540 section 3.5");
594
          return;
1✔
595
        }
596
        // Read until the underlying socket closes.
597
        while (frameReader.nextFrame(this)) {
1✔
598
          if (keepAliveManager != null) {
1✔
599
            keepAliveManager.onDataReceived();
1✔
600
          }
601
        }
602
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
603
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
604
        // nothing, otherwise, we finish all streams since it's a real IO issue.
605
        Status status;
606
        synchronized (lock) {
1✔
607
          status = goAwayStatus;
1✔
608
        }
1✔
609
        if (status == null) {
1✔
610
          status = Status.UNAVAILABLE.withDescription("TCP connection closed or IOException");
1✔
611
        }
612
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
613
      } catch (Throwable t) {
1✔
614
        log.log(Level.WARNING, "Error decoding HTTP/2 frames", t);
1✔
615
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "Error in frame decoder",
1✔
616
            Status.INTERNAL.withDescription("Error decoding HTTP/2 frames").withCause(t), false);
1✔
617
      } finally {
618
        // Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
619
        try {
620
          GrpcUtil.exhaust(socket.getInputStream());
1✔
621
        } catch (IOException ex) {
1✔
622
          // Unable to wait, so just proceed to tear-down. The socket is probably already closed so
623
          // the GOAWAY can't be sent anyway.
624
        }
1✔
625
        GrpcUtil.closeQuietly(socket);
1✔
626
        terminated();
1✔
627
        Thread.currentThread().setName(threadName);
1✔
628
      }
629
    }
1✔
630

631
    /**
632
     * Handle HTTP2 HEADER and CONTINUATION frames.
633
     */
634
    @Override
635
    public void headers(boolean outFinished,
636
        boolean inFinished,
637
        int streamId,
638
        int associatedStreamId,
639
        List<Header> headerBlock,
640
        HeadersMode headersMode) {
641
      frameLogger.logHeaders(
1✔
642
          OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
643
      // streamId == 0 checking is in HTTP/2 decoder
644
      if ((streamId & 1) == 0) {
1✔
645
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
646
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
647
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
648
        return;
1✔
649
      }
650
      boolean newStream;
651
      synchronized (lock) {
1✔
652
        if (streamId > goAwayStreamId) {
1✔
653
          return;
×
654
        }
655
        newStream = streamId > lastStreamId;
1✔
656
        if (newStream) {
1✔
657
          lastStreamId = streamId;
1✔
658
          if (config.maxConcurrentStreams <= streams.size()) {
1✔
659
            streamError(streamId, ErrorCode.REFUSED_STREAM,
1✔
660
                "Max concurrent stream reached. RFC7540 section 5.1.2");
661
            return;
1✔
662
          }
663
        }
664
      }
1✔
665

666
      int metadataSize = headerBlockSize(headerBlock);
1✔
667
      if (metadataSize > config.maxInboundMetadataSize) {
1✔
668
        respondWithHttpError(streamId, inFinished, 431, Status.Code.RESOURCE_EXHAUSTED,
1✔
669
            String.format(
1✔
670
                Locale.US,
671
                "Request metadata larger than %d: %d",
672
                config.maxInboundMetadataSize,
1✔
673
                metadataSize));
1✔
674
        return;
1✔
675
      }
676

677
      headerRemove(headerBlock, ByteString.EMPTY);
1✔
678

679
      ByteString httpMethod = null;
1✔
680
      ByteString scheme = null;
1✔
681
      ByteString path = null;
1✔
682
      ByteString authority = null;
1✔
683
      while (headerBlock.size() > 0 && headerBlock.get(0).name.getByte(0) == ':') {
1✔
684
        Header header = headerBlock.remove(0);
1✔
685
        if (HTTP_METHOD.equals(header.name) && httpMethod == null) {
1✔
686
          httpMethod = header.value;
1✔
687
        } else if (SCHEME.equals(header.name) && scheme == null) {
1✔
688
          scheme = header.value;
1✔
689
        } else if (PATH.equals(header.name) && path == null) {
1✔
690
          path = header.value;
1✔
691
        } else if (AUTHORITY.equals(header.name) && authority == null) {
1✔
692
          authority = header.value;
1✔
693
        } else {
694
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
695
              "Unexpected pseudo header. RFC7540 section 8.1.2.1");
696
          return;
1✔
697
        }
698
      }
1✔
699
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
700
        if (headerBlock.get(i).name.getByte(0) == ':') {
1✔
701
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
702
              "Pseudo header not before regular headers. RFC7540 section 8.1.2.1");
703
          return;
1✔
704
        }
705
      }
706
      if (!CONNECT_METHOD.equals(httpMethod)
1✔
707
          && newStream
708
          && (httpMethod == null || scheme == null || path == null)) {
709
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
710
            "Missing required pseudo header. RFC7540 section 8.1.2.3");
711
        return;
1✔
712
      }
713
      if (headerContains(headerBlock, CONNECTION)) {
1✔
714
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
715
            "Connection-specific headers not permitted. RFC7540 section 8.1.2.2");
716
        return;
1✔
717
      }
718

719
      if (!newStream) {
1✔
720
        if (inFinished) {
1✔
721
          synchronized (lock) {
1✔
722
            StreamState stream = streams.get(streamId);
1✔
723
            if (stream == null) {
1✔
724
              streamError(streamId, ErrorCode.STREAM_CLOSED, "Received headers for closed stream");
×
725
              return;
×
726
            }
727
            if (stream.hasReceivedEndOfStream()) {
1✔
728
              streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
729
                  "Received HEADERS for half-closed (remote) stream. RFC7540 section 5.1");
730
              return;
1✔
731
            }
732
            // Ignore the trailers, but still half-close the stream
733
            stream.inboundDataReceived(new Buffer(), 0, 0, true);
1✔
734
            return;
1✔
735
          }
736
        } else {
737
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
738
              "Headers disallowed in the middle of the stream. RFC7540 section 8.1");
739
          return;
1✔
740
        }
741
      }
742

743
      if (authority == null) {
1✔
744
        int i = headerFind(headerBlock, HOST, 0);
1✔
745
        if (i != -1) {
1✔
746
          if (headerFind(headerBlock, HOST, i + 1) != -1) {
1✔
747
            respondWithHttpError(streamId, inFinished, 400, Status.Code.INTERNAL,
1✔
748
                "Multiple host headers disallowed. RFC7230 section 5.4");
749
            return;
1✔
750
          }
751
          authority = headerBlock.get(i).value;
1✔
752
        }
753
      }
754
      headerRemove(headerBlock, HOST);
1✔
755

756
      // Remove the leading slash of the path and get the fully qualified method name
757
      if (path.size() == 0 || path.getByte(0) != '/') {
1✔
758
        respondWithHttpError(streamId, inFinished, 404, Status.Code.UNIMPLEMENTED,
1✔
759
            "Expected path to start with /: " + asciiString(path));
1✔
760
        return;
1✔
761
      }
762
      String method = asciiString(path).substring(1);
1✔
763

764
      ByteString contentType = headerGetRequiredSingle(headerBlock, CONTENT_TYPE);
1✔
765
      if (contentType == null) {
1✔
766
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
767
            "Content-Type is missing or duplicated");
768
        return;
1✔
769
      }
770
      String contentTypeString = asciiString(contentType);
1✔
771
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
772
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
773
            "Content-Type is not supported: " + contentTypeString);
774
        return;
1✔
775
      }
776

777
      if (!POST_METHOD.equals(httpMethod)) {
1✔
778
        List<Header> extraHeaders = Lists.newArrayList(new Header(ALLOW, POST_METHOD));
1✔
779
        respondWithHttpError(streamId, inFinished, 405, Status.Code.INTERNAL,
1✔
780
            "HTTP Method is not supported: " + asciiString(httpMethod), extraHeaders);
1✔
781
        return;
1✔
782
      }
783

784
      ByteString te = headerGetRequiredSingle(headerBlock, TE);
1✔
785
      if (!TE_TRAILERS.equals(te)) {
1✔
786
        respondWithGrpcError(streamId, inFinished, Status.Code.INTERNAL,
1✔
787
            String.format("Expected header TE: %s, but %s is received. "
1✔
788
              + "Some intermediate proxy may not support trailers",
789
              asciiString(TE_TRAILERS), te == null ? "<missing>" : asciiString(te)));
1✔
790
        return;
1✔
791
      }
792
      headerRemove(headerBlock, CONTENT_LENGTH);
1✔
793

794
      Metadata metadata = Utils.convertHeaders(headerBlock);
1✔
795
      StatsTraceContext statsTraceCtx =
1✔
796
          StatsTraceContext.newServerContext(config.streamTracerFactories, method, metadata);
1✔
797
      synchronized (lock) {
1✔
798
        OkHttpServerStream.TransportState stream = new OkHttpServerStream.TransportState(
1✔
799
            OkHttpServerTransport.this,
800
            streamId,
801
            config.maxInboundMessageSize,
1✔
802
            statsTraceCtx,
803
            lock,
1✔
804
            frameWriter,
1✔
805
            outboundFlow,
1✔
806
            config.flowControlWindow,
1✔
807
            tracer,
1✔
808
            method);
809
        OkHttpServerStream streamForApp = new OkHttpServerStream(
1✔
810
            stream,
811
            attributes,
1✔
812
            authority == null ? null : asciiString(authority),
1✔
813
            statsTraceCtx,
814
            tracer);
1✔
815
        if (streams.isEmpty()) {
1✔
816
          keepAliveEnforcer.onTransportActive();
1✔
817
          if (maxConnectionIdleManager != null) {
1✔
818
            maxConnectionIdleManager.onTransportActive();
1✔
819
          }
820
        }
821
        streams.put(streamId, stream);
1✔
822
        listener.streamCreated(streamForApp, method, metadata);
1✔
823
        stream.onStreamAllocated();
1✔
824
        if (inFinished) {
1✔
825
          stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
1✔
826
        }
827
      }
1✔
828
    }
1✔
829

830
    private int headerBlockSize(List<Header> headerBlock) {
831
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
832
      long size = 0;
1✔
833
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
834
        Header header = headerBlock.get(i);
1✔
835
        size += 32 + header.name.size() + header.value.size();
1✔
836
      }
837
      size = Math.min(size, Integer.MAX_VALUE);
1✔
838
      return (int) size;
1✔
839
    }
840

841
    /**
842
     * Handle an HTTP2 DATA frame.
843
     */
844
    @Override
845
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
846
                     int paddedLength)
847
        throws IOException {
848
      frameLogger.logData(
1✔
849
          OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
1✔
850
      if (streamId == 0) {
1✔
851
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
852
            "Stream 0 is reserved for control messages. RFC7540 section 5.1.1");
853
        return;
1✔
854
      }
855
      if ((streamId & 1) == 0) {
1✔
856
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
857
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
858
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
859
        return;
1✔
860
      }
861

862
      // Wait until the frame is complete. We only support 16 KiB frames, and the max permitted in
863
      // HTTP/2 is 16 MiB. This is verified in OkHttp's Http2 deframer, so we don't need to be
864
      // concerned with the window being exceeded at this point.
865
      in.require(length);
1✔
866

867
      synchronized (lock) {
1✔
868
        StreamState stream = streams.get(streamId);
1✔
869
        if (stream == null) {
1✔
870
          in.skip(length);
1✔
871
          streamError(streamId, ErrorCode.STREAM_CLOSED, "Received data for closed stream");
1✔
872
          return;
1✔
873
        }
874
        if (stream.hasReceivedEndOfStream()) {
1✔
875
          in.skip(length);
1✔
876
          streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
877
              "Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
878
          return;
1✔
879
        }
880
        if (stream.inboundWindowAvailable() < paddedLength) {
1✔
881
          in.skip(length);
1✔
882
          streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
1✔
883
              "Received DATA size exceeded window size. RFC7540 section 6.9");
884
          return;
1✔
885
        }
886
        Buffer buf = new Buffer();
1✔
887
        buf.write(in.getBuffer(), length);
1✔
888
        stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
1✔
889
      }
1✔
890

891
      // connection window update
892
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
893
      if (connectionUnacknowledgedBytesRead
1✔
894
          >= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
895
        synchronized (lock) {
1✔
896
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
897
          frameWriter.flush();
1✔
898
        }
1✔
899
        connectionUnacknowledgedBytesRead = 0;
1✔
900
      }
901
    }
1✔
902

903
    @Override
904
    public void rstStream(int streamId, ErrorCode errorCode) {
905
      frameLogger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
906
      // streamId == 0 checking is in HTTP/2 decoder
907

908
      if (!(ErrorCode.NO_ERROR.equals(errorCode)
1✔
909
            || ErrorCode.CANCEL.equals(errorCode)
1✔
910
            || ErrorCode.STREAM_CLOSED.equals(errorCode))) {
1✔
911
        log.log(Level.INFO, "Received RST_STREAM: " + errorCode);
×
912
      }
913
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
914
          .withDescription("RST_STREAM");
1✔
915
      synchronized (lock) {
1✔
916
        StreamState stream = streams.get(streamId);
1✔
917
        if (stream != null) {
1✔
918
          stream.inboundRstReceived(status);
1✔
919
          streamClosed(streamId, /*flush=*/ false);
1✔
920
        }
921
      }
1✔
922
    }
1✔
923

924
    @Override
925
    public void settings(boolean clearPrevious, Settings settings) {
926
      frameLogger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
927
      synchronized (lock) {
1✔
928
        boolean outboundWindowSizeIncreased = false;
1✔
929
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
930
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
931
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
932
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
933
        }
934

935
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
936
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
937
        // otherwise it will cause a stream error (RST_STREAM).
938
        frameWriter.ackSettings(settings);
1✔
939
        frameWriter.flush();
1✔
940
        if (!receivedSettings) {
1✔
941
          receivedSettings = true;
1✔
942
          attributes = listener.transportReady(attributes);
1✔
943
        }
944

945
        // send any pending bytes / streams
946
        if (outboundWindowSizeIncreased) {
1✔
947
          outboundFlow.writeStreams();
1✔
948
        }
949
      }
1✔
950
    }
1✔
951

952
    @Override
953
    public void ping(boolean ack, int payload1, int payload2) {
954
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
955
        abruptShutdown(ErrorCode.ENHANCE_YOUR_CALM, "too_many_pings",
1✔
956
            Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"), false);
1✔
957
        return;
1✔
958
      }
959
      long payload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
960
      if (!ack) {
1✔
961
        frameLogger.logPing(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
962
        synchronized (lock) {
1✔
963
          frameWriter.ping(true, payload1, payload2);
1✔
964
          frameWriter.flush();
1✔
965
        }
1✔
966
      } else {
967
        frameLogger.logPingAck(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
968
        if (KEEPALIVE_PING == payload) {
1✔
969
          return;
×
970
        }
971
        if (GRACEFUL_SHUTDOWN_PING == payload) {
1✔
972
          triggerGracefulSecondGoaway();
1✔
973
          return;
1✔
974
        }
975
        log.log(Level.INFO, "Received unexpected ping ack: " + payload);
×
976
      }
977
    }
1✔
978

979
    @Override
980
    public void ackSettings() {}
1✔
981

982
    @Override
983
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
984
      frameLogger.logGoAway(
1✔
985
          OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
986
      String description = String.format("Received GOAWAY: %s '%s'", errorCode, debugData.utf8());
1✔
987
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
988
          .withDescription(description);
1✔
989
      if (!ErrorCode.NO_ERROR.equals(errorCode)) {
1✔
990
        log.log(
×
991
            Level.WARNING, "Received GOAWAY: {0} {1}", new Object[] {errorCode, debugData.utf8()});
×
992
      }
993
      synchronized (lock) {
1✔
994
        goAwayStatus = status;
1✔
995
      }
1✔
996
    }
1✔
997

998
    @Override
999
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1000
        throws IOException {
1001
      frameLogger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1002
          streamId, promisedStreamId, requestHeaders);
1003
      // streamId == 0 checking is in HTTP/2 decoder.
1004
      // The server doesn't use PUSH_PROMISE, so all even streams are IDLE, and odd streams are not
1005
      // peer-initiated.
1006
      connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
1007
          "PUSH_PROMISE only allowed on peer-initiated streams. RFC7540 section 6.6");
1008
    }
1✔
1009

1010
    @Override
1011
    public void windowUpdate(int streamId, long delta) {
1012
      frameLogger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1013
      // delta == 0 checking is in HTTP/2 decoder. And it isn't quite right, as it will always cause
1014
      // a GOAWAY. RFC7540 section 6.9 says to use RST_STREAM if the stream id isn't 0. Doesn't
1015
      // matter much though.
1016
      synchronized (lock) {
1✔
1017
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1018
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1019
        } else {
1020
          StreamState stream = streams.get(streamId);
1✔
1021
          if (stream != null) {
1✔
1022
            outboundFlow.windowUpdate(stream.getOutboundFlowState(), (int) delta);
1✔
1023
          }
1024
        }
1025
      }
1✔
1026
    }
1✔
1027

1028
    @Override
1029
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1030
      frameLogger.logPriority(
×
1031
          OkHttpFrameLogger.Direction.INBOUND, streamId, streamDependency, weight, exclusive);
1032
      // streamId == 0 checking is in HTTP/2 decoder.
1033
      // Ignore priority change.
1034
    }
×
1035

1036
    @Override
1037
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1038
        int port, long maxAge) {}
×
1039

1040
    /**
1041
     * Send GOAWAY to the server, then finish all streams and close the transport. RFC7540 §5.4.1.
1042
     */
1043
    private void connectionError(ErrorCode errorCode, String moreDetail) {
1044
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1045
          .withDescription(String.format("HTTP2 connection error: %s '%s'", errorCode, moreDetail));
1✔
1046
      abruptShutdown(errorCode, moreDetail, status, false);
1✔
1047
    }
1✔
1048

1049
    /**
1050
     * Respond with RST_STREAM, making sure to kill the associated stream if it exists. Reason will
1051
     * rarely be seen. RFC7540 §5.4.2.
1052
     */
1053
    private void streamError(int streamId, ErrorCode errorCode, String reason) {
1054
      if (errorCode == ErrorCode.PROTOCOL_ERROR) {
1✔
1055
        log.log(
1✔
1056
            Level.FINE, "Responding with RST_STREAM {0}: {1}", new Object[] {errorCode, reason});
1057
      }
1058
      synchronized (lock) {
1✔
1059
        frameWriter.rstStream(streamId, errorCode);
1✔
1060
        frameWriter.flush();
1✔
1061
        StreamState stream = streams.get(streamId);
1✔
1062
        if (stream != null) {
1✔
1063
          stream.transportReportStatus(
1✔
1064
              Status.INTERNAL.withDescription(
1✔
1065
                  String.format("Responded with RST_STREAM %s: %s", errorCode, reason)));
1✔
1066
          streamClosed(streamId, /*flush=*/ false);
1✔
1067
        }
1068
      }
1✔
1069
    }
1✔
1070

1071
    private void respondWithHttpError(
1072
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg) {
1073
      respondWithHttpError(streamId, inFinished, httpCode, statusCode, msg,
1✔
1074
              Collections.emptyList());
1✔
1075
    }
1✔
1076

1077
    private void respondWithHttpError(
1078
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg,
1079
        List<Header> extraHeaders) {
1080
      Metadata metadata = new Metadata();
1✔
1081
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1082
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1083
      List<Header> headers =
1✔
1084
          Headers.createHttpResponseHeaders(httpCode, "text/plain; charset=utf-8", metadata);
1✔
1085
      headers.addAll(extraHeaders);
1✔
1086
      Buffer data = new Buffer().writeUtf8(msg);
1✔
1087

1088
      synchronized (lock) {
1✔
1089
        Http2ErrorStreamState stream =
1✔
1090
            new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
1✔
1091
        if (streams.isEmpty()) {
1✔
1092
          keepAliveEnforcer.onTransportActive();
1✔
1093
          if (maxConnectionIdleManager != null) {
1✔
1094
            maxConnectionIdleManager.onTransportActive();
1✔
1095
          }
1096
        }
1097
        streams.put(streamId, stream);
1✔
1098
        if (inFinished) {
1✔
1099
          stream.inboundDataReceived(new Buffer(), 0, 0, true);
×
1100
        }
1101
        frameWriter.headers(streamId, headers);
1✔
1102
        outboundFlow.data(
1✔
1103
            /*outFinished=*/true, stream.getOutboundFlowState(), data, /*flush=*/true);
1✔
1104
        outboundFlow.notifyWhenNoPendingData(
1✔
1105
            stream.getOutboundFlowState(), () -> rstOkAtEndOfHttpError(stream));
1✔
1106
      }
1✔
1107
    }
1✔
1108

1109
    private void rstOkAtEndOfHttpError(Http2ErrorStreamState stream) {
1110
      synchronized (lock) {
1✔
1111
        if (!stream.hasReceivedEndOfStream()) {
1✔
1112
          frameWriter.rstStream(stream.streamId, ErrorCode.NO_ERROR);
1✔
1113
        }
1114
        streamClosed(stream.streamId, /*flush=*/ true);
1✔
1115
      }
1✔
1116
    }
1✔
1117

1118
    private void respondWithGrpcError(
1119
        int streamId, boolean inFinished, Status.Code statusCode, String msg) {
1120
      Metadata metadata = new Metadata();
1✔
1121
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1122
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1123
      List<Header> headers = Headers.createResponseTrailers(metadata, false);
1✔
1124

1125
      synchronized (lock) {
1✔
1126
        frameWriter.synReply(true, streamId, headers);
1✔
1127
        if (!inFinished) {
1✔
1128
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
1129
        }
1130
        frameWriter.flush();
1✔
1131
      }
1✔
1132
    }
1✔
1133
  }
1134

1135
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1✔
1136
    @Override
1137
    public void ping() {
1138
      synchronized (lock) {
×
1139
        frameWriter.ping(false, 0, KEEPALIVE_PING);
×
1140
        frameWriter.flush();
×
1141
      }
×
1142
      tracer.reportKeepAliveSent();
×
1143
    }
×
1144

1145
    @Override
1146
    public void onPingTimeout() {
1147
      synchronized (lock) {
×
1148
        goAwayStatus = Status.UNAVAILABLE
×
1149
            .withDescription("Keepalive failed. Considering connection dead");
×
1150
        GrpcUtil.closeQuietly(socket);
×
1151
      }
×
1152
    }
×
1153
  }
1154

1155
  interface StreamState {
1156
    /** Must be holding 'lock' when calling. */
1157
    void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
1158

1159
    /** Must be holding 'lock' when calling. */
1160
    boolean hasReceivedEndOfStream();
1161

1162
    /** Must be holding 'lock' when calling. */
1163
    int inboundWindowAvailable();
1164

1165
    /** Must be holding 'lock' when calling. */
1166
    void transportReportStatus(Status status);
1167

1168
    /** Must be holding 'lock' when calling. */
1169
    void inboundRstReceived(Status status);
1170

1171
    OutboundFlowController.StreamState getOutboundFlowState();
1172
  }
1173

1174
  static class Http2ErrorStreamState implements StreamState, OutboundFlowController.Stream {
1175
    private final int streamId;
1176
    private final Object lock;
1177
    private final OutboundFlowController.StreamState outboundFlowState;
1178
    @GuardedBy("lock")
1179
    private int window;
1180
    @GuardedBy("lock")
1181
    private boolean receivedEndOfStream;
1182

1183
    Http2ErrorStreamState(
1184
        int streamId, Object lock, OutboundFlowController outboundFlow, int initialWindowSize) {
1✔
1185
      this.streamId = streamId;
1✔
1186
      this.lock = lock;
1✔
1187
      this.outboundFlowState = outboundFlow.createState(this, streamId);
1✔
1188
      this.window = initialWindowSize;
1✔
1189
    }
1✔
1190

1191
    @Override public void onSentBytes(int frameBytes) {}
1✔
1192

1193
    @Override public void inboundDataReceived(
1194
        Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
1195
      synchronized (lock) {
×
1196
        if (endOfStream) {
×
1197
          receivedEndOfStream = true;
×
1198
        }
1199
        window -= dataLength + paddingLength;
×
1200
        try {
1201
          frame.skip(frame.size()); // Recycle segments
×
1202
        } catch (IOException ex) {
×
1203
          throw new AssertionError(ex);
×
1204
        }
×
1205
      }
×
1206
    }
×
1207

1208
    @Override public boolean hasReceivedEndOfStream() {
1209
      synchronized (lock) {
1✔
1210
        return receivedEndOfStream;
1✔
1211
      }
1212
    }
1213

1214
    @Override public int inboundWindowAvailable() {
1215
      synchronized (lock) {
×
1216
        return window;
×
1217
      }
1218
    }
1219

1220
    @Override public void transportReportStatus(Status status) {}
×
1221

1222
    @Override public void inboundRstReceived(Status status) {}
×
1223

1224
    @Override public OutboundFlowController.StreamState getOutboundFlowState() {
1225
      synchronized (lock) {
1✔
1226
        return outboundFlowState;
1✔
1227
      }
1228
    }
1229
  }
1230
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc