• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18905

17 Nov 2023 11:10PM UTC coverage: 88.211%. Remained the same
#18905

push

github

web-flow
netty: Add option to limit RST_STREAM rate

The behavior purposefully mirrors that of Netty's
AbstractHttp2ConnectionHandlerBuilder.decoderEnforceMaxRstFramesPerWindow().
That API is not available to our code as we extend the
Http2ConnectionHandler, but we want our API to be able to delegate to
Netty's in the future if that ever becomes possible.

30365 of 34423 relevant lines covered (88.21%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.86
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import io.grpc.Attributes;
31
import io.grpc.CallOptions;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.Grpc;
34
import io.grpc.HttpConnectProxiedSocketAddress;
35
import io.grpc.InternalChannelz;
36
import io.grpc.InternalChannelz.SocketStats;
37
import io.grpc.InternalLogId;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.MethodDescriptor.MethodType;
41
import io.grpc.SecurityLevel;
42
import io.grpc.Status;
43
import io.grpc.Status.Code;
44
import io.grpc.StatusException;
45
import io.grpc.internal.ClientStreamListener.RpcProgress;
46
import io.grpc.internal.ConnectionClientTransport;
47
import io.grpc.internal.GrpcAttributes;
48
import io.grpc.internal.GrpcUtil;
49
import io.grpc.internal.Http2Ping;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.KeepAliveManager;
52
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
53
import io.grpc.internal.SerializingExecutor;
54
import io.grpc.internal.StatsTraceContext;
55
import io.grpc.internal.TransportTracer;
56
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
57
import io.grpc.okhttp.internal.ConnectionSpec;
58
import io.grpc.okhttp.internal.Credentials;
59
import io.grpc.okhttp.internal.StatusLine;
60
import io.grpc.okhttp.internal.framed.ErrorCode;
61
import io.grpc.okhttp.internal.framed.FrameReader;
62
import io.grpc.okhttp.internal.framed.FrameWriter;
63
import io.grpc.okhttp.internal.framed.Header;
64
import io.grpc.okhttp.internal.framed.HeadersMode;
65
import io.grpc.okhttp.internal.framed.Http2;
66
import io.grpc.okhttp.internal.framed.Settings;
67
import io.grpc.okhttp.internal.framed.Variant;
68
import io.grpc.okhttp.internal.proxy.HttpUrl;
69
import io.grpc.okhttp.internal.proxy.Request;
70
import io.perfmark.PerfMark;
71
import java.io.EOFException;
72
import java.io.IOException;
73
import java.net.InetSocketAddress;
74
import java.net.Socket;
75
import java.net.URI;
76
import java.util.Collections;
77
import java.util.Deque;
78
import java.util.EnumMap;
79
import java.util.HashMap;
80
import java.util.Iterator;
81
import java.util.LinkedList;
82
import java.util.List;
83
import java.util.Locale;
84
import java.util.Map;
85
import java.util.Random;
86
import java.util.concurrent.CountDownLatch;
87
import java.util.concurrent.Executor;
88
import java.util.concurrent.ScheduledExecutorService;
89
import java.util.logging.Level;
90
import java.util.logging.Logger;
91
import javax.annotation.Nullable;
92
import javax.annotation.concurrent.GuardedBy;
93
import javax.net.SocketFactory;
94
import javax.net.ssl.HostnameVerifier;
95
import javax.net.ssl.SSLSession;
96
import javax.net.ssl.SSLSocket;
97
import javax.net.ssl.SSLSocketFactory;
98
import okio.Buffer;
99
import okio.BufferedSink;
100
import okio.BufferedSource;
101
import okio.ByteString;
102
import okio.Okio;
103
import okio.Source;
104
import okio.Timeout;
105

106
/**
107
 * A okhttp-based {@link ConnectionClientTransport} implementation.
108
 */
109
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
110
      OutboundFlowController.Transport {
111
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
112
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
113

114
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
115
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
116
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
117
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
118
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
119
        Status.INTERNAL.withDescription("Protocol error"));
1✔
120
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
121
        Status.INTERNAL.withDescription("Internal error"));
1✔
122
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
123
        Status.INTERNAL.withDescription("Flow control error"));
1✔
124
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
125
        Status.INTERNAL.withDescription("Stream closed"));
1✔
126
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
127
        Status.INTERNAL.withDescription("Frame too large"));
1✔
128
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
129
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
130
    errorToStatus.put(ErrorCode.CANCEL,
1✔
131
        Status.CANCELLED.withDescription("Cancelled"));
1✔
132
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
133
        Status.INTERNAL.withDescription("Compression error"));
1✔
134
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
135
        Status.INTERNAL.withDescription("Connect error"));
1✔
136
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
137
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
138
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
139
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
140
    return Collections.unmodifiableMap(errorToStatus);
1✔
141
  }
142

143
  private final InetSocketAddress address;
144
  private final String defaultAuthority;
145
  private final String userAgent;
146
  private final Random random = new Random();
1✔
147
  // Returns new unstarted stopwatches
148
  private final Supplier<Stopwatch> stopwatchFactory;
149
  private final int initialWindowSize;
150
  private final Variant variant;
151
  private Listener listener;
152
  @GuardedBy("lock")
153
  private ExceptionHandlingFrameWriter frameWriter;
154
  private OutboundFlowController outboundFlow;
155
  private final Object lock = new Object();
1✔
156
  private final InternalLogId logId;
157
  @GuardedBy("lock")
158
  private int nextStreamId;
159
  @GuardedBy("lock")
1✔
160
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
161
  private final Executor executor;
162
  // Wrap on executor, to guarantee some operations be executed serially.
163
  private final SerializingExecutor serializingExecutor;
164
  private final ScheduledExecutorService scheduler;
165
  private final int maxMessageSize;
166
  private int connectionUnacknowledgedBytesRead;
167
  private ClientFrameHandler clientFrameHandler;
168
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
169
  private Attributes attributes;
170
  /**
171
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
172
   * streams may continue.
173
   */
174
  @GuardedBy("lock")
175
  private Status goAwayStatus;
176
  @GuardedBy("lock")
177
  private boolean goAwaySent;
178
  @GuardedBy("lock")
179
  private Http2Ping ping;
180
  @GuardedBy("lock")
181
  private boolean stopped;
182
  @GuardedBy("lock")
183
  private boolean hasStream;
184
  private final SocketFactory socketFactory;
185
  private SSLSocketFactory sslSocketFactory;
186
  private HostnameVerifier hostnameVerifier;
187
  private Socket socket;
188
  @GuardedBy("lock")
1✔
189
  private int maxConcurrentStreams = 0;
190
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
191
  @GuardedBy("lock")
192
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
193
  private final ConnectionSpec connectionSpec;
194
  private KeepAliveManager keepAliveManager;
195
  private boolean enableKeepAlive;
196
  private long keepAliveTimeNanos;
197
  private long keepAliveTimeoutNanos;
198
  private boolean keepAliveWithoutCalls;
199
  private final Runnable tooManyPingsRunnable;
200
  private final int maxInboundMetadataSize;
201
  private final boolean useGetForSafeMethods;
202
  @GuardedBy("lock")
203
  private final TransportTracer transportTracer;
204
  @GuardedBy("lock")
1✔
205
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
206
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
207
        @Override
208
        protected void handleInUse() {
209
          listener.transportInUse(true);
1✔
210
        }
1✔
211

212
        @Override
213
        protected void handleNotInUse() {
214
          listener.transportInUse(false);
1✔
215
        }
1✔
216
      };
217
  @GuardedBy("lock")
218
  private InternalChannelz.Security securityInfo;
219

220
  @VisibleForTesting
221
  @Nullable
222
  final HttpConnectProxiedSocketAddress proxiedAddr;
223

224
  @VisibleForTesting
1✔
225
  int proxySocketTimeout = 30000;
226

227
  // The following fields should only be used for test.
228
  Runnable connectingCallback;
229
  SettableFuture<Void> connectedFuture;
230

231
  public OkHttpClientTransport(
232
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
233
      InetSocketAddress address,
234
      String authority,
235
      @Nullable String userAgent,
236
      Attributes eagAttrs,
237
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
238
      Runnable tooManyPingsRunnable) {
239
    this(
1✔
240
        transportFactory,
241
        address,
242
        authority,
243
        userAgent,
244
        eagAttrs,
245
        GrpcUtil.STOPWATCH_SUPPLIER,
246
        new Http2(),
247
        proxiedAddr,
248
        tooManyPingsRunnable);
249
  }
1✔
250

251
  private OkHttpClientTransport(
252
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
253
      InetSocketAddress address,
254
      String authority,
255
      @Nullable String userAgent,
256
      Attributes eagAttrs,
257
      Supplier<Stopwatch> stopwatchFactory,
258
      Variant variant,
259
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
260
      Runnable tooManyPingsRunnable) {
1✔
261
    this.address = Preconditions.checkNotNull(address, "address");
1✔
262
    this.defaultAuthority = authority;
1✔
263
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
264
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
265
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
266
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
267
    this.scheduler = Preconditions.checkNotNull(
1✔
268
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
269
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
270
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
271
    nextStreamId = 3;
1✔
272
    this.socketFactory = transportFactory.socketFactory == null
1✔
273
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
274
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
275
    this.hostnameVerifier = transportFactory.hostnameVerifier;
1✔
276
    this.connectionSpec = Preconditions.checkNotNull(
1✔
277
        transportFactory.connectionSpec, "connectionSpec");
278
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
279
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
280
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
281
    this.proxiedAddr = proxiedAddr;
1✔
282
    this.tooManyPingsRunnable =
1✔
283
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
284
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
285
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
286
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
287
    this.attributes = Attributes.newBuilder()
1✔
288
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
289
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
290
    initTransportTracer();
1✔
291
  }
1✔
292

293
  /**
294
   * Create a transport connected to a fake peer for test.
295
   */
296
  @VisibleForTesting
297
  OkHttpClientTransport(
298
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
299
      String userAgent,
300
      Supplier<Stopwatch> stopwatchFactory,
301
      Variant variant,
302
      @Nullable Runnable connectingCallback,
303
      SettableFuture<Void> connectedFuture,
304
      Runnable tooManyPingsRunnable) {
305
    this(
1✔
306
        transportFactory,
307
        new InetSocketAddress("127.0.0.1", 80),
308
        "notarealauthority:80",
309
        userAgent,
310
        Attributes.EMPTY,
311
        stopwatchFactory,
312
        variant,
313
        null,
314
        tooManyPingsRunnable);
315
    this.connectingCallback = connectingCallback;
1✔
316
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
317
  }
1✔
318

319
  // sslSocketFactory is set to null when use plaintext.
320
  boolean isUsingPlaintext() {
321
    return sslSocketFactory == null;
1✔
322
  }
323

324
  private void initTransportTracer() {
325
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
326
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
327
        @Override
328
        public TransportTracer.FlowControlWindows read() {
329
          synchronized (lock) {
1✔
330
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
331
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
332
            // provide a lower bound.
333
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
334
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
335
          }
336
        }
337
      });
338
    }
1✔
339
  }
1✔
340

341
  /**
342
   * Enable keepalive with custom delay and timeout.
343
   */
344
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
345
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
346
    enableKeepAlive = enable;
×
347
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
348
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
349
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
350
  }
×
351

352
  @Override
353
  public void ping(final PingCallback callback, Executor executor) {
354
    long data = 0;
1✔
355
    Http2Ping p;
356
    boolean writePing;
357
    synchronized (lock) {
1✔
358
      checkState(frameWriter != null);
1✔
359
      if (stopped) {
1✔
360
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
361
        return;
1✔
362
      }
363
      if (ping != null) {
1✔
364
        // we only allow one outstanding ping at a time, so just add the callback to
365
        // any outstanding operation
366
        p = ping;
1✔
367
        writePing = false;
1✔
368
      } else {
369
        // set outstanding operation and then write the ping after releasing lock
370
        data = random.nextLong();
1✔
371
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
372
        stopwatch.start();
1✔
373
        p = ping = new Http2Ping(data, stopwatch);
1✔
374
        writePing = true;
1✔
375
        transportTracer.reportKeepAliveSent();
1✔
376
      }
377
      if (writePing) {
1✔
378
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
379
      }
380
    }
1✔
381
    // If transport concurrently failed/stopped since we released the lock above, this could
382
    // immediately invoke callback (which we shouldn't do while holding a lock)
383
    p.addCallback(callback, executor);
1✔
384
  }
1✔
385

386
  @Override
387
  public OkHttpClientStream newStream(
388
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
389
      ClientStreamTracer[] tracers) {
390
    Preconditions.checkNotNull(method, "method");
1✔
391
    Preconditions.checkNotNull(headers, "headers");
1✔
392
    StatsTraceContext statsTraceContext =
1✔
393
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
394
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
395
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
396
      return new OkHttpClientStream(
1✔
397
          method,
398
          headers,
399
          frameWriter,
400
          OkHttpClientTransport.this,
401
          outboundFlow,
402
          lock,
403
          maxMessageSize,
404
          initialWindowSize,
405
          defaultAuthority,
406
          userAgent,
407
          statsTraceContext,
408
          transportTracer,
409
          callOptions,
410
          useGetForSafeMethods);
411
    }
412
  }
413

414
  @GuardedBy("lock")
415
  void streamReadyToStart(OkHttpClientStream clientStream) {
416
    if (goAwayStatus != null) {
1✔
417
      clientStream.transportState().transportReportStatus(
1✔
418
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
419
    } else if (streams.size() >= maxConcurrentStreams) {
1✔
420
      pendingStreams.add(clientStream);
1✔
421
      setInUse(clientStream);
1✔
422
    } else {
423
      startStream(clientStream);
1✔
424
    }
425
  }
1✔
426

427
  @SuppressWarnings("GuardedBy")
428
  @GuardedBy("lock")
429
  private void startStream(OkHttpClientStream stream) {
430
    Preconditions.checkState(
1✔
431
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
432
    streams.put(nextStreamId, stream);
1✔
433
    setInUse(stream);
1✔
434
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
435
    // found: 'this.lock'
436
    stream.transportState().start(nextStreamId);
1✔
437
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
438
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
439
        || stream.useGet()) {
1✔
440
      frameWriter.flush();
1✔
441
    }
442
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
443
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
444
      // correctly.
445
      nextStreamId = Integer.MAX_VALUE;
1✔
446
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
447
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
448
    } else {
449
      nextStreamId += 2;
1✔
450
    }
451
  }
1✔
452

453
  /**
454
   * Starts pending streams, returns true if at least one pending stream is started.
455
   */
456
  @GuardedBy("lock")
457
  private boolean startPendingStreams() {
458
    boolean hasStreamStarted = false;
1✔
459
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
460
      OkHttpClientStream stream = pendingStreams.poll();
1✔
461
      startStream(stream);
1✔
462
      hasStreamStarted = true;
1✔
463
    }
1✔
464
    return hasStreamStarted;
1✔
465
  }
466

467
  /**
468
   * Removes given pending stream, used when a pending stream is cancelled.
469
   */
470
  @GuardedBy("lock")
471
  void removePendingStream(OkHttpClientStream pendingStream) {
472
    pendingStreams.remove(pendingStream);
1✔
473
    maybeClearInUse(pendingStream);
1✔
474
  }
1✔
475

476
  @Override
477
  public Runnable start(Listener listener) {
478
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
479

480
    if (enableKeepAlive) {
1✔
481
      keepAliveManager = new KeepAliveManager(
×
482
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
483
          keepAliveWithoutCalls);
484
      keepAliveManager.onTransportStarted();
×
485
    }
486

487
    int maxQueuedControlFrames = 10000;
1✔
488
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
489
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
490
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
491

492
    synchronized (lock) {
1✔
493
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
494
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
495
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
496
      // the same TransportExceptionHandler instance so it is all mixed back together.
497
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
498
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
499
    }
1✔
500
    final CountDownLatch latch = new CountDownLatch(1);
1✔
501
    // Connecting in the serializingExecutor, so that some stream operations like synStream
502
    // will be executed after connected.
503
    serializingExecutor.execute(new Runnable() {
1✔
504
      @Override
505
      public void run() {
506
        // This is a hack to make sure the connection preface and initial settings to be sent out
507
        // without blocking the start. By doing this essentially prevents potential deadlock when
508
        // network is not available during startup while another thread holding lock to send the
509
        // initial preface.
510
        try {
511
          latch.await();
1✔
512
        } catch (InterruptedException e) {
×
513
          Thread.currentThread().interrupt();
×
514
        }
1✔
515
        // Use closed source on failure so that the reader immediately shuts down.
516
        BufferedSource source = Okio.buffer(new Source() {
1✔
517
          @Override
518
          public long read(Buffer sink, long byteCount) {
519
            return -1;
1✔
520
          }
521

522
          @Override
523
          public Timeout timeout() {
524
            return Timeout.NONE;
×
525
          }
526

527
          @Override
528
          public void close() {
529
          }
1✔
530
        });
531
        Socket sock;
532
        SSLSession sslSession = null;
1✔
533
        try {
534
          if (proxiedAddr == null) {
1✔
535
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
536
          } else {
537
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
538
              sock = createHttpProxySocket(
1✔
539
                  proxiedAddr.getTargetAddress(),
1✔
540
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
541
                  proxiedAddr.getUsername(),
1✔
542
                  proxiedAddr.getPassword()
1✔
543
              );
544
            } else {
545
              throw Status.INTERNAL.withDescription(
×
546
                  "Unsupported SocketAddress implementation "
547
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
548
            }
549
          }
550
          if (sslSocketFactory != null) {
1✔
551
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
552
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
553
                connectionSpec);
1✔
554
            sslSession = sslSocket.getSession();
1✔
555
            sock = sslSocket;
1✔
556
          }
557
          sock.setTcpNoDelay(true);
1✔
558
          source = Okio.buffer(Okio.source(sock));
1✔
559
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
560

561
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
562
          attributes = attributes.toBuilder()
1✔
563
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
564
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
565
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
566
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
567
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
568
              .build();
1✔
569
        } catch (StatusException e) {
1✔
570
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
571
          return;
1✔
572
        } catch (Exception e) {
1✔
573
          onException(e);
1✔
574
          return;
1✔
575
        } finally {
576
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
577
        }
578
        synchronized (lock) {
1✔
579
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
580
          if (sslSession != null) {
1✔
581
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
582
          }
583
        }
1✔
584
      }
1✔
585
    });
586
    // Schedule to send connection preface & settings before any other write.
587
    try {
588
      sendConnectionPrefaceAndSettings();
1✔
589
    } finally {
590
      latch.countDown();
1✔
591
    }
592

593
    serializingExecutor.execute(new Runnable() {
1✔
594
      @Override
595
      public void run() {
596
        if (connectingCallback != null) {
1✔
597
          connectingCallback.run();
1✔
598
        }
599
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
600
        // may send goAway immediately.
601
        executor.execute(clientFrameHandler);
1✔
602
        synchronized (lock) {
1✔
603
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
604
          startPendingStreams();
1✔
605
        }
1✔
606
        if (connectedFuture != null) {
1✔
607
          connectedFuture.set(null);
1✔
608
        }
609
      }
1✔
610
    });
611
    return null;
1✔
612
  }
613

614
  /**
615
   * Should only be called once when the transport is first established.
616
   */
617
  private void sendConnectionPrefaceAndSettings() {
618
    synchronized (lock) {
1✔
619
      frameWriter.connectionPreface();
1✔
620
      Settings settings = new Settings();
1✔
621
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
622
      frameWriter.settings(settings);
1✔
623
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
624
        frameWriter.windowUpdate(
1✔
625
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
626
      }
627
    }
1✔
628
  }
1✔
629

630
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
631
      String proxyUsername, String proxyPassword) throws StatusException {
632
    Socket sock = null;
1✔
633
    try {
634
      // The proxy address may not be resolved
635
      if (proxyAddress.getAddress() != null) {
1✔
636
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
637
      } else {
638
        sock =
×
639
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
640
      }
641
      sock.setTcpNoDelay(true);
1✔
642
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
643
      // can cause reading from the socket to hang.
644
      sock.setSoTimeout(proxySocketTimeout);
1✔
645

646
      Source source = Okio.source(sock);
1✔
647
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
648

649
      // Prepare headers and request method line
650
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
651
      HttpUrl url = proxyRequest.httpUrl();
1✔
652
      String requestLine =
1✔
653
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
654

655
      // Write request to socket
656
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
657
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
658
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
659
            .writeUtf8(": ")
1✔
660
            .writeUtf8(proxyRequest.headers().value(i))
1✔
661
            .writeUtf8("\r\n");
1✔
662
      }
663
      sink.writeUtf8("\r\n");
1✔
664
      // Flush buffer (flushes socket and sends request)
665
      sink.flush();
1✔
666

667
      // Read status line, check if 2xx was returned
668
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
669
      // Drain rest of headers
670
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
671
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
672
        Buffer body = new Buffer();
1✔
673
        try {
674
          sock.shutdownOutput();
1✔
675
          source.read(body, 1024);
1✔
676
        } catch (IOException ex) {
×
677
          body.writeUtf8("Unable to read body: " + ex.toString());
×
678
        }
1✔
679
        try {
680
          sock.close();
1✔
681
        } catch (IOException ignored) {
×
682
          // ignored
683
        }
1✔
684
        String message = String.format(
1✔
685
            Locale.US,
686
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
687
              + "Response body:\n%s",
688
            statusLine.code, statusLine.message, body.readUtf8());
1✔
689
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
690
      }
691
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
692
      sock.setSoTimeout(0);
1✔
693
      return sock;
1✔
694
    } catch (IOException e) {
1✔
695
      if (sock != null) {
1✔
696
        GrpcUtil.closeQuietly(sock);
1✔
697
      }
698
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
699
          .asException();
1✔
700
    }
701
  }
702

703
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
704
                                         String proxyPassword) {
705
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
706
        .scheme("https")
1✔
707
        .host(address.getHostName())
1✔
708
        .port(address.getPort())
1✔
709
        .build();
1✔
710

711
    Request.Builder request = new Request.Builder()
1✔
712
        .url(tunnelUrl)
1✔
713
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
714
        .header("User-Agent", userAgent);
1✔
715

716
    // If we have proxy credentials, set them right away
717
    if (proxyUsername != null && proxyPassword != null) {
1✔
718
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
719
    }
720
    return request.build();
1✔
721
  }
722

723
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
724
    Buffer buffer = new Buffer();
1✔
725
    while (true) {
726
      if (source.read(buffer, 1) == -1) {
1✔
727
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
728
      }
729
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
730
        return buffer.readUtf8LineStrict();
1✔
731
      }
732
    }
733
  }
734

735
  @Override
736
  public String toString() {
737
    return MoreObjects.toStringHelper(this)
1✔
738
        .add("logId", logId.getId())
1✔
739
        .add("address", address)
1✔
740
        .toString();
1✔
741
  }
742

743
  @Override
744
  public InternalLogId getLogId() {
745
    return logId;
1✔
746
  }
747

748
  /**
749
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
750
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
751
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
752
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
753
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
754
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
755
   * connection to the port, independent of this function.
756
   *
757
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
758
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
759
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
760
   */
761
  @VisibleForTesting
762
  String getOverridenHost() {
763
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
764
    if (uri.getHost() != null) {
1✔
765
      return uri.getHost();
1✔
766
    }
767

768
    return defaultAuthority;
1✔
769
  }
770

771
  @VisibleForTesting
772
  int getOverridenPort() {
773
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
774
    if (uri.getPort() != -1) {
1✔
775
      return uri.getPort();
1✔
776
    }
777

778
    return address.getPort();
1✔
779
  }
780

781
  @Override
782
  public void shutdown(Status reason) {
783
    synchronized (lock) {
1✔
784
      if (goAwayStatus != null) {
1✔
785
        return;
1✔
786
      }
787

788
      goAwayStatus = reason;
1✔
789
      listener.transportShutdown(goAwayStatus);
1✔
790
      stopIfNecessary();
1✔
791
    }
1✔
792
  }
1✔
793

794
  @Override
795
  public void shutdownNow(Status reason) {
796
    shutdown(reason);
1✔
797
    synchronized (lock) {
1✔
798
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
799
      while (it.hasNext()) {
1✔
800
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
801
        it.remove();
1✔
802
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
803
        maybeClearInUse(entry.getValue());
1✔
804
      }
1✔
805

806
      for (OkHttpClientStream stream : pendingStreams) {
1✔
807
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
808
        // chance to retry and be routed to another connection.
809
        stream.transportState().transportReportStatus(
1✔
810
            reason, RpcProgress.MISCARRIED, true, new Metadata());
811
        maybeClearInUse(stream);
1✔
812
      }
1✔
813
      pendingStreams.clear();
1✔
814

815
      stopIfNecessary();
1✔
816
    }
1✔
817
  }
1✔
818

819
  @Override
820
  public Attributes getAttributes() {
821
    return attributes;
1✔
822
  }
823

824
  /**
825
   * Gets all active streams as an array.
826
   */
827
  @Override
828
  public OutboundFlowController.StreamState[] getActiveStreams() {
829
    synchronized (lock) {
1✔
830
      OutboundFlowController.StreamState[] flowStreams =
1✔
831
          new OutboundFlowController.StreamState[streams.size()];
1✔
832
      int i = 0;
1✔
833
      for (OkHttpClientStream stream : streams.values()) {
1✔
834
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
835
      }
1✔
836
      return flowStreams;
1✔
837
    }
838
  }
839

840
  @VisibleForTesting
841
  ClientFrameHandler getHandler() {
842
    return clientFrameHandler;
1✔
843
  }
844

845
  @VisibleForTesting
846
  SocketFactory getSocketFactory() {
847
    return socketFactory;
1✔
848
  }
849

850
  @VisibleForTesting
851
  int getPendingStreamSize() {
852
    synchronized (lock) {
1✔
853
      return pendingStreams.size();
1✔
854
    }
855
  }
856

857
  @VisibleForTesting
858
  void setNextStreamId(int nextStreamId) {
859
    synchronized (lock) {
1✔
860
      this.nextStreamId = nextStreamId;
1✔
861
    }
1✔
862
  }
1✔
863

864
  /**
865
   * Finish all active streams due to an IOException, then close the transport.
866
   */
867
  @Override
868
  public void onException(Throwable failureCause) {
869
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
870
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
871
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
872
  }
1✔
873

874
  /**
875
   * Send GOAWAY to the server, then finish all active streams and close the transport.
876
   */
877
  private void onError(ErrorCode errorCode, String moreDetail) {
878
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
879
  }
1✔
880

881
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
882
    synchronized (lock) {
1✔
883
      if (goAwayStatus == null) {
1✔
884
        goAwayStatus = status;
1✔
885
        listener.transportShutdown(status);
1✔
886
      }
887
      if (errorCode != null && !goAwaySent) {
1✔
888
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
889
        // streams. The GOAWAY is part of graceful shutdown.
890
        goAwaySent = true;
1✔
891
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
892
      }
893

894
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
895
      while (it.hasNext()) {
1✔
896
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
897
        if (entry.getKey() > lastKnownStreamId) {
1✔
898
          it.remove();
1✔
899
          entry.getValue().transportState().transportReportStatus(
1✔
900
              status, RpcProgress.REFUSED, false, new Metadata());
901
          maybeClearInUse(entry.getValue());
1✔
902
        }
903
      }
1✔
904

905
      for (OkHttpClientStream stream : pendingStreams) {
1✔
906
        stream.transportState().transportReportStatus(
1✔
907
            status, RpcProgress.MISCARRIED, true, new Metadata());
908
        maybeClearInUse(stream);
1✔
909
      }
1✔
910
      pendingStreams.clear();
1✔
911

912
      stopIfNecessary();
1✔
913
    }
1✔
914
  }
1✔
915

916
  /**
917
   * Called when a stream is closed. We do things like:
918
   * <ul>
919
   * <li>Removing the stream from the map.
920
   * <li>Optionally reporting the status.
921
   * <li>Starting pending streams if we can.
922
   * <li>Stopping the transport if this is the last live stream under a go-away status.
923
   * </ul>
924
   *
925
   * @param streamId the Id of the stream.
926
   * @param status the final status of this stream, null means no need to report.
927
   * @param stopDelivery interrupt queued messages in the deframer
928
   * @param errorCode reset the stream with this ErrorCode if not null.
929
   * @param trailers the trailers received if not null
930
   */
931
  void finishStream(
932
      int streamId,
933
      @Nullable Status status,
934
      RpcProgress rpcProgress,
935
      boolean stopDelivery,
936
      @Nullable ErrorCode errorCode,
937
      @Nullable Metadata trailers) {
938
    synchronized (lock) {
1✔
939
      OkHttpClientStream stream = streams.remove(streamId);
1✔
940
      if (stream != null) {
1✔
941
        if (errorCode != null) {
1✔
942
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
943
        }
944
        if (status != null) {
1✔
945
          stream
1✔
946
              .transportState()
1✔
947
              .transportReportStatus(
1✔
948
                  status,
949
                  rpcProgress,
950
                  stopDelivery,
951
                  trailers != null ? trailers : new Metadata());
1✔
952
        }
953
        if (!startPendingStreams()) {
1✔
954
          stopIfNecessary();
1✔
955
          maybeClearInUse(stream);
1✔
956
        }
957
      }
958
    }
1✔
959
  }
1✔
960

961
  /**
962
   * When the transport is in goAway state, we should stop it once all active streams finish.
963
   */
964
  @GuardedBy("lock")
965
  private void stopIfNecessary() {
966
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
967
      return;
1✔
968
    }
969
    if (stopped) {
1✔
970
      return;
1✔
971
    }
972
    stopped = true;
1✔
973

974
    if (keepAliveManager != null) {
1✔
975
      keepAliveManager.onTransportTermination();
×
976
    }
977

978
    if (ping != null) {
1✔
979
      ping.failed(getPingFailure());
1✔
980
      ping = null;
1✔
981
    }
982

983
    if (!goAwaySent) {
1✔
984
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
985
      // streams. The GOAWAY is part of graceful shutdown.
986
      goAwaySent = true;
1✔
987
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
988
    }
989

990
    // We will close the underlying socket in the writing thread to break out the reader
991
    // thread, which will close the frameReader and notify the listener.
992
    frameWriter.close();
1✔
993
  }
1✔
994

995
  @GuardedBy("lock")
996
  private void maybeClearInUse(OkHttpClientStream stream) {
997
    if (hasStream) {
1✔
998
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
999
        hasStream = false;
1✔
1000
        if (keepAliveManager != null) {
1✔
1001
          // We don't have any active streams. No need to do keepalives any more.
1002
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1003
          // and onTransportActive.
1004
          keepAliveManager.onTransportIdle();
×
1005
        }
1006
      }
1007
    }
1008
    if (stream.shouldBeCountedForInUse()) {
1✔
1009
      inUseState.updateObjectInUse(stream, false);
1✔
1010
    }
1011
  }
1✔
1012

1013
  @GuardedBy("lock")
1014
  private void setInUse(OkHttpClientStream stream) {
1015
    if (!hasStream) {
1✔
1016
      hasStream = true;
1✔
1017
      if (keepAliveManager != null) {
1✔
1018
        // We have a new stream. We might need to do keepalives now.
1019
        // Note that we have to do this inside the lock to avoid calling
1020
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1021
        // order.
1022
        keepAliveManager.onTransportActive();
×
1023
      }
1024
    }
1025
    if (stream.shouldBeCountedForInUse()) {
1✔
1026
      inUseState.updateObjectInUse(stream, true);
1✔
1027
    }
1028
  }
1✔
1029

1030
  private Throwable getPingFailure() {
1031
    synchronized (lock) {
1✔
1032
      if (goAwayStatus != null) {
1✔
1033
        return goAwayStatus.asException();
1✔
1034
      } else {
1035
        return Status.UNAVAILABLE.withDescription("Connection closed").asException();
×
1036
      }
1037
    }
1038
  }
1039

1040
  boolean mayHaveCreatedStream(int streamId) {
1041
    synchronized (lock) {
1✔
1042
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1043
    }
1044
  }
1045

1046
  OkHttpClientStream getStream(int streamId) {
1047
    synchronized (lock) {
1✔
1048
      return streams.get(streamId);
1✔
1049
    }
1050
  }
1051

1052
  /**
1053
   * Returns a Grpc status corresponding to the given ErrorCode.
1054
   */
1055
  @VisibleForTesting
1056
  static Status toGrpcStatus(ErrorCode code) {
1057
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1058
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1059
        "Unknown http2 error code: " + code.httpCode);
1060
  }
1061

1062
  @Override
1063
  public ListenableFuture<SocketStats> getStats() {
1064
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1065
    synchronized (lock) {
1✔
1066
      if (socket == null) {
1✔
1067
        ret.set(new SocketStats(
×
1068
            transportTracer.getStats(),
×
1069
            /*local=*/ null,
1070
            /*remote=*/ null,
1071
            new InternalChannelz.SocketOptions.Builder().build(),
×
1072
            /*security=*/ null));
1073
      } else {
1074
        ret.set(new SocketStats(
1✔
1075
            transportTracer.getStats(),
1✔
1076
            socket.getLocalSocketAddress(),
1✔
1077
            socket.getRemoteSocketAddress(),
1✔
1078
            Utils.getSocketOptions(socket),
1✔
1079
            securityInfo));
1080
      }
1081
      return ret;
1✔
1082
    }
1083
  }
1084

1085
  /**
1086
   * Runnable which reads frames and dispatches them to in flight calls.
1087
   */
1088
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1089

1090
    private final OkHttpFrameLogger logger =
1✔
1091
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1092
    FrameReader frameReader;
1093
    boolean firstSettings = true;
1✔
1094

1095
    ClientFrameHandler(FrameReader frameReader) {
1✔
1096
      this.frameReader = frameReader;
1✔
1097
    }
1✔
1098

1099
    @Override
1100
    public void run() {
1101
      String threadName = Thread.currentThread().getName();
1✔
1102
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1103
      try {
1104
        // Read until the underlying socket closes.
1105
        while (frameReader.nextFrame(this)) {
1✔
1106
          if (keepAliveManager != null) {
1✔
1107
            keepAliveManager.onDataReceived();
×
1108
          }
1109
        }
1110
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1111
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1112
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1113
        Status status;
1114
        synchronized (lock) {
1✔
1115
          status = goAwayStatus;
1✔
1116
        }
1✔
1117
        if (status == null) {
1✔
1118
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1119
        }
1120
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1121
      } catch (Throwable t) {
1✔
1122
        // TODO(madongfly): Send the exception message to the server.
1123
        startGoAway(
1✔
1124
            0,
1125
            ErrorCode.PROTOCOL_ERROR,
1126
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1127
      } finally {
1128
        try {
1129
          frameReader.close();
1✔
1130
        } catch (IOException ex) {
×
1131
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1132
        }
1✔
1133
        listener.transportTerminated();
1✔
1134
        Thread.currentThread().setName(threadName);
1✔
1135
      }
1136
    }
1✔
1137

1138
    /**
1139
     * Handle an HTTP2 DATA frame.
1140
     */
1141
    @SuppressWarnings("GuardedBy")
1142
    @Override
1143
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1144
                     int paddedLength)
1145
        throws IOException {
1146
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1147
          streamId, in.getBuffer(), length, inFinished);
1✔
1148
      OkHttpClientStream stream = getStream(streamId);
1✔
1149
      if (stream == null) {
1✔
1150
        if (mayHaveCreatedStream(streamId)) {
1✔
1151
          synchronized (lock) {
1✔
1152
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1153
          }
1✔
1154
          in.skip(length);
1✔
1155
        } else {
1156
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1157
          return;
1✔
1158
        }
1159
      } else {
1160
        // Wait until the frame is complete.
1161
        in.require(length);
1✔
1162

1163
        Buffer buf = new Buffer();
1✔
1164
        buf.write(in.getBuffer(), length);
1✔
1165
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1166
            stream.transportState().tag());
1✔
1167
        synchronized (lock) {
1✔
1168
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1169
          // instead found: 'OkHttpClientTransport.this.lock'
1170
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1171
        }
1✔
1172
      }
1173

1174
      // connection window update
1175
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1176
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1177
        synchronized (lock) {
1✔
1178
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1179
        }
1✔
1180
        connectionUnacknowledgedBytesRead = 0;
1✔
1181
      }
1182
    }
1✔
1183

1184
    /**
1185
     * Handle HTTP2 HEADER and CONTINUATION frames.
1186
     */
1187
    @SuppressWarnings("GuardedBy")
1188
    @Override
1189
    public void headers(boolean outFinished,
1190
        boolean inFinished,
1191
        int streamId,
1192
        int associatedStreamId,
1193
        List<Header> headerBlock,
1194
        HeadersMode headersMode) {
1195
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1196
      boolean unknownStream = false;
1✔
1197
      Status failedStatus = null;
1✔
1198
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1199
        int metadataSize = headerBlockSize(headerBlock);
1✔
1200
        if (metadataSize > maxInboundMetadataSize) {
1✔
1201
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1202
              String.format(
1✔
1203
                  Locale.US,
1204
                  "Response %s metadata larger than %d: %d",
1205
                  inFinished ? "trailer" : "header",
1✔
1206
                  maxInboundMetadataSize,
1✔
1207
                  metadataSize));
1✔
1208
        }
1209
      }
1210
      synchronized (lock) {
1✔
1211
        OkHttpClientStream stream = streams.get(streamId);
1✔
1212
        if (stream == null) {
1✔
1213
          if (mayHaveCreatedStream(streamId)) {
1✔
1214
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1215
          } else {
1216
            unknownStream = true;
1✔
1217
          }
1218
        } else {
1219
          if (failedStatus == null) {
1✔
1220
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1221
                stream.transportState().tag());
1✔
1222
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1223
            // instead found: 'OkHttpClientTransport.this.lock'
1224
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1225
          } else {
1226
            if (!inFinished) {
1✔
1227
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1228
            }
1229
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1230
          }
1231
        }
1232
      }
1✔
1233
      if (unknownStream) {
1✔
1234
        // We don't expect any server-initiated streams.
1235
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1236
      }
1237
    }
1✔
1238

1239
    private int headerBlockSize(List<Header> headerBlock) {
1240
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1241
      long size = 0;
1✔
1242
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1243
        Header header = headerBlock.get(i);
1✔
1244
        size += 32 + header.name.size() + header.value.size();
1✔
1245
      }
1246
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1247
      return (int) size;
1✔
1248
    }
1249

1250
    @Override
1251
    public void rstStream(int streamId, ErrorCode errorCode) {
1252
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1253
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1254
      boolean stopDelivery =
1✔
1255
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1256
      synchronized (lock) {
1✔
1257
        OkHttpClientStream stream = streams.get(streamId);
1✔
1258
        if (stream != null) {
1✔
1259
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1260
              stream.transportState().tag());
1✔
1261
          finishStream(
1✔
1262
              streamId, status,
1263
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1264
              stopDelivery, null, null);
1265
        }
1266
      }
1✔
1267
    }
1✔
1268

1269
    @Override
1270
    public void settings(boolean clearPrevious, Settings settings) {
1271
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1272
      boolean outboundWindowSizeIncreased = false;
1✔
1273
      synchronized (lock) {
1✔
1274
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1275
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1276
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1277
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1278
        }
1279

1280
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1281
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1282
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1283
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1284
        }
1285
        if (firstSettings) {
1✔
1286
          listener.transportReady();
1✔
1287
          firstSettings = false;
1✔
1288
        }
1289

1290
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1291
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1292
        // otherwise it will cause a stream error (RST_STREAM).
1293
        frameWriter.ackSettings(settings);
1✔
1294

1295
        // send any pending bytes / streams
1296
        if (outboundWindowSizeIncreased) {
1✔
1297
          outboundFlow.writeStreams();
1✔
1298
        }
1299
        startPendingStreams();
1✔
1300
      }
1✔
1301
    }
1✔
1302

1303
    @Override
1304
    public void ping(boolean ack, int payload1, int payload2) {
1305
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1306
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1307
      if (!ack) {
1✔
1308
        synchronized (lock) {
1✔
1309
          frameWriter.ping(true, payload1, payload2);
1✔
1310
        }
1✔
1311
      } else {
1312
        Http2Ping p = null;
1✔
1313
        synchronized (lock) {
1✔
1314
          if (ping != null) {
1✔
1315
            if (ping.payload() == ackPayload) {
1✔
1316
              p = ping;
1✔
1317
              ping = null;
1✔
1318
            } else {
1319
              log.log(Level.WARNING, String.format(
1✔
1320
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1321
                  ping.payload(), ackPayload));
1✔
1322
            }
1323
          } else {
1324
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1325
          }
1326
        }
1✔
1327
        // don't complete it while holding lock since callbacks could run immediately
1328
        if (p != null) {
1✔
1329
          p.complete();
1✔
1330
        }
1331
      }
1332
    }
1✔
1333

1334
    @Override
1335
    public void ackSettings() {
1336
      // Do nothing currently.
1337
    }
1✔
1338

1339
    @Override
1340
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1341
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1342
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1343
        String data = debugData.utf8();
1✔
1344
        log.log(Level.WARNING, String.format(
1✔
1345
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1346
        if ("too_many_pings".equals(data)) {
1✔
1347
          tooManyPingsRunnable.run();
1✔
1348
        }
1349
      }
1350
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1351
          .augmentDescription("Received Goaway");
1✔
1352
      if (debugData.size() > 0) {
1✔
1353
        // If a debug message was provided, use it.
1354
        status = status.augmentDescription(debugData.utf8());
1✔
1355
      }
1356
      startGoAway(lastGoodStreamId, null, status);
1✔
1357
    }
1✔
1358

1359
    @Override
1360
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1361
        throws IOException {
1362
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1363
          streamId, promisedStreamId, requestHeaders);
1364
      // We don't accept server initiated stream.
1365
      synchronized (lock) {
1✔
1366
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1367
      }
1✔
1368
    }
1✔
1369

1370
    @Override
1371
    public void windowUpdate(int streamId, long delta) {
1372
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1373
      if (delta == 0) {
1✔
1374
        String errorMsg = "Received 0 flow control window increment.";
×
1375
        if (streamId == 0) {
×
1376
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1377
        } else {
1378
          finishStream(
×
1379
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1380
              ErrorCode.PROTOCOL_ERROR, null);
1381
        }
1382
        return;
×
1383
      }
1384

1385
      boolean unknownStream = false;
1✔
1386
      synchronized (lock) {
1✔
1387
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1388
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1389
          return;
1✔
1390
        }
1391

1392
        OkHttpClientStream stream = streams.get(streamId);
1✔
1393
        if (stream != null) {
1✔
1394
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1395
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1396
          unknownStream = true;
1✔
1397
        }
1398
      }
1✔
1399
      if (unknownStream) {
1✔
1400
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1401
            "Received window_update for unknown stream: " + streamId);
1402
      }
1403
    }
1✔
1404

1405
    @Override
1406
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1407
      // Ignore priority change.
1408
      // TODO(madongfly): log
1409
    }
×
1410

1411
    @Override
1412
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1413
        int port, long maxAge) {
1414
      // TODO(madongfly): Deal with alternateService propagation
1415
    }
×
1416
  }
1417
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc