• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19195

06 May 2024 06:38PM UTC coverage: 88.377% (+0.03%) from 88.343%
#19195

push

github

web-flow
Add gauge metric API and Otel implementation

This is needed by gRFC A78 for xds metrics, and for RLS metrics. Since
gauges need to acquire a lock (or other synchronization) in the
callback, the callback allows batching multiple gauges together to avoid
acquiring-and-requiring such locks.

Unlike other metrics, gauges are reported on-demand to the MetricSink.
This means not all sinks will receive the same data, as the sinks will
ask for the gauges at different times.

31571 of 35723 relevant lines covered (88.38%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.77
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
20
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
21

22
import com.google.common.base.Preconditions;
23
import com.google.common.util.concurrent.Futures;
24
import com.google.common.util.concurrent.ListenableFuture;
25
import io.grpc.Attributes;
26
import io.grpc.InternalChannelz;
27
import io.grpc.InternalLogId;
28
import io.grpc.InternalStatus;
29
import io.grpc.Metadata;
30
import io.grpc.ServerStreamTracer;
31
import io.grpc.Status;
32
import io.grpc.internal.GrpcUtil;
33
import io.grpc.internal.KeepAliveEnforcer;
34
import io.grpc.internal.KeepAliveManager;
35
import io.grpc.internal.LogExceptionRunnable;
36
import io.grpc.internal.MaxConnectionIdleManager;
37
import io.grpc.internal.ObjectPool;
38
import io.grpc.internal.SerializingExecutor;
39
import io.grpc.internal.ServerTransport;
40
import io.grpc.internal.ServerTransportListener;
41
import io.grpc.internal.StatsTraceContext;
42
import io.grpc.internal.TransportTracer;
43
import io.grpc.okhttp.internal.framed.ErrorCode;
44
import io.grpc.okhttp.internal.framed.FrameReader;
45
import io.grpc.okhttp.internal.framed.FrameWriter;
46
import io.grpc.okhttp.internal.framed.Header;
47
import io.grpc.okhttp.internal.framed.HeadersMode;
48
import io.grpc.okhttp.internal.framed.Http2;
49
import io.grpc.okhttp.internal.framed.Settings;
50
import io.grpc.okhttp.internal.framed.Variant;
51
import java.io.IOException;
52
import java.net.Socket;
53
import java.util.List;
54
import java.util.Locale;
55
import java.util.Map;
56
import java.util.TreeMap;
57
import java.util.concurrent.Executor;
58
import java.util.concurrent.ScheduledExecutorService;
59
import java.util.concurrent.ScheduledFuture;
60
import java.util.concurrent.TimeUnit;
61
import java.util.logging.Level;
62
import java.util.logging.Logger;
63
import javax.annotation.Nullable;
64
import javax.annotation.concurrent.GuardedBy;
65
import okio.Buffer;
66
import okio.BufferedSource;
67
import okio.ByteString;
68
import okio.Okio;
69

70
/**
71
 * OkHttp-based server transport.
72
 */
73
final class OkHttpServerTransport implements ServerTransport,
74
      ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport {
75
  private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName());
1✔
76
  private static final int GRACEFUL_SHUTDOWN_PING = 0x1111;
77

78
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1);
1✔
79

80
  private static final int KEEPALIVE_PING = 0xDEAD;
81
  private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method");
1✔
82
  private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT");
1✔
83
  private static final ByteString POST_METHOD = ByteString.encodeUtf8("POST");
1✔
84
  private static final ByteString SCHEME = ByteString.encodeUtf8(":scheme");
1✔
85
  private static final ByteString PATH = ByteString.encodeUtf8(":path");
1✔
86
  private static final ByteString AUTHORITY = ByteString.encodeUtf8(":authority");
1✔
87
  private static final ByteString CONNECTION = ByteString.encodeUtf8("connection");
1✔
88
  private static final ByteString HOST = ByteString.encodeUtf8("host");
1✔
89
  private static final ByteString TE = ByteString.encodeUtf8("te");
1✔
90
  private static final ByteString TE_TRAILERS = ByteString.encodeUtf8("trailers");
1✔
91
  private static final ByteString CONTENT_TYPE = ByteString.encodeUtf8("content-type");
1✔
92
  private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
1✔
93

94
  private final Config config;
95
  private final Variant variant = new Http2();
1✔
96
  private final TransportTracer tracer;
97
  private final InternalLogId logId;
98
  private Socket socket;
99
  private ServerTransportListener listener;
100
  private Executor transportExecutor;
101
  private ScheduledExecutorService scheduledExecutorService;
102
  private Attributes attributes;
103
  private KeepAliveManager keepAliveManager;
104
  private MaxConnectionIdleManager maxConnectionIdleManager;
105
  private ScheduledFuture<?> maxConnectionAgeMonitor;
106
  private final KeepAliveEnforcer keepAliveEnforcer;
107

108
  private final Object lock = new Object();
1✔
109
  @GuardedBy("lock")
110
  private boolean abruptShutdown;
111
  @GuardedBy("lock")
112
  private boolean gracefulShutdown;
113
  @GuardedBy("lock")
114
  private boolean handshakeShutdown;
115
  @GuardedBy("lock")
116
  private InternalChannelz.Security securityInfo;
117
  @GuardedBy("lock")
118
  private ExceptionHandlingFrameWriter frameWriter;
119
  @GuardedBy("lock")
120
  private OutboundFlowController outboundFlow;
121
  @GuardedBy("lock")
1✔
122
  private final Map<Integer, StreamState> streams = new TreeMap<>();
123
  @GuardedBy("lock")
124
  private int lastStreamId;
125
  @GuardedBy("lock")
1✔
126
  private int goAwayStreamId = Integer.MAX_VALUE;
127
  /**
128
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
129
   * streams may continue.
130
   */
131
  @GuardedBy("lock")
132
  private Status goAwayStatus;
133
  /** Non-{@code null} when gracefully shutting down and have not yet sent second GOAWAY. */
134
  @GuardedBy("lock")
135
  private ScheduledFuture<?> secondGoawayTimer;
136
  /** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
137
  @GuardedBy("lock")
138
  private ScheduledFuture<?> forcefulCloseTimer;
139
  @GuardedBy("lock")
1✔
140
  private Long gracefulShutdownPeriod = null;
141

142
  public OkHttpServerTransport(Config config, Socket bareSocket) {
1✔
143
    this.config = Preconditions.checkNotNull(config, "config");
1✔
144
    this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
1✔
145

146
    tracer = config.transportTracerFactory.create();
1✔
147
    tracer.setFlowControlWindowReader(this::readFlowControlWindow);
1✔
148
    logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
1✔
149
    transportExecutor = config.transportExecutorPool.getObject();
1✔
150
    scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
1✔
151
    keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
1✔
152
        config.permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
153
  }
1✔
154

155
  public void start(ServerTransportListener listener) {
156
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
157

158
    SerializingExecutor serializingExecutor = new SerializingExecutor(transportExecutor);
1✔
159
    serializingExecutor.execute(() -> startIo(serializingExecutor));
1✔
160
  }
1✔
161

162
  private void startIo(SerializingExecutor serializingExecutor) {
163
    try {
164
      // The socket implementation is lazily initialized, but had broken thread-safety 
165
      // for that laziness https://bugs.openjdk.org/browse/JDK-8278326. 
166
      // As a workaround, we lock to synchronize initialization with shutdown().
167
      synchronized (lock) {
1✔
168
        socket.setTcpNoDelay(true);
1✔
169
      }
1✔
170
      HandshakerSocketFactory.HandshakeResult result =
1✔
171
          config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
1✔
172
      synchronized (lock) {
1✔
173
        this.socket = result.socket;
1✔
174
      }
1✔
175
      this.attributes = result.attributes;
1✔
176

177
      int maxQueuedControlFrames = 10000;
1✔
178
      AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
179
      asyncSink.becomeConnected(Okio.sink(socket), socket);
1✔
180
      FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
181
          variant.newWriter(Okio.buffer(asyncSink), false));
1✔
182
      FrameWriter writeMonitoringFrameWriter = new ForwardingFrameWriter(rawFrameWriter) {
1✔
183
        @Override
184
        public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
185
            throws IOException {
186
          keepAliveEnforcer.resetCounters();
1✔
187
          super.synReply(outFinished, streamId, headerBlock);
1✔
188
        }
1✔
189

190
        @Override
191
        public void headers(int streamId, List<Header> headerBlock) throws IOException {
192
          keepAliveEnforcer.resetCounters();
1✔
193
          super.headers(streamId, headerBlock);
1✔
194
        }
1✔
195

196
        @Override
197
        public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
198
            throws IOException {
199
          keepAliveEnforcer.resetCounters();
1✔
200
          super.data(outFinished, streamId, source, byteCount);
1✔
201
        }
1✔
202
      };
203
      synchronized (lock) {
1✔
204
        this.securityInfo = result.securityInfo;
1✔
205

206
        // Handle FrameWriter exceptions centrally, since there are many callers. Note that
207
        // errors coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink
208
        // does not propagate syscall errors through the FrameWriter. But we handle the
209
        // AsyncSink failures with the same TransportExceptionHandler instance so it is all
210
        // mixed back together.
211
        frameWriter = new ExceptionHandlingFrameWriter(this, writeMonitoringFrameWriter);
1✔
212
        outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
213

214
        // These writes will be queued in the serializingExecutor waiting for this function to
215
        // return.
216
        frameWriter.connectionPreface();
1✔
217
        Settings settings = new Settings();
1✔
218
        OkHttpSettingsUtil.set(settings,
1✔
219
            OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
220
        OkHttpSettingsUtil.set(settings,
1✔
221
            OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
222
        if (config.maxConcurrentStreams != Integer.MAX_VALUE) {
1✔
223
          OkHttpSettingsUtil.set(settings,
1✔
224
              OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams);
225
        }
226
        frameWriter.settings(settings);
1✔
227
        if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
1✔
228
          frameWriter.windowUpdate(
1✔
229
              Utils.CONNECTION_STREAM_ID, config.flowControlWindow - Utils.DEFAULT_WINDOW_SIZE);
230
        }
231
        frameWriter.flush();
1✔
232
      }
1✔
233

234
      if (config.keepAliveTimeNanos != GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
235
        keepAliveManager = new KeepAliveManager(
1✔
236
            new KeepAlivePinger(), scheduledExecutorService, config.keepAliveTimeNanos,
237
            config.keepAliveTimeoutNanos, true);
238
        keepAliveManager.onTransportStarted();
1✔
239
      }
240

241
      if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
242
        maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
1✔
243
        maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
1✔
244
      }
245

246
      if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
247
        long maxConnectionAgeInNanos =
1✔
248
            (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
1✔
249
        maxConnectionAgeMonitor = scheduledExecutorService.schedule(
1✔
250
            new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
1✔
251
            maxConnectionAgeInNanos,
252
            TimeUnit.NANOSECONDS);
253
      }
254

255
      transportExecutor.execute(new FrameHandler(
1✔
256
          variant.newReader(Okio.buffer(Okio.source(socket)), false)));
1✔
257
    } catch (Error | IOException | RuntimeException ex) {
1✔
258
      synchronized (lock) {
1✔
259
        if (!handshakeShutdown) {
1✔
260
          log.log(Level.INFO, "Socket failed to handshake", ex);
1✔
261
        }
262
      }
1✔
263
      GrpcUtil.closeQuietly(socket);
1✔
264
      terminated();
1✔
265
    }
1✔
266
  }
1✔
267

268
  @Override
269
  public void shutdown() {
270
    shutdown(null);
1✔
271
  }
1✔
272

273
  private void shutdown(@Nullable Long gracefulShutdownPeriod) {
274
    synchronized (lock) {
1✔
275
      if (gracefulShutdown || abruptShutdown) {
1✔
276
        return;
1✔
277
      }
278
      gracefulShutdown = true;
1✔
279
      this.gracefulShutdownPeriod = gracefulShutdownPeriod;
1✔
280
      if (frameWriter == null) {
1✔
281
        handshakeShutdown = true;
1✔
282
        GrpcUtil.closeQuietly(socket);
1✔
283
      } else {
284
        // RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
285
        // we also set a timer to limit the upper bound in case the PING is excessively stalled or
286
        // the client is malicious.
287
        secondGoawayTimer = scheduledExecutorService.schedule(
1✔
288
            this::triggerGracefulSecondGoaway,
289
            GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
290
        frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
1✔
291
        frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
1✔
292
        frameWriter.flush();
1✔
293
      }
294
    }
1✔
295
  }
1✔
296

297
  private void triggerGracefulSecondGoaway() {
298
    synchronized (lock) {
1✔
299
      if (secondGoawayTimer == null) {
1✔
300
        return;
×
301
      }
302
      secondGoawayTimer.cancel(false);
1✔
303
      secondGoawayTimer = null;
1✔
304
      frameWriter.goAway(lastStreamId, ErrorCode.NO_ERROR, new byte[0]);
1✔
305
      goAwayStreamId = lastStreamId;
1✔
306
      if (streams.isEmpty()) {
1✔
307
        frameWriter.close();
1✔
308
      } else {
309
        frameWriter.flush();
1✔
310
      }
311
      if (gracefulShutdownPeriod != null) {
1✔
312
        forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
313
            this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
1✔
314
      }
315
    }
1✔
316
  }
1✔
317

318
  @Override
319
  public void shutdownNow(Status reason) {
320
    synchronized (lock) {
1✔
321
      if (frameWriter == null) {
1✔
322
        handshakeShutdown = true;
1✔
323
        GrpcUtil.closeQuietly(socket);
1✔
324
        return;
1✔
325
      }
326
    }
1✔
327
    abruptShutdown(ErrorCode.NO_ERROR, "", reason, true);
1✔
328
  }
1✔
329

330
  /**
331
   * Finish all active streams due to an IOException, then close the transport.
332
   */
333
  @Override
334
  public void onException(Throwable failureCause) {
335
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
336
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
337
    abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
338
  }
1✔
339

340
  private void abruptShutdown(
341
      ErrorCode errorCode, String moreDetail, Status reason, boolean rstStreams) {
342
    synchronized (lock) {
1✔
343
      if (abruptShutdown) {
1✔
344
        return;
1✔
345
      }
346
      abruptShutdown = true;
1✔
347
      goAwayStatus = reason;
1✔
348

349
      if (secondGoawayTimer != null) {
1✔
350
        secondGoawayTimer.cancel(false);
1✔
351
        secondGoawayTimer = null;
1✔
352
      }
353
      for (Map.Entry<Integer, StreamState> entry : streams.entrySet()) {
1✔
354
        if (rstStreams) {
1✔
355
          frameWriter.rstStream(entry.getKey(), ErrorCode.CANCEL);
1✔
356
        }
357
        entry.getValue().transportReportStatus(reason);
1✔
358
      }
1✔
359
      streams.clear();
1✔
360

361
      // RFC7540 §5.4.1. Attempt to inform the client what went wrong. We try to write the GOAWAY
362
      // _and then_ close our side of the connection. But place an upper-bound for how long we wait
363
      // for I/O with a timer, which forcefully closes the socket.
364
      frameWriter.goAway(lastStreamId, errorCode, moreDetail.getBytes(GrpcUtil.US_ASCII));
1✔
365
      goAwayStreamId = lastStreamId;
1✔
366
      frameWriter.close();
1✔
367
      forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
368
          this::triggerForcefulClose, 1, TimeUnit.SECONDS);
369
    }
1✔
370
  }
1✔
371

372
  private void triggerForcefulClose() {
373
    // Safe to do unconditionally; no need to check if timer cancellation raced
374
    GrpcUtil.closeQuietly(socket);
1✔
375
  }
1✔
376

377
  private void terminated() {
378
    synchronized (lock) {
1✔
379
      if (forcefulCloseTimer != null) {
1✔
380
        forcefulCloseTimer.cancel(false);
1✔
381
        forcefulCloseTimer = null;
1✔
382
      }
383
    }
1✔
384
    if (keepAliveManager != null) {
1✔
385
      keepAliveManager.onTransportTermination();
1✔
386
    }
387
    if (maxConnectionIdleManager != null) {
1✔
388
      maxConnectionIdleManager.onTransportTermination();
1✔
389
    }
390

391
    if (maxConnectionAgeMonitor != null) {
1✔
392
      maxConnectionAgeMonitor.cancel(false);
1✔
393
    }
394
    transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
1✔
395
    scheduledExecutorService =
1✔
396
        config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
1✔
397
    listener.transportTerminated();
1✔
398
  }
1✔
399

400
  @Override
401
  public ScheduledExecutorService getScheduledExecutorService() {
402
    return scheduledExecutorService;
1✔
403
  }
404

405
  @Override
406
  public ListenableFuture<InternalChannelz.SocketStats> getStats() {
407
    synchronized (lock) {
1✔
408
      return Futures.immediateFuture(new InternalChannelz.SocketStats(
1✔
409
          tracer.getStats(),
1✔
410
          socket.getLocalSocketAddress(),
1✔
411
          socket.getRemoteSocketAddress(),
1✔
412
          Utils.getSocketOptions(socket),
1✔
413
          securityInfo));
414
    }
415
  }
416

417
  private TransportTracer.FlowControlWindows readFlowControlWindow() {
418
    synchronized (lock) {
1✔
419
      long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
420
      // connectionUnacknowledgedBytesRead is only readable by FrameHandler, so we provide a lower
421
      // bound.
422
      long remote = (long) (config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO);
1✔
423
      return new TransportTracer.FlowControlWindows(local, remote);
1✔
424
    }
425
  }
426

427
  @Override
428
  public InternalLogId getLogId() {
429
    return logId;
1✔
430
  }
431

432
  @Override
433
  public OutboundFlowController.StreamState[] getActiveStreams() {
434
    synchronized (lock) {
1✔
435
      OutboundFlowController.StreamState[] flowStreams =
1✔
436
          new OutboundFlowController.StreamState[streams.size()];
1✔
437
      int i = 0;
1✔
438
      for (StreamState stream : streams.values()) {
1✔
439
        flowStreams[i++] = stream.getOutboundFlowState();
1✔
440
      }
1✔
441
      return flowStreams;
1✔
442
    }
443
  }
444

445
  /**
446
   * Notify the transport that the stream was closed. Any frames for the stream must be enqueued
447
   * before calling.
448
   */
449
  void streamClosed(int streamId, boolean flush) {
450
    synchronized (lock) {
1✔
451
      streams.remove(streamId);
1✔
452
      if (streams.isEmpty()) {
1✔
453
        keepAliveEnforcer.onTransportIdle();
1✔
454
        if (maxConnectionIdleManager != null) {
1✔
455
          maxConnectionIdleManager.onTransportIdle();
1✔
456
        }
457
      }
458
      if (gracefulShutdown && streams.isEmpty()) {
1✔
459
        frameWriter.close();
1✔
460
      } else {
461
        if (flush) {
1✔
462
          frameWriter.flush();
1✔
463
        }
464
      }
465
    }
1✔
466
  }
1✔
467

468
  private static String asciiString(ByteString value) {
469
    // utf8() string is cached in ByteString, so we prefer it when the contents are ASCII. This
470
    // provides benefit if the header was reused via HPACK.
471
    for (int i = 0; i < value.size(); i++) {
1✔
472
      if (value.getByte(i) < 0) {
1✔
473
        return value.string(GrpcUtil.US_ASCII);
×
474
      }
475
    }
476
    return value.utf8();
1✔
477
  }
478

479
  private static int headerFind(List<Header> header, ByteString key, int startIndex) {
480
    for (int i = startIndex; i < header.size(); i++) {
1✔
481
      if (header.get(i).name.equals(key)) {
1✔
482
        return i;
1✔
483
      }
484
    }
485
    return -1;
1✔
486
  }
487

488
  private static boolean headerContains(List<Header> header, ByteString key) {
489
    return headerFind(header, key, 0) != -1;
1✔
490
  }
491

492
  private static void headerRemove(List<Header> header, ByteString key) {
493
    int i = 0;
1✔
494
    while ((i = headerFind(header, key, i)) != -1) {
1✔
495
      header.remove(i);
1✔
496
    }
497
  }
1✔
498

499
  /** Assumes that caller requires this field, so duplicates are treated as missing. */
500
  private static ByteString headerGetRequiredSingle(List<Header> header, ByteString key) {
501
    int i = headerFind(header, key, 0);
1✔
502
    if (i == -1) {
1✔
503
      return null;
1✔
504
    }
505
    if (headerFind(header, key, i + 1) != -1) {
1✔
506
      return null;
1✔
507
    }
508
    return header.get(i).value;
1✔
509
  }
510

511
  static final class Config {
512
    final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
513
    final ObjectPool<Executor> transportExecutorPool;
514
    final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
515
    final TransportTracer.Factory transportTracerFactory;
516
    final HandshakerSocketFactory handshakerSocketFactory;
517
    final long keepAliveTimeNanos;
518
    final long keepAliveTimeoutNanos;
519
    final int flowControlWindow;
520
    final int maxInboundMessageSize;
521
    final int maxInboundMetadataSize;
522
    final long maxConnectionIdleNanos;
523
    final boolean permitKeepAliveWithoutCalls;
524
    final long permitKeepAliveTimeInNanos;
525
    final long maxConnectionAgeInNanos;
526
    final long maxConnectionAgeGraceInNanos;
527
    final int maxConcurrentStreams;
528

529
    public Config(
530
        OkHttpServerBuilder builder,
531
        List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
1✔
532
      this.streamTracerFactories = Preconditions.checkNotNull(
1✔
533
          streamTracerFactories, "streamTracerFactories");
534
      transportExecutorPool = Preconditions.checkNotNull(
1✔
535
          builder.transportExecutorPool, "transportExecutorPool");
536
      scheduledExecutorServicePool = Preconditions.checkNotNull(
1✔
537
          builder.scheduledExecutorServicePool, "scheduledExecutorServicePool");
538
      transportTracerFactory = Preconditions.checkNotNull(
1✔
539
          builder.transportTracerFactory, "transportTracerFactory");
540
      handshakerSocketFactory = Preconditions.checkNotNull(
1✔
541
          builder.handshakerSocketFactory, "handshakerSocketFactory");
542
      keepAliveTimeNanos = builder.keepAliveTimeNanos;
1✔
543
      keepAliveTimeoutNanos = builder.keepAliveTimeoutNanos;
1✔
544
      flowControlWindow = builder.flowControlWindow;
1✔
545
      maxInboundMessageSize = builder.maxInboundMessageSize;
1✔
546
      maxInboundMetadataSize = builder.maxInboundMetadataSize;
1✔
547
      maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
1✔
548
      permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
1✔
549
      permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
1✔
550
      maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
1✔
551
      maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
1✔
552
      maxConcurrentStreams = builder.maxConcurrentCallsPerConnection;
1✔
553
    }
1✔
554
  }
555

556
  /**
557
   * Runnable which reads frames and dispatches them to in flight calls.
558
   */
559
  class FrameHandler implements FrameReader.Handler, Runnable {
560
    private final OkHttpFrameLogger frameLogger =
1✔
561
        new OkHttpFrameLogger(Level.FINE, OkHttpServerTransport.class);
562
    private final FrameReader frameReader;
563
    private boolean receivedSettings;
564
    private int connectionUnacknowledgedBytesRead;
565

566
    public FrameHandler(FrameReader frameReader) {
1✔
567
      this.frameReader = frameReader;
1✔
568
    }
1✔
569

570
    @Override
571
    public void run() {
572
      String threadName = Thread.currentThread().getName();
1✔
573
      Thread.currentThread().setName("OkHttpServerTransport");
1✔
574
      try {
575
        frameReader.readConnectionPreface();
1✔
576
        if (!frameReader.nextFrame(this)) {
1✔
577
          connectionError(ErrorCode.INTERNAL_ERROR, "Failed to read initial SETTINGS");
1✔
578
          return;
1✔
579
        }
580
        if (!receivedSettings) {
1✔
581
          connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
582
              "First HTTP/2 frame must be SETTINGS. RFC7540 section 3.5");
583
          return;
1✔
584
        }
585
        // Read until the underlying socket closes.
586
        while (frameReader.nextFrame(this)) {
1✔
587
          if (keepAliveManager != null) {
1✔
588
            keepAliveManager.onDataReceived();
1✔
589
          }
590
        }
591
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
592
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
593
        // nothing, otherwise, we finish all streams since it's a real IO issue.
594
        Status status;
595
        synchronized (lock) {
1✔
596
          status = goAwayStatus;
1✔
597
        }
1✔
598
        if (status == null) {
1✔
599
          status = Status.UNAVAILABLE.withDescription("TCP connection closed or IOException");
1✔
600
        }
601
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
602
      } catch (Throwable t) {
1✔
603
        log.log(Level.WARNING, "Error decoding HTTP/2 frames", t);
1✔
604
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "Error in frame decoder",
1✔
605
            Status.INTERNAL.withDescription("Error decoding HTTP/2 frames").withCause(t), false);
1✔
606
      } finally {
607
        // Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
608
        try {
609
          GrpcUtil.exhaust(socket.getInputStream());
1✔
610
        } catch (IOException ex) {
1✔
611
          // Unable to wait, so just proceed to tear-down. The socket is probably already closed so
612
          // the GOAWAY can't be sent anyway.
613
        }
1✔
614
        GrpcUtil.closeQuietly(socket);
1✔
615
        terminated();
1✔
616
        Thread.currentThread().setName(threadName);
1✔
617
      }
618
    }
1✔
619

620
    /**
621
     * Handle HTTP2 HEADER and CONTINUATION frames.
622
     */
623
    @Override
624
    public void headers(boolean outFinished,
625
        boolean inFinished,
626
        int streamId,
627
        int associatedStreamId,
628
        List<Header> headerBlock,
629
        HeadersMode headersMode) {
630
      frameLogger.logHeaders(
1✔
631
          OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
632
      // streamId == 0 checking is in HTTP/2 decoder
633
      if ((streamId & 1) == 0) {
1✔
634
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
635
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
636
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
637
        return;
1✔
638
      }
639
      boolean newStream;
640
      synchronized (lock) {
1✔
641
        if (streamId > goAwayStreamId) {
1✔
642
          return;
×
643
        }
644
        newStream = streamId > lastStreamId;
1✔
645
        if (newStream) {
1✔
646
          lastStreamId = streamId;
1✔
647
          if (config.maxConcurrentStreams <= streams.size()) {
1✔
648
            streamError(streamId, ErrorCode.REFUSED_STREAM,
1✔
649
                "Max concurrent stream reached. RFC7540 section 5.1.2");
650
            return;
1✔
651
          }
652
        }
653
      }
1✔
654

655
      int metadataSize = headerBlockSize(headerBlock);
1✔
656
      if (metadataSize > config.maxInboundMetadataSize) {
1✔
657
        respondWithHttpError(streamId, inFinished, 431, Status.Code.RESOURCE_EXHAUSTED,
1✔
658
            String.format(
1✔
659
                Locale.US,
660
                "Request metadata larger than %d: %d",
661
                config.maxInboundMetadataSize,
1✔
662
                metadataSize));
1✔
663
        return;
1✔
664
      }
665

666
      headerRemove(headerBlock, ByteString.EMPTY);
1✔
667

668
      ByteString httpMethod = null;
1✔
669
      ByteString scheme = null;
1✔
670
      ByteString path = null;
1✔
671
      ByteString authority = null;
1✔
672
      while (headerBlock.size() > 0 && headerBlock.get(0).name.getByte(0) == ':') {
1✔
673
        Header header = headerBlock.remove(0);
1✔
674
        if (HTTP_METHOD.equals(header.name) && httpMethod == null) {
1✔
675
          httpMethod = header.value;
1✔
676
        } else if (SCHEME.equals(header.name) && scheme == null) {
1✔
677
          scheme = header.value;
1✔
678
        } else if (PATH.equals(header.name) && path == null) {
1✔
679
          path = header.value;
1✔
680
        } else if (AUTHORITY.equals(header.name) && authority == null) {
1✔
681
          authority = header.value;
1✔
682
        } else {
683
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
684
              "Unexpected pseudo header. RFC7540 section 8.1.2.1");
685
          return;
1✔
686
        }
687
      }
1✔
688
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
689
        if (headerBlock.get(i).name.getByte(0) == ':') {
1✔
690
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
691
              "Pseudo header not before regular headers. RFC7540 section 8.1.2.1");
692
          return;
1✔
693
        }
694
      }
695
      if (!CONNECT_METHOD.equals(httpMethod)
1✔
696
          && newStream
697
          && (httpMethod == null || scheme == null || path == null)) {
698
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
699
            "Missing required pseudo header. RFC7540 section 8.1.2.3");
700
        return;
1✔
701
      }
702
      if (headerContains(headerBlock, CONNECTION)) {
1✔
703
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
704
            "Connection-specific headers not permitted. RFC7540 section 8.1.2.2");
705
        return;
1✔
706
      }
707

708
      if (!newStream) {
1✔
709
        if (inFinished) {
1✔
710
          synchronized (lock) {
1✔
711
            StreamState stream = streams.get(streamId);
1✔
712
            if (stream == null) {
1✔
713
              streamError(streamId, ErrorCode.STREAM_CLOSED, "Received headers for closed stream");
×
714
              return;
×
715
            }
716
            if (stream.hasReceivedEndOfStream()) {
1✔
717
              streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
718
                  "Received HEADERS for half-closed (remote) stream. RFC7540 section 5.1");
719
              return;
1✔
720
            }
721
            // Ignore the trailers, but still half-close the stream
722
            stream.inboundDataReceived(new Buffer(), 0, 0, true);
1✔
723
            return;
1✔
724
          }
725
        } else {
726
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
727
              "Headers disallowed in the middle of the stream. RFC7540 section 8.1");
728
          return;
1✔
729
        }
730
      }
731

732
      if (authority == null) {
1✔
733
        int i = headerFind(headerBlock, HOST, 0);
1✔
734
        if (i != -1) {
1✔
735
          if (headerFind(headerBlock, HOST, i + 1) != -1) {
1✔
736
            respondWithHttpError(streamId, inFinished, 400, Status.Code.INTERNAL,
1✔
737
                "Multiple host headers disallowed. RFC7230 section 5.4");
738
            return;
1✔
739
          }
740
          authority = headerBlock.get(i).value;
1✔
741
        }
742
      }
743
      headerRemove(headerBlock, HOST);
1✔
744

745
      // Remove the leading slash of the path and get the fully qualified method name
746
      if (path.size() == 0 || path.getByte(0) != '/') {
1✔
747
        respondWithHttpError(streamId, inFinished, 404, Status.Code.UNIMPLEMENTED,
1✔
748
            "Expected path to start with /: " + asciiString(path));
1✔
749
        return;
1✔
750
      }
751
      String method = asciiString(path).substring(1);
1✔
752

753
      ByteString contentType = headerGetRequiredSingle(headerBlock, CONTENT_TYPE);
1✔
754
      if (contentType == null) {
1✔
755
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
756
            "Content-Type is missing or duplicated");
757
        return;
1✔
758
      }
759
      String contentTypeString = asciiString(contentType);
1✔
760
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
761
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
762
            "Content-Type is not supported: " + contentTypeString);
763
        return;
1✔
764
      }
765

766
      if (!POST_METHOD.equals(httpMethod)) {
1✔
767
        respondWithHttpError(streamId, inFinished, 405, Status.Code.INTERNAL,
1✔
768
            "HTTP Method is not supported: " + asciiString(httpMethod));
1✔
769
        return;
1✔
770
      }
771

772
      ByteString te = headerGetRequiredSingle(headerBlock, TE);
1✔
773
      if (!TE_TRAILERS.equals(te)) {
1✔
774
        respondWithGrpcError(streamId, inFinished, Status.Code.INTERNAL,
1✔
775
            String.format("Expected header TE: %s, but %s is received. "
1✔
776
              + "Some intermediate proxy may not support trailers",
777
              asciiString(TE_TRAILERS), te == null ? "<missing>" : asciiString(te)));
1✔
778
        return;
1✔
779
      }
780
      headerRemove(headerBlock, CONTENT_LENGTH);
1✔
781

782
      Metadata metadata = Utils.convertHeaders(headerBlock);
1✔
783
      StatsTraceContext statsTraceCtx =
1✔
784
          StatsTraceContext.newServerContext(config.streamTracerFactories, method, metadata);
1✔
785
      synchronized (lock) {
1✔
786
        OkHttpServerStream.TransportState stream = new OkHttpServerStream.TransportState(
1✔
787
            OkHttpServerTransport.this,
788
            streamId,
789
            config.maxInboundMessageSize,
1✔
790
            statsTraceCtx,
791
            lock,
1✔
792
            frameWriter,
1✔
793
            outboundFlow,
1✔
794
            config.flowControlWindow,
1✔
795
            tracer,
1✔
796
            method);
797
        OkHttpServerStream streamForApp = new OkHttpServerStream(
1✔
798
            stream,
799
            attributes,
1✔
800
            authority == null ? null : asciiString(authority),
1✔
801
            statsTraceCtx,
802
            tracer);
1✔
803
        if (streams.isEmpty()) {
1✔
804
          keepAliveEnforcer.onTransportActive();
1✔
805
          if (maxConnectionIdleManager != null) {
1✔
806
            maxConnectionIdleManager.onTransportActive();
1✔
807
          }
808
        }
809
        streams.put(streamId, stream);
1✔
810
        listener.streamCreated(streamForApp, method, metadata);
1✔
811
        stream.onStreamAllocated();
1✔
812
        if (inFinished) {
1✔
813
          stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
1✔
814
        }
815
      }
1✔
816
    }
1✔
817

818
    private int headerBlockSize(List<Header> headerBlock) {
819
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
820
      long size = 0;
1✔
821
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
822
        Header header = headerBlock.get(i);
1✔
823
        size += 32 + header.name.size() + header.value.size();
1✔
824
      }
825
      size = Math.min(size, Integer.MAX_VALUE);
1✔
826
      return (int) size;
1✔
827
    }
828

829
    /**
830
     * Handle an HTTP2 DATA frame.
831
     */
832
    @Override
833
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
834
                     int paddedLength)
835
        throws IOException {
836
      frameLogger.logData(
1✔
837
          OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
1✔
838
      if (streamId == 0) {
1✔
839
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
840
            "Stream 0 is reserved for control messages. RFC7540 section 5.1.1");
841
        return;
1✔
842
      }
843
      if ((streamId & 1) == 0) {
1✔
844
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
845
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
846
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
847
        return;
1✔
848
      }
849

850
      // Wait until the frame is complete. We only support 16 KiB frames, and the max permitted in
851
      // HTTP/2 is 16 MiB. This is verified in OkHttp's Http2 deframer, so we don't need to be
852
      // concerned with the window being exceeded at this point.
853
      in.require(length);
1✔
854

855
      synchronized (lock) {
1✔
856
        StreamState stream = streams.get(streamId);
1✔
857
        if (stream == null) {
1✔
858
          in.skip(length);
1✔
859
          streamError(streamId, ErrorCode.STREAM_CLOSED, "Received data for closed stream");
1✔
860
          return;
1✔
861
        }
862
        if (stream.hasReceivedEndOfStream()) {
1✔
863
          in.skip(length);
1✔
864
          streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
865
              "Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
866
          return;
1✔
867
        }
868
        if (stream.inboundWindowAvailable() < paddedLength) {
1✔
869
          in.skip(length);
1✔
870
          streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
1✔
871
              "Received DATA size exceeded window size. RFC7540 section 6.9");
872
          return;
1✔
873
        }
874
        Buffer buf = new Buffer();
1✔
875
        buf.write(in.getBuffer(), length);
1✔
876
        stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
1✔
877
      }
1✔
878

879
      // connection window update
880
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
881
      if (connectionUnacknowledgedBytesRead
1✔
882
          >= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
883
        synchronized (lock) {
1✔
884
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
885
          frameWriter.flush();
1✔
886
        }
1✔
887
        connectionUnacknowledgedBytesRead = 0;
1✔
888
      }
889
    }
1✔
890

891
    @Override
892
    public void rstStream(int streamId, ErrorCode errorCode) {
893
      frameLogger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
894
      // streamId == 0 checking is in HTTP/2 decoder
895

896
      if (!(ErrorCode.NO_ERROR.equals(errorCode)
1✔
897
            || ErrorCode.CANCEL.equals(errorCode)
1✔
898
            || ErrorCode.STREAM_CLOSED.equals(errorCode))) {
1✔
899
        log.log(Level.INFO, "Received RST_STREAM: " + errorCode);
×
900
      }
901
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
902
          .withDescription("RST_STREAM");
1✔
903
      synchronized (lock) {
1✔
904
        StreamState stream = streams.get(streamId);
1✔
905
        if (stream != null) {
1✔
906
          stream.inboundRstReceived(status);
1✔
907
          streamClosed(streamId, /*flush=*/ false);
1✔
908
        }
909
      }
1✔
910
    }
1✔
911

912
    @Override
913
    public void settings(boolean clearPrevious, Settings settings) {
914
      frameLogger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
915
      synchronized (lock) {
1✔
916
        boolean outboundWindowSizeIncreased = false;
1✔
917
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
918
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
919
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
920
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
921
        }
922

923
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
924
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
925
        // otherwise it will cause a stream error (RST_STREAM).
926
        frameWriter.ackSettings(settings);
1✔
927
        frameWriter.flush();
1✔
928
        if (!receivedSettings) {
1✔
929
          receivedSettings = true;
1✔
930
          attributes = listener.transportReady(attributes);
1✔
931
        }
932

933
        // send any pending bytes / streams
934
        if (outboundWindowSizeIncreased) {
1✔
935
          outboundFlow.writeStreams();
1✔
936
        }
937
      }
1✔
938
    }
1✔
939

940
    @Override
941
    public void ping(boolean ack, int payload1, int payload2) {
942
      if (!keepAliveEnforcer.pingAcceptable()) {
1✔
943
        abruptShutdown(ErrorCode.ENHANCE_YOUR_CALM, "too_many_pings",
1✔
944
            Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"), false);
1✔
945
        return;
1✔
946
      }
947
      long payload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
948
      if (!ack) {
1✔
949
        frameLogger.logPing(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
950
        synchronized (lock) {
1✔
951
          frameWriter.ping(true, payload1, payload2);
1✔
952
          frameWriter.flush();
1✔
953
        }
1✔
954
      } else {
955
        frameLogger.logPingAck(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
956
        if (KEEPALIVE_PING == payload) {
1✔
957
          return;
×
958
        }
959
        if (GRACEFUL_SHUTDOWN_PING == payload) {
1✔
960
          triggerGracefulSecondGoaway();
1✔
961
          return;
1✔
962
        }
963
        log.log(Level.INFO, "Received unexpected ping ack: " + payload);
×
964
      }
965
    }
1✔
966

967
    @Override
968
    public void ackSettings() {}
1✔
969

970
    @Override
971
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
972
      frameLogger.logGoAway(
1✔
973
          OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
974
      String description = String.format("Received GOAWAY: %s '%s'", errorCode, debugData.utf8());
1✔
975
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
976
          .withDescription(description);
1✔
977
      if (!ErrorCode.NO_ERROR.equals(errorCode)) {
1✔
978
        log.log(
×
979
            Level.WARNING, "Received GOAWAY: {0} {1}", new Object[] {errorCode, debugData.utf8()});
×
980
      }
981
      synchronized (lock) {
1✔
982
        goAwayStatus = status;
1✔
983
      }
1✔
984
    }
1✔
985

986
    @Override
987
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
988
        throws IOException {
989
      frameLogger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
990
          streamId, promisedStreamId, requestHeaders);
991
      // streamId == 0 checking is in HTTP/2 decoder.
992
      // The server doesn't use PUSH_PROMISE, so all even streams are IDLE, and odd streams are not
993
      // peer-initiated.
994
      connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
995
          "PUSH_PROMISE only allowed on peer-initiated streams. RFC7540 section 6.6");
996
    }
1✔
997

998
    @Override
999
    public void windowUpdate(int streamId, long delta) {
1000
      frameLogger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1001
      // delta == 0 checking is in HTTP/2 decoder. And it isn't quite right, as it will always cause
1002
      // a GOAWAY. RFC7540 section 6.9 says to use RST_STREAM if the stream id isn't 0. Doesn't
1003
      // matter much though.
1004
      synchronized (lock) {
1✔
1005
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1006
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1007
        } else {
1008
          StreamState stream = streams.get(streamId);
1✔
1009
          if (stream != null) {
1✔
1010
            outboundFlow.windowUpdate(stream.getOutboundFlowState(), (int) delta);
1✔
1011
          }
1012
        }
1013
      }
1✔
1014
    }
1✔
1015

1016
    @Override
1017
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1018
      frameLogger.logPriority(
1✔
1019
          OkHttpFrameLogger.Direction.INBOUND, streamId, streamDependency, weight, exclusive);
1020
      // streamId == 0 checking is in HTTP/2 decoder.
1021
      // Ignore priority change.
1022
    }
1✔
1023

1024
    @Override
1025
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1026
        int port, long maxAge) {}
×
1027

1028
    /**
1029
     * Send GOAWAY to the server, then finish all streams and close the transport. RFC7540 §5.4.1.
1030
     */
1031
    private void connectionError(ErrorCode errorCode, String moreDetail) {
1032
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1033
          .withDescription(String.format("HTTP2 connection error: %s '%s'", errorCode, moreDetail));
1✔
1034
      abruptShutdown(errorCode, moreDetail, status, false);
1✔
1035
    }
1✔
1036

1037
    /**
1038
     * Respond with RST_STREAM, making sure to kill the associated stream if it exists. Reason will
1039
     * rarely be seen. RFC7540 §5.4.2.
1040
     */
1041
    private void streamError(int streamId, ErrorCode errorCode, String reason) {
1042
      if (errorCode == ErrorCode.PROTOCOL_ERROR) {
1✔
1043
        log.log(
1✔
1044
            Level.FINE, "Responding with RST_STREAM {0}: {1}", new Object[] {errorCode, reason});
1045
      }
1046
      synchronized (lock) {
1✔
1047
        frameWriter.rstStream(streamId, errorCode);
1✔
1048
        frameWriter.flush();
1✔
1049
        StreamState stream = streams.get(streamId);
1✔
1050
        if (stream != null) {
1✔
1051
          stream.transportReportStatus(
1✔
1052
              Status.INTERNAL.withDescription(
1✔
1053
                  String.format("Responded with RST_STREAM %s: %s", errorCode, reason)));
1✔
1054
          streamClosed(streamId, /*flush=*/ false);
1✔
1055
        }
1056
      }
1✔
1057
    }
1✔
1058

1059
    private void respondWithHttpError(
1060
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg) {
1061
      Metadata metadata = new Metadata();
1✔
1062
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1063
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1064
      List<Header> headers =
1✔
1065
          Headers.createHttpResponseHeaders(httpCode, "text/plain; charset=utf-8", metadata);
1✔
1066
      Buffer data = new Buffer().writeUtf8(msg);
1✔
1067

1068
      synchronized (lock) {
1✔
1069
        Http2ErrorStreamState stream =
1✔
1070
            new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
1✔
1071
        if (streams.isEmpty()) {
1✔
1072
          keepAliveEnforcer.onTransportActive();
1✔
1073
          if (maxConnectionIdleManager != null) {
1✔
1074
            maxConnectionIdleManager.onTransportActive();
1✔
1075
          }
1076
        }
1077
        streams.put(streamId, stream);
1✔
1078
        if (inFinished) {
1✔
1079
          stream.inboundDataReceived(new Buffer(), 0, 0, true);
×
1080
        }
1081
        frameWriter.headers(streamId, headers);
1✔
1082
        outboundFlow.data(
1✔
1083
            /*outFinished=*/true, stream.getOutboundFlowState(), data, /*flush=*/true);
1✔
1084
        outboundFlow.notifyWhenNoPendingData(
1✔
1085
            stream.getOutboundFlowState(), () -> rstOkAtEndOfHttpError(stream));
1✔
1086
      }
1✔
1087
    }
1✔
1088

1089
    private void rstOkAtEndOfHttpError(Http2ErrorStreamState stream) {
1090
      synchronized (lock) {
1✔
1091
        if (!stream.hasReceivedEndOfStream()) {
1✔
1092
          frameWriter.rstStream(stream.streamId, ErrorCode.NO_ERROR);
1✔
1093
        }
1094
        streamClosed(stream.streamId, /*flush=*/ true);
1✔
1095
      }
1✔
1096
    }
1✔
1097

1098
    private void respondWithGrpcError(
1099
        int streamId, boolean inFinished, Status.Code statusCode, String msg) {
1100
      Metadata metadata = new Metadata();
1✔
1101
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1102
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1103
      List<Header> headers = Headers.createResponseTrailers(metadata, false);
1✔
1104

1105
      synchronized (lock) {
1✔
1106
        frameWriter.synReply(true, streamId, headers);
1✔
1107
        if (!inFinished) {
1✔
1108
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
1109
        }
1110
        frameWriter.flush();
1✔
1111
      }
1✔
1112
    }
1✔
1113
  }
1114

1115
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1✔
1116
    @Override
1117
    public void ping() {
1118
      synchronized (lock) {
×
1119
        frameWriter.ping(false, 0, KEEPALIVE_PING);
×
1120
        frameWriter.flush();
×
1121
      }
×
1122
      tracer.reportKeepAliveSent();
×
1123
    }
×
1124

1125
    @Override
1126
    public void onPingTimeout() {
1127
      synchronized (lock) {
×
1128
        goAwayStatus = Status.UNAVAILABLE
×
1129
            .withDescription("Keepalive failed. Considering connection dead");
×
1130
        GrpcUtil.closeQuietly(socket);
×
1131
      }
×
1132
    }
×
1133
  }
1134

1135
  interface StreamState {
1136
    /** Must be holding 'lock' when calling. */
1137
    void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
1138

1139
    /** Must be holding 'lock' when calling. */
1140
    boolean hasReceivedEndOfStream();
1141

1142
    /** Must be holding 'lock' when calling. */
1143
    int inboundWindowAvailable();
1144

1145
    /** Must be holding 'lock' when calling. */
1146
    void transportReportStatus(Status status);
1147

1148
    /** Must be holding 'lock' when calling. */
1149
    void inboundRstReceived(Status status);
1150

1151
    OutboundFlowController.StreamState getOutboundFlowState();
1152
  }
1153

1154
  static class Http2ErrorStreamState implements StreamState, OutboundFlowController.Stream {
1155
    private final int streamId;
1156
    private final Object lock;
1157
    private final OutboundFlowController.StreamState outboundFlowState;
1158
    @GuardedBy("lock")
1159
    private int window;
1160
    @GuardedBy("lock")
1161
    private boolean receivedEndOfStream;
1162

1163
    Http2ErrorStreamState(
1164
        int streamId, Object lock, OutboundFlowController outboundFlow, int initialWindowSize) {
1✔
1165
      this.streamId = streamId;
1✔
1166
      this.lock = lock;
1✔
1167
      this.outboundFlowState = outboundFlow.createState(this, streamId);
1✔
1168
      this.window = initialWindowSize;
1✔
1169
    }
1✔
1170

1171
    @Override public void onSentBytes(int frameBytes) {}
1✔
1172

1173
    @Override public void inboundDataReceived(
1174
        Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
1175
      synchronized (lock) {
×
1176
        if (endOfStream) {
×
1177
          receivedEndOfStream = true;
×
1178
        }
1179
        window -= dataLength + paddingLength;
×
1180
        try {
1181
          frame.skip(frame.size()); // Recycle segments
×
1182
        } catch (IOException ex) {
×
1183
          throw new AssertionError(ex);
×
1184
        }
×
1185
      }
×
1186
    }
×
1187

1188
    @Override public boolean hasReceivedEndOfStream() {
1189
      synchronized (lock) {
1✔
1190
        return receivedEndOfStream;
1✔
1191
      }
1192
    }
1193

1194
    @Override public int inboundWindowAvailable() {
1195
      synchronized (lock) {
×
1196
        return window;
×
1197
      }
1198
    }
1199

1200
    @Override public void transportReportStatus(Status status) {}
×
1201

1202
    @Override public void inboundRstReceived(Status status) {}
×
1203

1204
    @Override public OutboundFlowController.StreamState getOutboundFlowState() {
1205
      synchronized (lock) {
1✔
1206
        return outboundFlowState;
1✔
1207
      }
1208
    }
1209
  }
1210
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc