• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19449

05 Sep 2024 10:32PM UTC coverage: 84.488% (-0.01%) from 84.498%
#19449

push

github

ejona86
core: touch() buffer when detach()ing

Detachable lets a buffer outlive its original lifetime. The new lifetime
is application-controlled. If the application fails to read/close the
stream, then the leak detector wouldn't make clear what code was
responsible for the buffer's lifetime. With this touch, we'll be able to
see detach() was called and thus know the application needs debugging.

Realized when looking at b/364531464, although I think the issue is
unrelated.

33251 of 39356 relevant lines covered (84.49%)

0.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.26
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import io.grpc.Attributes;
31
import io.grpc.CallOptions;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.Grpc;
34
import io.grpc.HttpConnectProxiedSocketAddress;
35
import io.grpc.InternalChannelz;
36
import io.grpc.InternalChannelz.SocketStats;
37
import io.grpc.InternalLogId;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.MethodDescriptor.MethodType;
41
import io.grpc.SecurityLevel;
42
import io.grpc.Status;
43
import io.grpc.Status.Code;
44
import io.grpc.StatusException;
45
import io.grpc.internal.ClientStreamListener.RpcProgress;
46
import io.grpc.internal.ConnectionClientTransport;
47
import io.grpc.internal.GrpcAttributes;
48
import io.grpc.internal.GrpcUtil;
49
import io.grpc.internal.Http2Ping;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.KeepAliveManager;
52
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
53
import io.grpc.internal.SerializingExecutor;
54
import io.grpc.internal.StatsTraceContext;
55
import io.grpc.internal.TransportTracer;
56
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
57
import io.grpc.okhttp.internal.ConnectionSpec;
58
import io.grpc.okhttp.internal.Credentials;
59
import io.grpc.okhttp.internal.StatusLine;
60
import io.grpc.okhttp.internal.framed.ErrorCode;
61
import io.grpc.okhttp.internal.framed.FrameReader;
62
import io.grpc.okhttp.internal.framed.FrameWriter;
63
import io.grpc.okhttp.internal.framed.Header;
64
import io.grpc.okhttp.internal.framed.HeadersMode;
65
import io.grpc.okhttp.internal.framed.Http2;
66
import io.grpc.okhttp.internal.framed.Settings;
67
import io.grpc.okhttp.internal.framed.Variant;
68
import io.grpc.okhttp.internal.proxy.HttpUrl;
69
import io.grpc.okhttp.internal.proxy.Request;
70
import io.perfmark.PerfMark;
71
import java.io.EOFException;
72
import java.io.IOException;
73
import java.net.InetSocketAddress;
74
import java.net.Socket;
75
import java.net.URI;
76
import java.util.Collections;
77
import java.util.Deque;
78
import java.util.EnumMap;
79
import java.util.HashMap;
80
import java.util.Iterator;
81
import java.util.LinkedList;
82
import java.util.List;
83
import java.util.Locale;
84
import java.util.Map;
85
import java.util.Random;
86
import java.util.concurrent.CountDownLatch;
87
import java.util.concurrent.Executor;
88
import java.util.concurrent.ScheduledExecutorService;
89
import java.util.logging.Level;
90
import java.util.logging.Logger;
91
import javax.annotation.Nullable;
92
import javax.annotation.concurrent.GuardedBy;
93
import javax.net.SocketFactory;
94
import javax.net.ssl.HostnameVerifier;
95
import javax.net.ssl.SSLSession;
96
import javax.net.ssl.SSLSocket;
97
import javax.net.ssl.SSLSocketFactory;
98
import okio.Buffer;
99
import okio.BufferedSink;
100
import okio.BufferedSource;
101
import okio.ByteString;
102
import okio.Okio;
103
import okio.Source;
104
import okio.Timeout;
105

106
/**
107
 * A okhttp-based {@link ConnectionClientTransport} implementation.
108
 */
109
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
110
      OutboundFlowController.Transport {
111
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
112
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
113

114
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
115
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
116
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
117
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
118
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
119
        Status.INTERNAL.withDescription("Protocol error"));
1✔
120
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
121
        Status.INTERNAL.withDescription("Internal error"));
1✔
122
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
123
        Status.INTERNAL.withDescription("Flow control error"));
1✔
124
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
125
        Status.INTERNAL.withDescription("Stream closed"));
1✔
126
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
127
        Status.INTERNAL.withDescription("Frame too large"));
1✔
128
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
129
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
130
    errorToStatus.put(ErrorCode.CANCEL,
1✔
131
        Status.CANCELLED.withDescription("Cancelled"));
1✔
132
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
133
        Status.INTERNAL.withDescription("Compression error"));
1✔
134
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
135
        Status.INTERNAL.withDescription("Connect error"));
1✔
136
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
137
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
138
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
139
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
140
    return Collections.unmodifiableMap(errorToStatus);
1✔
141
  }
142

143
  private final InetSocketAddress address;
144
  private final String defaultAuthority;
145
  private final String userAgent;
146
  private final Random random = new Random();
1✔
147
  // Returns new unstarted stopwatches
148
  private final Supplier<Stopwatch> stopwatchFactory;
149
  private final int initialWindowSize;
150
  private final Variant variant;
151
  private Listener listener;
152
  @GuardedBy("lock")
153
  private ExceptionHandlingFrameWriter frameWriter;
154
  private OutboundFlowController outboundFlow;
155
  private final Object lock = new Object();
1✔
156
  private final InternalLogId logId;
157
  @GuardedBy("lock")
158
  private int nextStreamId;
159
  @GuardedBy("lock")
1✔
160
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
161
  private final Executor executor;
162
  // Wrap on executor, to guarantee some operations be executed serially.
163
  private final SerializingExecutor serializingExecutor;
164
  private final ScheduledExecutorService scheduler;
165
  private final int maxMessageSize;
166
  private int connectionUnacknowledgedBytesRead;
167
  private ClientFrameHandler clientFrameHandler;
168
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
169
  private Attributes attributes;
170
  /**
171
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
172
   * streams may continue.
173
   */
174
  @GuardedBy("lock")
175
  private Status goAwayStatus;
176
  @GuardedBy("lock")
177
  private boolean goAwaySent;
178
  @GuardedBy("lock")
179
  private Http2Ping ping;
180
  @GuardedBy("lock")
181
  private boolean stopped;
182
  @GuardedBy("lock")
183
  private boolean hasStream;
184
  private final SocketFactory socketFactory;
185
  private SSLSocketFactory sslSocketFactory;
186
  private HostnameVerifier hostnameVerifier;
187
  private Socket socket;
188
  @GuardedBy("lock")
1✔
189
  private int maxConcurrentStreams = 0;
190
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
191
  @GuardedBy("lock")
192
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
193
  private final ConnectionSpec connectionSpec;
194
  private KeepAliveManager keepAliveManager;
195
  private boolean enableKeepAlive;
196
  private long keepAliveTimeNanos;
197
  private long keepAliveTimeoutNanos;
198
  private boolean keepAliveWithoutCalls;
199
  private final Runnable tooManyPingsRunnable;
200
  private final int maxInboundMetadataSize;
201
  private final boolean useGetForSafeMethods;
202
  @GuardedBy("lock")
203
  private final TransportTracer transportTracer;
204
  @GuardedBy("lock")
1✔
205
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
206
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
207
        @Override
208
        protected void handleInUse() {
209
          listener.transportInUse(true);
1✔
210
        }
1✔
211

212
        @Override
213
        protected void handleNotInUse() {
214
          listener.transportInUse(false);
1✔
215
        }
1✔
216
      };
217
  @GuardedBy("lock")
218
  private InternalChannelz.Security securityInfo;
219

220
  @VisibleForTesting
221
  @Nullable
222
  final HttpConnectProxiedSocketAddress proxiedAddr;
223

224
  @VisibleForTesting
1✔
225
  int proxySocketTimeout = 30000;
226

227
  // The following fields should only be used for test.
228
  Runnable connectingCallback;
229
  SettableFuture<Void> connectedFuture;
230

231
  public OkHttpClientTransport(
232
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
233
      InetSocketAddress address,
234
      String authority,
235
      @Nullable String userAgent,
236
      Attributes eagAttrs,
237
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
238
      Runnable tooManyPingsRunnable) {
239
    this(
1✔
240
        transportFactory,
241
        address,
242
        authority,
243
        userAgent,
244
        eagAttrs,
245
        GrpcUtil.STOPWATCH_SUPPLIER,
246
        new Http2(),
247
        proxiedAddr,
248
        tooManyPingsRunnable);
249
  }
1✔
250

251
  private OkHttpClientTransport(
252
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
253
      InetSocketAddress address,
254
      String authority,
255
      @Nullable String userAgent,
256
      Attributes eagAttrs,
257
      Supplier<Stopwatch> stopwatchFactory,
258
      Variant variant,
259
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
260
      Runnable tooManyPingsRunnable) {
1✔
261
    this.address = Preconditions.checkNotNull(address, "address");
1✔
262
    this.defaultAuthority = authority;
1✔
263
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
264
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
265
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
266
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
267
    this.scheduler = Preconditions.checkNotNull(
1✔
268
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
269
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
270
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
271
    nextStreamId = 3;
1✔
272
    this.socketFactory = transportFactory.socketFactory == null
1✔
273
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
274
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
275
    this.hostnameVerifier = transportFactory.hostnameVerifier;
1✔
276
    this.connectionSpec = Preconditions.checkNotNull(
1✔
277
        transportFactory.connectionSpec, "connectionSpec");
278
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
279
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
280
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
281
    this.proxiedAddr = proxiedAddr;
1✔
282
    this.tooManyPingsRunnable =
1✔
283
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
284
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
285
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
286
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
287
    this.attributes = Attributes.newBuilder()
1✔
288
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
289
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
290
    initTransportTracer();
1✔
291
  }
1✔
292

293
  /**
294
   * Create a transport connected to a fake peer for test.
295
   */
296
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
297
  @VisibleForTesting
298
  OkHttpClientTransport(
299
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
300
      String userAgent,
301
      Supplier<Stopwatch> stopwatchFactory,
302
      Variant variant,
303
      @Nullable Runnable connectingCallback,
304
      SettableFuture<Void> connectedFuture,
305
      Runnable tooManyPingsRunnable) {
306
    this(
1✔
307
        transportFactory,
308
        new InetSocketAddress("127.0.0.1", 80),
309
        "notarealauthority:80",
310
        userAgent,
311
        Attributes.EMPTY,
312
        stopwatchFactory,
313
        variant,
314
        null,
315
        tooManyPingsRunnable);
316
    this.connectingCallback = connectingCallback;
1✔
317
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
318
  }
1✔
319

320
  // sslSocketFactory is set to null when use plaintext.
321
  boolean isUsingPlaintext() {
322
    return sslSocketFactory == null;
1✔
323
  }
324

325
  private void initTransportTracer() {
326
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
327
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
328
        @Override
329
        public TransportTracer.FlowControlWindows read() {
330
          synchronized (lock) {
1✔
331
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
332
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
333
            // provide a lower bound.
334
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
335
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
336
          }
337
        }
338
      });
339
    }
1✔
340
  }
1✔
341

342
  /**
343
   * Enable keepalive with custom delay and timeout.
344
   */
345
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
346
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
347
    enableKeepAlive = enable;
×
348
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
349
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
350
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
351
  }
×
352

353
  @Override
354
  public void ping(final PingCallback callback, Executor executor) {
355
    long data = 0;
1✔
356
    Http2Ping p;
357
    boolean writePing;
358
    synchronized (lock) {
1✔
359
      checkState(frameWriter != null);
1✔
360
      if (stopped) {
1✔
361
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
362
        return;
1✔
363
      }
364
      if (ping != null) {
1✔
365
        // we only allow one outstanding ping at a time, so just add the callback to
366
        // any outstanding operation
367
        p = ping;
1✔
368
        writePing = false;
1✔
369
      } else {
370
        // set outstanding operation and then write the ping after releasing lock
371
        data = random.nextLong();
1✔
372
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
373
        stopwatch.start();
1✔
374
        p = ping = new Http2Ping(data, stopwatch);
1✔
375
        writePing = true;
1✔
376
        transportTracer.reportKeepAliveSent();
1✔
377
      }
378
      if (writePing) {
1✔
379
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
380
      }
381
    }
1✔
382
    // If transport concurrently failed/stopped since we released the lock above, this could
383
    // immediately invoke callback (which we shouldn't do while holding a lock)
384
    p.addCallback(callback, executor);
1✔
385
  }
1✔
386

387
  @Override
388
  public OkHttpClientStream newStream(
389
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
390
      ClientStreamTracer[] tracers) {
391
    Preconditions.checkNotNull(method, "method");
1✔
392
    Preconditions.checkNotNull(headers, "headers");
1✔
393
    StatsTraceContext statsTraceContext =
1✔
394
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
395
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
396
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
397
      return new OkHttpClientStream(
1✔
398
          method,
399
          headers,
400
          frameWriter,
401
          OkHttpClientTransport.this,
402
          outboundFlow,
403
          lock,
404
          maxMessageSize,
405
          initialWindowSize,
406
          defaultAuthority,
407
          userAgent,
408
          statsTraceContext,
409
          transportTracer,
410
          callOptions,
411
          useGetForSafeMethods);
412
    }
413
  }
414

415
  @GuardedBy("lock")
416
  void streamReadyToStart(OkHttpClientStream clientStream) {
417
    if (goAwayStatus != null) {
1✔
418
      clientStream.transportState().transportReportStatus(
1✔
419
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
420
    } else if (streams.size() >= maxConcurrentStreams) {
1✔
421
      pendingStreams.add(clientStream);
1✔
422
      setInUse(clientStream);
1✔
423
    } else {
424
      startStream(clientStream);
1✔
425
    }
426
  }
1✔
427

428
  @SuppressWarnings("GuardedBy")
429
  @GuardedBy("lock")
430
  private void startStream(OkHttpClientStream stream) {
431
    Preconditions.checkState(
1✔
432
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
433
    streams.put(nextStreamId, stream);
1✔
434
    setInUse(stream);
1✔
435
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
436
    // found: 'this.lock'
437
    stream.transportState().start(nextStreamId);
1✔
438
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
439
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
440
        || stream.useGet()) {
1✔
441
      frameWriter.flush();
1✔
442
    }
443
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
444
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
445
      // correctly.
446
      nextStreamId = Integer.MAX_VALUE;
1✔
447
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
448
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
449
    } else {
450
      nextStreamId += 2;
1✔
451
    }
452
  }
1✔
453

454
  /**
455
   * Starts pending streams, returns true if at least one pending stream is started.
456
   */
457
  @GuardedBy("lock")
458
  private boolean startPendingStreams() {
459
    boolean hasStreamStarted = false;
1✔
460
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
461
      OkHttpClientStream stream = pendingStreams.poll();
1✔
462
      startStream(stream);
1✔
463
      hasStreamStarted = true;
1✔
464
    }
1✔
465
    return hasStreamStarted;
1✔
466
  }
467

468
  /**
469
   * Removes given pending stream, used when a pending stream is cancelled.
470
   */
471
  @GuardedBy("lock")
472
  void removePendingStream(OkHttpClientStream pendingStream) {
473
    pendingStreams.remove(pendingStream);
1✔
474
    maybeClearInUse(pendingStream);
1✔
475
  }
1✔
476

477
  @Override
478
  public Runnable start(Listener listener) {
479
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
480

481
    if (enableKeepAlive) {
1✔
482
      keepAliveManager = new KeepAliveManager(
×
483
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
484
          keepAliveWithoutCalls);
485
      keepAliveManager.onTransportStarted();
×
486
    }
487

488
    int maxQueuedControlFrames = 10000;
1✔
489
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
490
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
491
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
492

493
    synchronized (lock) {
1✔
494
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
495
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
496
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
497
      // the same TransportExceptionHandler instance so it is all mixed back together.
498
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
499
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
500
    }
1✔
501
    final CountDownLatch latch = new CountDownLatch(1);
1✔
502
    // Connecting in the serializingExecutor, so that some stream operations like synStream
503
    // will be executed after connected.
504
    serializingExecutor.execute(new Runnable() {
1✔
505
      @Override
506
      public void run() {
507
        // This is a hack to make sure the connection preface and initial settings to be sent out
508
        // without blocking the start. By doing this essentially prevents potential deadlock when
509
        // network is not available during startup while another thread holding lock to send the
510
        // initial preface.
511
        try {
512
          latch.await();
1✔
513
        } catch (InterruptedException e) {
×
514
          Thread.currentThread().interrupt();
×
515
        }
1✔
516
        // Use closed source on failure so that the reader immediately shuts down.
517
        BufferedSource source = Okio.buffer(new Source() {
1✔
518
          @Override
519
          public long read(Buffer sink, long byteCount) {
520
            return -1;
1✔
521
          }
522

523
          @Override
524
          public Timeout timeout() {
525
            return Timeout.NONE;
×
526
          }
527

528
          @Override
529
          public void close() {
530
          }
1✔
531
        });
532
        Socket sock;
533
        SSLSession sslSession = null;
1✔
534
        try {
535
          if (proxiedAddr == null) {
1✔
536
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
537
          } else {
538
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
539
              sock = createHttpProxySocket(
1✔
540
                  proxiedAddr.getTargetAddress(),
1✔
541
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
542
                  proxiedAddr.getUsername(),
1✔
543
                  proxiedAddr.getPassword()
1✔
544
              );
545
            } else {
546
              throw Status.INTERNAL.withDescription(
×
547
                  "Unsupported SocketAddress implementation "
548
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
549
            }
550
          }
551
          if (sslSocketFactory != null) {
1✔
552
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
553
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
554
                connectionSpec);
1✔
555
            sslSession = sslSocket.getSession();
1✔
556
            sock = sslSocket;
1✔
557
          }
558
          sock.setTcpNoDelay(true);
1✔
559
          source = Okio.buffer(Okio.source(sock));
1✔
560
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
561

562
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
563
          attributes = attributes.toBuilder()
1✔
564
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
565
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
566
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
567
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
568
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
569
              .build();
1✔
570
        } catch (StatusException e) {
1✔
571
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
572
          return;
1✔
573
        } catch (Exception e) {
1✔
574
          onException(e);
1✔
575
          return;
1✔
576
        } finally {
577
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
578
        }
579
        synchronized (lock) {
1✔
580
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
581
          if (sslSession != null) {
1✔
582
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
583
          }
584
        }
1✔
585
      }
1✔
586
    });
587
    // Schedule to send connection preface & settings before any other write.
588
    try {
589
      sendConnectionPrefaceAndSettings();
1✔
590
    } finally {
591
      latch.countDown();
1✔
592
    }
593

594
    serializingExecutor.execute(new Runnable() {
1✔
595
      @Override
596
      public void run() {
597
        if (connectingCallback != null) {
1✔
598
          connectingCallback.run();
1✔
599
        }
600
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
601
        // may send goAway immediately.
602
        executor.execute(clientFrameHandler);
1✔
603
        synchronized (lock) {
1✔
604
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
605
          startPendingStreams();
1✔
606
        }
1✔
607
        if (connectedFuture != null) {
1✔
608
          connectedFuture.set(null);
1✔
609
        }
610
      }
1✔
611
    });
612
    return null;
1✔
613
  }
614

615
  /**
616
   * Should only be called once when the transport is first established.
617
   */
618
  private void sendConnectionPrefaceAndSettings() {
619
    synchronized (lock) {
1✔
620
      frameWriter.connectionPreface();
1✔
621
      Settings settings = new Settings();
1✔
622
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
623
      frameWriter.settings(settings);
1✔
624
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
625
        frameWriter.windowUpdate(
1✔
626
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
627
      }
628
    }
1✔
629
  }
1✔
630

631
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
632
      String proxyUsername, String proxyPassword) throws StatusException {
633
    Socket sock = null;
1✔
634
    try {
635
      // The proxy address may not be resolved
636
      if (proxyAddress.getAddress() != null) {
1✔
637
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
638
      } else {
639
        sock =
×
640
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
641
      }
642
      sock.setTcpNoDelay(true);
1✔
643
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
644
      // can cause reading from the socket to hang.
645
      sock.setSoTimeout(proxySocketTimeout);
1✔
646

647
      Source source = Okio.source(sock);
1✔
648
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
649

650
      // Prepare headers and request method line
651
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
652
      HttpUrl url = proxyRequest.httpUrl();
1✔
653
      String requestLine =
1✔
654
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
655

656
      // Write request to socket
657
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
658
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
659
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
660
            .writeUtf8(": ")
1✔
661
            .writeUtf8(proxyRequest.headers().value(i))
1✔
662
            .writeUtf8("\r\n");
1✔
663
      }
664
      sink.writeUtf8("\r\n");
1✔
665
      // Flush buffer (flushes socket and sends request)
666
      sink.flush();
1✔
667

668
      // Read status line, check if 2xx was returned
669
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
670
      // Drain rest of headers
671
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
672
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
673
        Buffer body = new Buffer();
1✔
674
        try {
675
          sock.shutdownOutput();
1✔
676
          source.read(body, 1024);
1✔
677
        } catch (IOException ex) {
×
678
          body.writeUtf8("Unable to read body: " + ex.toString());
×
679
        }
1✔
680
        try {
681
          sock.close();
1✔
682
        } catch (IOException ignored) {
×
683
          // ignored
684
        }
1✔
685
        String message = String.format(
1✔
686
            Locale.US,
687
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
688
              + "Response body:\n%s",
689
            statusLine.code, statusLine.message, body.readUtf8());
1✔
690
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
691
      }
692
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
693
      sock.setSoTimeout(0);
1✔
694
      return sock;
1✔
695
    } catch (IOException e) {
1✔
696
      if (sock != null) {
1✔
697
        GrpcUtil.closeQuietly(sock);
1✔
698
      }
699
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
700
          .asException();
1✔
701
    }
702
  }
703

704
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
705
                                         String proxyPassword) {
706
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
707
        .scheme("https")
1✔
708
        .host(address.getHostName())
1✔
709
        .port(address.getPort())
1✔
710
        .build();
1✔
711

712
    Request.Builder request = new Request.Builder()
1✔
713
        .url(tunnelUrl)
1✔
714
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
715
        .header("User-Agent", userAgent);
1✔
716

717
    // If we have proxy credentials, set them right away
718
    if (proxyUsername != null && proxyPassword != null) {
1✔
719
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
720
    }
721
    return request.build();
1✔
722
  }
723

724
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
725
    Buffer buffer = new Buffer();
1✔
726
    while (true) {
727
      if (source.read(buffer, 1) == -1) {
1✔
728
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
729
      }
730
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
731
        return buffer.readUtf8LineStrict();
1✔
732
      }
733
    }
734
  }
735

736
  @Override
737
  public String toString() {
738
    return MoreObjects.toStringHelper(this)
1✔
739
        .add("logId", logId.getId())
1✔
740
        .add("address", address)
1✔
741
        .toString();
1✔
742
  }
743

744
  @Override
745
  public InternalLogId getLogId() {
746
    return logId;
1✔
747
  }
748

749
  /**
750
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
751
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
752
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
753
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
754
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
755
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
756
   * connection to the port, independent of this function.
757
   *
758
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
759
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
760
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
761
   */
762
  @VisibleForTesting
763
  String getOverridenHost() {
764
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
765
    if (uri.getHost() != null) {
1✔
766
      return uri.getHost();
1✔
767
    }
768

769
    return defaultAuthority;
1✔
770
  }
771

772
  @VisibleForTesting
773
  int getOverridenPort() {
774
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
775
    if (uri.getPort() != -1) {
1✔
776
      return uri.getPort();
×
777
    }
778

779
    return address.getPort();
1✔
780
  }
781

782
  @Override
783
  public void shutdown(Status reason) {
784
    synchronized (lock) {
1✔
785
      if (goAwayStatus != null) {
1✔
786
        return;
1✔
787
      }
788

789
      goAwayStatus = reason;
1✔
790
      listener.transportShutdown(goAwayStatus);
1✔
791
      stopIfNecessary();
1✔
792
    }
1✔
793
  }
1✔
794

795
  @Override
796
  public void shutdownNow(Status reason) {
797
    shutdown(reason);
1✔
798
    synchronized (lock) {
1✔
799
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
800
      while (it.hasNext()) {
1✔
801
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
802
        it.remove();
1✔
803
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
804
        maybeClearInUse(entry.getValue());
1✔
805
      }
1✔
806

807
      for (OkHttpClientStream stream : pendingStreams) {
1✔
808
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
809
        // chance to retry and be routed to another connection.
810
        stream.transportState().transportReportStatus(
1✔
811
            reason, RpcProgress.MISCARRIED, true, new Metadata());
812
        maybeClearInUse(stream);
1✔
813
      }
1✔
814
      pendingStreams.clear();
1✔
815

816
      stopIfNecessary();
1✔
817
    }
1✔
818
  }
1✔
819

820
  @Override
821
  public Attributes getAttributes() {
822
    return attributes;
1✔
823
  }
824

825
  /**
826
   * Gets all active streams as an array.
827
   */
828
  @Override
829
  public OutboundFlowController.StreamState[] getActiveStreams() {
830
    synchronized (lock) {
1✔
831
      OutboundFlowController.StreamState[] flowStreams =
1✔
832
          new OutboundFlowController.StreamState[streams.size()];
1✔
833
      int i = 0;
1✔
834
      for (OkHttpClientStream stream : streams.values()) {
1✔
835
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
836
      }
1✔
837
      return flowStreams;
1✔
838
    }
839
  }
840

841
  @VisibleForTesting
842
  ClientFrameHandler getHandler() {
843
    return clientFrameHandler;
1✔
844
  }
845

846
  @VisibleForTesting
847
  SocketFactory getSocketFactory() {
848
    return socketFactory;
1✔
849
  }
850

851
  @VisibleForTesting
852
  int getPendingStreamSize() {
853
    synchronized (lock) {
1✔
854
      return pendingStreams.size();
1✔
855
    }
856
  }
857

858
  @VisibleForTesting
859
  void setNextStreamId(int nextStreamId) {
860
    synchronized (lock) {
1✔
861
      this.nextStreamId = nextStreamId;
1✔
862
    }
1✔
863
  }
1✔
864

865
  /**
866
   * Finish all active streams due to an IOException, then close the transport.
867
   */
868
  @Override
869
  public void onException(Throwable failureCause) {
870
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
871
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
872
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
873
  }
1✔
874

875
  /**
876
   * Send GOAWAY to the server, then finish all active streams and close the transport.
877
   */
878
  private void onError(ErrorCode errorCode, String moreDetail) {
879
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
880
  }
1✔
881

882
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
883
    synchronized (lock) {
1✔
884
      if (goAwayStatus == null) {
1✔
885
        goAwayStatus = status;
1✔
886
        listener.transportShutdown(status);
1✔
887
      }
888
      if (errorCode != null && !goAwaySent) {
1✔
889
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
890
        // streams. The GOAWAY is part of graceful shutdown.
891
        goAwaySent = true;
1✔
892
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
893
      }
894

895
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
896
      while (it.hasNext()) {
1✔
897
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
898
        if (entry.getKey() > lastKnownStreamId) {
1✔
899
          it.remove();
1✔
900
          entry.getValue().transportState().transportReportStatus(
1✔
901
              status, RpcProgress.REFUSED, false, new Metadata());
902
          maybeClearInUse(entry.getValue());
1✔
903
        }
904
      }
1✔
905

906
      for (OkHttpClientStream stream : pendingStreams) {
1✔
907
        stream.transportState().transportReportStatus(
1✔
908
            status, RpcProgress.MISCARRIED, true, new Metadata());
909
        maybeClearInUse(stream);
1✔
910
      }
1✔
911
      pendingStreams.clear();
1✔
912

913
      stopIfNecessary();
1✔
914
    }
1✔
915
  }
1✔
916

917
  /**
918
   * Called when a stream is closed. We do things like:
919
   * <ul>
920
   * <li>Removing the stream from the map.
921
   * <li>Optionally reporting the status.
922
   * <li>Starting pending streams if we can.
923
   * <li>Stopping the transport if this is the last live stream under a go-away status.
924
   * </ul>
925
   *
926
   * @param streamId the Id of the stream.
927
   * @param status the final status of this stream, null means no need to report.
928
   * @param stopDelivery interrupt queued messages in the deframer
929
   * @param errorCode reset the stream with this ErrorCode if not null.
930
   * @param trailers the trailers received if not null
931
   */
932
  void finishStream(
933
      int streamId,
934
      @Nullable Status status,
935
      RpcProgress rpcProgress,
936
      boolean stopDelivery,
937
      @Nullable ErrorCode errorCode,
938
      @Nullable Metadata trailers) {
939
    synchronized (lock) {
1✔
940
      OkHttpClientStream stream = streams.remove(streamId);
1✔
941
      if (stream != null) {
1✔
942
        if (errorCode != null) {
1✔
943
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
944
        }
945
        if (status != null) {
1✔
946
          stream
1✔
947
              .transportState()
1✔
948
              .transportReportStatus(
1✔
949
                  status,
950
                  rpcProgress,
951
                  stopDelivery,
952
                  trailers != null ? trailers : new Metadata());
1✔
953
        }
954
        if (!startPendingStreams()) {
1✔
955
          stopIfNecessary();
1✔
956
        }
957
        maybeClearInUse(stream);
1✔
958
      }
959
    }
1✔
960
  }
1✔
961

962
  /**
963
   * When the transport is in goAway state, we should stop it once all active streams finish.
964
   */
965
  @GuardedBy("lock")
966
  private void stopIfNecessary() {
967
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
968
      return;
1✔
969
    }
970
    if (stopped) {
1✔
971
      return;
1✔
972
    }
973
    stopped = true;
1✔
974

975
    if (keepAliveManager != null) {
1✔
976
      keepAliveManager.onTransportTermination();
×
977
    }
978

979
    if (ping != null) {
1✔
980
      ping.failed(getPingFailure());
1✔
981
      ping = null;
1✔
982
    }
983

984
    if (!goAwaySent) {
1✔
985
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
986
      // streams. The GOAWAY is part of graceful shutdown.
987
      goAwaySent = true;
1✔
988
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
989
    }
990

991
    // We will close the underlying socket in the writing thread to break out the reader
992
    // thread, which will close the frameReader and notify the listener.
993
    frameWriter.close();
1✔
994
  }
1✔
995

996
  @GuardedBy("lock")
997
  private void maybeClearInUse(OkHttpClientStream stream) {
998
    if (hasStream) {
1✔
999
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1000
        hasStream = false;
1✔
1001
        if (keepAliveManager != null) {
1✔
1002
          // We don't have any active streams. No need to do keepalives any more.
1003
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1004
          // and onTransportActive.
1005
          keepAliveManager.onTransportIdle();
×
1006
        }
1007
      }
1008
    }
1009
    if (stream.shouldBeCountedForInUse()) {
1✔
1010
      inUseState.updateObjectInUse(stream, false);
1✔
1011
    }
1012
  }
1✔
1013

1014
  @GuardedBy("lock")
1015
  private void setInUse(OkHttpClientStream stream) {
1016
    if (!hasStream) {
1✔
1017
      hasStream = true;
1✔
1018
      if (keepAliveManager != null) {
1✔
1019
        // We have a new stream. We might need to do keepalives now.
1020
        // Note that we have to do this inside the lock to avoid calling
1021
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1022
        // order.
1023
        keepAliveManager.onTransportActive();
×
1024
      }
1025
    }
1026
    if (stream.shouldBeCountedForInUse()) {
1✔
1027
      inUseState.updateObjectInUse(stream, true);
1✔
1028
    }
1029
  }
1✔
1030

1031
  private Throwable getPingFailure() {
1032
    synchronized (lock) {
1✔
1033
      if (goAwayStatus != null) {
1✔
1034
        return goAwayStatus.asException();
1✔
1035
      } else {
1036
        return Status.UNAVAILABLE.withDescription("Connection closed").asException();
×
1037
      }
1038
    }
1039
  }
1040

1041
  boolean mayHaveCreatedStream(int streamId) {
1042
    synchronized (lock) {
1✔
1043
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1044
    }
1045
  }
1046

1047
  OkHttpClientStream getStream(int streamId) {
1048
    synchronized (lock) {
1✔
1049
      return streams.get(streamId);
1✔
1050
    }
1051
  }
1052

1053
  /**
1054
   * Returns a Grpc status corresponding to the given ErrorCode.
1055
   */
1056
  @VisibleForTesting
1057
  static Status toGrpcStatus(ErrorCode code) {
1058
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1059
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1060
        "Unknown http2 error code: " + code.httpCode);
1061
  }
1062

1063
  @Override
1064
  public ListenableFuture<SocketStats> getStats() {
1065
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1066
    synchronized (lock) {
1✔
1067
      if (socket == null) {
1✔
1068
        ret.set(new SocketStats(
×
1069
            transportTracer.getStats(),
×
1070
            /*local=*/ null,
1071
            /*remote=*/ null,
1072
            new InternalChannelz.SocketOptions.Builder().build(),
×
1073
            /*security=*/ null));
1074
      } else {
1075
        ret.set(new SocketStats(
1✔
1076
            transportTracer.getStats(),
1✔
1077
            socket.getLocalSocketAddress(),
1✔
1078
            socket.getRemoteSocketAddress(),
1✔
1079
            Utils.getSocketOptions(socket),
1✔
1080
            securityInfo));
1081
      }
1082
      return ret;
1✔
1083
    }
1084
  }
1085

1086
  /**
1087
   * Runnable which reads frames and dispatches them to in flight calls.
1088
   */
1089
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1090

1091
    private final OkHttpFrameLogger logger =
1✔
1092
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1093
    FrameReader frameReader;
1094
    boolean firstSettings = true;
1✔
1095

1096
    ClientFrameHandler(FrameReader frameReader) {
1✔
1097
      this.frameReader = frameReader;
1✔
1098
    }
1✔
1099

1100
    @Override
1101
    @SuppressWarnings("Finally")
1102
    public void run() {
1103
      String threadName = Thread.currentThread().getName();
1✔
1104
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1105
      try {
1106
        // Read until the underlying socket closes.
1107
        while (frameReader.nextFrame(this)) {
1✔
1108
          if (keepAliveManager != null) {
1✔
1109
            keepAliveManager.onDataReceived();
×
1110
          }
1111
        }
1112
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1113
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1114
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1115
        Status status;
1116
        synchronized (lock) {
1✔
1117
          status = goAwayStatus;
1✔
1118
        }
1✔
1119
        if (status == null) {
1✔
1120
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1121
        }
1122
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1123
      } catch (Throwable t) {
1✔
1124
        // TODO(madongfly): Send the exception message to the server.
1125
        startGoAway(
1✔
1126
            0,
1127
            ErrorCode.PROTOCOL_ERROR,
1128
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1129
      } finally {
1130
        try {
1131
          frameReader.close();
1✔
1132
        } catch (IOException ex) {
×
1133
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1134
        } catch (RuntimeException e) {
×
1135
          // This same check is done in okhttp proper:
1136
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1137

1138
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1139
          // https://issuetracker.google.com/issues/177450597
1140
          if (!"bio == null".equals(e.getMessage())) {
×
1141
            throw e;
×
1142
          }
1143
        }
1✔
1144
        listener.transportTerminated();
1✔
1145
        Thread.currentThread().setName(threadName);
1✔
1146
      }
1147
    }
1✔
1148

1149
    /**
1150
     * Handle an HTTP2 DATA frame.
1151
     */
1152
    @SuppressWarnings("GuardedBy")
1153
    @Override
1154
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1155
                     int paddedLength)
1156
        throws IOException {
1157
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1158
          streamId, in.getBuffer(), length, inFinished);
1✔
1159
      OkHttpClientStream stream = getStream(streamId);
1✔
1160
      if (stream == null) {
1✔
1161
        if (mayHaveCreatedStream(streamId)) {
1✔
1162
          synchronized (lock) {
1✔
1163
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1164
          }
1✔
1165
          in.skip(length);
1✔
1166
        } else {
1167
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1168
          return;
1✔
1169
        }
1170
      } else {
1171
        // Wait until the frame is complete.
1172
        in.require(length);
1✔
1173

1174
        Buffer buf = new Buffer();
1✔
1175
        buf.write(in.getBuffer(), length);
1✔
1176
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1177
            stream.transportState().tag());
1✔
1178
        synchronized (lock) {
1✔
1179
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1180
          // instead found: 'OkHttpClientTransport.this.lock'
1181
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1182
        }
1✔
1183
      }
1184

1185
      // connection window update
1186
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1187
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1188
        synchronized (lock) {
1✔
1189
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1190
        }
1✔
1191
        connectionUnacknowledgedBytesRead = 0;
1✔
1192
      }
1193
    }
1✔
1194

1195
    /**
1196
     * Handle HTTP2 HEADER and CONTINUATION frames.
1197
     */
1198
    @SuppressWarnings("GuardedBy")
1199
    @Override
1200
    public void headers(boolean outFinished,
1201
        boolean inFinished,
1202
        int streamId,
1203
        int associatedStreamId,
1204
        List<Header> headerBlock,
1205
        HeadersMode headersMode) {
1206
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1207
      boolean unknownStream = false;
1✔
1208
      Status failedStatus = null;
1✔
1209
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1210
        int metadataSize = headerBlockSize(headerBlock);
1✔
1211
        if (metadataSize > maxInboundMetadataSize) {
1✔
1212
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1213
              String.format(
1✔
1214
                  Locale.US,
1215
                  "Response %s metadata larger than %d: %d",
1216
                  inFinished ? "trailer" : "header",
1✔
1217
                  maxInboundMetadataSize,
1✔
1218
                  metadataSize));
1✔
1219
        }
1220
      }
1221
      synchronized (lock) {
1✔
1222
        OkHttpClientStream stream = streams.get(streamId);
1✔
1223
        if (stream == null) {
1✔
1224
          if (mayHaveCreatedStream(streamId)) {
1✔
1225
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1226
          } else {
1227
            unknownStream = true;
1✔
1228
          }
1229
        } else {
1230
          if (failedStatus == null) {
1✔
1231
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1232
                stream.transportState().tag());
1✔
1233
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1234
            // instead found: 'OkHttpClientTransport.this.lock'
1235
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1236
          } else {
1237
            if (!inFinished) {
1✔
1238
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1239
            }
1240
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1241
          }
1242
        }
1243
      }
1✔
1244
      if (unknownStream) {
1✔
1245
        // We don't expect any server-initiated streams.
1246
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1247
      }
1248
    }
1✔
1249

1250
    private int headerBlockSize(List<Header> headerBlock) {
1251
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1252
      long size = 0;
1✔
1253
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1254
        Header header = headerBlock.get(i);
1✔
1255
        size += 32 + header.name.size() + header.value.size();
1✔
1256
      }
1257
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1258
      return (int) size;
1✔
1259
    }
1260

1261
    @Override
1262
    public void rstStream(int streamId, ErrorCode errorCode) {
1263
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1264
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1265
      boolean stopDelivery =
1✔
1266
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1267
      synchronized (lock) {
1✔
1268
        OkHttpClientStream stream = streams.get(streamId);
1✔
1269
        if (stream != null) {
1✔
1270
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1271
              stream.transportState().tag());
1✔
1272
          finishStream(
1✔
1273
              streamId, status,
1274
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1275
              stopDelivery, null, null);
1276
        }
1277
      }
1✔
1278
    }
1✔
1279

1280
    @Override
1281
    public void settings(boolean clearPrevious, Settings settings) {
1282
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1283
      boolean outboundWindowSizeIncreased = false;
1✔
1284
      synchronized (lock) {
1✔
1285
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1286
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1287
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1288
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1289
        }
1290

1291
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1292
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1293
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1294
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1295
        }
1296
        if (firstSettings) {
1✔
1297
          attributes = listener.filterTransport(attributes);
1✔
1298
          listener.transportReady();
1✔
1299
          firstSettings = false;
1✔
1300
        }
1301

1302
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1303
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1304
        // otherwise it will cause a stream error (RST_STREAM).
1305
        frameWriter.ackSettings(settings);
1✔
1306

1307
        // send any pending bytes / streams
1308
        if (outboundWindowSizeIncreased) {
1✔
1309
          outboundFlow.writeStreams();
1✔
1310
        }
1311
        startPendingStreams();
1✔
1312
      }
1✔
1313
    }
1✔
1314

1315
    @Override
1316
    public void ping(boolean ack, int payload1, int payload2) {
1317
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1318
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1319
      if (!ack) {
1✔
1320
        synchronized (lock) {
1✔
1321
          frameWriter.ping(true, payload1, payload2);
1✔
1322
        }
1✔
1323
      } else {
1324
        Http2Ping p = null;
1✔
1325
        synchronized (lock) {
1✔
1326
          if (ping != null) {
1✔
1327
            if (ping.payload() == ackPayload) {
1✔
1328
              p = ping;
1✔
1329
              ping = null;
1✔
1330
            } else {
1331
              log.log(Level.WARNING, String.format(
1✔
1332
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1333
                  ping.payload(), ackPayload));
1✔
1334
            }
1335
          } else {
1336
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1337
          }
1338
        }
1✔
1339
        // don't complete it while holding lock since callbacks could run immediately
1340
        if (p != null) {
1✔
1341
          p.complete();
1✔
1342
        }
1343
      }
1344
    }
1✔
1345

1346
    @Override
1347
    public void ackSettings() {
1348
      // Do nothing currently.
1349
    }
1✔
1350

1351
    @Override
1352
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1353
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1354
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1355
        String data = debugData.utf8();
1✔
1356
        log.log(Level.WARNING, String.format(
1✔
1357
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1358
        if ("too_many_pings".equals(data)) {
1✔
1359
          tooManyPingsRunnable.run();
1✔
1360
        }
1361
      }
1362
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1363
          .augmentDescription("Received Goaway");
1✔
1364
      if (debugData.size() > 0) {
1✔
1365
        // If a debug message was provided, use it.
1366
        status = status.augmentDescription(debugData.utf8());
1✔
1367
      }
1368
      startGoAway(lastGoodStreamId, null, status);
1✔
1369
    }
1✔
1370

1371
    @Override
1372
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1373
        throws IOException {
1374
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1375
          streamId, promisedStreamId, requestHeaders);
1376
      // We don't accept server initiated stream.
1377
      synchronized (lock) {
1✔
1378
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1379
      }
1✔
1380
    }
1✔
1381

1382
    @Override
1383
    public void windowUpdate(int streamId, long delta) {
1384
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1385
      if (delta == 0) {
1✔
1386
        String errorMsg = "Received 0 flow control window increment.";
×
1387
        if (streamId == 0) {
×
1388
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1389
        } else {
1390
          finishStream(
×
1391
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1392
              ErrorCode.PROTOCOL_ERROR, null);
1393
        }
1394
        return;
×
1395
      }
1396

1397
      boolean unknownStream = false;
1✔
1398
      synchronized (lock) {
1✔
1399
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1400
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1401
          return;
1✔
1402
        }
1403

1404
        OkHttpClientStream stream = streams.get(streamId);
1✔
1405
        if (stream != null) {
1✔
1406
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1407
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1408
          unknownStream = true;
1✔
1409
        }
1410
      }
1✔
1411
      if (unknownStream) {
1✔
1412
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1413
            "Received window_update for unknown stream: " + streamId);
1414
      }
1415
    }
1✔
1416

1417
    @Override
1418
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1419
      // Ignore priority change.
1420
      // TODO(madongfly): log
1421
    }
×
1422

1423
    @Override
1424
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1425
        int port, long maxAge) {
1426
      // TODO(madongfly): Deal with alternateService propagation
1427
    }
×
1428
  }
1429
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc