• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19691

14 Feb 2025 06:23PM CUT coverage: 88.603% (-0.02%) from 88.626%
#19691

push

github

web-flow
xds:Cleanup to reduce test flakiness (#11895)

* don't process resourceDoesNotExist for watchers that have been cancelled.

* Change test to use an ArgumentMatcher instead of expecting that only the final result will be sent since depending on timing there may be configs sent for clusters being removed with their entries as errors.

34261 of 38668 relevant lines covered (88.6%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.47
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
20
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
21

22
import com.google.common.base.Preconditions;
23
import com.google.common.util.concurrent.Futures;
24
import com.google.common.util.concurrent.ListenableFuture;
25
import com.google.errorprone.annotations.concurrent.GuardedBy;
26
import io.grpc.Attributes;
27
import io.grpc.InternalChannelz;
28
import io.grpc.InternalLogId;
29
import io.grpc.InternalStatus;
30
import io.grpc.Metadata;
31
import io.grpc.ServerStreamTracer;
32
import io.grpc.Status;
33
import io.grpc.internal.GrpcUtil;
34
import io.grpc.internal.KeepAliveEnforcer;
35
import io.grpc.internal.KeepAliveManager;
36
import io.grpc.internal.LogExceptionRunnable;
37
import io.grpc.internal.MaxConnectionIdleManager;
38
import io.grpc.internal.ObjectPool;
39
import io.grpc.internal.SerializingExecutor;
40
import io.grpc.internal.ServerTransport;
41
import io.grpc.internal.ServerTransportListener;
42
import io.grpc.internal.StatsTraceContext;
43
import io.grpc.internal.TransportTracer;
44
import io.grpc.okhttp.internal.framed.ErrorCode;
45
import io.grpc.okhttp.internal.framed.FrameReader;
46
import io.grpc.okhttp.internal.framed.FrameWriter;
47
import io.grpc.okhttp.internal.framed.Header;
48
import io.grpc.okhttp.internal.framed.HeadersMode;
49
import io.grpc.okhttp.internal.framed.Http2;
50
import io.grpc.okhttp.internal.framed.Settings;
51
import io.grpc.okhttp.internal.framed.Variant;
52
import java.io.IOException;
53
import java.net.Socket;
54
import java.net.SocketException;
55
import java.util.List;
56
import java.util.Locale;
57
import java.util.Map;
58
import java.util.TreeMap;
59
import java.util.concurrent.Executor;
60
import java.util.concurrent.ScheduledExecutorService;
61
import java.util.concurrent.ScheduledFuture;
62
import java.util.concurrent.TimeUnit;
63
import java.util.logging.Level;
64
import java.util.logging.Logger;
65
import javax.annotation.Nullable;
66
import okio.Buffer;
67
import okio.BufferedSource;
68
import okio.ByteString;
69
import okio.Okio;
70

71
/**
72
 * OkHttp-based server transport.
73
 */
74
final class OkHttpServerTransport implements ServerTransport,
75
      ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport {
76
  private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName());
1✔
77
  private static final int GRACEFUL_SHUTDOWN_PING = 0x1111;
78

79
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1);
1✔
80

81
  private static final int KEEPALIVE_PING = 0xDEAD;
82
  private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method");
1✔
83
  private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT");
1✔
84
  private static final ByteString POST_METHOD = ByteString.encodeUtf8("POST");
1✔
85
  private static final ByteString SCHEME = ByteString.encodeUtf8(":scheme");
1✔
86
  private static final ByteString PATH = ByteString.encodeUtf8(":path");
1✔
87
  private static final ByteString AUTHORITY = ByteString.encodeUtf8(":authority");
1✔
88
  private static final ByteString CONNECTION = ByteString.encodeUtf8("connection");
1✔
89
  private static final ByteString HOST = ByteString.encodeUtf8("host");
1✔
90
  private static final ByteString TE = ByteString.encodeUtf8("te");
1✔
91
  private static final ByteString TE_TRAILERS = ByteString.encodeUtf8("trailers");
1✔
92
  private static final ByteString CONTENT_TYPE = ByteString.encodeUtf8("content-type");
1✔
93
  private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
1✔
94

95
  private final Config config;
96
  private final Variant variant = new Http2();
1✔
97
  private final TransportTracer tracer;
98
  private final InternalLogId logId;
99
  private Socket socket;
100
  private ServerTransportListener listener;
101
  private Executor transportExecutor;
102
  private ScheduledExecutorService scheduledExecutorService;
103
  private Attributes attributes;
104
  private KeepAliveManager keepAliveManager;
105
  private MaxConnectionIdleManager maxConnectionIdleManager;
106
  private ScheduledFuture<?> maxConnectionAgeMonitor;
107
  private final KeepAliveEnforcer keepAliveEnforcer;
108

109
  private final Object lock = new Object();
1✔
110
  @GuardedBy("lock")
111
  private boolean abruptShutdown;
112
  @GuardedBy("lock")
113
  private boolean gracefulShutdown;
114
  @GuardedBy("lock")
115
  private boolean handshakeShutdown;
116
  @GuardedBy("lock")
117
  private InternalChannelz.Security securityInfo;
118
  @GuardedBy("lock")
119
  private ExceptionHandlingFrameWriter frameWriter;
120
  @GuardedBy("lock")
121
  private OutboundFlowController outboundFlow;
122
  @GuardedBy("lock")
1✔
123
  private final Map<Integer, StreamState> streams = new TreeMap<>();
124
  @GuardedBy("lock")
125
  private int lastStreamId;
126
  @GuardedBy("lock")
1✔
127
  private int goAwayStreamId = Integer.MAX_VALUE;
128
  /**
129
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
130
   * streams may continue.
131
   */
132
  @GuardedBy("lock")
133
  private Status goAwayStatus;
134
  /** Non-{@code null} when gracefully shutting down and have not yet sent second GOAWAY. */
135
  @GuardedBy("lock")
136
  private ScheduledFuture<?> secondGoawayTimer;
137
  /** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
138
  @GuardedBy("lock")
139
  private ScheduledFuture<?> forcefulCloseTimer;
140
  @GuardedBy("lock")
1✔
141
  private Long gracefulShutdownPeriod = null;
142

143
  public OkHttpServerTransport(Config config, Socket bareSocket) {
1✔
144
    this.config = Preconditions.checkNotNull(config, "config");
1✔
145
    this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
1✔
146

147
    tracer = config.transportTracerFactory.create();
1✔
148
    tracer.setFlowControlWindowReader(this::readFlowControlWindow);
1✔
149
    logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
1✔
150
    transportExecutor = config.transportExecutorPool.getObject();
1✔
151
    scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
1✔
152
    keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
1✔
153
        config.permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
154
  }
1✔
155

156
  public void start(ServerTransportListener listener) {
157
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
158

159
    SerializingExecutor serializingExecutor = new SerializingExecutor(transportExecutor);
1✔
160
    serializingExecutor.execute(() -> startIo(serializingExecutor));
1✔
161
  }
1✔
162

163
  private void startIo(SerializingExecutor serializingExecutor) {
164
    try {
165
      // The socket implementation is lazily initialized, but had broken thread-safety 
166
      // for that laziness https://bugs.openjdk.org/browse/JDK-8278326. 
167
      // As a workaround, we lock to synchronize initialization with shutdown().
168
      synchronized (lock) {
1✔
169
        socket.setTcpNoDelay(true);
1✔
170
      }
1✔
171
      HandshakerSocketFactory.HandshakeResult result =
1✔
172
          config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
1✔
173
      synchronized (lock) {
1✔
174
        if (socket.isClosed()) {
1✔
175
          // The wrapped socket may not handle the underlying socket being closed by shutdown(). In
176
          // particular, SSLSocket hangs future reads if the underlying socket is already closed at
177
          // this point, even if you call sslSocket.close() later.
178
          result.socket.close();
×
179
          throw new SocketException("Socket close raced with handshake");
×
180
        }
181
        this.socket = result.socket;
1✔
182
      }
1✔
183
      this.attributes = result.attributes;
1✔
184

185
      int maxQueuedControlFrames = 10000;
1✔
186
      AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
187
      asyncSink.becomeConnected(Okio.sink(socket), socket);
1✔
188
      FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
189
          variant.newWriter(Okio.buffer(asyncSink), false));
1✔
190
      FrameWriter writeMonitoringFrameWriter = new ForwardingFrameWriter(rawFrameWriter) {
1✔
191
        @Override
192
        public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
193
            throws IOException {
194
          keepAliveEnforcer.resetCounters();
1✔
195
          super.synReply(outFinished, streamId, headerBlock);
1✔
196
        }
1✔
197

198
        @Override
199
        public void headers(int streamId, List<Header> headerBlock) throws IOException {
200
          keepAliveEnforcer.resetCounters();
1✔
201
          super.headers(streamId, headerBlock);
1✔
202
        }
1✔
203

204
        @Override
205
        public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
206
            throws IOException {
207
          keepAliveEnforcer.resetCounters();
1✔
208
          super.data(outFinished, streamId, source, byteCount);
1✔
209
        }
1✔
210
      };
211
      synchronized (lock) {
1✔
212
        this.securityInfo = result.securityInfo;
1✔
213

214
        // Handle FrameWriter exceptions centrally, since there are many callers. Note that
215
        // errors coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink
216
        // does not propagate syscall errors through the FrameWriter. But we handle the
217
        // AsyncSink failures with the same TransportExceptionHandler instance so it is all
218
        // mixed back together.
219
        frameWriter = new ExceptionHandlingFrameWriter(this, writeMonitoringFrameWriter);
1✔
220
        outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
221

222
        // These writes will be queued in the serializingExecutor waiting for this function to
223
        // return.
224
        frameWriter.connectionPreface();
1✔
225
        Settings settings = new Settings();
1✔
226
        OkHttpSettingsUtil.set(settings,
1✔
227
            OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
228
        OkHttpSettingsUtil.set(settings,
1✔
229
            OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
230
        if (config.maxConcurrentStreams != Integer.MAX_VALUE) {
1✔
231
          OkHttpSettingsUtil.set(settings,
1✔
232
              OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams);
233
        }
234
        frameWriter.settings(settings);
1✔
235
        if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
1✔
236
          frameWriter.windowUpdate(
1✔
237
              Utils.CONNECTION_STREAM_ID, config.flowControlWindow - Utils.DEFAULT_WINDOW_SIZE);
238
        }
239
        frameWriter.flush();
1✔
240
      }
1✔
241

242
      if (config.keepAliveTimeNanos != GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
243
        keepAliveManager = new KeepAliveManager(
1✔
244
            new KeepAlivePinger(), scheduledExecutorService, config.keepAliveTimeNanos,
245
            config.keepAliveTimeoutNanos, true);
246
        keepAliveManager.onTransportStarted();
1✔
247
      }
248

249
      if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
250
        maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
1✔
251
        maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
1✔
252
      }
253

254
      if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
255
        long maxConnectionAgeInNanos =
1✔
256
            (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
1✔
257
        maxConnectionAgeMonitor = scheduledExecutorService.schedule(
1✔
258
            new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
1✔
259
            maxConnectionAgeInNanos,
260
            TimeUnit.NANOSECONDS);
261
      }
262

263
      transportExecutor.execute(new FrameHandler(
1✔
264
          variant.newReader(Okio.buffer(Okio.source(socket)), false)));
1✔
265
    } catch (Error | IOException | RuntimeException ex) {
1✔
266
      synchronized (lock) {
1✔
267
        if (!handshakeShutdown) {
1✔
268
          log.log(Level.INFO, "Socket failed to handshake", ex);
1✔
269
        }
270
      }
1✔
271
      GrpcUtil.closeQuietly(socket);
1✔
272
      terminated();
1✔
273
    }
1✔
274
  }
1✔
275

276
  @Override
277
  public void shutdown() {
278
    shutdown(null);
1✔
279
  }
1✔
280

281
  private void shutdown(@Nullable Long gracefulShutdownPeriod) {
282
    synchronized (lock) {
1✔
283
      if (gracefulShutdown || abruptShutdown) {
1✔
284
        return;
1✔
285
      }
286
      gracefulShutdown = true;
1✔
287
      this.gracefulShutdownPeriod = gracefulShutdownPeriod;
1✔
288
      if (frameWriter == null) {
1✔
289
        handshakeShutdown = true;
1✔
290
        GrpcUtil.closeQuietly(socket);
1✔
291
      } else {
292
        // RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
293
        // we also set a timer to limit the upper bound in case the PING is excessively stalled or
294
        // the client is malicious.
295
        secondGoawayTimer = scheduledExecutorService.schedule(
1✔
296
            this::triggerGracefulSecondGoaway,
297
            GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
298
        frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
1✔
299
        frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
1✔
300
        frameWriter.flush();
1✔
301
      }
302
    }
1✔
303
  }
1✔
304

305
  private void triggerGracefulSecondGoaway() {
306
    synchronized (lock) {
1✔
307
      if (secondGoawayTimer == null) {
1✔
308
        return;
×
309
      }
310
      secondGoawayTimer.cancel(false);
1✔
311
      secondGoawayTimer = null;
1✔
312
      frameWriter.goAway(lastStreamId, ErrorCode.NO_ERROR, new byte[0]);
1✔
313
      goAwayStreamId = lastStreamId;
1✔
314
      if (streams.isEmpty()) {
1✔
315
        frameWriter.close();
1✔
316
      } else {
317
        frameWriter.flush();
1✔
318
      }
319
      if (gracefulShutdownPeriod != null) {
1✔
320
        forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
321
            this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
1✔
322
      }
323
    }
1✔
324
  }
1✔
325

326
  @Override
327
  public void shutdownNow(Status reason) {
328
    synchronized (lock) {
1✔
329
      if (frameWriter == null) {
1✔
330
        handshakeShutdown = true;
1✔
331
        GrpcUtil.closeQuietly(socket);
1✔
332
        return;
1✔
333
      }
334
    }
1✔
335
    abruptShutdown(ErrorCode.NO_ERROR, "", reason, true);
1✔
336
  }
1✔
337

338
  /**
339
   * Finish all active streams due to an IOException, then close the transport.
340
   */
341
  @Override
342
  public void onException(Throwable failureCause) {
343
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
344
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
345
    abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
346
  }
1✔
347

348
  private void abruptShutdown(
349
      ErrorCode errorCode, String moreDetail, Status reason, boolean rstStreams) {
350
    synchronized (lock) {
1✔
351
      if (abruptShutdown) {
1✔
352
        return;
1✔
353
      }
354
      abruptShutdown = true;
1✔
355
      goAwayStatus = reason;
1✔
356

357
      if (secondGoawayTimer != null) {
1✔
358
        secondGoawayTimer.cancel(false);
1✔
359
        secondGoawayTimer = null;
1✔
360
      }
361
      for (Map.Entry<Integer, StreamState> entry : streams.entrySet()) {
1✔
362
        if (rstStreams) {
1✔
363
          frameWriter.rstStream(entry.getKey(), ErrorCode.CANCEL);
1✔
364
        }
365
        entry.getValue().transportReportStatus(reason);
1✔
366
      }
1✔
367
      streams.clear();
1✔
368

369
      // RFC7540 §5.4.1. Attempt to inform the client what went wrong. We try to write the GOAWAY
370
      // _and then_ close our side of the connection. But place an upper-bound for how long we wait
371
      // for I/O with a timer, which forcefully closes the socket.
372
      frameWriter.goAway(lastStreamId, errorCode, moreDetail.getBytes(GrpcUtil.US_ASCII));
1✔
373
      goAwayStreamId = lastStreamId;
1✔
374
      frameWriter.close();
1✔
375
      forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
376
          this::triggerForcefulClose, 1, TimeUnit.SECONDS);
377
    }
1✔
378
  }
1✔
379

380
  private void triggerForcefulClose() {
381
    // Safe to do unconditionally; no need to check if timer cancellation raced
382
    GrpcUtil.closeQuietly(socket);
1✔
383
  }
1✔
384

385
  private void terminated() {
386
    synchronized (lock) {
1✔
387
      if (forcefulCloseTimer != null) {
1✔
388
        forcefulCloseTimer.cancel(false);
1✔
389
        forcefulCloseTimer = null;
1✔
390
      }
391
    }
1✔
392
    if (keepAliveManager != null) {
1✔
393
      keepAliveManager.onTransportTermination();
1✔
394
    }
395
    if (maxConnectionIdleManager != null) {
1✔
396
      maxConnectionIdleManager.onTransportTermination();
1✔
397
    }
398

399
    if (maxConnectionAgeMonitor != null) {
1✔
400
      maxConnectionAgeMonitor.cancel(false);
1✔
401
    }
402
    transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
1✔
403
    scheduledExecutorService =
1✔
404
        config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
1✔
405
    listener.transportTerminated();
1✔
406
  }
1✔
407

408
  @Override
409
  public ScheduledExecutorService getScheduledExecutorService() {
410
    return scheduledExecutorService;
1✔
411
  }
412

413
  @Override
414
  public ListenableFuture<InternalChannelz.SocketStats> getStats() {
415
    synchronized (lock) {
1✔
416
      return Futures.immediateFuture(new InternalChannelz.SocketStats(
1✔
417
          tracer.getStats(),
1✔
418
          socket.getLocalSocketAddress(),
1✔
419
          socket.getRemoteSocketAddress(),
1✔
420
          Utils.getSocketOptions(socket),
1✔
421
          securityInfo));
422
    }
423
  }
424

425
  private TransportTracer.FlowControlWindows readFlowControlWindow() {
426
    synchronized (lock) {
1✔
427
      long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
428
      // connectionUnacknowledgedBytesRead is only readable by FrameHandler, so we provide a lower
429
      // bound.
430
      long remote = (long) (config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO);
1✔
431
      return new TransportTracer.FlowControlWindows(local, remote);
1✔
432
    }
433
  }
434

435
  @Override
436
  public InternalLogId getLogId() {
437
    return logId;
1✔
438
  }
439

440
  @Override
441
  public OutboundFlowController.StreamState[] getActiveStreams() {
442
    synchronized (lock) {
1✔
443
      OutboundFlowController.StreamState[] flowStreams =
1✔
444
          new OutboundFlowController.StreamState[streams.size()];
1✔
445
      int i = 0;
1✔
446
      for (StreamState stream : streams.values()) {
1✔
447
        flowStreams[i++] = stream.getOutboundFlowState();
1✔
448
      }
1✔
449
      return flowStreams;
1✔
450
    }
451
  }
452

453
  /**
454
   * Notify the transport that the stream was closed. Any frames for the stream must be enqueued
455
   * before calling.
456
   */
457
  void streamClosed(int streamId, boolean flush) {
458
    synchronized (lock) {
1✔
459
      streams.remove(streamId);
1✔
460
      if (streams.isEmpty()) {
1✔
461
        keepAliveEnforcer.onTransportIdle();
1✔
462
        if (maxConnectionIdleManager != null) {
1✔
463
          maxConnectionIdleManager.onTransportIdle();
1✔
464
        }
465
      }
466
      if (gracefulShutdown && streams.isEmpty()) {
1✔
467
        frameWriter.close();
1✔
468
      } else {
469
        if (flush) {
1✔
470
          frameWriter.flush();
1✔
471
        }
472
      }
473
    }
1✔
474
  }
1✔
475

476
  private static String asciiString(ByteString value) {
477
    // utf8() string is cached in ByteString, so we prefer it when the contents are ASCII. This
478
    // provides benefit if the header was reused via HPACK.
479
    for (int i = 0; i < value.size(); i++) {
1✔
480
      if (value.getByte(i) < 0) {
1✔
481
        return value.string(GrpcUtil.US_ASCII);
×
482
      }
483
    }
484
    return value.utf8();
1✔
485
  }
486

487
  private static int headerFind(List<Header> header, ByteString key, int startIndex) {
488
    for (int i = startIndex; i < header.size(); i++) {
1✔
489
      if (header.get(i).name.equals(key)) {
1✔
490
        return i;
1✔
491
      }
492
    }
493
    return -1;
1✔
494
  }
495

496
  private static boolean headerContains(List<Header> header, ByteString key) {
497
    return headerFind(header, key, 0) != -1;
1✔
498
  }
499

500
  private static void headerRemove(List<Header> header, ByteString key) {
501
    int i = 0;
1✔
502
    while ((i = headerFind(header, key, i)) != -1) {
1✔
503
      header.remove(i);
1✔
504
    }
505
  }
1✔
506

507
  /** Assumes that caller requires this field, so duplicates are treated as missing. */
508
  private static ByteString headerGetRequiredSingle(List<Header> header, ByteString key) {
509
    int i = headerFind(header, key, 0);
1✔
510
    if (i == -1) {
1✔
511
      return null;
1✔
512
    }
513
    if (headerFind(header, key, i + 1) != -1) {
1✔
514
      return null;
1✔
515
    }
516
    return header.get(i).value;
1✔
517
  }
518

519
  static final class Config {
520
    final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
521
    final ObjectPool<Executor> transportExecutorPool;
522
    final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
523
    final TransportTracer.Factory transportTracerFactory;
524
    final HandshakerSocketFactory handshakerSocketFactory;
525
    final long keepAliveTimeNanos;
526
    final long keepAliveTimeoutNanos;
527
    final int flowControlWindow;
528
    final int maxInboundMessageSize;
529
    final int maxInboundMetadataSize;
530
    final long maxConnectionIdleNanos;
531
    final boolean permitKeepAliveWithoutCalls;
532
    final long permitKeepAliveTimeInNanos;
533
    final long maxConnectionAgeInNanos;
534
    final long maxConnectionAgeGraceInNanos;
535
    final int maxConcurrentStreams;
536

537
    public Config(
538
        OkHttpServerBuilder builder,
539
        List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
1✔
540
      this.streamTracerFactories = Preconditions.checkNotNull(
1✔
541
          streamTracerFactories, "streamTracerFactories");
542
      transportExecutorPool = Preconditions.checkNotNull(
1✔
543
          builder.transportExecutorPool, "transportExecutorPool");
544
      scheduledExecutorServicePool = Preconditions.checkNotNull(
1✔
545
          builder.scheduledExecutorServicePool, "scheduledExecutorServicePool");
546
      transportTracerFactory = Preconditions.checkNotNull(
1✔
547
          builder.transportTracerFactory, "transportTracerFactory");
548
      handshakerSocketFactory = Preconditions.checkNotNull(
1✔
549
          builder.handshakerSocketFactory, "handshakerSocketFactory");
550
      keepAliveTimeNanos = builder.keepAliveTimeNanos;
1✔
551
      keepAliveTimeoutNanos = builder.keepAliveTimeoutNanos;
1✔
552
      flowControlWindow = builder.flowControlWindow;
1✔
553
      maxInboundMessageSize = builder.maxInboundMessageSize;
1✔
554
      maxInboundMetadataSize = builder.maxInboundMetadataSize;
1✔
555
      maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
1✔
556
      permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
1✔
557
      permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
1✔
558
      maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
1✔
559
      maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
1✔
560
      maxConcurrentStreams = builder.maxConcurrentCallsPerConnection;
1✔
561
    }
1✔
562
  }
563

564
  /**
565
   * Runnable which reads frames and dispatches them to in flight calls.
566
   */
567
  class FrameHandler implements FrameReader.Handler, Runnable {
568
    private final OkHttpFrameLogger frameLogger =
1✔
569
        new OkHttpFrameLogger(Level.FINE, OkHttpServerTransport.class);
570
    private final FrameReader frameReader;
571
    private boolean receivedSettings;
572
    private int connectionUnacknowledgedBytesRead;
573

574
    public FrameHandler(FrameReader frameReader) {
1✔
575
      this.frameReader = frameReader;
1✔
576
    }
1✔
577

578
    @Override
579
    public void run() {
580
      String threadName = Thread.currentThread().getName();
1✔
581
      Thread.currentThread().setName("OkHttpServerTransport");
1✔
582
      try {
583
        frameReader.readConnectionPreface();
1✔
584
        if (!frameReader.nextFrame(this)) {
1✔
585
          connectionError(ErrorCode.INTERNAL_ERROR, "Failed to read initial SETTINGS");
1✔
586
          return;
1✔
587
        }
588
        if (!receivedSettings) {
1✔
589
          connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
590
              "First HTTP/2 frame must be SETTINGS. RFC7540 section 3.5");
591
          return;
1✔
592
        }
593
        // Read until the underlying socket closes.
594
        while (frameReader.nextFrame(this)) {
1✔
595
          if (keepAliveManager != null) {
1✔
596
            keepAliveManager.onDataReceived();
1✔
597
          }
598
        }
599
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
600
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
601
        // nothing, otherwise, we finish all streams since it's a real IO issue.
602
        Status status;
603
        synchronized (lock) {
1✔
604
          status = goAwayStatus;
1✔
605
        }
1✔
606
        if (status == null) {
1✔
607
          status = Status.UNAVAILABLE.withDescription("TCP connection closed or IOException");
1✔
608
        }
609
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
610
      } catch (Throwable t) {
1✔
611
        log.log(Level.WARNING, "Error decoding HTTP/2 frames", t);
1✔
612
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "Error in frame decoder",
1✔
613
            Status.INTERNAL.withDescription("Error decoding HTTP/2 frames").withCause(t), false);
1✔
614
      } finally {
615
        // Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
616
        try {
617
          GrpcUtil.exhaust(socket.getInputStream());
1✔
618
        } catch (IOException ex) {
1✔
619
          // Unable to wait, so just proceed to tear-down. The socket is probably already closed so
620
          // the GOAWAY can't be sent anyway.
621
        }
1✔
622
        GrpcUtil.closeQuietly(socket);
1✔
623
        terminated();
1✔
624
        Thread.currentThread().setName(threadName);
1✔
625
      }
626
    }
1✔
627

628
    /**
629
     * Handle HTTP2 HEADER and CONTINUATION frames.
630
     */
631
    @Override
632
    public void headers(boolean outFinished,
633
        boolean inFinished,
634
        int streamId,
635
        int associatedStreamId,
636
        List<Header> headerBlock,
637
        HeadersMode headersMode) {
638
      frameLogger.logHeaders(
1✔
639
          OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
640
      // streamId == 0 checking is in HTTP/2 decoder
641
      if ((streamId & 1) == 0) {
1✔
642
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
643
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
644
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
645
        return;
1✔
646
      }
647
      boolean newStream;
648
      synchronized (lock) {
1✔
649
        if (streamId > goAwayStreamId) {
1✔
650
          return;
×
651
        }
652
        newStream = streamId > lastStreamId;
1✔
653
        if (newStream) {
1✔
654
          lastStreamId = streamId;
1✔
655
          if (config.maxConcurrentStreams <= streams.size()) {
1✔
656
            streamError(streamId, ErrorCode.REFUSED_STREAM,
1✔
657
                "Max concurrent stream reached. RFC7540 section 5.1.2");
658
            return;
1✔
659
          }
660
        }
661
      }
1✔
662

663
      int metadataSize = headerBlockSize(headerBlock);
1✔
664
      if (metadataSize > config.maxInboundMetadataSize) {
1✔
665
        respondWithHttpError(streamId, inFinished, 431, Status.Code.RESOURCE_EXHAUSTED,
1✔
666
            String.format(
1✔
667
                Locale.US,
668
                "Request metadata larger than %d: %d",
669
                config.maxInboundMetadataSize,
1✔
670
                metadataSize));
1✔
671
        return;
1✔
672
      }
673

674
      headerRemove(headerBlock, ByteString.EMPTY);
1✔
675

676
      ByteString httpMethod = null;
1✔
677
      ByteString scheme = null;
1✔
678
      ByteString path = null;
1✔
679
      ByteString authority = null;
1✔
680
      while (headerBlock.size() > 0 && headerBlock.get(0).name.getByte(0) == ':') {
1✔
681
        Header header = headerBlock.remove(0);
1✔
682
        if (HTTP_METHOD.equals(header.name) && httpMethod == null) {
1✔
683
          httpMethod = header.value;
1✔
684
        } else if (SCHEME.equals(header.name) && scheme == null) {
1✔
685
          scheme = header.value;
1✔
686
        } else if (PATH.equals(header.name) && path == null) {
1✔
687
          path = header.value;
1✔
688
        } else if (AUTHORITY.equals(header.name) && authority == null) {
1✔
689
          authority = header.value;
1✔
690
        } else {
691
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
692
              "Unexpected pseudo header. RFC7540 section 8.1.2.1");
693
          return;
1✔
694
        }
695
      }
1✔
696
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
697
        if (headerBlock.get(i).name.getByte(0) == ':') {
1✔
698
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
699
              "Pseudo header not before regular headers. RFC7540 section 8.1.2.1");
700
          return;
1✔
701
        }
702
      }
703
      if (!CONNECT_METHOD.equals(httpMethod)
1✔
704
          && newStream
705
          && (httpMethod == null || scheme == null || path == null)) {
706
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
707
            "Missing required pseudo header. RFC7540 section 8.1.2.3");
708
        return;
1✔
709
      }
710
      if (headerContains(headerBlock, CONNECTION)) {
1✔
711
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
712
            "Connection-specific headers not permitted. RFC7540 section 8.1.2.2");
713
        return;
1✔
714
      }
715

716
      if (!newStream) {
1✔
717
        if (inFinished) {
1✔
718
          synchronized (lock) {
1✔
719
            StreamState stream = streams.get(streamId);
1✔
720
            if (stream == null) {
1✔
721
              streamError(streamId, ErrorCode.STREAM_CLOSED, "Received headers for closed stream");
×
722
              return;
×
723
            }
724
            if (stream.hasReceivedEndOfStream()) {
1✔
725
              streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
726
                  "Received HEADERS for half-closed (remote) stream. RFC7540 section 5.1");
727
              return;
1✔
728
            }
729
            // Ignore the trailers, but still half-close the stream
730
            stream.inboundDataReceived(new Buffer(), 0, 0, true);
1✔
731
            return;
1✔
732
          }
733
        } else {
734
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
735
              "Headers disallowed in the middle of the stream. RFC7540 section 8.1");
736
          return;
1✔
737
        }
738
      }
739

740
      if (authority == null) {
1✔
741
        int i = headerFind(headerBlock, HOST, 0);
1✔
742
        if (i != -1) {
1✔
743
          if (headerFind(headerBlock, HOST, i + 1) != -1) {
1✔
744
            respondWithHttpError(streamId, inFinished, 400, Status.Code.INTERNAL,
1✔
745
                "Multiple host headers disallowed. RFC7230 section 5.4");
746
            return;
1✔
747
          }
748
          authority = headerBlock.get(i).value;
1✔
749
        }
750
      }
751
      headerRemove(headerBlock, HOST);
1✔
752

753
      // Remove the leading slash of the path and get the fully qualified method name
754
      if (path.size() == 0 || path.getByte(0) != '/') {
1✔
755
        respondWithHttpError(streamId, inFinished, 404, Status.Code.UNIMPLEMENTED,
1✔
756
            "Expected path to start with /: " + asciiString(path));
1✔
757
        return;
1✔
758
      }
759
      String method = asciiString(path).substring(1);
1✔
760

761
      ByteString contentType = headerGetRequiredSingle(headerBlock, CONTENT_TYPE);
1✔
762
      if (contentType == null) {
1✔
763
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
764
            "Content-Type is missing or duplicated");
765
        return;
1✔
766
      }
767
      String contentTypeString = asciiString(contentType);
1✔
768
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
769
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
770
            "Content-Type is not supported: " + contentTypeString);
771
        return;
1✔
772
      }
773

774
      if (!POST_METHOD.equals(httpMethod)) {
1✔
775
        respondWithHttpError(streamId, inFinished, 405, Status.Code.INTERNAL,
1✔
776
            "HTTP Method is not supported: " + asciiString(httpMethod));
1✔
777
        return;
1✔
778
      }
779

780
      ByteString te = headerGetRequiredSingle(headerBlock, TE);
1✔
781
      if (!TE_TRAILERS.equals(te)) {
1✔
782
        respondWithGrpcError(streamId, inFinished, Status.Code.INTERNAL,
1✔
783
            String.format("Expected header TE: %s, but %s is received. "
1✔
784
              + "Some intermediate proxy may not support trailers",
785
              asciiString(TE_TRAILERS), te == null ? "<missing>" : asciiString(te)));
1✔
786
        return;
1✔
787
      }
788
      headerRemove(headerBlock, CONTENT_LENGTH);
1✔
789

790
      Metadata metadata = Utils.convertHeaders(headerBlock);
1✔
791
      StatsTraceContext statsTraceCtx =
1✔
792
          StatsTraceContext.newServerContext(config.streamTracerFactories, method, metadata);
1✔
793
      synchronized (lock) {
1✔
794
        OkHttpServerStream.TransportState stream = new OkHttpServerStream.TransportState(
1✔
795
            OkHttpServerTransport.this,
796
            streamId,
797
            config.maxInboundMessageSize,
1✔
798
            statsTraceCtx,
799
            lock,
1✔
800
            frameWriter,
1✔
801
            outboundFlow,
1✔
802
            config.flowControlWindow,
1✔
803
            tracer,
1✔
804
            method);
805
        OkHttpServerStream streamForApp = new OkHttpServerStream(
1✔
806
            stream,
807
            attributes,
1✔
808
            authority == null ? null : asciiString(authority),
1✔
809
            statsTraceCtx,
810
            tracer);
1✔
811
        if (streams.isEmpty()) {
1✔
812
          keepAliveEnforcer.onTransportActive();
1✔
813
          if (maxConnectionIdleManager != null) {
1✔
814
            maxConnectionIdleManager.onTransportActive();
1✔
815
          }
816
        }
817
        streams.put(streamId, stream);
1✔
818
        listener.streamCreated(streamForApp, method, metadata);
1✔
819
        stream.onStreamAllocated();
1✔
820
        if (inFinished) {
1✔
821
          stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
1✔
822
        }
823
      }
1✔
824
    }
1✔
825

826
    private int headerBlockSize(List<Header> headerBlock) {
827
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
828
      long size = 0;
1✔
829
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
830
        Header header = headerBlock.get(i);
1✔
831
        size += 32 + header.name.size() + header.value.size();
1✔
832
      }
833
      size = Math.min(size, Integer.MAX_VALUE);
1✔
834
      return (int) size;
1✔
835
    }
836

837
    /**
838
     * Handle an HTTP2 DATA frame.
839
     */
840
    @Override
841
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
842
                     int paddedLength)
843
        throws IOException {
844
      frameLogger.logData(
1✔
845
          OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
1✔
846
      if (streamId == 0) {
1✔
847
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
848
            "Stream 0 is reserved for control messages. RFC7540 section 5.1.1");
849
        return;
1✔
850
      }
851
      if ((streamId & 1) == 0) {
1✔
852
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
853
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
854
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
855
        return;
1✔
856
      }
857

858
      // Wait until the frame is complete. We only support 16 KiB frames, and the max permitted in
859
      // HTTP/2 is 16 MiB. This is verified in OkHttp's Http2 deframer, so we don't need to be
860
      // concerned with the window being exceeded at this point.
861
      in.require(length);
1✔
862

863
      synchronized (lock) {
1✔
864
        StreamState stream = streams.get(streamId);
1✔
865
        if (stream == null) {
1✔
866
          in.skip(length);
1✔
867
          streamError(streamId, ErrorCode.STREAM_CLOSED, "Received data for closed stream");
1✔
868
          return;
1✔
869
        }
870
        if (stream.hasReceivedEndOfStream()) {
1✔
871
          in.skip(length);
1✔
872
          streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
873
              "Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
874
          return;
1✔
875
        }
876
        if (stream.inboundWindowAvailable() < paddedLength) {
1✔
877
          in.skip(length);
1✔
878
          streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
1✔
879
              "Received DATA size exceeded window size. RFC7540 section 6.9");
880
          return;
1✔
881
        }
882
        Buffer buf = new Buffer();
1✔
883
        buf.write(in.getBuffer(), length);
1✔
884
        stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
1✔
885
      }
1✔
886

887
      // connection window update
888
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
889
      if (connectionUnacknowledgedBytesRead
1✔
890
          >= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
891
        synchronized (lock) {
1✔
892
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
893
          frameWriter.flush();
1✔
894
        }
1✔
895
        connectionUnacknowledgedBytesRead = 0;
1✔
896
      }
897
    }
1✔
898

899
    @Override
900
    public void rstStream(int streamId, ErrorCode errorCode) {
901
      frameLogger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
902
      // streamId == 0 checking is in HTTP/2 decoder
903

904
      if (!(ErrorCode.NO_ERROR.equals(errorCode)
1✔
905
            || ErrorCode.CANCEL.equals(errorCode)
1✔
906
            || ErrorCode.STREAM_CLOSED.equals(errorCode))) {
1✔
907
        log.log(Level.INFO, "Received RST_STREAM: " + errorCode);
×
908
      }
909
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
910
          .withDescription("RST_STREAM");
1✔
911
      synchronized (lock) {
1✔
912
        StreamState stream = streams.get(streamId);
1✔
913
        if (stream != null) {
1✔
914
          stream.inboundRstReceived(status);
1✔
915
          streamClosed(streamId, /*flush=*/ false);
1✔
916
        }
917
      }
1✔
918
    }
1✔
919

920
    @Override
921
    public void settings(boolean clearPrevious, Settings settings) {
922
      frameLogger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
923
      synchronized (lock) {
1✔
924
        boolean outboundWindowSizeIncreased = false;
1✔
925
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
926
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
927
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
928
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
929
        }
930

931
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
932
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
933
        // otherwise it will cause a stream error (RST_STREAM).
934
        frameWriter.ackSettings(settings);
1✔
935
        frameWriter.flush();
1✔
936
        if (!receivedSettings) {
1✔
937
          receivedSettings = true;
1✔
938
          attributes = listener.transportReady(attributes);
1✔
939
        }
940

941
        // send any pending bytes / streams
942
        if (outboundWindowSizeIncreased) {
1✔
943
          outboundFlow.writeStreams();
1✔
944
        }
945
      }
1✔
946
    }
1✔
947

948
    @Override
949
    public void ping(boolean ack, int payload1, int payload2) {
950
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
951
        abruptShutdown(ErrorCode.ENHANCE_YOUR_CALM, "too_many_pings",
1✔
952
            Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"), false);
1✔
953
        return;
1✔
954
      }
955
      long payload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
956
      if (!ack) {
1✔
957
        frameLogger.logPing(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
958
        synchronized (lock) {
1✔
959
          frameWriter.ping(true, payload1, payload2);
1✔
960
          frameWriter.flush();
1✔
961
        }
1✔
962
      } else {
963
        frameLogger.logPingAck(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
964
        if (KEEPALIVE_PING == payload) {
1✔
965
          return;
×
966
        }
967
        if (GRACEFUL_SHUTDOWN_PING == payload) {
1✔
968
          triggerGracefulSecondGoaway();
1✔
969
          return;
1✔
970
        }
971
        log.log(Level.INFO, "Received unexpected ping ack: " + payload);
×
972
      }
973
    }
1✔
974

975
    @Override
976
    public void ackSettings() {}
1✔
977

978
    @Override
979
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
980
      frameLogger.logGoAway(
1✔
981
          OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
982
      String description = String.format("Received GOAWAY: %s '%s'", errorCode, debugData.utf8());
1✔
983
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
984
          .withDescription(description);
1✔
985
      if (!ErrorCode.NO_ERROR.equals(errorCode)) {
1✔
986
        log.log(
×
987
            Level.WARNING, "Received GOAWAY: {0} {1}", new Object[] {errorCode, debugData.utf8()});
×
988
      }
989
      synchronized (lock) {
1✔
990
        goAwayStatus = status;
1✔
991
      }
1✔
992
    }
1✔
993

994
    @Override
995
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
996
        throws IOException {
997
      frameLogger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
998
          streamId, promisedStreamId, requestHeaders);
999
      // streamId == 0 checking is in HTTP/2 decoder.
1000
      // The server doesn't use PUSH_PROMISE, so all even streams are IDLE, and odd streams are not
1001
      // peer-initiated.
1002
      connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
1003
          "PUSH_PROMISE only allowed on peer-initiated streams. RFC7540 section 6.6");
1004
    }
1✔
1005

1006
    @Override
1007
    public void windowUpdate(int streamId, long delta) {
1008
      frameLogger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1009
      // delta == 0 checking is in HTTP/2 decoder. And it isn't quite right, as it will always cause
1010
      // a GOAWAY. RFC7540 section 6.9 says to use RST_STREAM if the stream id isn't 0. Doesn't
1011
      // matter much though.
1012
      synchronized (lock) {
1✔
1013
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1014
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1015
        } else {
1016
          StreamState stream = streams.get(streamId);
1✔
1017
          if (stream != null) {
1✔
1018
            outboundFlow.windowUpdate(stream.getOutboundFlowState(), (int) delta);
1✔
1019
          }
1020
        }
1021
      }
1✔
1022
    }
1✔
1023

1024
    @Override
1025
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1026
      frameLogger.logPriority(
1✔
1027
          OkHttpFrameLogger.Direction.INBOUND, streamId, streamDependency, weight, exclusive);
1028
      // streamId == 0 checking is in HTTP/2 decoder.
1029
      // Ignore priority change.
1030
    }
1✔
1031

1032
    @Override
1033
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1034
        int port, long maxAge) {}
×
1035

1036
    /**
1037
     * Send GOAWAY to the server, then finish all streams and close the transport. RFC7540 §5.4.1.
1038
     */
1039
    private void connectionError(ErrorCode errorCode, String moreDetail) {
1040
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1041
          .withDescription(String.format("HTTP2 connection error: %s '%s'", errorCode, moreDetail));
1✔
1042
      abruptShutdown(errorCode, moreDetail, status, false);
1✔
1043
    }
1✔
1044

1045
    /**
1046
     * Respond with RST_STREAM, making sure to kill the associated stream if it exists. Reason will
1047
     * rarely be seen. RFC7540 §5.4.2.
1048
     */
1049
    private void streamError(int streamId, ErrorCode errorCode, String reason) {
1050
      if (errorCode == ErrorCode.PROTOCOL_ERROR) {
1✔
1051
        log.log(
1✔
1052
            Level.FINE, "Responding with RST_STREAM {0}: {1}", new Object[] {errorCode, reason});
1053
      }
1054
      synchronized (lock) {
1✔
1055
        frameWriter.rstStream(streamId, errorCode);
1✔
1056
        frameWriter.flush();
1✔
1057
        StreamState stream = streams.get(streamId);
1✔
1058
        if (stream != null) {
1✔
1059
          stream.transportReportStatus(
1✔
1060
              Status.INTERNAL.withDescription(
1✔
1061
                  String.format("Responded with RST_STREAM %s: %s", errorCode, reason)));
1✔
1062
          streamClosed(streamId, /*flush=*/ false);
1✔
1063
        }
1064
      }
1✔
1065
    }
1✔
1066

1067
    private void respondWithHttpError(
1068
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg) {
1069
      Metadata metadata = new Metadata();
1✔
1070
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1071
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1072
      List<Header> headers =
1✔
1073
          Headers.createHttpResponseHeaders(httpCode, "text/plain; charset=utf-8", metadata);
1✔
1074
      Buffer data = new Buffer().writeUtf8(msg);
1✔
1075

1076
      synchronized (lock) {
1✔
1077
        Http2ErrorStreamState stream =
1✔
1078
            new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
1✔
1079
        if (streams.isEmpty()) {
1✔
1080
          keepAliveEnforcer.onTransportActive();
1✔
1081
          if (maxConnectionIdleManager != null) {
1✔
1082
            maxConnectionIdleManager.onTransportActive();
1✔
1083
          }
1084
        }
1085
        streams.put(streamId, stream);
1✔
1086
        if (inFinished) {
1✔
1087
          stream.inboundDataReceived(new Buffer(), 0, 0, true);
×
1088
        }
1089
        frameWriter.headers(streamId, headers);
1✔
1090
        outboundFlow.data(
1✔
1091
            /*outFinished=*/true, stream.getOutboundFlowState(), data, /*flush=*/true);
1✔
1092
        outboundFlow.notifyWhenNoPendingData(
1✔
1093
            stream.getOutboundFlowState(), () -> rstOkAtEndOfHttpError(stream));
1✔
1094
      }
1✔
1095
    }
1✔
1096

1097
    private void rstOkAtEndOfHttpError(Http2ErrorStreamState stream) {
1098
      synchronized (lock) {
1✔
1099
        if (!stream.hasReceivedEndOfStream()) {
1✔
1100
          frameWriter.rstStream(stream.streamId, ErrorCode.NO_ERROR);
1✔
1101
        }
1102
        streamClosed(stream.streamId, /*flush=*/ true);
1✔
1103
      }
1✔
1104
    }
1✔
1105

1106
    private void respondWithGrpcError(
1107
        int streamId, boolean inFinished, Status.Code statusCode, String msg) {
1108
      Metadata metadata = new Metadata();
1✔
1109
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1110
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1111
      List<Header> headers = Headers.createResponseTrailers(metadata, false);
1✔
1112

1113
      synchronized (lock) {
1✔
1114
        frameWriter.synReply(true, streamId, headers);
1✔
1115
        if (!inFinished) {
1✔
1116
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
1117
        }
1118
        frameWriter.flush();
1✔
1119
      }
1✔
1120
    }
1✔
1121
  }
1122

1123
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1✔
1124
    @Override
1125
    public void ping() {
1126
      synchronized (lock) {
×
1127
        frameWriter.ping(false, 0, KEEPALIVE_PING);
×
1128
        frameWriter.flush();
×
1129
      }
×
1130
      tracer.reportKeepAliveSent();
×
1131
    }
×
1132

1133
    @Override
1134
    public void onPingTimeout() {
1135
      synchronized (lock) {
×
1136
        goAwayStatus = Status.UNAVAILABLE
×
1137
            .withDescription("Keepalive failed. Considering connection dead");
×
1138
        GrpcUtil.closeQuietly(socket);
×
1139
      }
×
1140
    }
×
1141
  }
1142

1143
  interface StreamState {
1144
    /** Must be holding 'lock' when calling. */
1145
    void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
1146

1147
    /** Must be holding 'lock' when calling. */
1148
    boolean hasReceivedEndOfStream();
1149

1150
    /** Must be holding 'lock' when calling. */
1151
    int inboundWindowAvailable();
1152

1153
    /** Must be holding 'lock' when calling. */
1154
    void transportReportStatus(Status status);
1155

1156
    /** Must be holding 'lock' when calling. */
1157
    void inboundRstReceived(Status status);
1158

1159
    OutboundFlowController.StreamState getOutboundFlowState();
1160
  }
1161

1162
  static class Http2ErrorStreamState implements StreamState, OutboundFlowController.Stream {
1163
    private final int streamId;
1164
    private final Object lock;
1165
    private final OutboundFlowController.StreamState outboundFlowState;
1166
    @GuardedBy("lock")
1167
    private int window;
1168
    @GuardedBy("lock")
1169
    private boolean receivedEndOfStream;
1170

1171
    Http2ErrorStreamState(
1172
        int streamId, Object lock, OutboundFlowController outboundFlow, int initialWindowSize) {
1✔
1173
      this.streamId = streamId;
1✔
1174
      this.lock = lock;
1✔
1175
      this.outboundFlowState = outboundFlow.createState(this, streamId);
1✔
1176
      this.window = initialWindowSize;
1✔
1177
    }
1✔
1178

1179
    @Override public void onSentBytes(int frameBytes) {}
1✔
1180

1181
    @Override public void inboundDataReceived(
1182
        Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
1183
      synchronized (lock) {
×
1184
        if (endOfStream) {
×
1185
          receivedEndOfStream = true;
×
1186
        }
1187
        window -= dataLength + paddingLength;
×
1188
        try {
1189
          frame.skip(frame.size()); // Recycle segments
×
1190
        } catch (IOException ex) {
×
1191
          throw new AssertionError(ex);
×
1192
        }
×
1193
      }
×
1194
    }
×
1195

1196
    @Override public boolean hasReceivedEndOfStream() {
1197
      synchronized (lock) {
1✔
1198
        return receivedEndOfStream;
1✔
1199
      }
1200
    }
1201

1202
    @Override public int inboundWindowAvailable() {
1203
      synchronized (lock) {
×
1204
        return window;
×
1205
      }
1206
    }
1207

1208
    @Override public void transportReportStatus(Status status) {}
×
1209

1210
    @Override public void inboundRstReceived(Status status) {}
×
1211

1212
    @Override public OutboundFlowController.StreamState getOutboundFlowState() {
1213
      synchronized (lock) {
1✔
1214
        return outboundFlowState;
1✔
1215
      }
1216
    }
1217
  }
1218
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc