• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19688

13 Feb 2025 08:41PM CUT coverage: 88.613% (+0.008%) from 88.605%
#19688

push

github

ejona86
Upgrade netty-tcnative to 2.0.70

34248 of 38649 relevant lines covered (88.61%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.43
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import com.google.errorprone.annotations.concurrent.GuardedBy;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.ClientStreamTracer;
34
import io.grpc.Grpc;
35
import io.grpc.HttpConnectProxiedSocketAddress;
36
import io.grpc.InternalChannelz;
37
import io.grpc.InternalChannelz.SocketStats;
38
import io.grpc.InternalLogId;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.MethodDescriptor.MethodType;
42
import io.grpc.SecurityLevel;
43
import io.grpc.Status;
44
import io.grpc.Status.Code;
45
import io.grpc.StatusException;
46
import io.grpc.internal.ClientStreamListener.RpcProgress;
47
import io.grpc.internal.ConnectionClientTransport;
48
import io.grpc.internal.GrpcAttributes;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.Http2Ping;
51
import io.grpc.internal.InUseStateAggregator;
52
import io.grpc.internal.KeepAliveManager;
53
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
54
import io.grpc.internal.SerializingExecutor;
55
import io.grpc.internal.StatsTraceContext;
56
import io.grpc.internal.TransportTracer;
57
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
58
import io.grpc.okhttp.internal.ConnectionSpec;
59
import io.grpc.okhttp.internal.Credentials;
60
import io.grpc.okhttp.internal.StatusLine;
61
import io.grpc.okhttp.internal.framed.ErrorCode;
62
import io.grpc.okhttp.internal.framed.FrameReader;
63
import io.grpc.okhttp.internal.framed.FrameWriter;
64
import io.grpc.okhttp.internal.framed.Header;
65
import io.grpc.okhttp.internal.framed.HeadersMode;
66
import io.grpc.okhttp.internal.framed.Http2;
67
import io.grpc.okhttp.internal.framed.Settings;
68
import io.grpc.okhttp.internal.framed.Variant;
69
import io.grpc.okhttp.internal.proxy.HttpUrl;
70
import io.grpc.okhttp.internal.proxy.Request;
71
import io.perfmark.PerfMark;
72
import java.io.EOFException;
73
import java.io.IOException;
74
import java.net.InetSocketAddress;
75
import java.net.Socket;
76
import java.net.URI;
77
import java.util.Collections;
78
import java.util.Deque;
79
import java.util.EnumMap;
80
import java.util.HashMap;
81
import java.util.Iterator;
82
import java.util.LinkedList;
83
import java.util.List;
84
import java.util.Locale;
85
import java.util.Map;
86
import java.util.Random;
87
import java.util.concurrent.BrokenBarrierException;
88
import java.util.concurrent.CountDownLatch;
89
import java.util.concurrent.CyclicBarrier;
90
import java.util.concurrent.Executor;
91
import java.util.concurrent.ScheduledExecutorService;
92
import java.util.concurrent.TimeUnit;
93
import java.util.concurrent.TimeoutException;
94
import java.util.logging.Level;
95
import java.util.logging.Logger;
96
import javax.annotation.Nullable;
97
import javax.net.SocketFactory;
98
import javax.net.ssl.HostnameVerifier;
99
import javax.net.ssl.SSLSession;
100
import javax.net.ssl.SSLSocket;
101
import javax.net.ssl.SSLSocketFactory;
102
import okio.Buffer;
103
import okio.BufferedSink;
104
import okio.BufferedSource;
105
import okio.ByteString;
106
import okio.Okio;
107
import okio.Source;
108
import okio.Timeout;
109

110
/**
111
 * A okhttp-based {@link ConnectionClientTransport} implementation.
112
 */
113
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
114
      OutboundFlowController.Transport {
115
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
116
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
117

118
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
119
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
120
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
121
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
122
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
123
        Status.INTERNAL.withDescription("Protocol error"));
1✔
124
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
125
        Status.INTERNAL.withDescription("Internal error"));
1✔
126
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
127
        Status.INTERNAL.withDescription("Flow control error"));
1✔
128
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
129
        Status.INTERNAL.withDescription("Stream closed"));
1✔
130
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
131
        Status.INTERNAL.withDescription("Frame too large"));
1✔
132
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
133
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
134
    errorToStatus.put(ErrorCode.CANCEL,
1✔
135
        Status.CANCELLED.withDescription("Cancelled"));
1✔
136
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
137
        Status.INTERNAL.withDescription("Compression error"));
1✔
138
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
139
        Status.INTERNAL.withDescription("Connect error"));
1✔
140
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
141
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
142
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
143
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
144
    return Collections.unmodifiableMap(errorToStatus);
1✔
145
  }
146

147
  private final InetSocketAddress address;
148
  private final String defaultAuthority;
149
  private final String userAgent;
150
  private final Random random = new Random();
1✔
151
  // Returns new unstarted stopwatches
152
  private final Supplier<Stopwatch> stopwatchFactory;
153
  private final int initialWindowSize;
154
  private final Variant variant;
155
  private Listener listener;
156
  @GuardedBy("lock")
157
  private ExceptionHandlingFrameWriter frameWriter;
158
  private OutboundFlowController outboundFlow;
159
  private final Object lock = new Object();
1✔
160
  private final InternalLogId logId;
161
  @GuardedBy("lock")
162
  private int nextStreamId;
163
  @GuardedBy("lock")
1✔
164
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
165
  private final Executor executor;
166
  // Wrap on executor, to guarantee some operations be executed serially.
167
  private final SerializingExecutor serializingExecutor;
168
  private final ScheduledExecutorService scheduler;
169
  private final int maxMessageSize;
170
  private int connectionUnacknowledgedBytesRead;
171
  private ClientFrameHandler clientFrameHandler;
172
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
173
  private Attributes attributes;
174
  /**
175
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
176
   * streams may continue.
177
   */
178
  @GuardedBy("lock")
179
  private Status goAwayStatus;
180
  @GuardedBy("lock")
181
  private boolean goAwaySent;
182
  @GuardedBy("lock")
183
  private Http2Ping ping;
184
  @GuardedBy("lock")
185
  private boolean stopped;
186
  @GuardedBy("lock")
187
  private boolean hasStream;
188
  private final SocketFactory socketFactory;
189
  private SSLSocketFactory sslSocketFactory;
190
  private HostnameVerifier hostnameVerifier;
191
  private Socket socket;
192
  @GuardedBy("lock")
1✔
193
  private int maxConcurrentStreams = 0;
194
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
195
  @GuardedBy("lock")
196
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
197
  private final ConnectionSpec connectionSpec;
198
  private KeepAliveManager keepAliveManager;
199
  private boolean enableKeepAlive;
200
  private long keepAliveTimeNanos;
201
  private long keepAliveTimeoutNanos;
202
  private boolean keepAliveWithoutCalls;
203
  private final Runnable tooManyPingsRunnable;
204
  private final int maxInboundMetadataSize;
205
  private final boolean useGetForSafeMethods;
206
  @GuardedBy("lock")
207
  private final TransportTracer transportTracer;
208
  @GuardedBy("lock")
1✔
209
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
210
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
211
        @Override
212
        protected void handleInUse() {
213
          listener.transportInUse(true);
1✔
214
        }
1✔
215

216
        @Override
217
        protected void handleNotInUse() {
218
          listener.transportInUse(false);
1✔
219
        }
1✔
220
      };
221
  @GuardedBy("lock")
222
  private InternalChannelz.Security securityInfo;
223

224
  @VisibleForTesting
225
  @Nullable
226
  final HttpConnectProxiedSocketAddress proxiedAddr;
227

228
  @VisibleForTesting
1✔
229
  int proxySocketTimeout = 30000;
230

231
  // The following fields should only be used for test.
232
  Runnable connectingCallback;
233
  SettableFuture<Void> connectedFuture;
234

235
  public OkHttpClientTransport(
236
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
237
      InetSocketAddress address,
238
      String authority,
239
      @Nullable String userAgent,
240
      Attributes eagAttrs,
241
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
242
      Runnable tooManyPingsRunnable) {
243
    this(
1✔
244
        transportFactory,
245
        address,
246
        authority,
247
        userAgent,
248
        eagAttrs,
249
        GrpcUtil.STOPWATCH_SUPPLIER,
250
        new Http2(),
251
        proxiedAddr,
252
        tooManyPingsRunnable);
253
  }
1✔
254

255
  private OkHttpClientTransport(
256
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
257
      InetSocketAddress address,
258
      String authority,
259
      @Nullable String userAgent,
260
      Attributes eagAttrs,
261
      Supplier<Stopwatch> stopwatchFactory,
262
      Variant variant,
263
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
264
      Runnable tooManyPingsRunnable) {
1✔
265
    this.address = Preconditions.checkNotNull(address, "address");
1✔
266
    this.defaultAuthority = authority;
1✔
267
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
268
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
269
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
270
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
271
    this.scheduler = Preconditions.checkNotNull(
1✔
272
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
273
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
274
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
275
    nextStreamId = 3;
1✔
276
    this.socketFactory = transportFactory.socketFactory == null
1✔
277
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
278
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
279
    this.hostnameVerifier = transportFactory.hostnameVerifier;
1✔
280
    this.connectionSpec = Preconditions.checkNotNull(
1✔
281
        transportFactory.connectionSpec, "connectionSpec");
282
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
283
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
284
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
285
    this.proxiedAddr = proxiedAddr;
1✔
286
    this.tooManyPingsRunnable =
1✔
287
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
288
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
289
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
290
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
291
    this.attributes = Attributes.newBuilder()
1✔
292
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
293
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
294
    initTransportTracer();
1✔
295
  }
1✔
296

297
  /**
298
   * Create a transport connected to a fake peer for test.
299
   */
300
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
301
  @VisibleForTesting
302
  OkHttpClientTransport(
303
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
304
      String userAgent,
305
      Supplier<Stopwatch> stopwatchFactory,
306
      Variant variant,
307
      @Nullable Runnable connectingCallback,
308
      SettableFuture<Void> connectedFuture,
309
      Runnable tooManyPingsRunnable) {
310
    this(
1✔
311
        transportFactory,
312
        new InetSocketAddress("127.0.0.1", 80),
313
        "notarealauthority:80",
314
        userAgent,
315
        Attributes.EMPTY,
316
        stopwatchFactory,
317
        variant,
318
        null,
319
        tooManyPingsRunnable);
320
    this.connectingCallback = connectingCallback;
1✔
321
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
322
  }
1✔
323

324
  // sslSocketFactory is set to null when use plaintext.
325
  boolean isUsingPlaintext() {
326
    return sslSocketFactory == null;
1✔
327
  }
328

329
  private void initTransportTracer() {
330
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
331
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
332
        @Override
333
        public TransportTracer.FlowControlWindows read() {
334
          synchronized (lock) {
1✔
335
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
336
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
337
            // provide a lower bound.
338
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
339
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
340
          }
341
        }
342
      });
343
    }
1✔
344
  }
1✔
345

346
  /**
347
   * Enable keepalive with custom delay and timeout.
348
   */
349
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
350
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
351
    enableKeepAlive = enable;
×
352
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
353
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
354
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
355
  }
×
356

357
  @Override
358
  public void ping(final PingCallback callback, Executor executor) {
359
    long data = 0;
1✔
360
    Http2Ping p;
361
    boolean writePing;
362
    synchronized (lock) {
1✔
363
      checkState(frameWriter != null);
1✔
364
      if (stopped) {
1✔
365
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
366
        return;
1✔
367
      }
368
      if (ping != null) {
1✔
369
        // we only allow one outstanding ping at a time, so just add the callback to
370
        // any outstanding operation
371
        p = ping;
1✔
372
        writePing = false;
1✔
373
      } else {
374
        // set outstanding operation and then write the ping after releasing lock
375
        data = random.nextLong();
1✔
376
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
377
        stopwatch.start();
1✔
378
        p = ping = new Http2Ping(data, stopwatch);
1✔
379
        writePing = true;
1✔
380
        transportTracer.reportKeepAliveSent();
1✔
381
      }
382
      if (writePing) {
1✔
383
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
384
      }
385
    }
1✔
386
    // If transport concurrently failed/stopped since we released the lock above, this could
387
    // immediately invoke callback (which we shouldn't do while holding a lock)
388
    p.addCallback(callback, executor);
1✔
389
  }
1✔
390

391
  @Override
392
  public OkHttpClientStream newStream(
393
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
394
      ClientStreamTracer[] tracers) {
395
    Preconditions.checkNotNull(method, "method");
1✔
396
    Preconditions.checkNotNull(headers, "headers");
1✔
397
    StatsTraceContext statsTraceContext =
1✔
398
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
399
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
400
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
401
      return new OkHttpClientStream(
1✔
402
          method,
403
          headers,
404
          frameWriter,
405
          OkHttpClientTransport.this,
406
          outboundFlow,
407
          lock,
408
          maxMessageSize,
409
          initialWindowSize,
410
          defaultAuthority,
411
          userAgent,
412
          statsTraceContext,
413
          transportTracer,
414
          callOptions,
415
          useGetForSafeMethods);
416
    }
417
  }
418

419
  @GuardedBy("lock")
420
  void streamReadyToStart(OkHttpClientStream clientStream) {
421
    if (goAwayStatus != null) {
1✔
422
      clientStream.transportState().transportReportStatus(
1✔
423
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
424
    } else if (streams.size() >= maxConcurrentStreams) {
1✔
425
      pendingStreams.add(clientStream);
1✔
426
      setInUse(clientStream);
1✔
427
    } else {
428
      startStream(clientStream);
1✔
429
    }
430
  }
1✔
431

432
  @SuppressWarnings("GuardedBy")
433
  @GuardedBy("lock")
434
  private void startStream(OkHttpClientStream stream) {
435
    Preconditions.checkState(
1✔
436
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
437
    streams.put(nextStreamId, stream);
1✔
438
    setInUse(stream);
1✔
439
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
440
    // found: 'this.lock'
441
    stream.transportState().start(nextStreamId);
1✔
442
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
443
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
444
        || stream.useGet()) {
1✔
445
      frameWriter.flush();
1✔
446
    }
447
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
448
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
449
      // correctly.
450
      nextStreamId = Integer.MAX_VALUE;
1✔
451
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
452
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
453
    } else {
454
      nextStreamId += 2;
1✔
455
    }
456
  }
1✔
457

458
  /**
459
   * Starts pending streams, returns true if at least one pending stream is started.
460
   */
461
  @GuardedBy("lock")
462
  private boolean startPendingStreams() {
463
    boolean hasStreamStarted = false;
1✔
464
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
465
      OkHttpClientStream stream = pendingStreams.poll();
1✔
466
      startStream(stream);
1✔
467
      hasStreamStarted = true;
1✔
468
    }
1✔
469
    return hasStreamStarted;
1✔
470
  }
471

472
  /**
473
   * Removes given pending stream, used when a pending stream is cancelled.
474
   */
475
  @GuardedBy("lock")
476
  void removePendingStream(OkHttpClientStream pendingStream) {
477
    pendingStreams.remove(pendingStream);
1✔
478
    maybeClearInUse(pendingStream);
1✔
479
  }
1✔
480

481
  @Override
482
  public Runnable start(Listener listener) {
483
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
484

485
    if (enableKeepAlive) {
1✔
486
      keepAliveManager = new KeepAliveManager(
×
487
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
488
          keepAliveWithoutCalls);
489
      keepAliveManager.onTransportStarted();
×
490
    }
491

492
    int maxQueuedControlFrames = 10000;
1✔
493
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
494
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
495
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
496

497
    synchronized (lock) {
1✔
498
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
499
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
500
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
501
      // the same TransportExceptionHandler instance so it is all mixed back together.
502
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
503
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
504
    }
1✔
505
    final CountDownLatch latch = new CountDownLatch(1);
1✔
506
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
507
    // The transport needs up to two threads to function once started,
508
    // but only needs one during handshaking. Start another thread during handshaking
509
    // to make sure there's still a free thread available. If the number of threads is exhausted,
510
    // it is better to kill the transport than for all the transports to hang unable to send.
511
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
512
    // Connecting in the serializingExecutor, so that some stream operations like synStream
513
    // will be executed after connected.
514

515
    serializingExecutor.execute(new Runnable() {
1✔
516
      @Override
517
      public void run() {
518
        // Use closed source on failure so that the reader immediately shuts down.
519
        BufferedSource source = Okio.buffer(new Source() {
1✔
520
          @Override
521
          public long read(Buffer sink, long byteCount) {
522
            return -1;
1✔
523
          }
524

525
          @Override
526
          public Timeout timeout() {
527
            return Timeout.NONE;
×
528
          }
529

530
          @Override
531
          public void close() {
532
          }
1✔
533
        });
534
        Socket sock;
535
        SSLSession sslSession = null;
1✔
536
        try {
537
          // This is a hack to make sure the connection preface and initial settings to be sent out
538
          // without blocking the start. By doing this essentially prevents potential deadlock when
539
          // network is not available during startup while another thread holding lock to send the
540
          // initial preface.
541
          try {
542
            latch.await();
1✔
543
            barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
544
          } catch (InterruptedException e) {
×
545
            Thread.currentThread().interrupt();
×
546
          } catch (TimeoutException | BrokenBarrierException e) {
1✔
547
            startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
548
                .withDescription("Timed out waiting for second handshake thread. "
1✔
549
                    + "The transport executor pool may have run out of threads"));
550
            return;
1✔
551
          }
1✔
552

553
          if (proxiedAddr == null) {
1✔
554
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
555
          } else {
556
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
557
              sock = createHttpProxySocket(
1✔
558
                  proxiedAddr.getTargetAddress(),
1✔
559
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
560
                  proxiedAddr.getUsername(),
1✔
561
                  proxiedAddr.getPassword()
1✔
562
              );
563
            } else {
564
              throw Status.INTERNAL.withDescription(
×
565
                  "Unsupported SocketAddress implementation "
566
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
567
            }
568
          }
569
          if (sslSocketFactory != null) {
1✔
570
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
571
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
572
                connectionSpec);
1✔
573
            sslSession = sslSocket.getSession();
1✔
574
            sock = sslSocket;
1✔
575
          }
576
          sock.setTcpNoDelay(true);
1✔
577
          source = Okio.buffer(Okio.source(sock));
1✔
578
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
579

580
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
581
          attributes = attributes.toBuilder()
1✔
582
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
583
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
584
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
585
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
586
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
587
              .build();
1✔
588
        } catch (StatusException e) {
1✔
589
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
590
          return;
1✔
591
        } catch (Exception e) {
1✔
592
          onException(e);
1✔
593
          return;
1✔
594
        } finally {
595
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
596
          latchForExtraThread.countDown();
1✔
597
        }
598
        synchronized (lock) {
1✔
599
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
600
          if (sslSession != null) {
1✔
601
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
602
          }
603
        }
1✔
604
      }
1✔
605
    });
606

607
    executor.execute(new Runnable() {
1✔
608
      @Override
609
      public void run() {
610
        try {
611
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
612
          latchForExtraThread.await();
1✔
613
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
614
          // Something bad happened, maybe too few threads available!
615
          // This will be handled in the handshake thread.
616
        } catch (InterruptedException e) {
1✔
617
          Thread.currentThread().interrupt();
1✔
618
        }
1✔
619
      }
1✔
620
    });
621
    // Schedule to send connection preface & settings before any other write.
622
    try {
623
      sendConnectionPrefaceAndSettings();
1✔
624
    } finally {
625
      latch.countDown();
1✔
626
    }
627

628
    serializingExecutor.execute(new Runnable() {
1✔
629
      @Override
630
      public void run() {
631
        if (connectingCallback != null) {
1✔
632
          connectingCallback.run();
1✔
633
        }
634
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
635
        // may send goAway immediately.
636
        executor.execute(clientFrameHandler);
1✔
637
        synchronized (lock) {
1✔
638
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
639
          startPendingStreams();
1✔
640
        }
1✔
641
        if (connectedFuture != null) {
1✔
642
          connectedFuture.set(null);
1✔
643
        }
644
      }
1✔
645
    });
646
    return null;
1✔
647
  }
648

649
  /**
650
   * Should only be called once when the transport is first established.
651
   */
652
  private void sendConnectionPrefaceAndSettings() {
653
    synchronized (lock) {
1✔
654
      frameWriter.connectionPreface();
1✔
655
      Settings settings = new Settings();
1✔
656
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
657
      frameWriter.settings(settings);
1✔
658
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
659
        frameWriter.windowUpdate(
1✔
660
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
661
      }
662
    }
1✔
663
  }
1✔
664

665
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
666
      String proxyUsername, String proxyPassword) throws StatusException {
667
    Socket sock = null;
1✔
668
    try {
669
      // The proxy address may not be resolved
670
      if (proxyAddress.getAddress() != null) {
1✔
671
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
672
      } else {
673
        sock =
×
674
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
675
      }
676
      sock.setTcpNoDelay(true);
1✔
677
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
678
      // can cause reading from the socket to hang.
679
      sock.setSoTimeout(proxySocketTimeout);
1✔
680

681
      Source source = Okio.source(sock);
1✔
682
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
683

684
      // Prepare headers and request method line
685
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
686
      HttpUrl url = proxyRequest.httpUrl();
1✔
687
      String requestLine =
1✔
688
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
689

690
      // Write request to socket
691
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
692
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
693
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
694
            .writeUtf8(": ")
1✔
695
            .writeUtf8(proxyRequest.headers().value(i))
1✔
696
            .writeUtf8("\r\n");
1✔
697
      }
698
      sink.writeUtf8("\r\n");
1✔
699
      // Flush buffer (flushes socket and sends request)
700
      sink.flush();
1✔
701

702
      // Read status line, check if 2xx was returned
703
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
704
      // Drain rest of headers
705
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
706
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
707
        Buffer body = new Buffer();
1✔
708
        try {
709
          sock.shutdownOutput();
1✔
710
          source.read(body, 1024);
1✔
711
        } catch (IOException ex) {
×
712
          body.writeUtf8("Unable to read body: " + ex.toString());
×
713
        }
1✔
714
        try {
715
          sock.close();
1✔
716
        } catch (IOException ignored) {
×
717
          // ignored
718
        }
1✔
719
        String message = String.format(
1✔
720
            Locale.US,
721
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
722
              + "Response body:\n%s",
723
            statusLine.code, statusLine.message, body.readUtf8());
1✔
724
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
725
      }
726
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
727
      sock.setSoTimeout(0);
1✔
728
      return sock;
1✔
729
    } catch (IOException e) {
1✔
730
      if (sock != null) {
1✔
731
        GrpcUtil.closeQuietly(sock);
1✔
732
      }
733
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
734
          .asException();
1✔
735
    }
736
  }
737

738
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
739
                                         String proxyPassword) {
740
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
741
        .scheme("https")
1✔
742
        .host(address.getHostName())
1✔
743
        .port(address.getPort())
1✔
744
        .build();
1✔
745

746
    Request.Builder request = new Request.Builder()
1✔
747
        .url(tunnelUrl)
1✔
748
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
749
        .header("User-Agent", userAgent);
1✔
750

751
    // If we have proxy credentials, set them right away
752
    if (proxyUsername != null && proxyPassword != null) {
1✔
753
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
754
    }
755
    return request.build();
1✔
756
  }
757

758
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
759
    Buffer buffer = new Buffer();
1✔
760
    while (true) {
761
      if (source.read(buffer, 1) == -1) {
1✔
762
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
763
      }
764
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
765
        return buffer.readUtf8LineStrict();
1✔
766
      }
767
    }
768
  }
769

770
  @Override
771
  public String toString() {
772
    return MoreObjects.toStringHelper(this)
1✔
773
        .add("logId", logId.getId())
1✔
774
        .add("address", address)
1✔
775
        .toString();
1✔
776
  }
777

778
  @Override
779
  public InternalLogId getLogId() {
780
    return logId;
1✔
781
  }
782

783
  /**
784
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
785
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
786
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
787
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
788
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
789
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
790
   * connection to the port, independent of this function.
791
   *
792
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
793
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
794
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
795
   */
796
  @VisibleForTesting
797
  String getOverridenHost() {
798
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
799
    if (uri.getHost() != null) {
1✔
800
      return uri.getHost();
1✔
801
    }
802

803
    return defaultAuthority;
1✔
804
  }
805

806
  @VisibleForTesting
807
  int getOverridenPort() {
808
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
809
    if (uri.getPort() != -1) {
1✔
810
      return uri.getPort();
×
811
    }
812

813
    return address.getPort();
1✔
814
  }
815

816
  @Override
817
  public void shutdown(Status reason) {
818
    synchronized (lock) {
1✔
819
      if (goAwayStatus != null) {
1✔
820
        return;
1✔
821
      }
822

823
      goAwayStatus = reason;
1✔
824
      listener.transportShutdown(goAwayStatus);
1✔
825
      stopIfNecessary();
1✔
826
    }
1✔
827
  }
1✔
828

829
  @Override
830
  public void shutdownNow(Status reason) {
831
    shutdown(reason);
1✔
832
    synchronized (lock) {
1✔
833
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
834
      while (it.hasNext()) {
1✔
835
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
836
        it.remove();
1✔
837
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
838
        maybeClearInUse(entry.getValue());
1✔
839
      }
1✔
840

841
      for (OkHttpClientStream stream : pendingStreams) {
1✔
842
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
843
        // chance to retry and be routed to another connection.
844
        stream.transportState().transportReportStatus(
1✔
845
            reason, RpcProgress.MISCARRIED, true, new Metadata());
846
        maybeClearInUse(stream);
1✔
847
      }
1✔
848
      pendingStreams.clear();
1✔
849

850
      stopIfNecessary();
1✔
851
    }
1✔
852
  }
1✔
853

854
  @Override
855
  public Attributes getAttributes() {
856
    return attributes;
1✔
857
  }
858

859
  /**
860
   * Gets all active streams as an array.
861
   */
862
  @Override
863
  public OutboundFlowController.StreamState[] getActiveStreams() {
864
    synchronized (lock) {
1✔
865
      OutboundFlowController.StreamState[] flowStreams =
1✔
866
          new OutboundFlowController.StreamState[streams.size()];
1✔
867
      int i = 0;
1✔
868
      for (OkHttpClientStream stream : streams.values()) {
1✔
869
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
870
      }
1✔
871
      return flowStreams;
1✔
872
    }
873
  }
874

875
  @VisibleForTesting
876
  ClientFrameHandler getHandler() {
877
    return clientFrameHandler;
1✔
878
  }
879

880
  @VisibleForTesting
881
  SocketFactory getSocketFactory() {
882
    return socketFactory;
1✔
883
  }
884

885
  @VisibleForTesting
886
  int getPendingStreamSize() {
887
    synchronized (lock) {
1✔
888
      return pendingStreams.size();
1✔
889
    }
890
  }
891

892
  @VisibleForTesting
893
  void setNextStreamId(int nextStreamId) {
894
    synchronized (lock) {
1✔
895
      this.nextStreamId = nextStreamId;
1✔
896
    }
1✔
897
  }
1✔
898

899
  /**
900
   * Finish all active streams due to an IOException, then close the transport.
901
   */
902
  @Override
903
  public void onException(Throwable failureCause) {
904
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
905
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
906
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
907
  }
1✔
908

909
  /**
910
   * Send GOAWAY to the server, then finish all active streams and close the transport.
911
   */
912
  private void onError(ErrorCode errorCode, String moreDetail) {
913
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
914
  }
1✔
915

916
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
917
    synchronized (lock) {
1✔
918
      if (goAwayStatus == null) {
1✔
919
        goAwayStatus = status;
1✔
920
        listener.transportShutdown(status);
1✔
921
      }
922
      if (errorCode != null && !goAwaySent) {
1✔
923
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
924
        // streams. The GOAWAY is part of graceful shutdown.
925
        goAwaySent = true;
1✔
926
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
927
      }
928

929
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
930
      while (it.hasNext()) {
1✔
931
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
932
        if (entry.getKey() > lastKnownStreamId) {
1✔
933
          it.remove();
1✔
934
          entry.getValue().transportState().transportReportStatus(
1✔
935
              status, RpcProgress.REFUSED, false, new Metadata());
936
          maybeClearInUse(entry.getValue());
1✔
937
        }
938
      }
1✔
939

940
      for (OkHttpClientStream stream : pendingStreams) {
1✔
941
        stream.transportState().transportReportStatus(
1✔
942
            status, RpcProgress.MISCARRIED, true, new Metadata());
943
        maybeClearInUse(stream);
1✔
944
      }
1✔
945
      pendingStreams.clear();
1✔
946

947
      stopIfNecessary();
1✔
948
    }
1✔
949
  }
1✔
950

951
  /**
952
   * Called when a stream is closed. We do things like:
953
   * <ul>
954
   * <li>Removing the stream from the map.
955
   * <li>Optionally reporting the status.
956
   * <li>Starting pending streams if we can.
957
   * <li>Stopping the transport if this is the last live stream under a go-away status.
958
   * </ul>
959
   *
960
   * @param streamId the Id of the stream.
961
   * @param status the final status of this stream, null means no need to report.
962
   * @param stopDelivery interrupt queued messages in the deframer
963
   * @param errorCode reset the stream with this ErrorCode if not null.
964
   * @param trailers the trailers received if not null
965
   */
966
  void finishStream(
967
      int streamId,
968
      @Nullable Status status,
969
      RpcProgress rpcProgress,
970
      boolean stopDelivery,
971
      @Nullable ErrorCode errorCode,
972
      @Nullable Metadata trailers) {
973
    synchronized (lock) {
1✔
974
      OkHttpClientStream stream = streams.remove(streamId);
1✔
975
      if (stream != null) {
1✔
976
        if (errorCode != null) {
1✔
977
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
978
        }
979
        if (status != null) {
1✔
980
          stream
1✔
981
              .transportState()
1✔
982
              .transportReportStatus(
1✔
983
                  status,
984
                  rpcProgress,
985
                  stopDelivery,
986
                  trailers != null ? trailers : new Metadata());
1✔
987
        }
988
        if (!startPendingStreams()) {
1✔
989
          stopIfNecessary();
1✔
990
        }
991
        maybeClearInUse(stream);
1✔
992
      }
993
    }
1✔
994
  }
1✔
995

996
  /**
997
   * When the transport is in goAway state, we should stop it once all active streams finish.
998
   */
999
  @GuardedBy("lock")
1000
  private void stopIfNecessary() {
1001
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1002
      return;
1✔
1003
    }
1004
    if (stopped) {
1✔
1005
      return;
1✔
1006
    }
1007
    stopped = true;
1✔
1008

1009
    if (keepAliveManager != null) {
1✔
1010
      keepAliveManager.onTransportTermination();
×
1011
    }
1012

1013
    if (ping != null) {
1✔
1014
      ping.failed(getPingFailure());
1✔
1015
      ping = null;
1✔
1016
    }
1017

1018
    if (!goAwaySent) {
1✔
1019
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1020
      // streams. The GOAWAY is part of graceful shutdown.
1021
      goAwaySent = true;
1✔
1022
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1023
    }
1024

1025
    // We will close the underlying socket in the writing thread to break out the reader
1026
    // thread, which will close the frameReader and notify the listener.
1027
    frameWriter.close();
1✔
1028
  }
1✔
1029

1030
  @GuardedBy("lock")
1031
  private void maybeClearInUse(OkHttpClientStream stream) {
1032
    if (hasStream) {
1✔
1033
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1034
        hasStream = false;
1✔
1035
        if (keepAliveManager != null) {
1✔
1036
          // We don't have any active streams. No need to do keepalives any more.
1037
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1038
          // and onTransportActive.
1039
          keepAliveManager.onTransportIdle();
×
1040
        }
1041
      }
1042
    }
1043
    if (stream.shouldBeCountedForInUse()) {
1✔
1044
      inUseState.updateObjectInUse(stream, false);
1✔
1045
    }
1046
  }
1✔
1047

1048
  @GuardedBy("lock")
1049
  private void setInUse(OkHttpClientStream stream) {
1050
    if (!hasStream) {
1✔
1051
      hasStream = true;
1✔
1052
      if (keepAliveManager != null) {
1✔
1053
        // We have a new stream. We might need to do keepalives now.
1054
        // Note that we have to do this inside the lock to avoid calling
1055
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1056
        // order.
1057
        keepAliveManager.onTransportActive();
×
1058
      }
1059
    }
1060
    if (stream.shouldBeCountedForInUse()) {
1✔
1061
      inUseState.updateObjectInUse(stream, true);
1✔
1062
    }
1063
  }
1✔
1064

1065
  private Throwable getPingFailure() {
1066
    synchronized (lock) {
1✔
1067
      if (goAwayStatus != null) {
1✔
1068
        return goAwayStatus.asException();
1✔
1069
      } else {
1070
        return Status.UNAVAILABLE.withDescription("Connection closed").asException();
×
1071
      }
1072
    }
1073
  }
1074

1075
  boolean mayHaveCreatedStream(int streamId) {
1076
    synchronized (lock) {
1✔
1077
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1078
    }
1079
  }
1080

1081
  OkHttpClientStream getStream(int streamId) {
1082
    synchronized (lock) {
1✔
1083
      return streams.get(streamId);
1✔
1084
    }
1085
  }
1086

1087
  /**
1088
   * Returns a Grpc status corresponding to the given ErrorCode.
1089
   */
1090
  @VisibleForTesting
1091
  static Status toGrpcStatus(ErrorCode code) {
1092
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1093
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1094
        "Unknown http2 error code: " + code.httpCode);
1095
  }
1096

1097
  @Override
1098
  public ListenableFuture<SocketStats> getStats() {
1099
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1100
    synchronized (lock) {
1✔
1101
      if (socket == null) {
1✔
1102
        ret.set(new SocketStats(
×
1103
            transportTracer.getStats(),
×
1104
            /*local=*/ null,
1105
            /*remote=*/ null,
1106
            new InternalChannelz.SocketOptions.Builder().build(),
×
1107
            /*security=*/ null));
1108
      } else {
1109
        ret.set(new SocketStats(
1✔
1110
            transportTracer.getStats(),
1✔
1111
            socket.getLocalSocketAddress(),
1✔
1112
            socket.getRemoteSocketAddress(),
1✔
1113
            Utils.getSocketOptions(socket),
1✔
1114
            securityInfo));
1115
      }
1116
      return ret;
1✔
1117
    }
1118
  }
1119

1120
  /**
1121
   * Runnable which reads frames and dispatches them to in flight calls.
1122
   */
1123
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1124

1125
    private final OkHttpFrameLogger logger =
1✔
1126
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1127
    FrameReader frameReader;
1128
    boolean firstSettings = true;
1✔
1129

1130
    ClientFrameHandler(FrameReader frameReader) {
1✔
1131
      this.frameReader = frameReader;
1✔
1132
    }
1✔
1133

1134
    @Override
1135
    @SuppressWarnings("Finally")
1136
    public void run() {
1137
      String threadName = Thread.currentThread().getName();
1✔
1138
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1139
      try {
1140
        // Read until the underlying socket closes.
1141
        while (frameReader.nextFrame(this)) {
1✔
1142
          if (keepAliveManager != null) {
1✔
1143
            keepAliveManager.onDataReceived();
×
1144
          }
1145
        }
1146
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1147
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1148
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1149
        Status status;
1150
        synchronized (lock) {
1✔
1151
          status = goAwayStatus;
1✔
1152
        }
1✔
1153
        if (status == null) {
1✔
1154
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1155
        }
1156
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1157
      } catch (Throwable t) {
1✔
1158
        // TODO(madongfly): Send the exception message to the server.
1159
        startGoAway(
1✔
1160
            0,
1161
            ErrorCode.PROTOCOL_ERROR,
1162
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1163
      } finally {
1164
        try {
1165
          frameReader.close();
1✔
1166
        } catch (IOException ex) {
×
1167
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1168
        } catch (RuntimeException e) {
×
1169
          // This same check is done in okhttp proper:
1170
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1171

1172
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1173
          // https://issuetracker.google.com/issues/177450597
1174
          if (!"bio == null".equals(e.getMessage())) {
×
1175
            throw e;
×
1176
          }
1177
        }
1✔
1178
        listener.transportTerminated();
1✔
1179
        Thread.currentThread().setName(threadName);
1✔
1180
      }
1181
    }
1✔
1182

1183
    /**
1184
     * Handle an HTTP2 DATA frame.
1185
     */
1186
    @SuppressWarnings("GuardedBy")
1187
    @Override
1188
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1189
                     int paddedLength)
1190
        throws IOException {
1191
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1192
          streamId, in.getBuffer(), length, inFinished);
1✔
1193
      OkHttpClientStream stream = getStream(streamId);
1✔
1194
      if (stream == null) {
1✔
1195
        if (mayHaveCreatedStream(streamId)) {
1✔
1196
          synchronized (lock) {
1✔
1197
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1198
          }
1✔
1199
          in.skip(length);
1✔
1200
        } else {
1201
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1202
          return;
1✔
1203
        }
1204
      } else {
1205
        // Wait until the frame is complete.
1206
        in.require(length);
1✔
1207

1208
        Buffer buf = new Buffer();
1✔
1209
        buf.write(in.getBuffer(), length);
1✔
1210
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1211
            stream.transportState().tag());
1✔
1212
        synchronized (lock) {
1✔
1213
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1214
          // instead found: 'OkHttpClientTransport.this.lock'
1215
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1216
        }
1✔
1217
      }
1218

1219
      // connection window update
1220
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1221
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1222
        synchronized (lock) {
1✔
1223
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1224
        }
1✔
1225
        connectionUnacknowledgedBytesRead = 0;
1✔
1226
      }
1227
    }
1✔
1228

1229
    /**
1230
     * Handle HTTP2 HEADER and CONTINUATION frames.
1231
     */
1232
    @SuppressWarnings("GuardedBy")
1233
    @Override
1234
    public void headers(boolean outFinished,
1235
        boolean inFinished,
1236
        int streamId,
1237
        int associatedStreamId,
1238
        List<Header> headerBlock,
1239
        HeadersMode headersMode) {
1240
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1241
      boolean unknownStream = false;
1✔
1242
      Status failedStatus = null;
1✔
1243
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1244
        int metadataSize = headerBlockSize(headerBlock);
1✔
1245
        if (metadataSize > maxInboundMetadataSize) {
1✔
1246
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1247
              String.format(
1✔
1248
                  Locale.US,
1249
                  "Response %s metadata larger than %d: %d",
1250
                  inFinished ? "trailer" : "header",
1✔
1251
                  maxInboundMetadataSize,
1✔
1252
                  metadataSize));
1✔
1253
        }
1254
      }
1255
      synchronized (lock) {
1✔
1256
        OkHttpClientStream stream = streams.get(streamId);
1✔
1257
        if (stream == null) {
1✔
1258
          if (mayHaveCreatedStream(streamId)) {
1✔
1259
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1260
          } else {
1261
            unknownStream = true;
1✔
1262
          }
1263
        } else {
1264
          if (failedStatus == null) {
1✔
1265
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1266
                stream.transportState().tag());
1✔
1267
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1268
            // instead found: 'OkHttpClientTransport.this.lock'
1269
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1270
          } else {
1271
            if (!inFinished) {
1✔
1272
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1273
            }
1274
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1275
          }
1276
        }
1277
      }
1✔
1278
      if (unknownStream) {
1✔
1279
        // We don't expect any server-initiated streams.
1280
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1281
      }
1282
    }
1✔
1283

1284
    private int headerBlockSize(List<Header> headerBlock) {
1285
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
1286
      long size = 0;
1✔
1287
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1288
        Header header = headerBlock.get(i);
1✔
1289
        size += 32 + header.name.size() + header.value.size();
1✔
1290
      }
1291
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1292
      return (int) size;
1✔
1293
    }
1294

1295
    @Override
1296
    public void rstStream(int streamId, ErrorCode errorCode) {
1297
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1298
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1299
      boolean stopDelivery =
1✔
1300
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1301
      synchronized (lock) {
1✔
1302
        OkHttpClientStream stream = streams.get(streamId);
1✔
1303
        if (stream != null) {
1✔
1304
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1305
              stream.transportState().tag());
1✔
1306
          finishStream(
1✔
1307
              streamId, status,
1308
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1309
              stopDelivery, null, null);
1310
        }
1311
      }
1✔
1312
    }
1✔
1313

1314
    @Override
1315
    public void settings(boolean clearPrevious, Settings settings) {
1316
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1317
      boolean outboundWindowSizeIncreased = false;
1✔
1318
      synchronized (lock) {
1✔
1319
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1320
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1321
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1322
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1323
        }
1324

1325
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1326
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1327
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1328
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1329
        }
1330
        if (firstSettings) {
1✔
1331
          attributes = listener.filterTransport(attributes);
1✔
1332
          listener.transportReady();
1✔
1333
          firstSettings = false;
1✔
1334
        }
1335

1336
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1337
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1338
        // otherwise it will cause a stream error (RST_STREAM).
1339
        frameWriter.ackSettings(settings);
1✔
1340

1341
        // send any pending bytes / streams
1342
        if (outboundWindowSizeIncreased) {
1✔
1343
          outboundFlow.writeStreams();
1✔
1344
        }
1345
        startPendingStreams();
1✔
1346
      }
1✔
1347
    }
1✔
1348

1349
    @Override
1350
    public void ping(boolean ack, int payload1, int payload2) {
1351
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1352
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1353
      if (!ack) {
1✔
1354
        synchronized (lock) {
1✔
1355
          frameWriter.ping(true, payload1, payload2);
1✔
1356
        }
1✔
1357
      } else {
1358
        Http2Ping p = null;
1✔
1359
        synchronized (lock) {
1✔
1360
          if (ping != null) {
1✔
1361
            if (ping.payload() == ackPayload) {
1✔
1362
              p = ping;
1✔
1363
              ping = null;
1✔
1364
            } else {
1365
              log.log(Level.WARNING, String.format(
1✔
1366
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1367
                  ping.payload(), ackPayload));
1✔
1368
            }
1369
          } else {
1370
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1371
          }
1372
        }
1✔
1373
        // don't complete it while holding lock since callbacks could run immediately
1374
        if (p != null) {
1✔
1375
          p.complete();
1✔
1376
        }
1377
      }
1378
    }
1✔
1379

1380
    @Override
1381
    public void ackSettings() {
1382
      // Do nothing currently.
1383
    }
1✔
1384

1385
    @Override
1386
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1387
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1388
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1389
        String data = debugData.utf8();
1✔
1390
        log.log(Level.WARNING, String.format(
1✔
1391
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1392
        if ("too_many_pings".equals(data)) {
1✔
1393
          tooManyPingsRunnable.run();
1✔
1394
        }
1395
      }
1396
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1397
          .augmentDescription("Received Goaway");
1✔
1398
      if (debugData.size() > 0) {
1✔
1399
        // If a debug message was provided, use it.
1400
        status = status.augmentDescription(debugData.utf8());
1✔
1401
      }
1402
      startGoAway(lastGoodStreamId, null, status);
1✔
1403
    }
1✔
1404

1405
    @Override
1406
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1407
        throws IOException {
1408
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1409
          streamId, promisedStreamId, requestHeaders);
1410
      // We don't accept server initiated stream.
1411
      synchronized (lock) {
1✔
1412
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1413
      }
1✔
1414
    }
1✔
1415

1416
    @Override
1417
    public void windowUpdate(int streamId, long delta) {
1418
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1419
      if (delta == 0) {
1✔
1420
        String errorMsg = "Received 0 flow control window increment.";
×
1421
        if (streamId == 0) {
×
1422
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1423
        } else {
1424
          finishStream(
×
1425
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1426
              ErrorCode.PROTOCOL_ERROR, null);
1427
        }
1428
        return;
×
1429
      }
1430

1431
      boolean unknownStream = false;
1✔
1432
      synchronized (lock) {
1✔
1433
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1434
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1435
          return;
1✔
1436
        }
1437

1438
        OkHttpClientStream stream = streams.get(streamId);
1✔
1439
        if (stream != null) {
1✔
1440
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1441
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1442
          unknownStream = true;
1✔
1443
        }
1444
      }
1✔
1445
      if (unknownStream) {
1✔
1446
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1447
            "Received window_update for unknown stream: " + streamId);
1448
      }
1449
    }
1✔
1450

1451
    @Override
1452
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1453
      // Ignore priority change.
1454
      // TODO(madongfly): log
1455
    }
×
1456

1457
    @Override
1458
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1459
        int port, long maxAge) {
1460
      // TODO(madongfly): Deal with alternateService propagation
1461
    }
×
1462
  }
1463
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc