• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18863

19 Oct 2023 10:49PM CUT coverage: 88.249% (-0.01%) from 88.261%
#18863

push

github-actions

web-flow
Release v1.59.0 (#10616)

30295 of 34329 relevant lines covered (88.25%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.71
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
20
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
21

22
import com.google.common.base.Preconditions;
23
import com.google.common.util.concurrent.Futures;
24
import com.google.common.util.concurrent.ListenableFuture;
25
import io.grpc.Attributes;
26
import io.grpc.InternalChannelz;
27
import io.grpc.InternalLogId;
28
import io.grpc.InternalStatus;
29
import io.grpc.Metadata;
30
import io.grpc.ServerStreamTracer;
31
import io.grpc.Status;
32
import io.grpc.internal.GrpcUtil;
33
import io.grpc.internal.KeepAliveEnforcer;
34
import io.grpc.internal.KeepAliveManager;
35
import io.grpc.internal.LogExceptionRunnable;
36
import io.grpc.internal.MaxConnectionIdleManager;
37
import io.grpc.internal.ObjectPool;
38
import io.grpc.internal.SerializingExecutor;
39
import io.grpc.internal.ServerTransport;
40
import io.grpc.internal.ServerTransportListener;
41
import io.grpc.internal.StatsTraceContext;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.okhttp.internal.framed.ErrorCode;
44
import io.grpc.okhttp.internal.framed.FrameReader;
45
import io.grpc.okhttp.internal.framed.FrameWriter;
46
import io.grpc.okhttp.internal.framed.Header;
47
import io.grpc.okhttp.internal.framed.HeadersMode;
48
import io.grpc.okhttp.internal.framed.Http2;
49
import io.grpc.okhttp.internal.framed.Settings;
50
import io.grpc.okhttp.internal.framed.Variant;
51
import java.io.IOException;
52
import java.net.Socket;
53
import java.util.List;
54
import java.util.Locale;
55
import java.util.Map;
56
import java.util.TreeMap;
57
import java.util.concurrent.Executor;
58
import java.util.concurrent.ScheduledExecutorService;
59
import java.util.concurrent.ScheduledFuture;
60
import java.util.concurrent.TimeUnit;
61
import java.util.logging.Level;
62
import java.util.logging.Logger;
63
import javax.annotation.Nullable;
64
import javax.annotation.concurrent.GuardedBy;
65
import okio.Buffer;
66
import okio.BufferedSource;
67
import okio.ByteString;
68
import okio.Okio;
69

70
/**
71
 * OkHttp-based server transport.
72
 */
73
final class OkHttpServerTransport implements ServerTransport,
74
      ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport {
75
  private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName());
1✔
76
  private static final int GRACEFUL_SHUTDOWN_PING = 0x1111;
77

78
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1);
1✔
79

80
  private static final int KEEPALIVE_PING = 0xDEAD;
81
  private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method");
1✔
82
  private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT");
1✔
83
  private static final ByteString POST_METHOD = ByteString.encodeUtf8("POST");
1✔
84
  private static final ByteString SCHEME = ByteString.encodeUtf8(":scheme");
1✔
85
  private static final ByteString PATH = ByteString.encodeUtf8(":path");
1✔
86
  private static final ByteString AUTHORITY = ByteString.encodeUtf8(":authority");
1✔
87
  private static final ByteString CONNECTION = ByteString.encodeUtf8("connection");
1✔
88
  private static final ByteString HOST = ByteString.encodeUtf8("host");
1✔
89
  private static final ByteString TE = ByteString.encodeUtf8("te");
1✔
90
  private static final ByteString TE_TRAILERS = ByteString.encodeUtf8("trailers");
1✔
91
  private static final ByteString CONTENT_TYPE = ByteString.encodeUtf8("content-type");
1✔
92
  private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
1✔
93

94
  private final Config config;
95
  private final Variant variant = new Http2();
1✔
96
  private final TransportTracer tracer;
97
  private final InternalLogId logId;
98
  private Socket socket;
99
  private ServerTransportListener listener;
100
  private Executor transportExecutor;
101
  private ScheduledExecutorService scheduledExecutorService;
102
  private Attributes attributes;
103
  private KeepAliveManager keepAliveManager;
104
  private MaxConnectionIdleManager maxConnectionIdleManager;
105
  private ScheduledFuture<?> maxConnectionAgeMonitor;
106
  private final KeepAliveEnforcer keepAliveEnforcer;
107

108
  private final Object lock = new Object();
1✔
109
  @GuardedBy("lock")
110
  private boolean abruptShutdown;
111
  @GuardedBy("lock")
112
  private boolean gracefulShutdown;
113
  @GuardedBy("lock")
114
  private boolean handshakeShutdown;
115
  @GuardedBy("lock")
116
  private InternalChannelz.Security securityInfo;
117
  @GuardedBy("lock")
118
  private ExceptionHandlingFrameWriter frameWriter;
119
  @GuardedBy("lock")
120
  private OutboundFlowController outboundFlow;
121
  @GuardedBy("lock")
1✔
122
  private final Map<Integer, StreamState> streams = new TreeMap<>();
123
  @GuardedBy("lock")
124
  private int lastStreamId;
125
  @GuardedBy("lock")
1✔
126
  private int goAwayStreamId = Integer.MAX_VALUE;
127
  /**
128
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
129
   * streams may continue.
130
   */
131
  @GuardedBy("lock")
132
  private Status goAwayStatus;
133
  /** Non-{@code null} when gracefully shutting down and have not yet sent second GOAWAY. */
134
  @GuardedBy("lock")
135
  private ScheduledFuture<?> secondGoawayTimer;
136
  /** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
137
  @GuardedBy("lock")
138
  private ScheduledFuture<?> forcefulCloseTimer;
139
  @GuardedBy("lock")
1✔
140
  private Long gracefulShutdownPeriod = null;
141

142
  public OkHttpServerTransport(Config config, Socket bareSocket) {
1✔
143
    this.config = Preconditions.checkNotNull(config, "config");
1✔
144
    this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
1✔
145

146
    tracer = config.transportTracerFactory.create();
1✔
147
    tracer.setFlowControlWindowReader(this::readFlowControlWindow);
1✔
148
    logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
1✔
149
    transportExecutor = config.transportExecutorPool.getObject();
1✔
150
    scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
1✔
151
    keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
1✔
152
        config.permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
153
  }
1✔
154

155
  public void start(ServerTransportListener listener) {
156
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
157

158
    SerializingExecutor serializingExecutor = new SerializingExecutor(transportExecutor);
1✔
159
    serializingExecutor.execute(() -> startIo(serializingExecutor));
1✔
160
  }
1✔
161

162
  private void startIo(SerializingExecutor serializingExecutor) {
163
    try {
164
      // The socket implementation is lazily initialized, but had broken thread-safety 
165
      // for that laziness https://bugs.openjdk.org/browse/JDK-8278326. 
166
      // As a workaround, we lock to synchronize initialization with shutdown().
167
      synchronized (lock) {
1✔
168
        socket.setTcpNoDelay(true);
1✔
169
      }
1✔
170
      HandshakerSocketFactory.HandshakeResult result =
1✔
171
          config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
1✔
172
      synchronized (lock) {
1✔
173
        this.socket = result.socket;
1✔
174
      }
1✔
175
      this.attributes = result.attributes;
1✔
176

177
      int maxQueuedControlFrames = 10000;
1✔
178
      AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
179
      asyncSink.becomeConnected(Okio.sink(socket), socket);
1✔
180
      FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
181
          variant.newWriter(Okio.buffer(asyncSink), false));
1✔
182
      FrameWriter writeMonitoringFrameWriter = new ForwardingFrameWriter(rawFrameWriter) {
1✔
183
        @Override
184
        public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
185
            throws IOException {
186
          keepAliveEnforcer.resetCounters();
1✔
187
          super.synReply(outFinished, streamId, headerBlock);
1✔
188
        }
1✔
189

190
        @Override
191
        public void headers(int streamId, List<Header> headerBlock) throws IOException {
192
          keepAliveEnforcer.resetCounters();
1✔
193
          super.headers(streamId, headerBlock);
1✔
194
        }
1✔
195

196
        @Override
197
        public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
198
            throws IOException {
199
          keepAliveEnforcer.resetCounters();
1✔
200
          super.data(outFinished, streamId, source, byteCount);
1✔
201
        }
1✔
202
      };
203
      synchronized (lock) {
1✔
204
        this.securityInfo = result.securityInfo;
1✔
205

206
        // Handle FrameWriter exceptions centrally, since there are many callers. Note that
207
        // errors coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink
208
        // does not propagate syscall errors through the FrameWriter. But we handle the
209
        // AsyncSink failures with the same TransportExceptionHandler instance so it is all
210
        // mixed back together.
211
        frameWriter = new ExceptionHandlingFrameWriter(this, writeMonitoringFrameWriter);
1✔
212
        outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
213

214
        // These writes will be queued in the serializingExecutor waiting for this function to
215
        // return.
216
        frameWriter.connectionPreface();
1✔
217
        Settings settings = new Settings();
1✔
218
        OkHttpSettingsUtil.set(settings,
1✔
219
            OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
220
        OkHttpSettingsUtil.set(settings,
1✔
221
            OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
222
        frameWriter.settings(settings);
1✔
223
        if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
1✔
224
          frameWriter.windowUpdate(
1✔
225
              Utils.CONNECTION_STREAM_ID, config.flowControlWindow - Utils.DEFAULT_WINDOW_SIZE);
226
        }
227
        frameWriter.flush();
1✔
228
      }
1✔
229

230
      if (config.keepAliveTimeNanos != GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
231
        keepAliveManager = new KeepAliveManager(
1✔
232
            new KeepAlivePinger(), scheduledExecutorService, config.keepAliveTimeNanos,
233
            config.keepAliveTimeoutNanos, true);
234
        keepAliveManager.onTransportStarted();
1✔
235
      }
236

237
      if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
238
        maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
1✔
239
        maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
1✔
240
      }
241

242
      if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
243
        long maxConnectionAgeInNanos =
1✔
244
            (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
1✔
245
        maxConnectionAgeMonitor = scheduledExecutorService.schedule(
1✔
246
            new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
1✔
247
            maxConnectionAgeInNanos,
248
            TimeUnit.NANOSECONDS);
249
      }
250

251
      transportExecutor.execute(new FrameHandler(
1✔
252
          variant.newReader(Okio.buffer(Okio.source(socket)), false)));
1✔
253
    } catch (Error | IOException | RuntimeException ex) {
1✔
254
      synchronized (lock) {
1✔
255
        if (!handshakeShutdown) {
1✔
256
          log.log(Level.INFO, "Socket failed to handshake", ex);
1✔
257
        }
258
      }
1✔
259
      GrpcUtil.closeQuietly(socket);
1✔
260
      terminated();
1✔
261
    }
1✔
262
  }
1✔
263

264
  @Override
265
  public void shutdown() {
266
    shutdown(null);
1✔
267
  }
1✔
268

269
  private void shutdown(@Nullable Long gracefulShutdownPeriod) {
270
    synchronized (lock) {
1✔
271
      if (gracefulShutdown || abruptShutdown) {
1✔
272
        return;
1✔
273
      }
274
      gracefulShutdown = true;
1✔
275
      this.gracefulShutdownPeriod = gracefulShutdownPeriod;
1✔
276
      if (frameWriter == null) {
1✔
277
        handshakeShutdown = true;
1✔
278
        GrpcUtil.closeQuietly(socket);
1✔
279
      } else {
280
        // RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
281
        // we also set a timer to limit the upper bound in case the PING is excessively stalled or
282
        // the client is malicious.
283
        secondGoawayTimer = scheduledExecutorService.schedule(
1✔
284
            this::triggerGracefulSecondGoaway,
285
            GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
286
        frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
1✔
287
        frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
1✔
288
        frameWriter.flush();
1✔
289
      }
290
    }
1✔
291
  }
1✔
292

293
  private void triggerGracefulSecondGoaway() {
294
    synchronized (lock) {
1✔
295
      if (secondGoawayTimer == null) {
1✔
296
        return;
×
297
      }
298
      secondGoawayTimer.cancel(false);
1✔
299
      secondGoawayTimer = null;
1✔
300
      frameWriter.goAway(lastStreamId, ErrorCode.NO_ERROR, new byte[0]);
1✔
301
      goAwayStreamId = lastStreamId;
1✔
302
      if (streams.isEmpty()) {
1✔
303
        frameWriter.close();
1✔
304
      } else {
305
        frameWriter.flush();
1✔
306
      }
307
      if (gracefulShutdownPeriod != null) {
1✔
308
        forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
309
            this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
1✔
310
      }
311
    }
1✔
312
  }
1✔
313

314
  @Override
315
  public void shutdownNow(Status reason) {
316
    synchronized (lock) {
1✔
317
      if (frameWriter == null) {
1✔
318
        handshakeShutdown = true;
1✔
319
        GrpcUtil.closeQuietly(socket);
1✔
320
        return;
1✔
321
      }
322
    }
1✔
323
    abruptShutdown(ErrorCode.NO_ERROR, "", reason, true);
1✔
324
  }
1✔
325

326
  /**
327
   * Finish all active streams due to an IOException, then close the transport.
328
   */
329
  @Override
330
  public void onException(Throwable failureCause) {
331
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
332
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
333
    abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
334
  }
1✔
335

336
  private void abruptShutdown(
337
      ErrorCode errorCode, String moreDetail, Status reason, boolean rstStreams) {
338
    synchronized (lock) {
1✔
339
      if (abruptShutdown) {
1✔
340
        return;
1✔
341
      }
342
      abruptShutdown = true;
1✔
343
      goAwayStatus = reason;
1✔
344

345
      if (secondGoawayTimer != null) {
1✔
346
        secondGoawayTimer.cancel(false);
1✔
347
        secondGoawayTimer = null;
1✔
348
      }
349
      for (Map.Entry<Integer, StreamState> entry : streams.entrySet()) {
1✔
350
        if (rstStreams) {
1✔
351
          frameWriter.rstStream(entry.getKey(), ErrorCode.CANCEL);
1✔
352
        }
353
        entry.getValue().transportReportStatus(reason);
1✔
354
      }
1✔
355
      streams.clear();
1✔
356

357
      // RFC7540 §5.4.1. Attempt to inform the client what went wrong. We try to write the GOAWAY
358
      // _and then_ close our side of the connection. But place an upper-bound for how long we wait
359
      // for I/O with a timer, which forcefully closes the socket.
360
      frameWriter.goAway(lastStreamId, errorCode, moreDetail.getBytes(GrpcUtil.US_ASCII));
1✔
361
      goAwayStreamId = lastStreamId;
1✔
362
      frameWriter.close();
1✔
363
      forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
364
          this::triggerForcefulClose, 1, TimeUnit.SECONDS);
365
    }
1✔
366
  }
1✔
367

368
  private void triggerForcefulClose() {
369
    // Safe to do unconditionally; no need to check if timer cancellation raced
370
    GrpcUtil.closeQuietly(socket);
1✔
371
  }
1✔
372

373
  private void terminated() {
374
    synchronized (lock) {
1✔
375
      if (forcefulCloseTimer != null) {
1✔
376
        forcefulCloseTimer.cancel(false);
1✔
377
        forcefulCloseTimer = null;
1✔
378
      }
379
    }
1✔
380
    if (keepAliveManager != null) {
1✔
381
      keepAliveManager.onTransportTermination();
1✔
382
    }
383
    if (maxConnectionIdleManager != null) {
1✔
384
      maxConnectionIdleManager.onTransportTermination();
1✔
385
    }
386

387
    if (maxConnectionAgeMonitor != null) {
1✔
388
      maxConnectionAgeMonitor.cancel(false);
1✔
389
    }
390
    transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
1✔
391
    scheduledExecutorService =
1✔
392
        config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
1✔
393
    listener.transportTerminated();
1✔
394
  }
1✔
395

396
  @Override
397
  public ScheduledExecutorService getScheduledExecutorService() {
398
    return scheduledExecutorService;
1✔
399
  }
400

401
  @Override
402
  public ListenableFuture<InternalChannelz.SocketStats> getStats() {
403
    synchronized (lock) {
1✔
404
      return Futures.immediateFuture(new InternalChannelz.SocketStats(
1✔
405
          tracer.getStats(),
1✔
406
          socket.getLocalSocketAddress(),
1✔
407
          socket.getRemoteSocketAddress(),
1✔
408
          Utils.getSocketOptions(socket),
1✔
409
          securityInfo));
410
    }
411
  }
412

413
  private TransportTracer.FlowControlWindows readFlowControlWindow() {
414
    synchronized (lock) {
1✔
415
      long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
416
      // connectionUnacknowledgedBytesRead is only readable by FrameHandler, so we provide a lower
417
      // bound.
418
      long remote = (long) (config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO);
1✔
419
      return new TransportTracer.FlowControlWindows(local, remote);
1✔
420
    }
421
  }
422

423
  @Override
424
  public InternalLogId getLogId() {
425
    return logId;
1✔
426
  }
427

428
  @Override
429
  public OutboundFlowController.StreamState[] getActiveStreams() {
430
    synchronized (lock) {
1✔
431
      OutboundFlowController.StreamState[] flowStreams =
1✔
432
          new OutboundFlowController.StreamState[streams.size()];
1✔
433
      int i = 0;
1✔
434
      for (StreamState stream : streams.values()) {
1✔
435
        flowStreams[i++] = stream.getOutboundFlowState();
1✔
436
      }
1✔
437
      return flowStreams;
1✔
438
    }
439
  }
440

441
  /**
442
   * Notify the transport that the stream was closed. Any frames for the stream must be enqueued
443
   * before calling.
444
   */
445
  void streamClosed(int streamId, boolean flush) {
446
    synchronized (lock) {
1✔
447
      streams.remove(streamId);
1✔
448
      if (streams.isEmpty()) {
1✔
449
        keepAliveEnforcer.onTransportIdle();
1✔
450
        if (maxConnectionIdleManager != null) {
1✔
451
          maxConnectionIdleManager.onTransportIdle();
1✔
452
        }
453
      }
454
      if (gracefulShutdown && streams.isEmpty()) {
1✔
455
        frameWriter.close();
1✔
456
      } else {
457
        if (flush) {
1✔
458
          frameWriter.flush();
1✔
459
        }
460
      }
461
    }
1✔
462
  }
1✔
463

464
  private static String asciiString(ByteString value) {
465
    // utf8() string is cached in ByteString, so we prefer it when the contents are ASCII. This
466
    // provides benefit if the header was reused via HPACK.
467
    for (int i = 0; i < value.size(); i++) {
1✔
468
      if (value.getByte(i) < 0) {
1✔
469
        return value.string(GrpcUtil.US_ASCII);
×
470
      }
471
    }
472
    return value.utf8();
1✔
473
  }
474

475
  private static int headerFind(List<Header> header, ByteString key, int startIndex) {
476
    for (int i = startIndex; i < header.size(); i++) {
1✔
477
      if (header.get(i).name.equals(key)) {
1✔
478
        return i;
1✔
479
      }
480
    }
481
    return -1;
1✔
482
  }
483

484
  private static boolean headerContains(List<Header> header, ByteString key) {
485
    return headerFind(header, key, 0) != -1;
1✔
486
  }
487

488
  private static void headerRemove(List<Header> header, ByteString key) {
489
    int i = 0;
1✔
490
    while ((i = headerFind(header, key, i)) != -1) {
1✔
491
      header.remove(i);
1✔
492
    }
493
  }
1✔
494

495
  /** Assumes that caller requires this field, so duplicates are treated as missing. */
496
  private static ByteString headerGetRequiredSingle(List<Header> header, ByteString key) {
497
    int i = headerFind(header, key, 0);
1✔
498
    if (i == -1) {
1✔
499
      return null;
1✔
500
    }
501
    if (headerFind(header, key, i + 1) != -1) {
1✔
502
      return null;
1✔
503
    }
504
    return header.get(i).value;
1✔
505
  }
506

507
  static final class Config {
508
    final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
509
    final ObjectPool<Executor> transportExecutorPool;
510
    final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
511
    final TransportTracer.Factory transportTracerFactory;
512
    final HandshakerSocketFactory handshakerSocketFactory;
513
    final long keepAliveTimeNanos;
514
    final long keepAliveTimeoutNanos;
515
    final int flowControlWindow;
516
    final int maxInboundMessageSize;
517
    final int maxInboundMetadataSize;
518
    final long maxConnectionIdleNanos;
519
    final boolean permitKeepAliveWithoutCalls;
520
    final long permitKeepAliveTimeInNanos;
521
    final long maxConnectionAgeInNanos;
522
    final long maxConnectionAgeGraceInNanos;
523

524
    public Config(
525
        OkHttpServerBuilder builder,
526
        List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
1✔
527
      this.streamTracerFactories = Preconditions.checkNotNull(
1✔
528
          streamTracerFactories, "streamTracerFactories");
529
      transportExecutorPool = Preconditions.checkNotNull(
1✔
530
          builder.transportExecutorPool, "transportExecutorPool");
531
      scheduledExecutorServicePool = Preconditions.checkNotNull(
1✔
532
          builder.scheduledExecutorServicePool, "scheduledExecutorServicePool");
533
      transportTracerFactory = Preconditions.checkNotNull(
1✔
534
          builder.transportTracerFactory, "transportTracerFactory");
535
      handshakerSocketFactory = Preconditions.checkNotNull(
1✔
536
          builder.handshakerSocketFactory, "handshakerSocketFactory");
537
      keepAliveTimeNanos = builder.keepAliveTimeNanos;
1✔
538
      keepAliveTimeoutNanos = builder.keepAliveTimeoutNanos;
1✔
539
      flowControlWindow = builder.flowControlWindow;
1✔
540
      maxInboundMessageSize = builder.maxInboundMessageSize;
1✔
541
      maxInboundMetadataSize = builder.maxInboundMetadataSize;
1✔
542
      maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
1✔
543
      permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
1✔
544
      permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
1✔
545
      maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
1✔
546
      maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
1✔
547
    }
1✔
548
  }
549

550
  /**
551
   * Runnable which reads frames and dispatches them to in flight calls.
552
   */
553
  class FrameHandler implements FrameReader.Handler, Runnable {
554
    private final OkHttpFrameLogger frameLogger =
1✔
555
        new OkHttpFrameLogger(Level.FINE, OkHttpServerTransport.class);
556
    private final FrameReader frameReader;
557
    private boolean receivedSettings;
558
    private int connectionUnacknowledgedBytesRead;
559

560
    public FrameHandler(FrameReader frameReader) {
1✔
561
      this.frameReader = frameReader;
1✔
562
    }
1✔
563

564
    @Override
565
    public void run() {
566
      String threadName = Thread.currentThread().getName();
1✔
567
      Thread.currentThread().setName("OkHttpServerTransport");
1✔
568
      try {
569
        frameReader.readConnectionPreface();
1✔
570
        if (!frameReader.nextFrame(this)) {
1✔
571
          connectionError(ErrorCode.INTERNAL_ERROR, "Failed to read initial SETTINGS");
1✔
572
          return;
1✔
573
        }
574
        if (!receivedSettings) {
1✔
575
          connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
576
              "First HTTP/2 frame must be SETTINGS. RFC7540 section 3.5");
577
          return;
1✔
578
        }
579
        // Read until the underlying socket closes.
580
        while (frameReader.nextFrame(this)) {
1✔
581
          if (keepAliveManager != null) {
1✔
582
            keepAliveManager.onDataReceived();
1✔
583
          }
584
        }
585
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
586
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
587
        // nothing, otherwise, we finish all streams since it's a real IO issue.
588
        Status status;
589
        synchronized (lock) {
1✔
590
          status = goAwayStatus;
1✔
591
        }
1✔
592
        if (status == null) {
1✔
593
          status = Status.UNAVAILABLE.withDescription("TCP connection closed or IOException");
1✔
594
        }
595
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
596
      } catch (Throwable t) {
1✔
597
        log.log(Level.WARNING, "Error decoding HTTP/2 frames", t);
1✔
598
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "Error in frame decoder",
1✔
599
            Status.INTERNAL.withDescription("Error decoding HTTP/2 frames").withCause(t), false);
1✔
600
      } finally {
601
        // Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
602
        try {
603
          GrpcUtil.exhaust(socket.getInputStream());
1✔
604
        } catch (IOException ex) {
1✔
605
          // Unable to wait, so just proceed to tear-down. The socket is probably already closed so
606
          // the GOAWAY can't be sent anyway.
607
        }
1✔
608
        GrpcUtil.closeQuietly(socket);
1✔
609
        terminated();
1✔
610
        Thread.currentThread().setName(threadName);
1✔
611
      }
612
    }
1✔
613

614
    /**
615
     * Handle HTTP2 HEADER and CONTINUATION frames.
616
     */
617
    @Override
618
    public void headers(boolean outFinished,
619
        boolean inFinished,
620
        int streamId,
621
        int associatedStreamId,
622
        List<Header> headerBlock,
623
        HeadersMode headersMode) {
624
      frameLogger.logHeaders(
1✔
625
          OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
626
      // streamId == 0 checking is in HTTP/2 decoder
627
      if ((streamId & 1) == 0) {
1✔
628
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
629
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
630
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
631
        return;
1✔
632
      }
633
      boolean newStream;
634
      synchronized (lock) {
1✔
635
        if (streamId > goAwayStreamId) {
1✔
636
          return;
×
637
        }
638
        newStream = streamId > lastStreamId;
1✔
639
        if (newStream) {
1✔
640
          lastStreamId = streamId;
1✔
641
        }
642
      }
1✔
643

644
      int metadataSize = headerBlockSize(headerBlock);
1✔
645
      if (metadataSize > config.maxInboundMetadataSize) {
1✔
646
        respondWithHttpError(streamId, inFinished, 431, Status.Code.RESOURCE_EXHAUSTED,
1✔
647
            String.format(
1✔
648
                Locale.US,
649
                "Request metadata larger than %d: %d",
650
                config.maxInboundMetadataSize,
1✔
651
                metadataSize));
1✔
652
        return;
1✔
653
      }
654

655
      headerRemove(headerBlock, ByteString.EMPTY);
1✔
656

657
      ByteString httpMethod = null;
1✔
658
      ByteString scheme = null;
1✔
659
      ByteString path = null;
1✔
660
      ByteString authority = null;
1✔
661
      while (headerBlock.size() > 0 && headerBlock.get(0).name.getByte(0) == ':') {
1✔
662
        Header header = headerBlock.remove(0);
1✔
663
        if (HTTP_METHOD.equals(header.name) && httpMethod == null) {
1✔
664
          httpMethod = header.value;
1✔
665
        } else if (SCHEME.equals(header.name) && scheme == null) {
1✔
666
          scheme = header.value;
1✔
667
        } else if (PATH.equals(header.name) && path == null) {
1✔
668
          path = header.value;
1✔
669
        } else if (AUTHORITY.equals(header.name) && authority == null) {
1✔
670
          authority = header.value;
1✔
671
        } else {
672
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
673
              "Unexpected pseudo header. RFC7540 section 8.1.2.1");
674
          return;
1✔
675
        }
676
      }
1✔
677
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
678
        if (headerBlock.get(i).name.getByte(0) == ':') {
1✔
679
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
680
              "Pseudo header not before regular headers. RFC7540 section 8.1.2.1");
681
          return;
1✔
682
        }
683
      }
684
      if (!CONNECT_METHOD.equals(httpMethod)
1✔
685
          && newStream
686
          && (httpMethod == null || scheme == null || path == null)) {
687
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
688
            "Missing required pseudo header. RFC7540 section 8.1.2.3");
689
        return;
1✔
690
      }
691
      if (headerContains(headerBlock, CONNECTION)) {
1✔
692
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
693
            "Connection-specific headers not permitted. RFC7540 section 8.1.2.2");
694
        return;
1✔
695
      }
696

697
      if (!newStream) {
1✔
698
        if (inFinished) {
1✔
699
          synchronized (lock) {
1✔
700
            StreamState stream = streams.get(streamId);
1✔
701
            if (stream == null) {
1✔
702
              streamError(streamId, ErrorCode.STREAM_CLOSED, "Received headers for closed stream");
×
703
              return;
×
704
            }
705
            if (stream.hasReceivedEndOfStream()) {
1✔
706
              streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
707
                  "Received HEADERS for half-closed (remote) stream. RFC7540 section 5.1");
708
              return;
1✔
709
            }
710
            // Ignore the trailers, but still half-close the stream
711
            stream.inboundDataReceived(new Buffer(), 0, 0, true);
1✔
712
            return;
1✔
713
          }
714
        } else {
715
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
716
              "Headers disallowed in the middle of the stream. RFC7540 section 8.1");
717
          return;
1✔
718
        }
719
      }
720

721
      if (authority == null) {
1✔
722
        int i = headerFind(headerBlock, HOST, 0);
1✔
723
        if (i != -1) {
1✔
724
          if (headerFind(headerBlock, HOST, i + 1) != -1) {
1✔
725
            respondWithHttpError(streamId, inFinished, 400, Status.Code.INTERNAL,
1✔
726
                "Multiple host headers disallowed. RFC7230 section 5.4");
727
            return;
1✔
728
          }
729
          authority = headerBlock.get(i).value;
1✔
730
        }
731
      }
732
      headerRemove(headerBlock, HOST);
1✔
733

734
      // Remove the leading slash of the path and get the fully qualified method name
735
      if (path.size() == 0 || path.getByte(0) != '/') {
1✔
736
        respondWithHttpError(streamId, inFinished, 404, Status.Code.UNIMPLEMENTED,
1✔
737
            "Expected path to start with /: " + asciiString(path));
1✔
738
        return;
1✔
739
      }
740
      String method = asciiString(path).substring(1);
1✔
741

742
      ByteString contentType = headerGetRequiredSingle(headerBlock, CONTENT_TYPE);
1✔
743
      if (contentType == null) {
1✔
744
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
745
            "Content-Type is missing or duplicated");
746
        return;
1✔
747
      }
748
      String contentTypeString = asciiString(contentType);
1✔
749
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
750
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
751
            "Content-Type is not supported: " + contentTypeString);
752
        return;
1✔
753
      }
754

755
      if (!POST_METHOD.equals(httpMethod)) {
1✔
756
        respondWithHttpError(streamId, inFinished, 405, Status.Code.INTERNAL,
1✔
757
            "HTTP Method is not supported: " + asciiString(httpMethod));
1✔
758
        return;
1✔
759
      }
760

761
      ByteString te = headerGetRequiredSingle(headerBlock, TE);
1✔
762
      if (!TE_TRAILERS.equals(te)) {
1✔
763
        respondWithGrpcError(streamId, inFinished, Status.Code.INTERNAL,
1✔
764
            String.format("Expected header TE: %s, but %s is received. "
1✔
765
              + "Some intermediate proxy may not support trailers",
766
              asciiString(TE_TRAILERS), te == null ? "<missing>" : asciiString(te)));
1✔
767
        return;
1✔
768
      }
769
      headerRemove(headerBlock, CONTENT_LENGTH);
1✔
770

771
      Metadata metadata = Utils.convertHeaders(headerBlock);
1✔
772
      StatsTraceContext statsTraceCtx =
1✔
773
          StatsTraceContext.newServerContext(config.streamTracerFactories, method, metadata);
1✔
774
      synchronized (lock) {
1✔
775
        OkHttpServerStream.TransportState stream = new OkHttpServerStream.TransportState(
1✔
776
            OkHttpServerTransport.this,
777
            streamId,
778
            config.maxInboundMessageSize,
1✔
779
            statsTraceCtx,
780
            lock,
1✔
781
            frameWriter,
1✔
782
            outboundFlow,
1✔
783
            config.flowControlWindow,
1✔
784
            tracer,
1✔
785
            method);
786
        OkHttpServerStream streamForApp = new OkHttpServerStream(
1✔
787
            stream,
788
            attributes,
1✔
789
            authority == null ? null : asciiString(authority),
1✔
790
            statsTraceCtx,
791
            tracer);
1✔
792
        if (streams.isEmpty()) {
1✔
793
          keepAliveEnforcer.onTransportActive();
1✔
794
          if (maxConnectionIdleManager != null) {
1✔
795
            maxConnectionIdleManager.onTransportActive();
1✔
796
          }
797
        }
798
        streams.put(streamId, stream);
1✔
799
        listener.streamCreated(streamForApp, method, metadata);
1✔
800
        stream.onStreamAllocated();
1✔
801
        if (inFinished) {
1✔
802
          stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
1✔
803
        }
804
      }
1✔
805
    }
1✔
806

807
    private int headerBlockSize(List<Header> headerBlock) {
808
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
809
      long size = 0;
1✔
810
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
811
        Header header = headerBlock.get(i);
1✔
812
        size += 32 + header.name.size() + header.value.size();
1✔
813
      }
814
      size = Math.min(size, Integer.MAX_VALUE);
1✔
815
      return (int) size;
1✔
816
    }
817

818
    /**
819
     * Handle an HTTP2 DATA frame.
820
     */
821
    @Override
822
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
823
                     int paddedLength)
824
        throws IOException {
825
      frameLogger.logData(
1✔
826
          OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
1✔
827
      if (streamId == 0) {
1✔
828
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
829
            "Stream 0 is reserved for control messages. RFC7540 section 5.1.1");
830
        return;
1✔
831
      }
832
      if ((streamId & 1) == 0) {
1✔
833
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
834
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
835
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
836
        return;
1✔
837
      }
838

839
      // Wait until the frame is complete. We only support 16 KiB frames, and the max permitted in
840
      // HTTP/2 is 16 MiB. This is verified in OkHttp's Http2 deframer, so we don't need to be
841
      // concerned with the window being exceeded at this point.
842
      in.require(length);
1✔
843

844
      synchronized (lock) {
1✔
845
        StreamState stream = streams.get(streamId);
1✔
846
        if (stream == null) {
1✔
847
          in.skip(length);
1✔
848
          streamError(streamId, ErrorCode.STREAM_CLOSED, "Received data for closed stream");
1✔
849
          return;
1✔
850
        }
851
        if (stream.hasReceivedEndOfStream()) {
1✔
852
          in.skip(length);
1✔
853
          streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
854
              "Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
855
          return;
1✔
856
        }
857
        if (stream.inboundWindowAvailable() < paddedLength) {
1✔
858
          in.skip(length);
1✔
859
          streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
1✔
860
              "Received DATA size exceeded window size. RFC7540 section 6.9");
861
          return;
1✔
862
        }
863
        Buffer buf = new Buffer();
1✔
864
        buf.write(in.getBuffer(), length);
1✔
865
        stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
1✔
866
      }
1✔
867

868
      // connection window update
869
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
870
      if (connectionUnacknowledgedBytesRead
1✔
871
          >= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
872
        synchronized (lock) {
1✔
873
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
874
          frameWriter.flush();
1✔
875
        }
1✔
876
        connectionUnacknowledgedBytesRead = 0;
1✔
877
      }
878
    }
1✔
879

880
    @Override
881
    public void rstStream(int streamId, ErrorCode errorCode) {
882
      frameLogger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
883
      // streamId == 0 checking is in HTTP/2 decoder
884

885
      if (!(ErrorCode.NO_ERROR.equals(errorCode)
1✔
886
            || ErrorCode.CANCEL.equals(errorCode)
1✔
887
            || ErrorCode.STREAM_CLOSED.equals(errorCode))) {
1✔
888
        log.log(Level.INFO, "Received RST_STREAM: " + errorCode);
×
889
      }
890
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
891
          .withDescription("RST_STREAM");
1✔
892
      synchronized (lock) {
1✔
893
        StreamState stream = streams.get(streamId);
1✔
894
        if (stream != null) {
1✔
895
          stream.inboundRstReceived(status);
1✔
896
          streamClosed(streamId, /*flush=*/ false);
1✔
897
        }
898
      }
1✔
899
    }
1✔
900

901
    @Override
902
    public void settings(boolean clearPrevious, Settings settings) {
903
      frameLogger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
904
      synchronized (lock) {
1✔
905
        boolean outboundWindowSizeIncreased = false;
1✔
906
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
907
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
908
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
909
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
910
        }
911

912
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
913
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
914
        // otherwise it will cause a stream error (RST_STREAM).
915
        frameWriter.ackSettings(settings);
1✔
916
        frameWriter.flush();
1✔
917
        if (!receivedSettings) {
1✔
918
          receivedSettings = true;
1✔
919
          attributes = listener.transportReady(attributes);
1✔
920
        }
921

922
        // send any pending bytes / streams
923
        if (outboundWindowSizeIncreased) {
1✔
924
          outboundFlow.writeStreams();
1✔
925
        }
926
      }
1✔
927
    }
1✔
928

929
    @Override
930
    public void ping(boolean ack, int payload1, int payload2) {
931
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
932
        abruptShutdown(ErrorCode.ENHANCE_YOUR_CALM, "too_many_pings",
1✔
933
            Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"), false);
1✔
934
        return;
1✔
935
      }
936
      long payload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
937
      if (!ack) {
1✔
938
        frameLogger.logPing(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
939
        synchronized (lock) {
1✔
940
          frameWriter.ping(true, payload1, payload2);
1✔
941
          frameWriter.flush();
1✔
942
        }
1✔
943
      } else {
944
        frameLogger.logPingAck(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
945
        if (KEEPALIVE_PING == payload) {
1✔
946
          return;
×
947
        }
948
        if (GRACEFUL_SHUTDOWN_PING == payload) {
1✔
949
          triggerGracefulSecondGoaway();
1✔
950
          return;
1✔
951
        }
952
        log.log(Level.INFO, "Received unexpected ping ack: " + payload);
×
953
      }
954
    }
1✔
955

956
    @Override
957
    public void ackSettings() {}
1✔
958

959
    @Override
960
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
961
      frameLogger.logGoAway(
1✔
962
          OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
963
      String description = String.format("Received GOAWAY: %s '%s'", errorCode, debugData.utf8());
1✔
964
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
965
          .withDescription(description);
1✔
966
      if (!ErrorCode.NO_ERROR.equals(errorCode)) {
1✔
967
        log.log(
×
968
            Level.WARNING, "Received GOAWAY: {0} {1}", new Object[] {errorCode, debugData.utf8()});
×
969
      }
970
      synchronized (lock) {
1✔
971
        goAwayStatus = status;
1✔
972
      }
1✔
973
    }
1✔
974

975
    @Override
976
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
977
        throws IOException {
978
      frameLogger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
979
          streamId, promisedStreamId, requestHeaders);
980
      // streamId == 0 checking is in HTTP/2 decoder.
981
      // The server doesn't use PUSH_PROMISE, so all even streams are IDLE, and odd streams are not
982
      // peer-initiated.
983
      connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
984
          "PUSH_PROMISE only allowed on peer-initiated streams. RFC7540 section 6.6");
985
    }
1✔
986

987
    @Override
988
    public void windowUpdate(int streamId, long delta) {
989
      frameLogger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
990
      // delta == 0 checking is in HTTP/2 decoder. And it isn't quite right, as it will always cause
991
      // a GOAWAY. RFC7540 section 6.9 says to use RST_STREAM if the stream id isn't 0. Doesn't
992
      // matter much though.
993
      synchronized (lock) {
1✔
994
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
995
          outboundFlow.windowUpdate(null, (int) delta);
1✔
996
        } else {
997
          StreamState stream = streams.get(streamId);
1✔
998
          if (stream != null) {
1✔
999
            outboundFlow.windowUpdate(stream.getOutboundFlowState(), (int) delta);
1✔
1000
          }
1001
        }
1002
      }
1✔
1003
    }
1✔
1004

1005
    @Override
1006
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1007
      frameLogger.logPriority(
1✔
1008
          OkHttpFrameLogger.Direction.INBOUND, streamId, streamDependency, weight, exclusive);
1009
      // streamId == 0 checking is in HTTP/2 decoder.
1010
      // Ignore priority change.
1011
    }
1✔
1012

1013
    @Override
1014
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1015
        int port, long maxAge) {}
×
1016

1017
    /**
1018
     * Send GOAWAY to the server, then finish all streams and close the transport. RFC7540 §5.4.1.
1019
     */
1020
    private void connectionError(ErrorCode errorCode, String moreDetail) {
1021
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1022
          .withDescription(String.format("HTTP2 connection error: %s '%s'", errorCode, moreDetail));
1✔
1023
      abruptShutdown(errorCode, moreDetail, status, false);
1✔
1024
    }
1✔
1025

1026
    /**
1027
     * Respond with RST_STREAM, making sure to kill the associated stream if it exists. Reason will
1028
     * rarely be seen. RFC7540 §5.4.2.
1029
     */
1030
    private void streamError(int streamId, ErrorCode errorCode, String reason) {
1031
      if (errorCode == ErrorCode.PROTOCOL_ERROR) {
1✔
1032
        log.log(
1✔
1033
            Level.FINE, "Responding with RST_STREAM {0}: {1}", new Object[] {errorCode, reason});
1034
      }
1035
      synchronized (lock) {
1✔
1036
        frameWriter.rstStream(streamId, errorCode);
1✔
1037
        frameWriter.flush();
1✔
1038
        StreamState stream = streams.get(streamId);
1✔
1039
        if (stream != null) {
1✔
1040
          stream.transportReportStatus(
1✔
1041
              Status.INTERNAL.withDescription(
1✔
1042
                  String.format("Responded with RST_STREAM %s: %s", errorCode, reason)));
1✔
1043
          streamClosed(streamId, /*flush=*/ false);
1✔
1044
        }
1045
      }
1✔
1046
    }
1✔
1047

1048
    private void respondWithHttpError(
1049
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg) {
1050
      Metadata metadata = new Metadata();
1✔
1051
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1052
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1053
      List<Header> headers =
1✔
1054
          Headers.createHttpResponseHeaders(httpCode, "text/plain; charset=utf-8", metadata);
1✔
1055
      Buffer data = new Buffer().writeUtf8(msg);
1✔
1056

1057
      synchronized (lock) {
1✔
1058
        Http2ErrorStreamState stream =
1✔
1059
            new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
1✔
1060
        if (streams.isEmpty()) {
1✔
1061
          keepAliveEnforcer.onTransportActive();
1✔
1062
          if (maxConnectionIdleManager != null) {
1✔
1063
            maxConnectionIdleManager.onTransportActive();
1✔
1064
          }
1065
        }
1066
        streams.put(streamId, stream);
1✔
1067
        if (inFinished) {
1✔
1068
          stream.inboundDataReceived(new Buffer(), 0, 0, true);
×
1069
        }
1070
        frameWriter.headers(streamId, headers);
1✔
1071
        outboundFlow.data(
1✔
1072
            /*outFinished=*/true, stream.getOutboundFlowState(), data, /*flush=*/true);
1✔
1073
        outboundFlow.notifyWhenNoPendingData(
1✔
1074
            stream.getOutboundFlowState(), () -> rstOkAtEndOfHttpError(stream));
1✔
1075
      }
1✔
1076
    }
1✔
1077

1078
    private void rstOkAtEndOfHttpError(Http2ErrorStreamState stream) {
1079
      synchronized (lock) {
1✔
1080
        if (!stream.hasReceivedEndOfStream()) {
1✔
1081
          frameWriter.rstStream(stream.streamId, ErrorCode.NO_ERROR);
1✔
1082
        }
1083
        streamClosed(stream.streamId, /*flush=*/ true);
1✔
1084
      }
1✔
1085
    }
1✔
1086

1087
    private void respondWithGrpcError(
1088
        int streamId, boolean inFinished, Status.Code statusCode, String msg) {
1089
      Metadata metadata = new Metadata();
1✔
1090
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1091
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1092
      List<Header> headers = Headers.createResponseTrailers(metadata, false);
1✔
1093

1094
      synchronized (lock) {
1✔
1095
        frameWriter.synReply(true, streamId, headers);
1✔
1096
        if (!inFinished) {
1✔
1097
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
1098
        }
1099
        frameWriter.flush();
1✔
1100
      }
1✔
1101
    }
1✔
1102
  }
1103

1104
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1✔
1105
    @Override
1106
    public void ping() {
1107
      synchronized (lock) {
×
1108
        frameWriter.ping(false, 0, KEEPALIVE_PING);
×
1109
        frameWriter.flush();
×
1110
      }
×
1111
      tracer.reportKeepAliveSent();
×
1112
    }
×
1113

1114
    @Override
1115
    public void onPingTimeout() {
1116
      synchronized (lock) {
×
1117
        goAwayStatus = Status.UNAVAILABLE
×
1118
            .withDescription("Keepalive failed. Considering connection dead");
×
1119
        GrpcUtil.closeQuietly(socket);
×
1120
      }
×
1121
    }
×
1122
  }
1123

1124
  interface StreamState {
1125
    /** Must be holding 'lock' when calling. */
1126
    void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
1127

1128
    /** Must be holding 'lock' when calling. */
1129
    boolean hasReceivedEndOfStream();
1130

1131
    /** Must be holding 'lock' when calling. */
1132
    int inboundWindowAvailable();
1133

1134
    /** Must be holding 'lock' when calling. */
1135
    void transportReportStatus(Status status);
1136

1137
    /** Must be holding 'lock' when calling. */
1138
    void inboundRstReceived(Status status);
1139

1140
    OutboundFlowController.StreamState getOutboundFlowState();
1141
  }
1142

1143
  static class Http2ErrorStreamState implements StreamState, OutboundFlowController.Stream {
1144
    private final int streamId;
1145
    private final Object lock;
1146
    private final OutboundFlowController.StreamState outboundFlowState;
1147
    @GuardedBy("lock")
1148
    private int window;
1149
    @GuardedBy("lock")
1150
    private boolean receivedEndOfStream;
1151

1152
    Http2ErrorStreamState(
1153
        int streamId, Object lock, OutboundFlowController outboundFlow, int initialWindowSize) {
1✔
1154
      this.streamId = streamId;
1✔
1155
      this.lock = lock;
1✔
1156
      this.outboundFlowState = outboundFlow.createState(this, streamId);
1✔
1157
      this.window = initialWindowSize;
1✔
1158
    }
1✔
1159

1160
    @Override public void onSentBytes(int frameBytes) {}
1✔
1161

1162
    @Override public void inboundDataReceived(
1163
        Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
1164
      synchronized (lock) {
×
1165
        if (endOfStream) {
×
1166
          receivedEndOfStream = true;
×
1167
        }
1168
        window -= dataLength + paddingLength;
×
1169
        try {
1170
          frame.skip(frame.size()); // Recycle segments
×
1171
        } catch (IOException ex) {
×
1172
          throw new AssertionError(ex);
×
1173
        }
×
1174
      }
×
1175
    }
×
1176

1177
    @Override public boolean hasReceivedEndOfStream() {
1178
      synchronized (lock) {
1✔
1179
        return receivedEndOfStream;
1✔
1180
      }
1181
    }
1182

1183
    @Override public int inboundWindowAvailable() {
1184
      synchronized (lock) {
×
1185
        return window;
×
1186
      }
1187
    }
1188

1189
    @Override public void transportReportStatus(Status status) {}
×
1190

1191
    @Override public void inboundRstReceived(Status status) {}
×
1192

1193
    @Override public OutboundFlowController.StreamState getOutboundFlowState() {
1194
      synchronized (lock) {
1✔
1195
        return outboundFlowState;
1✔
1196
      }
1197
    }
1198
  }
1199
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc