• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19676

04 Feb 2025 06:51PM CUT coverage: 88.592% (+0.006%) from 88.586%
#19676

push

github

web-flow
Replace Kokoro ARM build with GitHub Actions

The Kokoro aarch64 build runs on x86 with an emulator, and has always
been flaky due to the slow execution speed. At present it is continually
failing due to deadline exceededs. GitHub Actions is running on aarch64
hardware, so is much faster (4 minutes vs 30 minutes, without including
the speedup from GitHub Action's caching).

33764 of 38112 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.11
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import com.google.errorprone.annotations.concurrent.GuardedBy;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.ClientStreamTracer;
34
import io.grpc.Grpc;
35
import io.grpc.HttpConnectProxiedSocketAddress;
36
import io.grpc.InternalChannelz;
37
import io.grpc.InternalChannelz.SocketStats;
38
import io.grpc.InternalLogId;
39
import io.grpc.Metadata;
40
import io.grpc.MethodDescriptor;
41
import io.grpc.MethodDescriptor.MethodType;
42
import io.grpc.SecurityLevel;
43
import io.grpc.Status;
44
import io.grpc.Status.Code;
45
import io.grpc.StatusException;
46
import io.grpc.internal.ClientStreamListener.RpcProgress;
47
import io.grpc.internal.ConnectionClientTransport;
48
import io.grpc.internal.GrpcAttributes;
49
import io.grpc.internal.GrpcUtil;
50
import io.grpc.internal.Http2Ping;
51
import io.grpc.internal.InUseStateAggregator;
52
import io.grpc.internal.KeepAliveManager;
53
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
54
import io.grpc.internal.SerializingExecutor;
55
import io.grpc.internal.StatsTraceContext;
56
import io.grpc.internal.TransportTracer;
57
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
58
import io.grpc.okhttp.internal.ConnectionSpec;
59
import io.grpc.okhttp.internal.Credentials;
60
import io.grpc.okhttp.internal.StatusLine;
61
import io.grpc.okhttp.internal.framed.ErrorCode;
62
import io.grpc.okhttp.internal.framed.FrameReader;
63
import io.grpc.okhttp.internal.framed.FrameWriter;
64
import io.grpc.okhttp.internal.framed.Header;
65
import io.grpc.okhttp.internal.framed.HeadersMode;
66
import io.grpc.okhttp.internal.framed.Http2;
67
import io.grpc.okhttp.internal.framed.Settings;
68
import io.grpc.okhttp.internal.framed.Variant;
69
import io.grpc.okhttp.internal.proxy.HttpUrl;
70
import io.grpc.okhttp.internal.proxy.Request;
71
import io.perfmark.PerfMark;
72
import java.io.EOFException;
73
import java.io.IOException;
74
import java.net.InetSocketAddress;
75
import java.net.Socket;
76
import java.net.URI;
77
import java.util.Collections;
78
import java.util.Deque;
79
import java.util.EnumMap;
80
import java.util.HashMap;
81
import java.util.Iterator;
82
import java.util.LinkedList;
83
import java.util.List;
84
import java.util.Locale;
85
import java.util.Map;
86
import java.util.Random;
87
import java.util.concurrent.BrokenBarrierException;
88
import java.util.concurrent.CountDownLatch;
89
import java.util.concurrent.CyclicBarrier;
90
import java.util.concurrent.Executor;
91
import java.util.concurrent.ScheduledExecutorService;
92
import java.util.concurrent.TimeUnit;
93
import java.util.concurrent.TimeoutException;
94
import java.util.logging.Level;
95
import java.util.logging.Logger;
96
import javax.annotation.Nullable;
97
import javax.net.SocketFactory;
98
import javax.net.ssl.HostnameVerifier;
99
import javax.net.ssl.SSLSession;
100
import javax.net.ssl.SSLSocket;
101
import javax.net.ssl.SSLSocketFactory;
102
import okio.Buffer;
103
import okio.BufferedSink;
104
import okio.BufferedSource;
105
import okio.ByteString;
106
import okio.Okio;
107
import okio.Source;
108
import okio.Timeout;
109

110
/**
111
 * A okhttp-based {@link ConnectionClientTransport} implementation.
112
 */
113
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
114
      OutboundFlowController.Transport {
115
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
116
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
117

118
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
119
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
120
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
121
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
122
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
123
        Status.INTERNAL.withDescription("Protocol error"));
1✔
124
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
125
        Status.INTERNAL.withDescription("Internal error"));
1✔
126
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
127
        Status.INTERNAL.withDescription("Flow control error"));
1✔
128
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
129
        Status.INTERNAL.withDescription("Stream closed"));
1✔
130
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
131
        Status.INTERNAL.withDescription("Frame too large"));
1✔
132
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
133
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
134
    errorToStatus.put(ErrorCode.CANCEL,
1✔
135
        Status.CANCELLED.withDescription("Cancelled"));
1✔
136
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
137
        Status.INTERNAL.withDescription("Compression error"));
1✔
138
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
139
        Status.INTERNAL.withDescription("Connect error"));
1✔
140
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
141
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
142
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
143
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
144
    return Collections.unmodifiableMap(errorToStatus);
1✔
145
  }
146

147
  private final InetSocketAddress address;
148
  private final String defaultAuthority;
149
  private final String userAgent;
150
  private final Random random = new Random();
1✔
151
  // Returns new unstarted stopwatches
152
  private final Supplier<Stopwatch> stopwatchFactory;
153
  private final int initialWindowSize;
154
  private final Variant variant;
155
  private Listener listener;
156
  @GuardedBy("lock")
157
  private ExceptionHandlingFrameWriter frameWriter;
158
  private OutboundFlowController outboundFlow;
159
  private final Object lock = new Object();
1✔
160
  private final InternalLogId logId;
161
  @GuardedBy("lock")
162
  private int nextStreamId;
163
  @GuardedBy("lock")
1✔
164
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
165
  private final Executor executor;
166
  // Wrap on executor, to guarantee some operations be executed serially.
167
  private final SerializingExecutor serializingExecutor;
168
  private final ScheduledExecutorService scheduler;
169
  private final int maxMessageSize;
170
  private int connectionUnacknowledgedBytesRead;
171
  private ClientFrameHandler clientFrameHandler;
172
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
173
  private Attributes attributes;
174
  /**
175
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
176
   * streams may continue.
177
   */
178
  @GuardedBy("lock")
179
  private Status goAwayStatus;
180
  @GuardedBy("lock")
181
  private boolean goAwaySent;
182
  @GuardedBy("lock")
183
  private Http2Ping ping;
184
  @GuardedBy("lock")
185
  private boolean stopped;
186
  @GuardedBy("lock")
187
  private boolean hasStream;
188
  private final SocketFactory socketFactory;
189
  private SSLSocketFactory sslSocketFactory;
190
  private HostnameVerifier hostnameVerifier;
191
  private Socket socket;
192
  @GuardedBy("lock")
1✔
193
  private int maxConcurrentStreams = 0;
194
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
195
  @GuardedBy("lock")
196
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
197
  private final ConnectionSpec connectionSpec;
198
  private KeepAliveManager keepAliveManager;
199
  private boolean enableKeepAlive;
200
  private long keepAliveTimeNanos;
201
  private long keepAliveTimeoutNanos;
202
  private boolean keepAliveWithoutCalls;
203
  private final Runnable tooManyPingsRunnable;
204
  private final int maxInboundMetadataSize;
205
  private final boolean useGetForSafeMethods;
206
  @GuardedBy("lock")
207
  private final TransportTracer transportTracer;
208
  @GuardedBy("lock")
1✔
209
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
210
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
211
        @Override
212
        protected void handleInUse() {
213
          listener.transportInUse(true);
1✔
214
        }
1✔
215

216
        @Override
217
        protected void handleNotInUse() {
218
          listener.transportInUse(false);
1✔
219
        }
1✔
220
      };
221
  @GuardedBy("lock")
222
  private InternalChannelz.Security securityInfo;
223

224
  @VisibleForTesting
225
  @Nullable
226
  final HttpConnectProxiedSocketAddress proxiedAddr;
227

228
  @VisibleForTesting
1✔
229
  int proxySocketTimeout = 30000;
230

231
  // The following fields should only be used for test.
232
  Runnable connectingCallback;
233
  SettableFuture<Void> connectedFuture;
234

235
  public OkHttpClientTransport(
236
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
237
      InetSocketAddress address,
238
      String authority,
239
      @Nullable String userAgent,
240
      Attributes eagAttrs,
241
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
242
      Runnable tooManyPingsRunnable) {
243
    this(
1✔
244
        transportFactory,
245
        address,
246
        authority,
247
        userAgent,
248
        eagAttrs,
249
        GrpcUtil.STOPWATCH_SUPPLIER,
250
        new Http2(),
251
        proxiedAddr,
252
        tooManyPingsRunnable);
253
  }
1✔
254

255
  private OkHttpClientTransport(
256
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
257
      InetSocketAddress address,
258
      String authority,
259
      @Nullable String userAgent,
260
      Attributes eagAttrs,
261
      Supplier<Stopwatch> stopwatchFactory,
262
      Variant variant,
263
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
264
      Runnable tooManyPingsRunnable) {
1✔
265
    this.address = Preconditions.checkNotNull(address, "address");
1✔
266
    this.defaultAuthority = authority;
1✔
267
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
268
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
269
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
270
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
271
    this.scheduler = Preconditions.checkNotNull(
1✔
272
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
273
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
274
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
275
    nextStreamId = 3;
1✔
276
    this.socketFactory = transportFactory.socketFactory == null
1✔
277
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
278
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
279
    this.hostnameVerifier = transportFactory.hostnameVerifier;
1✔
280
    this.connectionSpec = Preconditions.checkNotNull(
1✔
281
        transportFactory.connectionSpec, "connectionSpec");
282
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
283
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
284
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
285
    this.proxiedAddr = proxiedAddr;
1✔
286
    this.tooManyPingsRunnable =
1✔
287
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
288
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
289
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
290
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
291
    this.attributes = Attributes.newBuilder()
1✔
292
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
293
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
294
    initTransportTracer();
1✔
295
  }
1✔
296

297
  /**
298
   * Create a transport connected to a fake peer for test.
299
   */
300
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
301
  @VisibleForTesting
302
  OkHttpClientTransport(
303
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
304
      String userAgent,
305
      Supplier<Stopwatch> stopwatchFactory,
306
      Variant variant,
307
      @Nullable Runnable connectingCallback,
308
      SettableFuture<Void> connectedFuture,
309
      Runnable tooManyPingsRunnable) {
310
    this(
1✔
311
        transportFactory,
312
        new InetSocketAddress("127.0.0.1", 80),
313
        "notarealauthority:80",
314
        userAgent,
315
        Attributes.EMPTY,
316
        stopwatchFactory,
317
        variant,
318
        null,
319
        tooManyPingsRunnable);
320
    this.connectingCallback = connectingCallback;
1✔
321
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
322
  }
1✔
323

324
  // sslSocketFactory is set to null when use plaintext.
325
  boolean isUsingPlaintext() {
326
    return sslSocketFactory == null;
1✔
327
  }
328

329
  private void initTransportTracer() {
330
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
331
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
332
        @Override
333
        public TransportTracer.FlowControlWindows read() {
334
          synchronized (lock) {
1✔
335
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
336
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
337
            // provide a lower bound.
338
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
339
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
340
          }
341
        }
342
      });
343
    }
1✔
344
  }
1✔
345

346
  /**
347
   * Enable keepalive with custom delay and timeout.
348
   */
349
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
350
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
351
    enableKeepAlive = enable;
×
352
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
353
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
354
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
355
  }
×
356

357
  @Override
358
  public void ping(final PingCallback callback, Executor executor) {
359
    long data = 0;
1✔
360
    Http2Ping p;
361
    boolean writePing;
362
    synchronized (lock) {
1✔
363
      checkState(frameWriter != null);
1✔
364
      if (stopped) {
1✔
365
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
366
        return;
1✔
367
      }
368
      if (ping != null) {
1✔
369
        // we only allow one outstanding ping at a time, so just add the callback to
370
        // any outstanding operation
371
        p = ping;
1✔
372
        writePing = false;
1✔
373
      } else {
374
        // set outstanding operation and then write the ping after releasing lock
375
        data = random.nextLong();
1✔
376
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
377
        stopwatch.start();
1✔
378
        p = ping = new Http2Ping(data, stopwatch);
1✔
379
        writePing = true;
1✔
380
        transportTracer.reportKeepAliveSent();
1✔
381
      }
382
      if (writePing) {
1✔
383
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
384
      }
385
    }
1✔
386
    // If transport concurrently failed/stopped since we released the lock above, this could
387
    // immediately invoke callback (which we shouldn't do while holding a lock)
388
    p.addCallback(callback, executor);
1✔
389
  }
1✔
390

391
  @Override
392
  public OkHttpClientStream newStream(
393
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
394
      ClientStreamTracer[] tracers) {
395
    Preconditions.checkNotNull(method, "method");
1✔
396
    Preconditions.checkNotNull(headers, "headers");
1✔
397
    StatsTraceContext statsTraceContext =
1✔
398
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
399
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
400
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
401
      return new OkHttpClientStream(
1✔
402
          method,
403
          headers,
404
          frameWriter,
405
          OkHttpClientTransport.this,
406
          outboundFlow,
407
          lock,
408
          maxMessageSize,
409
          initialWindowSize,
410
          defaultAuthority,
411
          userAgent,
412
          statsTraceContext,
413
          transportTracer,
414
          callOptions,
415
          useGetForSafeMethods);
416
    }
417
  }
418

419
  @GuardedBy("lock")
420
  void streamReadyToStart(OkHttpClientStream clientStream) {
421
    if (goAwayStatus != null) {
1✔
422
      clientStream.transportState().transportReportStatus(
1✔
423
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
424
    } else if (streams.size() >= maxConcurrentStreams) {
1✔
425
      pendingStreams.add(clientStream);
1✔
426
      setInUse(clientStream);
1✔
427
    } else {
428
      startStream(clientStream);
1✔
429
    }
430
  }
1✔
431

432
  @SuppressWarnings("GuardedBy")
433
  @GuardedBy("lock")
434
  private void startStream(OkHttpClientStream stream) {
435
    Preconditions.checkState(
1✔
436
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
437
    streams.put(nextStreamId, stream);
1✔
438
    setInUse(stream);
1✔
439
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
440
    // found: 'this.lock'
441
    stream.transportState().start(nextStreamId);
1✔
442
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
443
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
444
        || stream.useGet()) {
1✔
445
      frameWriter.flush();
1✔
446
    }
447
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
448
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
449
      // correctly.
450
      nextStreamId = Integer.MAX_VALUE;
1✔
451
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
452
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
453
    } else {
454
      nextStreamId += 2;
1✔
455
    }
456
  }
1✔
457

458
  /**
459
   * Starts pending streams, returns true if at least one pending stream is started.
460
   */
461
  @GuardedBy("lock")
462
  private boolean startPendingStreams() {
463
    boolean hasStreamStarted = false;
1✔
464
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
465
      OkHttpClientStream stream = pendingStreams.poll();
1✔
466
      startStream(stream);
1✔
467
      hasStreamStarted = true;
1✔
468
    }
1✔
469
    return hasStreamStarted;
1✔
470
  }
471

472
  /**
473
   * Removes given pending stream, used when a pending stream is cancelled.
474
   */
475
  @GuardedBy("lock")
476
  void removePendingStream(OkHttpClientStream pendingStream) {
477
    pendingStreams.remove(pendingStream);
1✔
478
    maybeClearInUse(pendingStream);
1✔
479
  }
1✔
480

481
  @Override
482
  public Runnable start(Listener listener) {
483
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
484

485
    if (enableKeepAlive) {
1✔
486
      keepAliveManager = new KeepAliveManager(
×
487
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
488
          keepAliveWithoutCalls);
489
      keepAliveManager.onTransportStarted();
×
490
    }
491

492
    int maxQueuedControlFrames = 10000;
1✔
493
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
494
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
495
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
496

497
    synchronized (lock) {
1✔
498
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
499
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
500
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
501
      // the same TransportExceptionHandler instance so it is all mixed back together.
502
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
503
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
504
    }
1✔
505
    final CountDownLatch latch = new CountDownLatch(1);
1✔
506
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
507
    // The transport needs up to two threads to function once started,
508
    // but only needs one during handshaking. Start another thread during handshaking
509
    // to make sure there's still a free thread available. If the number of threads is exhausted,
510
    // it is better to kill the transport than for all the transports to hang unable to send.
511
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
512
    // Connecting in the serializingExecutor, so that some stream operations like synStream
513
    // will be executed after connected.
514

515
    serializingExecutor.execute(new Runnable() {
1✔
516
      @Override
517
      public void run() {
518
        // Use closed source on failure so that the reader immediately shuts down.
519
        BufferedSource source = Okio.buffer(new Source() {
1✔
520
          @Override
521
          public long read(Buffer sink, long byteCount) {
522
            return -1;
1✔
523
          }
524

525
          @Override
526
          public Timeout timeout() {
527
            return Timeout.NONE;
×
528
          }
529

530
          @Override
531
          public void close() {
532
          }
1✔
533
        });
534
        Socket sock;
535
        SSLSession sslSession = null;
1✔
536
        try {
537
          // This is a hack to make sure the connection preface and initial settings to be sent out
538
          // without blocking the start. By doing this essentially prevents potential deadlock when
539
          // network is not available during startup while another thread holding lock to send the
540
          // initial preface.
541
          try {
542
            latch.await();
1✔
543
            barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
544
          } catch (InterruptedException e) {
×
545
            Thread.currentThread().interrupt();
×
546
          } catch (TimeoutException | BrokenBarrierException e) {
1✔
547
            startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
548
                .withDescription("Timed out waiting for second handshake thread. "
1✔
549
                    + "The transport executor pool may have run out of threads"));
550
            return;
1✔
551
          }
1✔
552

553
          if (proxiedAddr == null) {
1✔
554
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
555
          } else {
556
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
557
              sock = createHttpProxySocket(
1✔
558
                  proxiedAddr.getTargetAddress(),
1✔
559
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
560
                  proxiedAddr.getUsername(),
1✔
561
                  proxiedAddr.getPassword()
1✔
562
              );
563
            } else {
564
              throw Status.INTERNAL.withDescription(
×
565
                  "Unsupported SocketAddress implementation "
566
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
567
            }
568
          }
569
          if (sslSocketFactory != null) {
1✔
570
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
571
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
572
                connectionSpec);
1✔
573
            sslSession = sslSocket.getSession();
1✔
574
            sock = sslSocket;
1✔
575
          }
576
          sock.setTcpNoDelay(true);
1✔
577
          source = Okio.buffer(Okio.source(sock));
1✔
578
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
579

580
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
581
          attributes = attributes.toBuilder()
1✔
582
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
583
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
584
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
585
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
586
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
587
              .build();
1✔
588
        } catch (StatusException e) {
1✔
589
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
590
          return;
1✔
591
        } catch (Exception e) {
1✔
592
          onException(e);
1✔
593
          return;
1✔
594
        } finally {
595
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
596
          latchForExtraThread.countDown();
1✔
597
        }
598
        synchronized (lock) {
1✔
599
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
600
          if (sslSession != null) {
1✔
601
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
602
          }
603
        }
1✔
604
      }
1✔
605
    });
606

607
    executor.execute(new Runnable() {
1✔
608
      @Override
609
      public void run() {
610
        try {
611
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
612
          latchForExtraThread.await();
1✔
613
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
614
          // Something bad happened, maybe too few threads available!
615
          // This will be handled in the handshake thread.
616
        } catch (InterruptedException e) {
×
617
          Thread.currentThread().interrupt();
×
618
        }
1✔
619
      }
1✔
620
    });
621
    // Schedule to send connection preface & settings before any other write.
622
    try {
623
      sendConnectionPrefaceAndSettings();
1✔
624
    } finally {
625
      latch.countDown();
1✔
626
    }
627

628
    serializingExecutor.execute(new Runnable() {
1✔
629
      @Override
630
      public void run() {
631
        if (connectingCallback != null) {
1✔
632
          connectingCallback.run();
1✔
633
        }
634
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
635
        // may send goAway immediately.
636
        executor.execute(clientFrameHandler);
1✔
637
        synchronized (lock) {
1✔
638
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
639
          startPendingStreams();
1✔
640
        }
1✔
641
        if (connectedFuture != null) {
1✔
642
          connectedFuture.set(null);
1✔
643
        }
644
      }
1✔
645
    });
646
    return null;
1✔
647
  }
648

649
  /**
650
   * Should only be called once when the transport is first established.
651
   */
652
  private void sendConnectionPrefaceAndSettings() {
653
    synchronized (lock) {
1✔
654
      frameWriter.connectionPreface();
1✔
655
      Settings settings = new Settings();
1✔
656
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
657
      frameWriter.settings(settings);
1✔
658
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
659
        frameWriter.windowUpdate(
1✔
660
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
661
      }
662
    }
1✔
663
  }
1✔
664

665
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
666
      String proxyUsername, String proxyPassword) throws StatusException {
667
    Socket sock = null;
1✔
668
    try {
669
      // The proxy address may not be resolved
670
      if (proxyAddress.getAddress() != null) {
1✔
671
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
672
      } else {
673
        sock =
×
674
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
675
      }
676
      sock.setTcpNoDelay(true);
1✔
677
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
678
      // can cause reading from the socket to hang.
679
      sock.setSoTimeout(proxySocketTimeout);
1✔
680

681
      Source source = Okio.source(sock);
1✔
682
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
683

684
      // Prepare headers and request method line
685
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
686
      HttpUrl url = proxyRequest.httpUrl();
1✔
687
      String requestLine =
1✔
688
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
689

690
      // Write request to socket
691
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
692
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
693
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
694
            .writeUtf8(": ")
1✔
695
            .writeUtf8(proxyRequest.headers().value(i))
1✔
696
            .writeUtf8("\r\n");
1✔
697
      }
698
      sink.writeUtf8("\r\n");
1✔
699
      // Flush buffer (flushes socket and sends request)
700
      sink.flush();
1✔
701

702
      // Read status line, check if 2xx was returned
703
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
704
      // Drain rest of headers
705
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
706
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
707
        Buffer body = new Buffer();
1✔
708
        try {
709
          sock.shutdownOutput();
1✔
710
          source.read(body, 1024);
1✔
711
        } catch (IOException ex) {
×
712
          body.writeUtf8("Unable to read body: " + ex.toString());
×
713
        }
1✔
714
        try {
715
          sock.close();
1✔
716
        } catch (IOException ignored) {
×
717
          // ignored
718
        }
1✔
719
        String message = String.format(
1✔
720
            Locale.US,
721
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
722
              + "Response body:\n%s",
723
            statusLine.code, statusLine.message, body.readUtf8());
1✔
724
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
725
      }
726
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
727
      sock.setSoTimeout(0);
1✔
728
      return sock;
1✔
729
    } catch (IOException e) {
1✔
730
      if (sock != null) {
1✔
731
        GrpcUtil.closeQuietly(sock);
1✔
732
      }
733
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
734
          .asException();
1✔
735
    }
736
  }
737

738
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
739
                                         String proxyPassword) {
740
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
741
        .scheme("https")
1✔
742
        .host(address.getHostName())
1✔
743
        .port(address.getPort())
1✔
744
        .build();
1✔
745

746
    Request.Builder request = new Request.Builder()
1✔
747
        .url(tunnelUrl)
1✔
748
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
749
        .header("User-Agent", userAgent);
1✔
750

751
    // If we have proxy credentials, set them right away
752
    if (proxyUsername != null && proxyPassword != null) {
1✔
753
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
754
    }
755
    return request.build();
1✔
756
  }
757

758
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
759
    Buffer buffer = new Buffer();
1✔
760
    while (true) {
761
      if (source.read(buffer, 1) == -1) {
1✔
762
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
763
      }
764
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
765
        return buffer.readUtf8LineStrict();
1✔
766
      }
767
    }
768
  }
769

770
  @Override
771
  public String toString() {
772
    return MoreObjects.toStringHelper(this)
1✔
773
        .add("logId", logId.getId())
1✔
774
        .add("address", address)
1✔
775
        .toString();
1✔
776
  }
777

778
  @Override
779
  public InternalLogId getLogId() {
780
    return logId;
1✔
781
  }
782

783
  /**
784
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
785
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
786
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
787
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
788
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
789
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
790
   * connection to the port, independent of this function.
791
   *
792
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
793
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
794
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
795
   */
796
  @VisibleForTesting
797
  String getOverridenHost() {
798
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
799
    if (uri.getHost() != null) {
1✔
800
      return uri.getHost();
1✔
801
    }
802

803
    return defaultAuthority;
1✔
804
  }
805

806
  @VisibleForTesting
807
  int getOverridenPort() {
808
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
809
    if (uri.getPort() != -1) {
1✔
810
      return uri.getPort();
×
811
    }
812

813
    return address.getPort();
1✔
814
  }
815

816
  @Override
817
  public void shutdown(Status reason) {
818
    synchronized (lock) {
1✔
819
      if (goAwayStatus != null) {
1✔
820
        return;
1✔
821
      }
822

823
      goAwayStatus = reason;
1✔
824
      listener.transportShutdown(goAwayStatus);
1✔
825
      stopIfNecessary();
1✔
826
    }
1✔
827
  }
1✔
828

829
  @Override
830
  public void shutdownNow(Status reason) {
831
    shutdown(reason);
1✔
832
    synchronized (lock) {
1✔
833
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
834
      while (it.hasNext()) {
1✔
835
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
836
        it.remove();
1✔
837
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
838
        maybeClearInUse(entry.getValue());
1✔
839
      }
1✔
840

841
      for (OkHttpClientStream stream : pendingStreams) {
1✔
842
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
843
        // chance to retry and be routed to another connection.
844
        stream.transportState().transportReportStatus(
1✔
845
            reason, RpcProgress.MISCARRIED, true, new Metadata());
846
        maybeClearInUse(stream);
1✔
847
      }
1✔
848
      pendingStreams.clear();
1✔
849

850
      stopIfNecessary();
1✔
851
    }
1✔
852
  }
1✔
853

854
  @Override
855
  public Attributes getAttributes() {
856
    return attributes;
1✔
857
  }
858

859
  /**
860
   * Gets all active streams as an array.
861
   */
862
  @Override
863
  public OutboundFlowController.StreamState[] getActiveStreams() {
864
    synchronized (lock) {
1✔
865
      OutboundFlowController.StreamState[] flowStreams =
1✔
866
          new OutboundFlowController.StreamState[streams.size()];
1✔
867
      int i = 0;
1✔
868
      for (OkHttpClientStream stream : streams.values()) {
1✔
869
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
870
      }
1✔
871
      return flowStreams;
1✔
872
    }
873
  }
874

875
  @VisibleForTesting
876
  ClientFrameHandler getHandler() {
877
    return clientFrameHandler;
1✔
878
  }
879

880
  @VisibleForTesting
881
  SocketFactory getSocketFactory() {
882
    return socketFactory;
1✔
883
  }
884

885
  @VisibleForTesting
886
  int getPendingStreamSize() {
887
    synchronized (lock) {
1✔
888
      return pendingStreams.size();
1✔
889
    }
890
  }
891

892
  @VisibleForTesting
893
  void setNextStreamId(int nextStreamId) {
894
    synchronized (lock) {
1✔
895
      this.nextStreamId = nextStreamId;
1✔
896
    }
1✔
897
  }
1✔
898

899
  /**
900
   * Finish all active streams due to an IOException, then close the transport.
901
   */
902
  @Override
903
  public void onException(Throwable failureCause) {
904
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
905
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
906
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
907
  }
1✔
908

909
  /**
910
   * Send GOAWAY to the server, then finish all active streams and close the transport.
911
   */
912
  private void onError(ErrorCode errorCode, String moreDetail) {
913
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
914
  }
1✔
915

916
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
917
    synchronized (lock) {
1✔
918
      if (goAwayStatus == null) {
1✔
919
        goAwayStatus = status;
1✔
920
        listener.transportShutdown(status);
1✔
921
      }
922
      if (errorCode != null && !goAwaySent) {
1✔
923
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
924
        // streams. The GOAWAY is part of graceful shutdown.
925
        goAwaySent = true;
1✔
926
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
927
      }
928

929
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
930
      while (it.hasNext()) {
1✔
931
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
932
        if (entry.getKey() > lastKnownStreamId) {
1✔
933
          it.remove();
1✔
934
          entry.getValue().transportState().transportReportStatus(
1✔
935
              status, RpcProgress.REFUSED, false, new Metadata());
936
          maybeClearInUse(entry.getValue());
1✔
937
        }
938
      }
1✔
939

940
      for (OkHttpClientStream stream : pendingStreams) {
1✔
941
        stream.transportState().transportReportStatus(
1✔
942
            status, RpcProgress.MISCARRIED, true, new Metadata());
943
        maybeClearInUse(stream);
1✔
944
      }
1✔
945
      pendingStreams.clear();
1✔
946

947
      stopIfNecessary();
1✔
948
    }
1✔
949
  }
1✔
950

951
  /**
952
   * Called when a stream is closed. We do things like:
953
   * <ul>
954
   * <li>Removing the stream from the map.
955
   * <li>Optionally reporting the status.
956
   * <li>Starting pending streams if we can.
957
   * <li>Stopping the transport if this is the last live stream under a go-away status.
958
   * </ul>
959
   *
960
   * @param streamId the Id of the stream.
961
   * @param status the final status of this stream, null means no need to report.
962
   * @param stopDelivery interrupt queued messages in the deframer
963
   * @param errorCode reset the stream with this ErrorCode if not null.
964
   * @param trailers the trailers received if not null
965
   */
966
  void finishStream(
967
      int streamId,
968
      @Nullable Status status,
969
      RpcProgress rpcProgress,
970
      boolean stopDelivery,
971
      @Nullable ErrorCode errorCode,
972
      @Nullable Metadata trailers) {
973
    synchronized (lock) {
1✔
974
      OkHttpClientStream stream = streams.remove(streamId);
1✔
975
      if (stream != null) {
1✔
976
        if (errorCode != null) {
1✔
977
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
978
        }
979
        if (status != null) {
1✔
980
          stream
1✔
981
              .transportState()
1✔
982
              .transportReportStatus(
1✔
983
                  status,
984
                  rpcProgress,
985
                  stopDelivery,
986
                  trailers != null ? trailers : new Metadata());
1✔
987
        }
988
        if (!startPendingStreams()) {
1✔
989
          stopIfNecessary();
1✔
990
        }
991
        maybeClearInUse(stream);
1✔
992
      }
993
    }
1✔
994
  }
1✔
995

996
  /**
997
   * When the transport is in goAway state, we should stop it once all active streams finish.
998
   */
999
  @GuardedBy("lock")
1000
  private void stopIfNecessary() {
1001
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1002
      return;
1✔
1003
    }
1004
    if (stopped) {
1✔
1005
      return;
1✔
1006
    }
1007
    stopped = true;
1✔
1008

1009
    if (keepAliveManager != null) {
1✔
1010
      keepAliveManager.onTransportTermination();
×
1011
    }
1012

1013
    if (ping != null) {
1✔
1014
      ping.failed(getPingFailure());
1✔
1015
      ping = null;
1✔
1016
    }
1017

1018
    if (!goAwaySent) {
1✔
1019
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1020
      // streams. The GOAWAY is part of graceful shutdown.
1021
      goAwaySent = true;
1✔
1022
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1023
    }
1024

1025
    // We will close the underlying socket in the writing thread to break out the reader
1026
    // thread, which will close the frameReader and notify the listener.
1027
    frameWriter.close();
1✔
1028
  }
1✔
1029

1030
  @GuardedBy("lock")
1031
  private void maybeClearInUse(OkHttpClientStream stream) {
1032
    if (hasStream) {
1✔
1033
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1034
        hasStream = false;
1✔
1035
        if (keepAliveManager != null) {
1✔
1036
          // We don't have any active streams. No need to do keepalives any more.
1037
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1038
          // and onTransportActive.
1039
          keepAliveManager.onTransportIdle();
×
1040
        }
1041
      }
1042
    }
1043
    if (stream.shouldBeCountedForInUse()) {
1✔
1044
      inUseState.updateObjectInUse(stream, false);
1✔
1045
    }
1046
  }
1✔
1047

1048
  @GuardedBy("lock")
1049
  private void setInUse(OkHttpClientStream stream) {
1050
    if (!hasStream) {
1✔
1051
      hasStream = true;
1✔
1052
      if (keepAliveManager != null) {
1✔
1053
        // We have a new stream. We might need to do keepalives now.
1054
        // Note that we have to do this inside the lock to avoid calling
1055
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1056
        // order.
1057
        keepAliveManager.onTransportActive();
×
1058
      }
1059
    }
1060
    if (stream.shouldBeCountedForInUse()) {
1✔
1061
      inUseState.updateObjectInUse(stream, true);
1✔
1062
    }
1063
  }
1✔
1064

1065
  private Throwable getPingFailure() {
1066
    synchronized (lock) {
1✔
1067
      if (goAwayStatus != null) {
1✔
1068
        return goAwayStatus.asException();
1✔
1069
      } else {
1070
        return Status.UNAVAILABLE.withDescription("Connection closed").asException();
×
1071
      }
1072
    }
1073
  }
1074

1075
  boolean mayHaveCreatedStream(int streamId) {
1076
    synchronized (lock) {
1✔
1077
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1078
    }
1079
  }
1080

1081
  OkHttpClientStream getStream(int streamId) {
1082
    synchronized (lock) {
1✔
1083
      return streams.get(streamId);
1✔
1084
    }
1085
  }
1086

1087
  /**
1088
   * Returns a Grpc status corresponding to the given ErrorCode.
1089
   */
1090
  @VisibleForTesting
1091
  static Status toGrpcStatus(ErrorCode code) {
1092
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1093
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1094
        "Unknown http2 error code: " + code.httpCode);
1095
  }
1096

1097
  @Override
1098
  public ListenableFuture<SocketStats> getStats() {
1099
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1100
    synchronized (lock) {
1✔
1101
      if (socket == null) {
1✔
1102
        ret.set(new SocketStats(
×
1103
            transportTracer.getStats(),
×
1104
            /*local=*/ null,
1105
            /*remote=*/ null,
1106
            new InternalChannelz.SocketOptions.Builder().build(),
×
1107
            /*security=*/ null));
1108
      } else {
1109
        ret.set(new SocketStats(
1✔
1110
            transportTracer.getStats(),
1✔
1111
            socket.getLocalSocketAddress(),
1✔
1112
            socket.getRemoteSocketAddress(),
1✔
1113
            Utils.getSocketOptions(socket),
1✔
1114
            securityInfo));
1115
      }
1116
      return ret;
1✔
1117
    }
1118
  }
1119

1120
  /**
1121
   * Runnable which reads frames and dispatches them to in flight calls.
1122
   */
1123
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1124

1125
    private final OkHttpFrameLogger logger =
1✔
1126
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1127
    FrameReader frameReader;
1128
    boolean firstSettings = true;
1✔
1129

1130
    ClientFrameHandler(FrameReader frameReader) {
1✔
1131
      this.frameReader = frameReader;
1✔
1132
    }
1✔
1133

1134
    @Override
1135
    @SuppressWarnings("Finally")
1136
    public void run() {
1137
      String threadName = Thread.currentThread().getName();
1✔
1138
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1139
      try {
1140
        // Read until the underlying socket closes.
1141
        while (frameReader.nextFrame(this)) {
1✔
1142
          if (keepAliveManager != null) {
1✔
1143
            keepAliveManager.onDataReceived();
×
1144
          }
1145
        }
1146
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1147
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1148
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1149
        Status status;
1150
        synchronized (lock) {
1✔
1151
          status = goAwayStatus;
1✔
1152
        }
1✔
1153
        if (status == null) {
1✔
1154
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1155
        }
1156
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1157
      } catch (Throwable t) {
1✔
1158
        // TODO(madongfly): Send the exception message to the server.
1159
        startGoAway(
1✔
1160
            0,
1161
            ErrorCode.PROTOCOL_ERROR,
1162
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1163
      } finally {
1164
        try {
1165
          frameReader.close();
1✔
1166
        } catch (IOException ex) {
×
1167
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1168
        } catch (RuntimeException e) {
×
1169
          // This same check is done in okhttp proper:
1170
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1171

1172
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1173
          // https://issuetracker.google.com/issues/177450597
1174
          if (!"bio == null".equals(e.getMessage())) {
×
1175
            throw e;
×
1176
          }
1177
        }
1✔
1178
        listener.transportTerminated();
1✔
1179
        Thread.currentThread().setName(threadName);
1✔
1180
      }
1181
    }
1✔
1182

1183
    /**
1184
     * Handle an HTTP2 DATA frame.
1185
     */
1186
    @SuppressWarnings("GuardedBy")
1187
    @Override
1188
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1189
                     int paddedLength)
1190
        throws IOException {
1191
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1192
          streamId, in.getBuffer(), length, inFinished);
1✔
1193
      OkHttpClientStream stream = getStream(streamId);
1✔
1194
      if (stream == null) {
1✔
1195
        if (mayHaveCreatedStream(streamId)) {
1✔
1196
          synchronized (lock) {
1✔
1197
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1198
          }
1✔
1199
          in.skip(length);
1✔
1200
        } else {
1201
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1202
          return;
1✔
1203
        }
1204
      } else {
1205
        // Wait until the frame is complete.
1206
        in.require(length);
1✔
1207

1208
        Buffer buf = new Buffer();
1✔
1209
        buf.write(in.getBuffer(), length);
1✔
1210
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1211
            stream.transportState().tag());
1✔
1212
        synchronized (lock) {
1✔
1213
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1214
          // instead found: 'OkHttpClientTransport.this.lock'
1215
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1216
        }
1✔
1217
      }
1218

1219
      // connection window update
1220
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1221
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1222
        synchronized (lock) {
1✔
1223
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1224
        }
1✔
1225
        connectionUnacknowledgedBytesRead = 0;
1✔
1226
      }
1227
    }
1✔
1228

1229
    /**
1230
     * Handle HTTP2 HEADER and CONTINUATION frames.
1231
     */
1232
    @SuppressWarnings("GuardedBy")
1233
    @Override
1234
    public void headers(boolean outFinished,
1235
        boolean inFinished,
1236
        int streamId,
1237
        int associatedStreamId,
1238
        List<Header> headerBlock,
1239
        HeadersMode headersMode) {
1240
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1241
      boolean unknownStream = false;
1✔
1242
      Status failedStatus = null;
1✔
1243
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1244
        int metadataSize = headerBlockSize(headerBlock);
1✔
1245
        if (metadataSize > maxInboundMetadataSize) {
1✔
1246
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1247
              String.format(
1✔
1248
                  Locale.US,
1249
                  "Response %s metadata larger than %d: %d",
1250
                  inFinished ? "trailer" : "header",
1✔
1251
                  maxInboundMetadataSize,
1✔
1252
                  metadataSize));
1✔
1253
        }
1254
      }
1255
      synchronized (lock) {
1✔
1256
        OkHttpClientStream stream = streams.get(streamId);
1✔
1257
        if (stream == null) {
1✔
1258
          if (mayHaveCreatedStream(streamId)) {
1✔
1259
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1260
          } else {
1261
            unknownStream = true;
1✔
1262
          }
1263
        } else {
1264
          if (failedStatus == null) {
1✔
1265
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1266
                stream.transportState().tag());
1✔
1267
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1268
            // instead found: 'OkHttpClientTransport.this.lock'
1269
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1270
          } else {
1271
            if (!inFinished) {
1✔
1272
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1273
            }
1274
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1275
          }
1276
        }
1277
      }
1✔
1278
      if (unknownStream) {
1✔
1279
        // We don't expect any server-initiated streams.
1280
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1281
      }
1282
    }
1✔
1283

1284
    private int headerBlockSize(List<Header> headerBlock) {
1285
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 §6.5.2.
1286
      long size = 0;
1✔
1287
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1288
        Header header = headerBlock.get(i);
1✔
1289
        size += 32 + header.name.size() + header.value.size();
1✔
1290
      }
1291
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1292
      return (int) size;
1✔
1293
    }
1294

1295
    @Override
1296
    public void rstStream(int streamId, ErrorCode errorCode) {
1297
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1298
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1299
      boolean stopDelivery =
1✔
1300
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1301
      synchronized (lock) {
1✔
1302
        OkHttpClientStream stream = streams.get(streamId);
1✔
1303
        if (stream != null) {
1✔
1304
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1305
              stream.transportState().tag());
1✔
1306
          finishStream(
1✔
1307
              streamId, status,
1308
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1309
              stopDelivery, null, null);
1310
        }
1311
      }
1✔
1312
    }
1✔
1313

1314
    @Override
1315
    public void settings(boolean clearPrevious, Settings settings) {
1316
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1317
      boolean outboundWindowSizeIncreased = false;
1✔
1318
      synchronized (lock) {
1✔
1319
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1320
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1321
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1322
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1323
        }
1324

1325
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1326
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1327
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1328
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1329
        }
1330
        if (firstSettings) {
1✔
1331
          attributes = listener.filterTransport(attributes);
1✔
1332
          listener.transportReady();
1✔
1333
          firstSettings = false;
1✔
1334
        }
1335

1336
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1337
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1338
        // otherwise it will cause a stream error (RST_STREAM).
1339
        frameWriter.ackSettings(settings);
1✔
1340

1341
        // send any pending bytes / streams
1342
        if (outboundWindowSizeIncreased) {
1✔
1343
          outboundFlow.writeStreams();
1✔
1344
        }
1345
        startPendingStreams();
1✔
1346
      }
1✔
1347
    }
1✔
1348

1349
    @Override
1350
    public void ping(boolean ack, int payload1, int payload2) {
1351
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1352
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1353
      if (!ack) {
1✔
1354
        synchronized (lock) {
1✔
1355
          frameWriter.ping(true, payload1, payload2);
1✔
1356
        }
1✔
1357
      } else {
1358
        Http2Ping p = null;
1✔
1359
        synchronized (lock) {
1✔
1360
          if (ping != null) {
1✔
1361
            if (ping.payload() == ackPayload) {
1✔
1362
              p = ping;
1✔
1363
              ping = null;
1✔
1364
            } else {
1365
              log.log(Level.WARNING, String.format(
1✔
1366
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1367
                  ping.payload(), ackPayload));
1✔
1368
            }
1369
          } else {
1370
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1371
          }
1372
        }
1✔
1373
        // don't complete it while holding lock since callbacks could run immediately
1374
        if (p != null) {
1✔
1375
          p.complete();
1✔
1376
        }
1377
      }
1378
    }
1✔
1379

1380
    @Override
1381
    public void ackSettings() {
1382
      // Do nothing currently.
1383
    }
1✔
1384

1385
    @Override
1386
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1387
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1388
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1389
        String data = debugData.utf8();
1✔
1390
        log.log(Level.WARNING, String.format(
1✔
1391
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1392
        if ("too_many_pings".equals(data)) {
1✔
1393
          tooManyPingsRunnable.run();
1✔
1394
        }
1395
      }
1396
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1397
          .augmentDescription("Received Goaway");
1✔
1398
      if (debugData.size() > 0) {
1✔
1399
        // If a debug message was provided, use it.
1400
        status = status.augmentDescription(debugData.utf8());
1✔
1401
      }
1402
      startGoAway(lastGoodStreamId, null, status);
1✔
1403
    }
1✔
1404

1405
    @Override
1406
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1407
        throws IOException {
1408
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1409
          streamId, promisedStreamId, requestHeaders);
1410
      // We don't accept server initiated stream.
1411
      synchronized (lock) {
1✔
1412
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1413
      }
1✔
1414
    }
1✔
1415

1416
    @Override
1417
    public void windowUpdate(int streamId, long delta) {
1418
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1419
      if (delta == 0) {
1✔
1420
        String errorMsg = "Received 0 flow control window increment.";
×
1421
        if (streamId == 0) {
×
1422
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1423
        } else {
1424
          finishStream(
×
1425
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1426
              ErrorCode.PROTOCOL_ERROR, null);
1427
        }
1428
        return;
×
1429
      }
1430

1431
      boolean unknownStream = false;
1✔
1432
      synchronized (lock) {
1✔
1433
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1434
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1435
          return;
1✔
1436
        }
1437

1438
        OkHttpClientStream stream = streams.get(streamId);
1✔
1439
        if (stream != null) {
1✔
1440
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1441
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1442
          unknownStream = true;
1✔
1443
        }
1444
      }
1✔
1445
      if (unknownStream) {
1✔
1446
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1447
            "Received window_update for unknown stream: " + streamId);
1448
      }
1449
    }
1✔
1450

1451
    @Override
1452
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1453
      // Ignore priority change.
1454
      // TODO(madongfly): log
1455
    }
×
1456

1457
    @Override
1458
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1459
        int port, long maxAge) {
1460
      // TODO(madongfly): Deal with alternateService propagation
1461
    }
×
1462
  }
1463
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc