• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20125

23 Dec 2025 04:32AM UTC coverage: 88.706% (-0.01%) from 88.72%
#20125

push

github

web-flow
grpclb: pick_first delegation (#12568)

**Summary of Changes**
This pull request refactors the grpclb load balancer's PICK_FIRST mode
to delegate its logic to a standard pick_first load balancing policy.

The key changes are as follows:
1. **`grpclb/build.gradle`**

Added dependency on `grpc-util` module to access
`ForwardingLoadBalancerHelper`

2. **`grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java`**
- New imports:
LoadBalancer, LoadBalancerProvider, LoadBalancerRegistry,
ResolvedAddresses, FixedResultPicker, ForwardingLoadBalancerHelper
- New fields for PICK_FIRST delegation:
    - pickFirstLbProvider - Provider for creating child pick_first LB
    - pickFirstLb - The child LoadBalancer instance
- pickFirstLbState / pickFirstLbPicker - Track child LB's state and
picker
    - currentPickFirstLoadRecorder - Load recorder for token attachment
- Key behavioral changes:
- updateServerList() PICK_FIRST case: Instead of creating a single
subchannel, it now:
- Creates the child pick_first LB once and then updates it with new
addresses on subsequent updates.
        - Passes addresses to child LB via acceptResolvedAddresses()
- maybeUpdatePicker() PICK_FIRST case: Uses child LB's state and picker
wrapped with ChildLbPickerEntry
- RoundRobinEntry.picked() signature change: Changed from
picked(Metadata) to picked(PickSubchannelArgs) to allow child picker
delegation
- New ChildLbPickerEntry class: Wraps child LB's picker and attaches
TokenAttachingTracerFactory for token propagation
- New PickFirstLbHelper class: Forwarding helper that intercepts
updateBalancingState() to store child state and trigger grpclb picker
updates
- Updated shutdown(), requestConnection(), maybeUseFallbackBackends():
Handle the new child LB delegation model

3. **`grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java`**

- Updated tests to reflect the new delegation behavior:
- Initial state is now CONNECTING (not IDLE) since standard pick_first
eagerly connects
- Tests ... (continued)

35463 of 39978 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.38
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
1
/*
2
 * Copyright 2022 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
20
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;
21

22
import com.google.common.base.Preconditions;
23
import com.google.common.collect.Lists;
24
import com.google.common.util.concurrent.Futures;
25
import com.google.common.util.concurrent.ListenableFuture;
26
import com.google.errorprone.annotations.concurrent.GuardedBy;
27
import io.grpc.Attributes;
28
import io.grpc.InternalChannelz;
29
import io.grpc.InternalLogId;
30
import io.grpc.InternalStatus;
31
import io.grpc.Metadata;
32
import io.grpc.ServerStreamTracer;
33
import io.grpc.Status;
34
import io.grpc.internal.GrpcUtil;
35
import io.grpc.internal.KeepAliveEnforcer;
36
import io.grpc.internal.KeepAliveManager;
37
import io.grpc.internal.LogExceptionRunnable;
38
import io.grpc.internal.MaxConnectionIdleManager;
39
import io.grpc.internal.ObjectPool;
40
import io.grpc.internal.SerializingExecutor;
41
import io.grpc.internal.ServerTransport;
42
import io.grpc.internal.ServerTransportListener;
43
import io.grpc.internal.StatsTraceContext;
44
import io.grpc.internal.TransportTracer;
45
import io.grpc.okhttp.internal.framed.ErrorCode;
46
import io.grpc.okhttp.internal.framed.FrameReader;
47
import io.grpc.okhttp.internal.framed.FrameWriter;
48
import io.grpc.okhttp.internal.framed.Header;
49
import io.grpc.okhttp.internal.framed.HeadersMode;
50
import io.grpc.okhttp.internal.framed.Http2;
51
import io.grpc.okhttp.internal.framed.Settings;
52
import io.grpc.okhttp.internal.framed.Variant;
53
import java.io.IOException;
54
import java.net.Socket;
55
import java.net.SocketException;
56
import java.util.Collections;
57
import java.util.List;
58
import java.util.Locale;
59
import java.util.Map;
60
import java.util.TreeMap;
61
import java.util.concurrent.Executor;
62
import java.util.concurrent.ScheduledExecutorService;
63
import java.util.concurrent.ScheduledFuture;
64
import java.util.concurrent.TimeUnit;
65
import java.util.logging.Level;
66
import java.util.logging.Logger;
67
import javax.annotation.Nullable;
68
import okio.Buffer;
69
import okio.BufferedSource;
70
import okio.ByteString;
71
import okio.Okio;
72

73
/**
74
 * OkHttp-based server transport.
75
 */
76
final class OkHttpServerTransport implements ServerTransport,
77
      ExceptionHandlingFrameWriter.TransportExceptionHandler, OutboundFlowController.Transport {
78
  private static final Logger log = Logger.getLogger(OkHttpServerTransport.class.getName());
1✔
79
  private static final int GRACEFUL_SHUTDOWN_PING = 0x1111;
80

81
  private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(1);
1✔
82

83
  private static final int KEEPALIVE_PING = 0xDEAD;
84
  private static final ByteString HTTP_METHOD = ByteString.encodeUtf8(":method");
1✔
85
  private static final ByteString CONNECT_METHOD = ByteString.encodeUtf8("CONNECT");
1✔
86
  private static final ByteString POST_METHOD = ByteString.encodeUtf8("POST");
1✔
87
  private static final ByteString SCHEME = ByteString.encodeUtf8(":scheme");
1✔
88
  private static final ByteString PATH = ByteString.encodeUtf8(":path");
1✔
89
  private static final ByteString AUTHORITY = ByteString.encodeUtf8(":authority");
1✔
90
  private static final ByteString CONNECTION = ByteString.encodeUtf8("connection");
1✔
91
  private static final ByteString HOST = ByteString.encodeUtf8("host");
1✔
92
  private static final ByteString TE = ByteString.encodeUtf8("te");
1✔
93
  private static final ByteString TE_TRAILERS = ByteString.encodeUtf8("trailers");
1✔
94
  private static final ByteString CONTENT_TYPE = ByteString.encodeUtf8("content-type");
1✔
95
  private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
1✔
96
  private static final ByteString ALLOW = ByteString.encodeUtf8("allow");
1✔
97

98
  private final Config config;
99
  private final Variant variant = new Http2();
1✔
100
  private final TransportTracer tracer;
101
  private final InternalLogId logId;
102
  private Socket socket;
103
  private ServerTransportListener listener;
104
  private Executor transportExecutor;
105
  private ScheduledExecutorService scheduledExecutorService;
106
  private Attributes attributes;
107
  private KeepAliveManager keepAliveManager;
108
  private MaxConnectionIdleManager maxConnectionIdleManager;
109
  private ScheduledFuture<?> maxConnectionAgeMonitor;
110
  private final KeepAliveEnforcer keepAliveEnforcer;
111

112
  private final Object lock = new Object();
1✔
113
  @GuardedBy("lock")
114
  private boolean abruptShutdown;
115
  @GuardedBy("lock")
116
  private boolean gracefulShutdown;
117
  @GuardedBy("lock")
118
  private boolean handshakeShutdown;
119
  @GuardedBy("lock")
120
  private InternalChannelz.Security securityInfo;
121
  @GuardedBy("lock")
122
  private ExceptionHandlingFrameWriter frameWriter;
123
  @GuardedBy("lock")
124
  private OutboundFlowController outboundFlow;
125
  @GuardedBy("lock")
1✔
126
  private final Map<Integer, StreamState> streams = new TreeMap<>();
127
  @GuardedBy("lock")
128
  private int lastStreamId;
129
  @GuardedBy("lock")
1✔
130
  private int goAwayStreamId = Integer.MAX_VALUE;
131
  /**
132
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
133
   * streams may continue.
134
   */
135
  @GuardedBy("lock")
136
  private Status goAwayStatus;
137
  /** Non-{@code null} when gracefully shutting down and have not yet sent second GOAWAY. */
138
  @GuardedBy("lock")
139
  private ScheduledFuture<?> secondGoawayTimer;
140
  /** Non-{@code null} when waiting for forceful close GOAWAY to be sent. */
141
  @GuardedBy("lock")
142
  private ScheduledFuture<?> forcefulCloseTimer;
143
  @GuardedBy("lock")
1✔
144
  private Long gracefulShutdownPeriod = null;
145

146
  public OkHttpServerTransport(Config config, Socket bareSocket) {
1✔
147
    this.config = Preconditions.checkNotNull(config, "config");
1✔
148
    this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
1✔
149

150
    tracer = config.transportTracerFactory.create();
1✔
151
    tracer.setFlowControlWindowReader(this::readFlowControlWindow);
1✔
152
    logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
1✔
153
    transportExecutor = config.transportExecutorPool.getObject();
1✔
154
    scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
1✔
155
    keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
1✔
156
        config.permitKeepAliveTimeInNanos, TimeUnit.NANOSECONDS);
157
  }
1✔
158

159
  public void start(ServerTransportListener listener) {
160
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
161

162
    SerializingExecutor serializingExecutor = new SerializingExecutor(transportExecutor);
1✔
163
    serializingExecutor.execute(() -> startIo(serializingExecutor));
1✔
164
  }
1✔
165

166
  private void startIo(SerializingExecutor serializingExecutor) {
167
    try {
168
      // The socket implementation is lazily initialized, but had broken thread-safety 
169
      // for that laziness https://bugs.openjdk.org/browse/JDK-8278326. 
170
      // As a workaround, we lock to synchronize initialization with shutdown().
171
      synchronized (lock) {
1✔
172
        socket.setTcpNoDelay(true);
1✔
173
      }
1✔
174
      HandshakerSocketFactory.HandshakeResult result =
1✔
175
          config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
1✔
176
      synchronized (lock) {
1✔
177
        if (socket.isClosed()) {
1✔
178
          // The wrapped socket may not handle the underlying socket being closed by shutdown(). In
179
          // particular, SSLSocket hangs future reads if the underlying socket is already closed at
180
          // this point, even if you call sslSocket.close() later.
181
          result.socket.close();
×
182
          throw new SocketException("Socket close raced with handshake");
×
183
        }
184
        this.socket = result.socket;
1✔
185
      }
1✔
186
      this.attributes = result.attributes;
1✔
187

188
      int maxQueuedControlFrames = 10000;
1✔
189
      AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
190
      asyncSink.becomeConnected(Okio.sink(socket), socket);
1✔
191
      FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
192
          variant.newWriter(Okio.buffer(asyncSink), false));
1✔
193
      FrameWriter writeMonitoringFrameWriter = new ForwardingFrameWriter(rawFrameWriter) {
1✔
194
        @Override
195
        public void synReply(boolean outFinished, int streamId, List<Header> headerBlock)
196
            throws IOException {
197
          keepAliveEnforcer.resetCounters();
1✔
198
          super.synReply(outFinished, streamId, headerBlock);
1✔
199
        }
1✔
200

201
        @Override
202
        public void headers(int streamId, List<Header> headerBlock) throws IOException {
203
          keepAliveEnforcer.resetCounters();
1✔
204
          super.headers(streamId, headerBlock);
1✔
205
        }
1✔
206

207
        @Override
208
        public void data(boolean outFinished, int streamId, Buffer source, int byteCount)
209
            throws IOException {
210
          keepAliveEnforcer.resetCounters();
1✔
211
          super.data(outFinished, streamId, source, byteCount);
1✔
212
        }
1✔
213
      };
214
      synchronized (lock) {
1✔
215
        this.securityInfo = result.securityInfo;
1✔
216

217
        // Handle FrameWriter exceptions centrally, since there are many callers. Note that
218
        // errors coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink
219
        // does not propagate syscall errors through the FrameWriter. But we handle the
220
        // AsyncSink failures with the same TransportExceptionHandler instance so it is all
221
        // mixed back together.
222
        frameWriter = new ExceptionHandlingFrameWriter(this, writeMonitoringFrameWriter);
1✔
223
        outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
224

225
        // These writes will be queued in the serializingExecutor waiting for this function to
226
        // return.
227
        frameWriter.connectionPreface();
1✔
228
        Settings settings = new Settings();
1✔
229
        OkHttpSettingsUtil.set(settings,
1✔
230
            OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, config.flowControlWindow);
231
        OkHttpSettingsUtil.set(settings,
1✔
232
            OkHttpSettingsUtil.MAX_HEADER_LIST_SIZE, config.maxInboundMetadataSize);
233
        if (config.maxConcurrentStreams != Integer.MAX_VALUE) {
1✔
234
          OkHttpSettingsUtil.set(settings,
1✔
235
              OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS, config.maxConcurrentStreams);
236
        }
237
        frameWriter.settings(settings);
1✔
238
        if (config.flowControlWindow > Utils.DEFAULT_WINDOW_SIZE) {
1✔
239
          frameWriter.windowUpdate(
1✔
240
              Utils.CONNECTION_STREAM_ID, config.flowControlWindow - Utils.DEFAULT_WINDOW_SIZE);
241
        }
242
        frameWriter.flush();
1✔
243
      }
1✔
244

245
      if (config.keepAliveTimeNanos != GrpcUtil.KEEPALIVE_TIME_NANOS_DISABLED) {
1✔
246
        keepAliveManager = new KeepAliveManager(
1✔
247
            new KeepAlivePinger(), scheduledExecutorService, config.keepAliveTimeNanos,
248
            config.keepAliveTimeoutNanos, true);
249
        keepAliveManager.onTransportStarted();
1✔
250
      }
251

252
      if (config.maxConnectionIdleNanos != MAX_CONNECTION_IDLE_NANOS_DISABLED) {
1✔
253
        maxConnectionIdleManager = new MaxConnectionIdleManager(config.maxConnectionIdleNanos);
1✔
254
        maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
1✔
255
      }
256

257
      if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
1✔
258
        long maxConnectionAgeInNanos =
1✔
259
            (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
1✔
260
        maxConnectionAgeMonitor = scheduledExecutorService.schedule(
1✔
261
            new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
1✔
262
            maxConnectionAgeInNanos,
263
            TimeUnit.NANOSECONDS);
264
      }
265

266
      transportExecutor.execute(new FrameHandler(
1✔
267
          variant.newReader(Okio.buffer(Okio.source(socket)), false)));
1✔
268
    } catch (Error | IOException | RuntimeException ex) {
1✔
269
      synchronized (lock) {
1✔
270
        if (!handshakeShutdown) {
1✔
271
          log.log(Level.INFO, "Socket failed to handshake", ex);
1✔
272
        }
273
      }
1✔
274
      GrpcUtil.closeQuietly(socket);
1✔
275
      terminated();
1✔
276
    }
1✔
277
  }
1✔
278

279
  @Override
280
  public void shutdown() {
281
    shutdown(null);
1✔
282
  }
1✔
283

284
  private void shutdown(@Nullable Long gracefulShutdownPeriod) {
285
    synchronized (lock) {
1✔
286
      if (gracefulShutdown || abruptShutdown) {
1✔
287
        return;
1✔
288
      }
289
      gracefulShutdown = true;
1✔
290
      this.gracefulShutdownPeriod = gracefulShutdownPeriod;
1✔
291
      if (frameWriter == null) {
1✔
292
        handshakeShutdown = true;
1✔
293
        GrpcUtil.closeQuietly(socket);
1✔
294
      } else {
295
        // RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
296
        // we also set a timer to limit the upper bound in case the PING is excessively stalled or
297
        // the client is malicious.
298
        secondGoawayTimer = scheduledExecutorService.schedule(
1✔
299
            this::triggerGracefulSecondGoaway,
300
            GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, TimeUnit.NANOSECONDS);
301
        frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
1✔
302
        frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
1✔
303
        frameWriter.flush();
1✔
304
      }
305
    }
1✔
306
  }
1✔
307

308
  private void triggerGracefulSecondGoaway() {
309
    synchronized (lock) {
1✔
310
      if (secondGoawayTimer == null) {
1✔
311
        return;
×
312
      }
313
      secondGoawayTimer.cancel(false);
1✔
314
      secondGoawayTimer = null;
1✔
315
      frameWriter.goAway(lastStreamId, ErrorCode.NO_ERROR, new byte[0]);
1✔
316
      goAwayStreamId = lastStreamId;
1✔
317
      if (streams.isEmpty()) {
1✔
318
        frameWriter.close();
1✔
319
      } else {
320
        frameWriter.flush();
1✔
321
      }
322
      if (gracefulShutdownPeriod != null) {
1✔
323
        forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
324
            this::triggerForcefulClose, gracefulShutdownPeriod, TimeUnit.NANOSECONDS);
1✔
325
      }
326
    }
1✔
327
  }
1✔
328

329
  @Override
330
  public void shutdownNow(Status reason) {
331
    synchronized (lock) {
1✔
332
      if (frameWriter == null) {
1✔
333
        handshakeShutdown = true;
1✔
334
        GrpcUtil.closeQuietly(socket);
1✔
335
        return;
1✔
336
      }
337
    }
1✔
338
    abruptShutdown(ErrorCode.NO_ERROR, "", reason, true);
1✔
339
  }
1✔
340

341
  /**
342
   * Finish all active streams due to an IOException, then close the transport.
343
   */
344
  @Override
345
  public void onException(Throwable failureCause) {
346
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
347
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
348
    abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
349
  }
1✔
350

351
  private void abruptShutdown(
352
      ErrorCode errorCode, String moreDetail, Status reason, boolean rstStreams) {
353
    synchronized (lock) {
1✔
354
      if (abruptShutdown) {
1✔
355
        return;
1✔
356
      }
357
      abruptShutdown = true;
1✔
358
      goAwayStatus = reason;
1✔
359

360
      if (secondGoawayTimer != null) {
1✔
361
        secondGoawayTimer.cancel(false);
1✔
362
        secondGoawayTimer = null;
1✔
363
      }
364
      for (Map.Entry<Integer, StreamState> entry : streams.entrySet()) {
1✔
365
        if (rstStreams) {
1✔
366
          frameWriter.rstStream(entry.getKey(), ErrorCode.CANCEL);
1✔
367
        }
368
        entry.getValue().transportReportStatus(reason);
1✔
369
      }
1✔
370
      streams.clear();
1✔
371

372
      // RFC7540 §5.4.1. Attempt to inform the client what went wrong. We try to write the GOAWAY
373
      // _and then_ close our side of the connection. But place an upper-bound for how long we wait
374
      // for I/O with a timer, which forcefully closes the socket.
375
      frameWriter.goAway(lastStreamId, errorCode, moreDetail.getBytes(GrpcUtil.US_ASCII));
1✔
376
      goAwayStreamId = lastStreamId;
1✔
377
      frameWriter.close();
1✔
378
      forcefulCloseTimer = scheduledExecutorService.schedule(
1✔
379
          this::triggerForcefulClose, 1, TimeUnit.SECONDS);
380
    }
1✔
381
  }
1✔
382

383
  private void triggerForcefulClose() {
384
    // Safe to do unconditionally; no need to check if timer cancellation raced
385
    GrpcUtil.closeQuietly(socket);
1✔
386
  }
1✔
387

388
  private void terminated() {
389
    synchronized (lock) {
1✔
390
      if (forcefulCloseTimer != null) {
1✔
391
        forcefulCloseTimer.cancel(false);
1✔
392
        forcefulCloseTimer = null;
1✔
393
      }
394
    }
1✔
395
    if (keepAliveManager != null) {
1✔
396
      keepAliveManager.onTransportTermination();
1✔
397
    }
398
    if (maxConnectionIdleManager != null) {
1✔
399
      maxConnectionIdleManager.onTransportTermination();
1✔
400
    }
401

402
    if (maxConnectionAgeMonitor != null) {
1✔
403
      maxConnectionAgeMonitor.cancel(false);
1✔
404
    }
405
    transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
1✔
406
    scheduledExecutorService =
1✔
407
        config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
1✔
408
    listener.transportTerminated();
1✔
409
  }
1✔
410

411
  @Override
412
  public ScheduledExecutorService getScheduledExecutorService() {
413
    return scheduledExecutorService;
1✔
414
  }
415

416
  @Override
417
  public ListenableFuture<InternalChannelz.SocketStats> getStats() {
418
    synchronized (lock) {
1✔
419
      return Futures.immediateFuture(new InternalChannelz.SocketStats(
1✔
420
          tracer.getStats(),
1✔
421
          socket.getLocalSocketAddress(),
1✔
422
          socket.getRemoteSocketAddress(),
1✔
423
          Utils.getSocketOptions(socket),
1✔
424
          securityInfo));
425
    }
426
  }
427

428
  private TransportTracer.FlowControlWindows readFlowControlWindow() {
429
    synchronized (lock) {
1✔
430
      long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
431
      // connectionUnacknowledgedBytesRead is only readable by FrameHandler, so we provide a lower
432
      // bound.
433
      long remote = (long) (config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO);
1✔
434
      return new TransportTracer.FlowControlWindows(local, remote);
1✔
435
    }
436
  }
437

438
  @Override
439
  public InternalLogId getLogId() {
440
    return logId;
1✔
441
  }
442

443
  @Override
444
  public OutboundFlowController.StreamState[] getActiveStreams() {
445
    synchronized (lock) {
1✔
446
      OutboundFlowController.StreamState[] flowStreams =
1✔
447
          new OutboundFlowController.StreamState[streams.size()];
1✔
448
      int i = 0;
1✔
449
      for (StreamState stream : streams.values()) {
1✔
450
        flowStreams[i++] = stream.getOutboundFlowState();
1✔
451
      }
1✔
452
      return flowStreams;
1✔
453
    }
454
  }
455

456
  /**
457
   * Notify the transport that the stream was closed. Any frames for the stream must be enqueued
458
   * before calling.
459
   */
460
  void streamClosed(int streamId, boolean flush) {
461
    synchronized (lock) {
1✔
462
      streams.remove(streamId);
1✔
463
      if (streams.isEmpty()) {
1✔
464
        keepAliveEnforcer.onTransportIdle();
1✔
465
        if (maxConnectionIdleManager != null) {
1✔
466
          maxConnectionIdleManager.onTransportIdle();
1✔
467
        }
468
      }
469
      if (gracefulShutdown && streams.isEmpty()) {
1✔
470
        frameWriter.close();
1✔
471
      } else {
472
        if (flush) {
1✔
473
          frameWriter.flush();
1✔
474
        }
475
      }
476
    }
1✔
477
  }
1✔
478

479
  private static String asciiString(ByteString value) {
480
    // utf8() string is cached in ByteString, so we prefer it when the contents are ASCII. This
481
    // provides benefit if the header was reused via HPACK.
482
    for (int i = 0; i < value.size(); i++) {
1✔
483
      if (value.getByte(i) < 0) {
1✔
484
        return value.string(GrpcUtil.US_ASCII);
×
485
      }
486
    }
487
    return value.utf8();
1✔
488
  }
489

490
  private static int headerFind(List<Header> header, ByteString key, int startIndex) {
491
    for (int i = startIndex; i < header.size(); i++) {
1✔
492
      if (header.get(i).name.equals(key)) {
1✔
493
        return i;
1✔
494
      }
495
    }
496
    return -1;
1✔
497
  }
498

499
  private static boolean headerContains(List<Header> header, ByteString key) {
500
    return headerFind(header, key, 0) != -1;
1✔
501
  }
502

503
  private static void headerRemove(List<Header> header, ByteString key) {
504
    int i = 0;
1✔
505
    while ((i = headerFind(header, key, i)) != -1) {
1✔
506
      header.remove(i);
1✔
507
    }
508
  }
1✔
509

510
  /** Assumes that caller requires this field, so duplicates are treated as missing. */
511
  private static ByteString headerGetRequiredSingle(List<Header> header, ByteString key) {
512
    int i = headerFind(header, key, 0);
1✔
513
    if (i == -1) {
1✔
514
      return null;
1✔
515
    }
516
    if (headerFind(header, key, i + 1) != -1) {
1✔
517
      return null;
1✔
518
    }
519
    return header.get(i).value;
1✔
520
  }
521

522
  static final class Config {
523
    final List<? extends ServerStreamTracer.Factory> streamTracerFactories;
524
    final ObjectPool<Executor> transportExecutorPool;
525
    final ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool;
526
    final TransportTracer.Factory transportTracerFactory;
527
    final HandshakerSocketFactory handshakerSocketFactory;
528
    final long keepAliveTimeNanos;
529
    final long keepAliveTimeoutNanos;
530
    final int flowControlWindow;
531
    final int maxInboundMessageSize;
532
    final int maxInboundMetadataSize;
533
    final long maxConnectionIdleNanos;
534
    final boolean permitKeepAliveWithoutCalls;
535
    final long permitKeepAliveTimeInNanos;
536
    final long maxConnectionAgeInNanos;
537
    final long maxConnectionAgeGraceInNanos;
538
    final int maxConcurrentStreams;
539

540
    public Config(
541
        OkHttpServerBuilder builder,
542
        List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
1✔
543
      this.streamTracerFactories = Preconditions.checkNotNull(
1✔
544
          streamTracerFactories, "streamTracerFactories");
545
      transportExecutorPool = Preconditions.checkNotNull(
1✔
546
          builder.transportExecutorPool, "transportExecutorPool");
547
      scheduledExecutorServicePool = Preconditions.checkNotNull(
1✔
548
          builder.scheduledExecutorServicePool, "scheduledExecutorServicePool");
549
      transportTracerFactory = Preconditions.checkNotNull(
1✔
550
          builder.transportTracerFactory, "transportTracerFactory");
551
      handshakerSocketFactory = Preconditions.checkNotNull(
1✔
552
          builder.handshakerSocketFactory, "handshakerSocketFactory");
553
      keepAliveTimeNanos = builder.keepAliveTimeNanos;
1✔
554
      keepAliveTimeoutNanos = builder.keepAliveTimeoutNanos;
1✔
555
      flowControlWindow = builder.flowControlWindow;
1✔
556
      maxInboundMessageSize = builder.maxInboundMessageSize;
1✔
557
      maxInboundMetadataSize = builder.maxInboundMetadataSize;
1✔
558
      maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
1✔
559
      permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
1✔
560
      permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
1✔
561
      maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
1✔
562
      maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
1✔
563
      maxConcurrentStreams = builder.maxConcurrentCallsPerConnection;
1✔
564
    }
1✔
565
  }
566

567
  /**
568
   * Runnable which reads frames and dispatches them to in flight calls.
569
   */
570
  class FrameHandler implements FrameReader.Handler, Runnable {
571
    private final OkHttpFrameLogger frameLogger =
1✔
572
        new OkHttpFrameLogger(Level.FINE, OkHttpServerTransport.class);
573
    private final FrameReader frameReader;
574
    private boolean receivedSettings;
575
    private int connectionUnacknowledgedBytesRead;
576

577
    public FrameHandler(FrameReader frameReader) {
1✔
578
      this.frameReader = frameReader;
1✔
579
    }
1✔
580

581
    @Override
582
    public void run() {
583
      String threadName = Thread.currentThread().getName();
1✔
584
      Thread.currentThread().setName("OkHttpServerTransport");
1✔
585
      try {
586
        frameReader.readConnectionPreface();
1✔
587
        if (!frameReader.nextFrame(this)) {
1✔
588
          connectionError(ErrorCode.INTERNAL_ERROR, "Failed to read initial SETTINGS");
1✔
589
          return;
1✔
590
        }
591
        if (!receivedSettings) {
1✔
592
          connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
593
              "First HTTP/2 frame must be SETTINGS. RFC7540 section 3.5");
594
          return;
1✔
595
        }
596
        // Read until the underlying socket closes.
597
        while (frameReader.nextFrame(this)) {
1✔
598
          if (keepAliveManager != null) {
1✔
599
            keepAliveManager.onDataReceived();
1✔
600
          }
601
        }
602
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
603
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
604
        // nothing, otherwise, we finish all streams since it's a real IO issue.
605
        Status status;
606
        synchronized (lock) {
1✔
607
          status = goAwayStatus;
1✔
608
        }
1✔
609
        if (status == null) {
1✔
610
          status = Status.UNAVAILABLE.withDescription("TCP connection closed or IOException");
1✔
611
        }
612
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "I/O failure", status, false);
1✔
613
      } catch (Throwable t) {
1✔
614
        log.log(Level.WARNING, "Error decoding HTTP/2 frames", t);
1✔
615
        abruptShutdown(ErrorCode.INTERNAL_ERROR, "Error in frame decoder",
1✔
616
            Status.INTERNAL.withDescription("Error decoding HTTP/2 frames").withCause(t), false);
1✔
617
      } finally {
618
        // Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
619
        try {
620
          GrpcUtil.exhaust(socket.getInputStream());
1✔
621
        } catch (IOException ex) {
1✔
622
          // Unable to wait, so just proceed to tear-down. The socket is probably already closed so
623
          // the GOAWAY can't be sent anyway.
624
        }
1✔
625
        GrpcUtil.closeQuietly(socket);
1✔
626
        terminated();
1✔
627
        Thread.currentThread().setName(threadName);
1✔
628
      }
629
    }
1✔
630

631
    /**
632
     * Handle HTTP2 HEADER and CONTINUATION frames.
633
     */
634
    @Override
635
    public void headers(boolean outFinished,
636
        boolean inFinished,
637
        int streamId,
638
        int associatedStreamId,
639
        List<Header> headerBlock,
640
        HeadersMode headersMode) {
641
      frameLogger.logHeaders(
1✔
642
          OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
643
      // streamId == 0 checking is in HTTP/2 decoder
644
      if ((streamId & 1) == 0) {
1✔
645
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
646
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
647
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
648
        return;
1✔
649
      }
650
      boolean newStream;
651
      synchronized (lock) {
1✔
652
        if (streamId > goAwayStreamId) {
1✔
653
          return;
×
654
        }
655
        newStream = streamId > lastStreamId;
1✔
656
        if (newStream) {
1✔
657
          lastStreamId = streamId;
1✔
658
          if (config.maxConcurrentStreams <= streams.size()) {
1✔
659
            streamError(streamId, ErrorCode.REFUSED_STREAM,
1✔
660
                "Max concurrent stream reached. RFC7540 section 5.1.2");
661
            return;
1✔
662
          }
663
        }
664
      }
1✔
665

666
      int metadataSize = headerBlockSize(headerBlock);
1✔
667
      if (metadataSize > config.maxInboundMetadataSize) {
1✔
668
        respondWithHttpError(streamId, inFinished, 431, Status.Code.RESOURCE_EXHAUSTED,
1✔
669
            String.format(
1✔
670
                Locale.US,
671
                "Request metadata larger than %d: %d",
672
                config.maxInboundMetadataSize,
1✔
673
                metadataSize));
1✔
674
        return;
1✔
675
      }
676

677
      headerRemove(headerBlock, ByteString.EMPTY);
1✔
678

679
      ByteString httpMethod = null;
1✔
680
      ByteString scheme = null;
1✔
681
      ByteString path = null;
1✔
682
      ByteString authority = null;
1✔
683
      while (headerBlock.size() > 0 && headerBlock.get(0).name.getByte(0) == ':') {
1✔
684
        Header header = headerBlock.remove(0);
1✔
685
        if (HTTP_METHOD.equals(header.name) && httpMethod == null) {
1✔
686
          httpMethod = header.value;
1✔
687
        } else if (SCHEME.equals(header.name) && scheme == null) {
1✔
688
          scheme = header.value;
1✔
689
        } else if (PATH.equals(header.name) && path == null) {
1✔
690
          path = header.value;
1✔
691
        } else if (AUTHORITY.equals(header.name) && authority == null) {
1✔
692
          authority = header.value;
1✔
693
        } else {
694
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
695
              "Unexpected pseudo header. RFC7540 section 8.1.2.1");
696
          return;
1✔
697
        }
698
      }
1✔
699
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
700
        if (headerBlock.get(i).name.getByte(0) == ':') {
1✔
701
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
702
              "Pseudo header not before regular headers. RFC7540 section 8.1.2.1");
703
          return;
1✔
704
        }
705
      }
706
      if (!CONNECT_METHOD.equals(httpMethod)
1✔
707
          && newStream
708
          && (httpMethod == null || scheme == null || path == null)) {
709
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
710
            "Missing required pseudo header. RFC7540 section 8.1.2.3");
711
        return;
1✔
712
      }
713
      if (headerContains(headerBlock, CONNECTION)) {
1✔
714
        streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
715
            "Connection-specific headers not permitted. RFC7540 section 8.1.2.2");
716
        return;
1✔
717
      }
718

719
      if (!newStream) {
1✔
720
        if (inFinished) {
1✔
721
          synchronized (lock) {
1✔
722
            StreamState stream = streams.get(streamId);
1✔
723
            if (stream == null) {
1✔
724
              streamError(streamId, ErrorCode.STREAM_CLOSED, "Received headers for closed stream");
×
725
              return;
×
726
            }
727
            if (stream.hasReceivedEndOfStream()) {
1✔
728
              streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
729
                  "Received HEADERS for half-closed (remote) stream. RFC7540 section 5.1");
730
              return;
1✔
731
            }
732
            // Ignore the trailers, but still half-close the stream
733
            stream.inboundDataReceived(new Buffer(), 0, 0, true);
1✔
734
            return;
1✔
735
          }
736
        } else {
737
          streamError(streamId, ErrorCode.PROTOCOL_ERROR,
1✔
738
              "Headers disallowed in the middle of the stream. RFC7540 section 8.1");
739
          return;
1✔
740
        }
741
      }
742

743
      if (authority == null) {
1✔
744
        int i = headerFind(headerBlock, HOST, 0);
1✔
745
        if (i != -1) {
1✔
746
          if (headerFind(headerBlock, HOST, i + 1) != -1) {
1✔
747
            respondWithHttpError(streamId, inFinished, 400, Status.Code.INTERNAL,
1✔
748
                "Multiple host headers disallowed. RFC7230 section 5.4");
749
            return;
1✔
750
          }
751
          authority = headerBlock.get(i).value;
1✔
752
        }
753
      }
754
      headerRemove(headerBlock, HOST);
1✔
755

756
      // Remove the leading slash of the path and get the fully qualified method name
757
      if (path.size() == 0 || path.getByte(0) != '/') {
1✔
758
        respondWithHttpError(streamId, inFinished, 404, Status.Code.UNIMPLEMENTED,
1✔
759
            "Expected path to start with /: " + asciiString(path));
1✔
760
        return;
1✔
761
      }
762
      String method = asciiString(path).substring(1);
1✔
763

764
      ByteString contentType = headerGetRequiredSingle(headerBlock, CONTENT_TYPE);
1✔
765
      if (contentType == null) {
1✔
766
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
767
            "Content-Type is missing or duplicated");
768
        return;
1✔
769
      }
770
      String contentTypeString = asciiString(contentType);
1✔
771
      if (!GrpcUtil.isGrpcContentType(contentTypeString)) {
1✔
772
        respondWithHttpError(streamId, inFinished, 415, Status.Code.INTERNAL,
1✔
773
            "Content-Type is not supported: " + contentTypeString);
774
        return;
1✔
775
      }
776

777
      if (!POST_METHOD.equals(httpMethod)) {
1✔
778
        List<Header> extraHeaders = Lists.newArrayList(new Header(ALLOW, POST_METHOD));
1✔
779
        respondWithHttpError(streamId, inFinished, 405, Status.Code.INTERNAL,
1✔
780
            "HTTP Method is not supported: " + asciiString(httpMethod), extraHeaders);
1✔
781
        return;
1✔
782
      }
783

784
      ByteString te = headerGetRequiredSingle(headerBlock, TE);
1✔
785
      if (!TE_TRAILERS.equals(te)) {
1✔
786
        respondWithGrpcError(streamId, inFinished, Status.Code.INTERNAL,
1✔
787
            String.format("Expected header TE: %s, but %s is received. "
1✔
788
              + "Some intermediate proxy may not support trailers",
789
              asciiString(TE_TRAILERS), te == null ? "<missing>" : asciiString(te)));
1✔
790
        return;
1✔
791
      }
792
      headerRemove(headerBlock, CONTENT_LENGTH);
1✔
793

794
      Metadata metadata = Utils.convertHeaders(headerBlock);
1✔
795
      StatsTraceContext statsTraceCtx =
1✔
796
          StatsTraceContext.newServerContext(config.streamTracerFactories, method, metadata);
1✔
797
      synchronized (lock) {
1✔
798
        OkHttpServerStream.TransportState stream = new OkHttpServerStream.TransportState(
1✔
799
            OkHttpServerTransport.this,
800
            streamId,
801
            config.maxInboundMessageSize,
1✔
802
            statsTraceCtx,
803
            lock,
1✔
804
            frameWriter,
1✔
805
            outboundFlow,
1✔
806
            config.flowControlWindow,
1✔
807
            tracer,
1✔
808
            method);
809
        OkHttpServerStream streamForApp = new OkHttpServerStream(
1✔
810
            stream,
811
            attributes,
1✔
812
            authority == null ? null : asciiString(authority),
1✔
813
            statsTraceCtx,
814
            tracer);
1✔
815
        if (streams.isEmpty()) {
1✔
816
          keepAliveEnforcer.onTransportActive();
1✔
817
          if (maxConnectionIdleManager != null) {
1✔
818
            maxConnectionIdleManager.onTransportActive();
1✔
819
          }
820
        }
821
        streams.put(streamId, stream);
1✔
822
        listener.streamCreated(streamForApp, method, metadata);
1✔
823
        stream.onStreamAllocated();
1✔
824
        if (inFinished) {
1✔
825
          stream.inboundDataReceived(new Buffer(), 0, 0, inFinished);
1✔
826
        }
827
      }
1✔
828
    }
1✔
829

830
    private int headerBlockSize(List<Header> headerBlock) {
831
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
832
      long size = 0;
1✔
833
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
834
        Header header = headerBlock.get(i);
1✔
835
        size += 32 + header.name.size() + header.value.size();
1✔
836
      }
837
      size = Math.min(size, Integer.MAX_VALUE);
1✔
838
      return (int) size;
1✔
839
    }
840

841
    /**
842
     * Handle an HTTP2 DATA frame.
843
     */
844
    @Override
845
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
846
                     int paddedLength)
847
        throws IOException {
848
      frameLogger.logData(
1✔
849
          OkHttpFrameLogger.Direction.INBOUND, streamId, in.getBuffer(), length, inFinished);
1✔
850
      if (streamId == 0) {
1✔
851
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
852
            "Stream 0 is reserved for control messages. RFC7540 section 5.1.1");
853
        return;
1✔
854
      }
855
      if ((streamId & 1) == 0) {
1✔
856
        // The server doesn't use PUSH_PROMISE, so all even streams are IDLE
857
        connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
858
            "Clients cannot open even numbered streams. RFC7540 section 5.1.1");
859
        return;
1✔
860
      }
861

862
      // Wait until the frame is complete. We only support 16 KiB frames, and the max permitted in
863
      // HTTP/2 is 16 MiB. This is verified in OkHttp's Http2 deframer, so we don't need to be
864
      // concerned with the window being exceeded at this point.
865
      in.require(length);
1✔
866

867
      synchronized (lock) {
1✔
868
        StreamState stream = streams.get(streamId);
1✔
869
        if (stream == null) {
1✔
870
          in.skip(length);
1✔
871
          streamError(streamId, ErrorCode.STREAM_CLOSED, "Received data for closed stream");
1✔
872
          return;
1✔
873
        }
874
        if (stream.hasReceivedEndOfStream()) {
1✔
875
          in.skip(length);
1✔
876
          streamError(streamId, ErrorCode.STREAM_CLOSED,
1✔
877
              "Received DATA for half-closed (remote) stream. RFC7540 section 5.1");
878
          return;
1✔
879
        }
880
        if (stream.inboundWindowAvailable() < paddedLength) {
1✔
881
          in.skip(length);
1✔
882
          streamError(streamId, ErrorCode.FLOW_CONTROL_ERROR,
1✔
883
              "Received DATA size exceeded window size. RFC7540 section 6.9");
884
          return;
1✔
885
        }
886
        Buffer buf = new Buffer();
1✔
887
        buf.write(in.getBuffer(), length);
1✔
888
        stream.inboundDataReceived(buf, length, paddedLength - length, inFinished);
1✔
889
      }
1✔
890

891
      // connection window update
892
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
893
      if (connectionUnacknowledgedBytesRead
1✔
894
          >= config.flowControlWindow * Utils.DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
895
        synchronized (lock) {
1✔
896
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
897
          frameWriter.flush();
1✔
898
        }
1✔
899
        connectionUnacknowledgedBytesRead = 0;
1✔
900
      }
901
    }
1✔
902

903
    @Override
904
    public void rstStream(int streamId, ErrorCode errorCode) {
905
      frameLogger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
906
      // streamId == 0 checking is in HTTP/2 decoder
907

908
      if (!(ErrorCode.NO_ERROR.equals(errorCode)
1✔
909
            || ErrorCode.CANCEL.equals(errorCode)
1✔
910
            || ErrorCode.STREAM_CLOSED.equals(errorCode))) {
1✔
911
        log.log(Level.INFO, "Received RST_STREAM: " + errorCode);
×
912
      }
913
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
914
          .withDescription("RST_STREAM");
1✔
915
      synchronized (lock) {
1✔
916
        StreamState stream = streams.get(streamId);
1✔
917
        if (stream != null) {
1✔
918
          stream.inboundRstReceived(status);
1✔
919
          streamClosed(streamId, /*flush=*/ false);
1✔
920
        }
921
      }
1✔
922
    }
1✔
923

924
    @Override
925
    public void settings(boolean clearPrevious, Settings settings) {
926
      frameLogger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
927
      synchronized (lock) {
1✔
928
        boolean outboundWindowSizeIncreased = false;
1✔
929
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
930
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
931
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
932
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
933
        }
934

935
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
936
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
937
        // otherwise it will cause a stream error (RST_STREAM).
938
        frameWriter.ackSettings(settings);
1✔
939
        frameWriter.flush();
1✔
940
        if (!receivedSettings) {
1✔
941
          receivedSettings = true;
1✔
942
          attributes = listener.transportReady(attributes);
1✔
943
        }
944

945
        // send any pending bytes / streams
946
        if (outboundWindowSizeIncreased) {
1✔
947
          outboundFlow.writeStreams();
1✔
948
        }
949
      }
1✔
950
    }
1✔
951

952
    @Override
953
    public void ping(boolean ack, int payload1, int payload2) {
954
      long payload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
955
      if (!ack) {
1✔
956
        if (!keepAliveEnforcer.pingAcceptable()) {
1✔
957
          abruptShutdown(ErrorCode.ENHANCE_YOUR_CALM, "too_many_pings",
1✔
958
              Status.RESOURCE_EXHAUSTED.withDescription("Too many pings from client"), false);
1✔
959
          return;
1✔
960
        }
961
        frameLogger.logPing(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
962
        synchronized (lock) {
1✔
963
          frameWriter.ping(true, payload1, payload2);
1✔
964
          frameWriter.flush();
1✔
965
        }
1✔
966
      } else {
967
        frameLogger.logPingAck(OkHttpFrameLogger.Direction.INBOUND, payload);
1✔
968
        if (KEEPALIVE_PING == payload) {
1✔
969
          return;
×
970
        }
971
        if (GRACEFUL_SHUTDOWN_PING == payload) {
1✔
972
          triggerGracefulSecondGoaway();
1✔
973
          return;
1✔
974
        }
975
        log.log(Level.INFO, "Received unexpected ping ack: " + payload);
1✔
976
      }
977
    }
1✔
978

979
    @Override
980
    public void ackSettings() {}
1✔
981

982
    @Override
983
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
984
      frameLogger.logGoAway(
1✔
985
          OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
986
      String description = String.format("Received GOAWAY: %s '%s'", errorCode, debugData.utf8());
1✔
987
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
988
          .withDescription(description);
1✔
989
      if (!ErrorCode.NO_ERROR.equals(errorCode)) {
1✔
990
        log.log(
×
991
            Level.WARNING, "Received GOAWAY: {0} {1}", new Object[] {errorCode, debugData.utf8()});
×
992
      }
993
      synchronized (lock) {
1✔
994
        goAwayStatus = status;
1✔
995
      }
1✔
996
    }
1✔
997

998
    @Override
999
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1000
        throws IOException {
1001
      frameLogger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1002
          streamId, promisedStreamId, requestHeaders);
1003
      // streamId == 0 checking is in HTTP/2 decoder.
1004
      // The server doesn't use PUSH_PROMISE, so all even streams are IDLE, and odd streams are not
1005
      // peer-initiated.
1006
      connectionError(ErrorCode.PROTOCOL_ERROR,
1✔
1007
          "PUSH_PROMISE only allowed on peer-initiated streams. RFC7540 section 6.6");
1008
    }
1✔
1009

1010
    @Override
1011
    public void windowUpdate(int streamId, long delta) {
1012
      frameLogger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1013
      // delta == 0 checking is in HTTP/2 decoder. And it isn't quite right, as it will always cause
1014
      // a GOAWAY. RFC7540 section 6.9 says to use RST_STREAM if the stream id isn't 0. Doesn't
1015
      // matter much though.
1016
      synchronized (lock) {
1✔
1017
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1018
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1019
        } else {
1020
          StreamState stream = streams.get(streamId);
1✔
1021
          if (stream != null) {
1✔
1022
            outboundFlow.windowUpdate(stream.getOutboundFlowState(), (int) delta);
1✔
1023
          }
1024
        }
1025
      }
1✔
1026
    }
1✔
1027

1028
    @Override
1029
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1030
      frameLogger.logPriority(
×
1031
          OkHttpFrameLogger.Direction.INBOUND, streamId, streamDependency, weight, exclusive);
1032
      // streamId == 0 checking is in HTTP/2 decoder.
1033
      // Ignore priority change.
1034
    }
×
1035

1036
    @Override
1037
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1038
        int port, long maxAge) {}
×
1039

1040
    /**
1041
     * Send GOAWAY to the server, then finish all streams and close the transport. RFC7540 §5.4.1.
1042
     */
1043
    private void connectionError(ErrorCode errorCode, String moreDetail) {
1044
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1045
          .withDescription(String.format("HTTP2 connection error: %s '%s'", errorCode, moreDetail));
1✔
1046
      abruptShutdown(errorCode, moreDetail, status, false);
1✔
1047
    }
1✔
1048

1049
    /**
1050
     * Respond with RST_STREAM, making sure to kill the associated stream if it exists. Reason will
1051
     * rarely be seen. RFC7540 §5.4.2.
1052
     */
1053
    private void streamError(int streamId, ErrorCode errorCode, String reason) {
1054
      if (errorCode == ErrorCode.PROTOCOL_ERROR) {
1✔
1055
        log.log(
1✔
1056
            Level.FINE, "Responding with RST_STREAM {0}: {1}", new Object[] {errorCode, reason});
1057
      }
1058
      synchronized (lock) {
1✔
1059
        frameWriter.rstStream(streamId, errorCode);
1✔
1060
        frameWriter.flush();
1✔
1061
        StreamState stream = streams.get(streamId);
1✔
1062
        if (stream != null) {
1✔
1063
          stream.transportReportStatus(
1✔
1064
              Status.INTERNAL.withDescription(
1✔
1065
                  String.format("Responded with RST_STREAM %s: %s", errorCode, reason)));
1✔
1066
          streamClosed(streamId, /*flush=*/ false);
1✔
1067
        }
1068
      }
1✔
1069
    }
1✔
1070

1071
    private void respondWithHttpError(
1072
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg) {
1073
      respondWithHttpError(streamId, inFinished, httpCode, statusCode, msg,
1✔
1074
              Collections.emptyList());
1✔
1075
    }
1✔
1076

1077
    private void respondWithHttpError(
1078
        int streamId, boolean inFinished, int httpCode, Status.Code statusCode, String msg,
1079
        List<Header> extraHeaders) {
1080
      Metadata metadata = new Metadata();
1✔
1081
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1082
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1083
      List<Header> headers =
1✔
1084
          Headers.createHttpResponseHeaders(httpCode, "text/plain; charset=utf-8", metadata);
1✔
1085
      headers.addAll(extraHeaders);
1✔
1086
      Buffer data = new Buffer().writeUtf8(msg);
1✔
1087

1088
      synchronized (lock) {
1✔
1089
        Http2ErrorStreamState stream =
1✔
1090
            new Http2ErrorStreamState(streamId, lock, outboundFlow, config.flowControlWindow);
1✔
1091
        if (streams.isEmpty()) {
1✔
1092
          keepAliveEnforcer.onTransportActive();
1✔
1093
          if (maxConnectionIdleManager != null) {
1✔
1094
            maxConnectionIdleManager.onTransportActive();
1✔
1095
          }
1096
        }
1097
        streams.put(streamId, stream);
1✔
1098
        if (inFinished) {
1✔
1099
          stream.inboundDataReceived(new Buffer(), 0, 0, true);
×
1100
        }
1101
        frameWriter.headers(streamId, headers);
1✔
1102
        outboundFlow.data(
1✔
1103
            /*outFinished=*/true, stream.getOutboundFlowState(), data, /*flush=*/true);
1✔
1104
        outboundFlow.notifyWhenNoPendingData(
1✔
1105
            stream.getOutboundFlowState(), () -> rstOkAtEndOfHttpError(stream));
1✔
1106
      }
1✔
1107
    }
1✔
1108

1109
    private void rstOkAtEndOfHttpError(Http2ErrorStreamState stream) {
1110
      synchronized (lock) {
1✔
1111
        if (!stream.hasReceivedEndOfStream()) {
1✔
1112
          frameWriter.rstStream(stream.streamId, ErrorCode.NO_ERROR);
1✔
1113
        }
1114
        streamClosed(stream.streamId, /*flush=*/ true);
1✔
1115
      }
1✔
1116
    }
1✔
1117

1118
    private void respondWithGrpcError(
1119
        int streamId, boolean inFinished, Status.Code statusCode, String msg) {
1120
      Metadata metadata = new Metadata();
1✔
1121
      metadata.put(InternalStatus.CODE_KEY, statusCode.toStatus());
1✔
1122
      metadata.put(InternalStatus.MESSAGE_KEY, msg);
1✔
1123
      List<Header> headers = Headers.createResponseTrailers(metadata, false);
1✔
1124

1125
      synchronized (lock) {
1✔
1126
        frameWriter.synReply(true, streamId, headers);
1✔
1127
        if (!inFinished) {
1✔
1128
          frameWriter.rstStream(streamId, ErrorCode.NO_ERROR);
1✔
1129
        }
1130
        frameWriter.flush();
1✔
1131
      }
1✔
1132
    }
1✔
1133
  }
1134

1135
  private final class KeepAlivePinger implements KeepAliveManager.KeepAlivePinger {
1✔
1136
    @Override
1137
    public void ping() {
1138
      synchronized (lock) {
×
1139
        frameWriter.ping(false, 0, KEEPALIVE_PING);
×
1140
        frameWriter.flush();
×
1141
      }
×
1142
      tracer.reportKeepAliveSent();
×
1143
    }
×
1144

1145
    @Override
1146
    public void onPingTimeout() {
1147
      synchronized (lock) {
×
1148
        goAwayStatus = Status.UNAVAILABLE
×
1149
            .withDescription("Keepalive failed. Considering connection dead");
×
1150
        GrpcUtil.closeQuietly(socket);
×
1151
      }
×
1152
    }
×
1153
  }
1154

1155
  interface StreamState {
1156
    /** Must be holding 'lock' when calling. */
1157
    void inboundDataReceived(Buffer frame, int dataLength, int paddingLength, boolean endOfStream);
1158

1159
    /** Must be holding 'lock' when calling. */
1160
    boolean hasReceivedEndOfStream();
1161

1162
    /** Must be holding 'lock' when calling. */
1163
    int inboundWindowAvailable();
1164

1165
    /** Must be holding 'lock' when calling. */
1166
    void transportReportStatus(Status status);
1167

1168
    /** Must be holding 'lock' when calling. */
1169
    void inboundRstReceived(Status status);
1170

1171
    OutboundFlowController.StreamState getOutboundFlowState();
1172
  }
1173

1174
  static class Http2ErrorStreamState implements StreamState, OutboundFlowController.Stream {
1175
    private final int streamId;
1176
    private final Object lock;
1177
    private final OutboundFlowController.StreamState outboundFlowState;
1178
    @GuardedBy("lock")
1179
    private int window;
1180
    @GuardedBy("lock")
1181
    private boolean receivedEndOfStream;
1182

1183
    Http2ErrorStreamState(
1184
        int streamId, Object lock, OutboundFlowController outboundFlow, int initialWindowSize) {
1✔
1185
      this.streamId = streamId;
1✔
1186
      this.lock = lock;
1✔
1187
      this.outboundFlowState = outboundFlow.createState(this, streamId);
1✔
1188
      this.window = initialWindowSize;
1✔
1189
    }
1✔
1190

1191
    @Override public void onSentBytes(int frameBytes) {}
1✔
1192

1193
    @Override public void inboundDataReceived(
1194
        Buffer frame, int dataLength, int paddingLength, boolean endOfStream) {
1195
      synchronized (lock) {
×
1196
        if (endOfStream) {
×
1197
          receivedEndOfStream = true;
×
1198
        }
1199
        window -= dataLength + paddingLength;
×
1200
        try {
1201
          frame.skip(frame.size()); // Recycle segments
×
1202
        } catch (IOException ex) {
×
1203
          throw new AssertionError(ex);
×
1204
        }
×
1205
      }
×
1206
    }
×
1207

1208
    @Override public boolean hasReceivedEndOfStream() {
1209
      synchronized (lock) {
1✔
1210
        return receivedEndOfStream;
1✔
1211
      }
1212
    }
1213

1214
    @Override public int inboundWindowAvailable() {
1215
      synchronized (lock) {
×
1216
        return window;
×
1217
      }
1218
    }
1219

1220
    @Override public void transportReportStatus(Status status) {}
×
1221

1222
    @Override public void inboundRstReceived(Status status) {}
×
1223

1224
    @Override public OutboundFlowController.StreamState getOutboundFlowState() {
1225
      synchronized (lock) {
1✔
1226
        return outboundFlowState;
1✔
1227
      }
1228
    }
1229
  }
1230
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc