• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19551

14 Nov 2024 04:34PM UTC coverage: 84.627% (+0.01%) from 84.614%
#19551

push

github

web-flow
api: When forwarding from Listener onAddresses to Listener2 continue to use onResult (#11666) (#11688)

When forwarding from Listener onAddresses to Listener2 continue to use onResult and not onResult2 because the latter requires to be called from within synchronization context and it breaks existing code that didn't need to do so when using the old Listener interface.

33855 of 40005 relevant lines covered (84.63%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.11
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import io.grpc.Attributes;
31
import io.grpc.CallOptions;
32
import io.grpc.ClientStreamTracer;
33
import io.grpc.Grpc;
34
import io.grpc.HttpConnectProxiedSocketAddress;
35
import io.grpc.InternalChannelz;
36
import io.grpc.InternalChannelz.SocketStats;
37
import io.grpc.InternalLogId;
38
import io.grpc.Metadata;
39
import io.grpc.MethodDescriptor;
40
import io.grpc.MethodDescriptor.MethodType;
41
import io.grpc.SecurityLevel;
42
import io.grpc.Status;
43
import io.grpc.Status.Code;
44
import io.grpc.StatusException;
45
import io.grpc.internal.ClientStreamListener.RpcProgress;
46
import io.grpc.internal.ConnectionClientTransport;
47
import io.grpc.internal.GrpcAttributes;
48
import io.grpc.internal.GrpcUtil;
49
import io.grpc.internal.Http2Ping;
50
import io.grpc.internal.InUseStateAggregator;
51
import io.grpc.internal.KeepAliveManager;
52
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
53
import io.grpc.internal.SerializingExecutor;
54
import io.grpc.internal.StatsTraceContext;
55
import io.grpc.internal.TransportTracer;
56
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
57
import io.grpc.okhttp.internal.ConnectionSpec;
58
import io.grpc.okhttp.internal.Credentials;
59
import io.grpc.okhttp.internal.StatusLine;
60
import io.grpc.okhttp.internal.framed.ErrorCode;
61
import io.grpc.okhttp.internal.framed.FrameReader;
62
import io.grpc.okhttp.internal.framed.FrameWriter;
63
import io.grpc.okhttp.internal.framed.Header;
64
import io.grpc.okhttp.internal.framed.HeadersMode;
65
import io.grpc.okhttp.internal.framed.Http2;
66
import io.grpc.okhttp.internal.framed.Settings;
67
import io.grpc.okhttp.internal.framed.Variant;
68
import io.grpc.okhttp.internal.proxy.HttpUrl;
69
import io.grpc.okhttp.internal.proxy.Request;
70
import io.perfmark.PerfMark;
71
import java.io.EOFException;
72
import java.io.IOException;
73
import java.net.InetSocketAddress;
74
import java.net.Socket;
75
import java.net.URI;
76
import java.util.Collections;
77
import java.util.Deque;
78
import java.util.EnumMap;
79
import java.util.HashMap;
80
import java.util.Iterator;
81
import java.util.LinkedList;
82
import java.util.List;
83
import java.util.Locale;
84
import java.util.Map;
85
import java.util.Random;
86
import java.util.concurrent.BrokenBarrierException;
87
import java.util.concurrent.CountDownLatch;
88
import java.util.concurrent.CyclicBarrier;
89
import java.util.concurrent.Executor;
90
import java.util.concurrent.ScheduledExecutorService;
91
import java.util.concurrent.TimeUnit;
92
import java.util.concurrent.TimeoutException;
93
import java.util.logging.Level;
94
import java.util.logging.Logger;
95
import javax.annotation.Nullable;
96
import javax.annotation.concurrent.GuardedBy;
97
import javax.net.SocketFactory;
98
import javax.net.ssl.HostnameVerifier;
99
import javax.net.ssl.SSLSession;
100
import javax.net.ssl.SSLSocket;
101
import javax.net.ssl.SSLSocketFactory;
102
import okio.Buffer;
103
import okio.BufferedSink;
104
import okio.BufferedSource;
105
import okio.ByteString;
106
import okio.Okio;
107
import okio.Source;
108
import okio.Timeout;
109

110
/**
111
 * A okhttp-based {@link ConnectionClientTransport} implementation.
112
 */
113
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
114
      OutboundFlowController.Transport {
115
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
116
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
117

118
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
119
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
120
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
121
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
122
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
123
        Status.INTERNAL.withDescription("Protocol error"));
1✔
124
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
125
        Status.INTERNAL.withDescription("Internal error"));
1✔
126
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
127
        Status.INTERNAL.withDescription("Flow control error"));
1✔
128
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
129
        Status.INTERNAL.withDescription("Stream closed"));
1✔
130
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
131
        Status.INTERNAL.withDescription("Frame too large"));
1✔
132
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
133
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
134
    errorToStatus.put(ErrorCode.CANCEL,
1✔
135
        Status.CANCELLED.withDescription("Cancelled"));
1✔
136
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
137
        Status.INTERNAL.withDescription("Compression error"));
1✔
138
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
139
        Status.INTERNAL.withDescription("Connect error"));
1✔
140
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
141
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
142
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
143
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
144
    return Collections.unmodifiableMap(errorToStatus);
1✔
145
  }
146

147
  private final InetSocketAddress address;
148
  private final String defaultAuthority;
149
  private final String userAgent;
150
  private final Random random = new Random();
1✔
151
  // Returns new unstarted stopwatches
152
  private final Supplier<Stopwatch> stopwatchFactory;
153
  private final int initialWindowSize;
154
  private final Variant variant;
155
  private Listener listener;
156
  @GuardedBy("lock")
157
  private ExceptionHandlingFrameWriter frameWriter;
158
  private OutboundFlowController outboundFlow;
159
  private final Object lock = new Object();
1✔
160
  private final InternalLogId logId;
161
  @GuardedBy("lock")
162
  private int nextStreamId;
163
  @GuardedBy("lock")
1✔
164
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
165
  private final Executor executor;
166
  // Wrap on executor, to guarantee some operations be executed serially.
167
  private final SerializingExecutor serializingExecutor;
168
  private final ScheduledExecutorService scheduler;
169
  private final int maxMessageSize;
170
  private int connectionUnacknowledgedBytesRead;
171
  private ClientFrameHandler clientFrameHandler;
172
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
173
  private Attributes attributes;
174
  /**
175
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
176
   * streams may continue.
177
   */
178
  @GuardedBy("lock")
179
  private Status goAwayStatus;
180
  @GuardedBy("lock")
181
  private boolean goAwaySent;
182
  @GuardedBy("lock")
183
  private Http2Ping ping;
184
  @GuardedBy("lock")
185
  private boolean stopped;
186
  @GuardedBy("lock")
187
  private boolean hasStream;
188
  private final SocketFactory socketFactory;
189
  private SSLSocketFactory sslSocketFactory;
190
  private HostnameVerifier hostnameVerifier;
191
  private Socket socket;
192
  @GuardedBy("lock")
1✔
193
  private int maxConcurrentStreams = 0;
194
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
195
  @GuardedBy("lock")
196
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
197
  private final ConnectionSpec connectionSpec;
198
  private KeepAliveManager keepAliveManager;
199
  private boolean enableKeepAlive;
200
  private long keepAliveTimeNanos;
201
  private long keepAliveTimeoutNanos;
202
  private boolean keepAliveWithoutCalls;
203
  private final Runnable tooManyPingsRunnable;
204
  private final int maxInboundMetadataSize;
205
  private final boolean useGetForSafeMethods;
206
  @GuardedBy("lock")
207
  private final TransportTracer transportTracer;
208
  @GuardedBy("lock")
1✔
209
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
210
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
211
        @Override
212
        protected void handleInUse() {
213
          listener.transportInUse(true);
1✔
214
        }
1✔
215

216
        @Override
217
        protected void handleNotInUse() {
218
          listener.transportInUse(false);
1✔
219
        }
1✔
220
      };
221
  @GuardedBy("lock")
222
  private InternalChannelz.Security securityInfo;
223

224
  @VisibleForTesting
225
  @Nullable
226
  final HttpConnectProxiedSocketAddress proxiedAddr;
227

228
  @VisibleForTesting
1✔
229
  int proxySocketTimeout = 30000;
230

231
  // The following fields should only be used for test.
232
  Runnable connectingCallback;
233
  SettableFuture<Void> connectedFuture;
234

235
  public OkHttpClientTransport(
236
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
237
      InetSocketAddress address,
238
      String authority,
239
      @Nullable String userAgent,
240
      Attributes eagAttrs,
241
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
242
      Runnable tooManyPingsRunnable) {
243
    this(
1✔
244
        transportFactory,
245
        address,
246
        authority,
247
        userAgent,
248
        eagAttrs,
249
        GrpcUtil.STOPWATCH_SUPPLIER,
250
        new Http2(),
251
        proxiedAddr,
252
        tooManyPingsRunnable);
253
  }
1✔
254

255
  private OkHttpClientTransport(
256
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
257
      InetSocketAddress address,
258
      String authority,
259
      @Nullable String userAgent,
260
      Attributes eagAttrs,
261
      Supplier<Stopwatch> stopwatchFactory,
262
      Variant variant,
263
      @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
264
      Runnable tooManyPingsRunnable) {
1✔
265
    this.address = Preconditions.checkNotNull(address, "address");
1✔
266
    this.defaultAuthority = authority;
1✔
267
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
268
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
269
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
270
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
271
    this.scheduler = Preconditions.checkNotNull(
1✔
272
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
273
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
274
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
275
    nextStreamId = 3;
1✔
276
    this.socketFactory = transportFactory.socketFactory == null
1✔
277
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
278
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
279
    this.hostnameVerifier = transportFactory.hostnameVerifier;
1✔
280
    this.connectionSpec = Preconditions.checkNotNull(
1✔
281
        transportFactory.connectionSpec, "connectionSpec");
282
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
283
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
284
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
285
    this.proxiedAddr = proxiedAddr;
1✔
286
    this.tooManyPingsRunnable =
1✔
287
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
288
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
289
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
290
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
291
    this.attributes = Attributes.newBuilder()
1✔
292
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
293
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
294
    initTransportTracer();
1✔
295
  }
1✔
296

297
  /**
298
   * Create a transport connected to a fake peer for test.
299
   */
300
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
301
  @VisibleForTesting
302
  OkHttpClientTransport(
303
      OkHttpChannelBuilder.OkHttpTransportFactory transportFactory,
304
      String userAgent,
305
      Supplier<Stopwatch> stopwatchFactory,
306
      Variant variant,
307
      @Nullable Runnable connectingCallback,
308
      SettableFuture<Void> connectedFuture,
309
      Runnable tooManyPingsRunnable) {
310
    this(
1✔
311
        transportFactory,
312
        new InetSocketAddress("127.0.0.1", 80),
313
        "notarealauthority:80",
314
        userAgent,
315
        Attributes.EMPTY,
316
        stopwatchFactory,
317
        variant,
318
        null,
319
        tooManyPingsRunnable);
320
    this.connectingCallback = connectingCallback;
1✔
321
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
322
  }
1✔
323

324
  // sslSocketFactory is set to null when use plaintext.
325
  boolean isUsingPlaintext() {
326
    return sslSocketFactory == null;
1✔
327
  }
328

329
  private void initTransportTracer() {
330
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
331
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
332
        @Override
333
        public TransportTracer.FlowControlWindows read() {
334
          synchronized (lock) {
1✔
335
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
336
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
337
            // provide a lower bound.
338
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
339
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
340
          }
341
        }
342
      });
343
    }
1✔
344
  }
1✔
345

346
  /**
347
   * Enable keepalive with custom delay and timeout.
348
   */
349
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
350
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
351
    enableKeepAlive = enable;
×
352
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
353
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
354
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
355
  }
×
356

357
  @Override
358
  public void ping(final PingCallback callback, Executor executor) {
359
    long data = 0;
1✔
360
    Http2Ping p;
361
    boolean writePing;
362
    synchronized (lock) {
1✔
363
      checkState(frameWriter != null);
1✔
364
      if (stopped) {
1✔
365
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
366
        return;
1✔
367
      }
368
      if (ping != null) {
1✔
369
        // we only allow one outstanding ping at a time, so just add the callback to
370
        // any outstanding operation
371
        p = ping;
1✔
372
        writePing = false;
1✔
373
      } else {
374
        // set outstanding operation and then write the ping after releasing lock
375
        data = random.nextLong();
1✔
376
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
377
        stopwatch.start();
1✔
378
        p = ping = new Http2Ping(data, stopwatch);
1✔
379
        writePing = true;
1✔
380
        transportTracer.reportKeepAliveSent();
1✔
381
      }
382
      if (writePing) {
1✔
383
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
384
      }
385
    }
1✔
386
    // If transport concurrently failed/stopped since we released the lock above, this could
387
    // immediately invoke callback (which we shouldn't do while holding a lock)
388
    p.addCallback(callback, executor);
1✔
389
  }
1✔
390

391
  @Override
392
  public OkHttpClientStream newStream(
393
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
394
      ClientStreamTracer[] tracers) {
395
    Preconditions.checkNotNull(method, "method");
1✔
396
    Preconditions.checkNotNull(headers, "headers");
1✔
397
    StatsTraceContext statsTraceContext =
1✔
398
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
399
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
400
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
401
      return new OkHttpClientStream(
1✔
402
          method,
403
          headers,
404
          frameWriter,
405
          OkHttpClientTransport.this,
406
          outboundFlow,
407
          lock,
408
          maxMessageSize,
409
          initialWindowSize,
410
          defaultAuthority,
411
          userAgent,
412
          statsTraceContext,
413
          transportTracer,
414
          callOptions,
415
          useGetForSafeMethods);
416
    }
417
  }
418

419
  @GuardedBy("lock")
420
  void streamReadyToStart(OkHttpClientStream clientStream) {
421
    if (goAwayStatus != null) {
1✔
422
      clientStream.transportState().transportReportStatus(
1✔
423
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
424
    } else if (streams.size() >= maxConcurrentStreams) {
1✔
425
      pendingStreams.add(clientStream);
1✔
426
      setInUse(clientStream);
1✔
427
    } else {
428
      startStream(clientStream);
1✔
429
    }
430
  }
1✔
431

432
  @SuppressWarnings("GuardedBy")
433
  @GuardedBy("lock")
434
  private void startStream(OkHttpClientStream stream) {
435
    Preconditions.checkState(
1✔
436
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
437
    streams.put(nextStreamId, stream);
1✔
438
    setInUse(stream);
1✔
439
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
440
    // found: 'this.lock'
441
    stream.transportState().start(nextStreamId);
1✔
442
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
443
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
444
        || stream.useGet()) {
1✔
445
      frameWriter.flush();
1✔
446
    }
447
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
448
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
449
      // correctly.
450
      nextStreamId = Integer.MAX_VALUE;
1✔
451
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
452
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
453
    } else {
454
      nextStreamId += 2;
1✔
455
    }
456
  }
1✔
457

458
  /**
459
   * Starts pending streams, returns true if at least one pending stream is started.
460
   */
461
  @GuardedBy("lock")
462
  private boolean startPendingStreams() {
463
    boolean hasStreamStarted = false;
1✔
464
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
465
      OkHttpClientStream stream = pendingStreams.poll();
1✔
466
      startStream(stream);
1✔
467
      hasStreamStarted = true;
1✔
468
    }
1✔
469
    return hasStreamStarted;
1✔
470
  }
471

472
  /**
473
   * Removes given pending stream, used when a pending stream is cancelled.
474
   */
475
  @GuardedBy("lock")
476
  void removePendingStream(OkHttpClientStream pendingStream) {
477
    pendingStreams.remove(pendingStream);
1✔
478
    maybeClearInUse(pendingStream);
1✔
479
  }
1✔
480

481
  @Override
482
  public Runnable start(Listener listener) {
483
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
484

485
    if (enableKeepAlive) {
1✔
486
      keepAliveManager = new KeepAliveManager(
×
487
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
488
          keepAliveWithoutCalls);
489
      keepAliveManager.onTransportStarted();
×
490
    }
491

492
    int maxQueuedControlFrames = 10000;
1✔
493
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
494
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
495
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
496

497
    synchronized (lock) {
1✔
498
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
499
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
500
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
501
      // the same TransportExceptionHandler instance so it is all mixed back together.
502
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
503
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
504
    }
1✔
505
    final CountDownLatch latch = new CountDownLatch(1);
1✔
506
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
507
    // The transport needs up to two threads to function once started,
508
    // but only needs one during handshaking. Start another thread during handshaking
509
    // to make sure there's still a free thread available. If the number of threads is exhausted,
510
    // it is better to kill the transport than for all the transports to hang unable to send.
511
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
512
    // Connecting in the serializingExecutor, so that some stream operations like synStream
513
    // will be executed after connected.
514

515
    serializingExecutor.execute(new Runnable() {
1✔
516
      @Override
517
      public void run() {
518
        // This is a hack to make sure the connection preface and initial settings to be sent out
519
        // without blocking the start. By doing this essentially prevents potential deadlock when
520
        // network is not available during startup while another thread holding lock to send the
521
        // initial preface.
522
        try {
523
          latch.await();
1✔
524
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
525
        } catch (InterruptedException e) {
×
526
          Thread.currentThread().interrupt();
×
527
        } catch (TimeoutException | BrokenBarrierException e) {
1✔
528
          startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
529
              .withDescription("Timed out waiting for second handshake thread. "
1✔
530
                + "The transport executor pool may have run out of threads"));
531
          return;
1✔
532
        }
1✔
533
        // Use closed source on failure so that the reader immediately shuts down.
534
        BufferedSource source = Okio.buffer(new Source() {
1✔
535
          @Override
536
          public long read(Buffer sink, long byteCount) {
537
            return -1;
1✔
538
          }
539

540
          @Override
541
          public Timeout timeout() {
542
            return Timeout.NONE;
×
543
          }
544

545
          @Override
546
          public void close() {
547
          }
1✔
548
        });
549
        Socket sock;
550
        SSLSession sslSession = null;
1✔
551
        try {
552
          if (proxiedAddr == null) {
1✔
553
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
554
          } else {
555
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
556
              sock = createHttpProxySocket(
1✔
557
                  proxiedAddr.getTargetAddress(),
1✔
558
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
559
                  proxiedAddr.getUsername(),
1✔
560
                  proxiedAddr.getPassword()
1✔
561
              );
562
            } else {
563
              throw Status.INTERNAL.withDescription(
×
564
                  "Unsupported SocketAddress implementation "
565
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
566
            }
567
          }
568
          if (sslSocketFactory != null) {
1✔
569
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
570
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
571
                connectionSpec);
1✔
572
            sslSession = sslSocket.getSession();
1✔
573
            sock = sslSocket;
1✔
574
          }
575
          sock.setTcpNoDelay(true);
1✔
576
          source = Okio.buffer(Okio.source(sock));
1✔
577
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
578

579
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
580
          attributes = attributes.toBuilder()
1✔
581
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
582
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
583
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
584
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
585
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
586
              .build();
1✔
587
        } catch (StatusException e) {
1✔
588
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
589
          return;
1✔
590
        } catch (Exception e) {
1✔
591
          onException(e);
1✔
592
          return;
1✔
593
        } finally {
594
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
595
          latchForExtraThread.countDown();
1✔
596
        }
597
        synchronized (lock) {
1✔
598
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
599
          if (sslSession != null) {
1✔
600
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
601
          }
602
        }
1✔
603
      }
1✔
604
    });
605

606
    executor.execute(new Runnable() {
1✔
607
      @Override
608
      public void run() {
609
        try {
610
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
611
          latchForExtraThread.await();
1✔
612
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
613
          // Something bad happened, maybe too few threads available!
614
          // This will be handled in the handshake thread.
615
        } catch (InterruptedException e) {
×
616
          Thread.currentThread().interrupt();
×
617
        }
1✔
618
      }
1✔
619
    });
620
    // Schedule to send connection preface & settings before any other write.
621
    try {
622
      sendConnectionPrefaceAndSettings();
1✔
623
    } finally {
624
      latch.countDown();
1✔
625
    }
626

627
    serializingExecutor.execute(new Runnable() {
1✔
628
      @Override
629
      public void run() {
630
        if (connectingCallback != null) {
1✔
631
          connectingCallback.run();
1✔
632
        }
633
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
634
        // may send goAway immediately.
635
        executor.execute(clientFrameHandler);
1✔
636
        synchronized (lock) {
1✔
637
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
638
          startPendingStreams();
1✔
639
        }
1✔
640
        if (connectedFuture != null) {
1✔
641
          connectedFuture.set(null);
1✔
642
        }
643
      }
1✔
644
    });
645
    return null;
1✔
646
  }
647

648
  /**
649
   * Should only be called once when the transport is first established.
650
   */
651
  private void sendConnectionPrefaceAndSettings() {
652
    synchronized (lock) {
1✔
653
      frameWriter.connectionPreface();
1✔
654
      Settings settings = new Settings();
1✔
655
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
656
      frameWriter.settings(settings);
1✔
657
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
658
        frameWriter.windowUpdate(
1✔
659
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
660
      }
661
    }
1✔
662
  }
1✔
663

664
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
665
      String proxyUsername, String proxyPassword) throws StatusException {
666
    Socket sock = null;
1✔
667
    try {
668
      // The proxy address may not be resolved
669
      if (proxyAddress.getAddress() != null) {
1✔
670
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
671
      } else {
672
        sock =
×
673
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
674
      }
675
      sock.setTcpNoDelay(true);
1✔
676
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
677
      // can cause reading from the socket to hang.
678
      sock.setSoTimeout(proxySocketTimeout);
1✔
679

680
      Source source = Okio.source(sock);
1✔
681
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
682

683
      // Prepare headers and request method line
684
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
685
      HttpUrl url = proxyRequest.httpUrl();
1✔
686
      String requestLine =
1✔
687
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
688

689
      // Write request to socket
690
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
691
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
692
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
693
            .writeUtf8(": ")
1✔
694
            .writeUtf8(proxyRequest.headers().value(i))
1✔
695
            .writeUtf8("\r\n");
1✔
696
      }
697
      sink.writeUtf8("\r\n");
1✔
698
      // Flush buffer (flushes socket and sends request)
699
      sink.flush();
1✔
700

701
      // Read status line, check if 2xx was returned
702
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
703
      // Drain rest of headers
704
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
705
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
706
        Buffer body = new Buffer();
1✔
707
        try {
708
          sock.shutdownOutput();
1✔
709
          source.read(body, 1024);
1✔
710
        } catch (IOException ex) {
×
711
          body.writeUtf8("Unable to read body: " + ex.toString());
×
712
        }
1✔
713
        try {
714
          sock.close();
1✔
715
        } catch (IOException ignored) {
×
716
          // ignored
717
        }
1✔
718
        String message = String.format(
1✔
719
            Locale.US,
720
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
721
              + "Response body:\n%s",
722
            statusLine.code, statusLine.message, body.readUtf8());
1✔
723
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
724
      }
725
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
726
      sock.setSoTimeout(0);
1✔
727
      return sock;
1✔
728
    } catch (IOException e) {
1✔
729
      if (sock != null) {
1✔
730
        GrpcUtil.closeQuietly(sock);
1✔
731
      }
732
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
733
          .asException();
1✔
734
    }
735
  }
736

737
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
738
                                         String proxyPassword) {
739
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
740
        .scheme("https")
1✔
741
        .host(address.getHostName())
1✔
742
        .port(address.getPort())
1✔
743
        .build();
1✔
744

745
    Request.Builder request = new Request.Builder()
1✔
746
        .url(tunnelUrl)
1✔
747
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
748
        .header("User-Agent", userAgent);
1✔
749

750
    // If we have proxy credentials, set them right away
751
    if (proxyUsername != null && proxyPassword != null) {
1✔
752
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
753
    }
754
    return request.build();
1✔
755
  }
756

757
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
758
    Buffer buffer = new Buffer();
1✔
759
    while (true) {
760
      if (source.read(buffer, 1) == -1) {
1✔
761
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
762
      }
763
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
764
        return buffer.readUtf8LineStrict();
1✔
765
      }
766
    }
767
  }
768

769
  @Override
770
  public String toString() {
771
    return MoreObjects.toStringHelper(this)
1✔
772
        .add("logId", logId.getId())
1✔
773
        .add("address", address)
1✔
774
        .toString();
1✔
775
  }
776

777
  @Override
778
  public InternalLogId getLogId() {
779
    return logId;
1✔
780
  }
781

782
  /**
783
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
784
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
785
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
786
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
787
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
788
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
789
   * connection to the port, independent of this function.
790
   *
791
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
792
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
793
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
794
   */
795
  @VisibleForTesting
796
  String getOverridenHost() {
797
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
798
    if (uri.getHost() != null) {
1✔
799
      return uri.getHost();
1✔
800
    }
801

802
    return defaultAuthority;
1✔
803
  }
804

805
  @VisibleForTesting
806
  int getOverridenPort() {
807
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
808
    if (uri.getPort() != -1) {
1✔
809
      return uri.getPort();
×
810
    }
811

812
    return address.getPort();
1✔
813
  }
814

815
  @Override
816
  public void shutdown(Status reason) {
817
    synchronized (lock) {
1✔
818
      if (goAwayStatus != null) {
1✔
819
        return;
1✔
820
      }
821

822
      goAwayStatus = reason;
1✔
823
      listener.transportShutdown(goAwayStatus);
1✔
824
      stopIfNecessary();
1✔
825
    }
1✔
826
  }
1✔
827

828
  @Override
829
  public void shutdownNow(Status reason) {
830
    shutdown(reason);
1✔
831
    synchronized (lock) {
1✔
832
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
833
      while (it.hasNext()) {
1✔
834
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
835
        it.remove();
1✔
836
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
837
        maybeClearInUse(entry.getValue());
1✔
838
      }
1✔
839

840
      for (OkHttpClientStream stream : pendingStreams) {
1✔
841
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
842
        // chance to retry and be routed to another connection.
843
        stream.transportState().transportReportStatus(
1✔
844
            reason, RpcProgress.MISCARRIED, true, new Metadata());
845
        maybeClearInUse(stream);
1✔
846
      }
1✔
847
      pendingStreams.clear();
1✔
848

849
      stopIfNecessary();
1✔
850
    }
1✔
851
  }
1✔
852

853
  @Override
854
  public Attributes getAttributes() {
855
    return attributes;
1✔
856
  }
857

858
  /**
859
   * Gets all active streams as an array.
860
   */
861
  @Override
862
  public OutboundFlowController.StreamState[] getActiveStreams() {
863
    synchronized (lock) {
1✔
864
      OutboundFlowController.StreamState[] flowStreams =
1✔
865
          new OutboundFlowController.StreamState[streams.size()];
1✔
866
      int i = 0;
1✔
867
      for (OkHttpClientStream stream : streams.values()) {
1✔
868
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
869
      }
1✔
870
      return flowStreams;
1✔
871
    }
872
  }
873

874
  @VisibleForTesting
875
  ClientFrameHandler getHandler() {
876
    return clientFrameHandler;
1✔
877
  }
878

879
  @VisibleForTesting
880
  SocketFactory getSocketFactory() {
881
    return socketFactory;
1✔
882
  }
883

884
  @VisibleForTesting
885
  int getPendingStreamSize() {
886
    synchronized (lock) {
1✔
887
      return pendingStreams.size();
1✔
888
    }
889
  }
890

891
  @VisibleForTesting
892
  void setNextStreamId(int nextStreamId) {
893
    synchronized (lock) {
1✔
894
      this.nextStreamId = nextStreamId;
1✔
895
    }
1✔
896
  }
1✔
897

898
  /**
899
   * Finish all active streams due to an IOException, then close the transport.
900
   */
901
  @Override
902
  public void onException(Throwable failureCause) {
903
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
904
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
905
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
906
  }
1✔
907

908
  /**
909
   * Send GOAWAY to the server, then finish all active streams and close the transport.
910
   */
911
  private void onError(ErrorCode errorCode, String moreDetail) {
912
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
913
  }
1✔
914

915
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
916
    synchronized (lock) {
1✔
917
      if (goAwayStatus == null) {
1✔
918
        goAwayStatus = status;
1✔
919
        listener.transportShutdown(status);
1✔
920
      }
921
      if (errorCode != null && !goAwaySent) {
1✔
922
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
923
        // streams. The GOAWAY is part of graceful shutdown.
924
        goAwaySent = true;
1✔
925
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
926
      }
927

928
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
929
      while (it.hasNext()) {
1✔
930
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
931
        if (entry.getKey() > lastKnownStreamId) {
1✔
932
          it.remove();
1✔
933
          entry.getValue().transportState().transportReportStatus(
1✔
934
              status, RpcProgress.REFUSED, false, new Metadata());
935
          maybeClearInUse(entry.getValue());
1✔
936
        }
937
      }
1✔
938

939
      for (OkHttpClientStream stream : pendingStreams) {
1✔
940
        stream.transportState().transportReportStatus(
1✔
941
            status, RpcProgress.MISCARRIED, true, new Metadata());
942
        maybeClearInUse(stream);
1✔
943
      }
1✔
944
      pendingStreams.clear();
1✔
945

946
      stopIfNecessary();
1✔
947
    }
1✔
948
  }
1✔
949

950
  /**
951
   * Called when a stream is closed. We do things like:
952
   * <ul>
953
   * <li>Removing the stream from the map.
954
   * <li>Optionally reporting the status.
955
   * <li>Starting pending streams if we can.
956
   * <li>Stopping the transport if this is the last live stream under a go-away status.
957
   * </ul>
958
   *
959
   * @param streamId the Id of the stream.
960
   * @param status the final status of this stream, null means no need to report.
961
   * @param stopDelivery interrupt queued messages in the deframer
962
   * @param errorCode reset the stream with this ErrorCode if not null.
963
   * @param trailers the trailers received if not null
964
   */
965
  void finishStream(
966
      int streamId,
967
      @Nullable Status status,
968
      RpcProgress rpcProgress,
969
      boolean stopDelivery,
970
      @Nullable ErrorCode errorCode,
971
      @Nullable Metadata trailers) {
972
    synchronized (lock) {
1✔
973
      OkHttpClientStream stream = streams.remove(streamId);
1✔
974
      if (stream != null) {
1✔
975
        if (errorCode != null) {
1✔
976
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
977
        }
978
        if (status != null) {
1✔
979
          stream
1✔
980
              .transportState()
1✔
981
              .transportReportStatus(
1✔
982
                  status,
983
                  rpcProgress,
984
                  stopDelivery,
985
                  trailers != null ? trailers : new Metadata());
1✔
986
        }
987
        if (!startPendingStreams()) {
1✔
988
          stopIfNecessary();
1✔
989
        }
990
        maybeClearInUse(stream);
1✔
991
      }
992
    }
1✔
993
  }
1✔
994

995
  /**
996
   * When the transport is in goAway state, we should stop it once all active streams finish.
997
   */
998
  @GuardedBy("lock")
999
  private void stopIfNecessary() {
1000
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1001
      return;
1✔
1002
    }
1003
    if (stopped) {
1✔
1004
      return;
1✔
1005
    }
1006
    stopped = true;
1✔
1007

1008
    if (keepAliveManager != null) {
1✔
1009
      keepAliveManager.onTransportTermination();
×
1010
    }
1011

1012
    if (ping != null) {
1✔
1013
      ping.failed(getPingFailure());
1✔
1014
      ping = null;
1✔
1015
    }
1016

1017
    if (!goAwaySent) {
1✔
1018
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1019
      // streams. The GOAWAY is part of graceful shutdown.
1020
      goAwaySent = true;
1✔
1021
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1022
    }
1023

1024
    // We will close the underlying socket in the writing thread to break out the reader
1025
    // thread, which will close the frameReader and notify the listener.
1026
    frameWriter.close();
1✔
1027
  }
1✔
1028

1029
  @GuardedBy("lock")
1030
  private void maybeClearInUse(OkHttpClientStream stream) {
1031
    if (hasStream) {
1✔
1032
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1033
        hasStream = false;
1✔
1034
        if (keepAliveManager != null) {
1✔
1035
          // We don't have any active streams. No need to do keepalives any more.
1036
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1037
          // and onTransportActive.
1038
          keepAliveManager.onTransportIdle();
×
1039
        }
1040
      }
1041
    }
1042
    if (stream.shouldBeCountedForInUse()) {
1✔
1043
      inUseState.updateObjectInUse(stream, false);
1✔
1044
    }
1045
  }
1✔
1046

1047
  @GuardedBy("lock")
1048
  private void setInUse(OkHttpClientStream stream) {
1049
    if (!hasStream) {
1✔
1050
      hasStream = true;
1✔
1051
      if (keepAliveManager != null) {
1✔
1052
        // We have a new stream. We might need to do keepalives now.
1053
        // Note that we have to do this inside the lock to avoid calling
1054
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1055
        // order.
1056
        keepAliveManager.onTransportActive();
×
1057
      }
1058
    }
1059
    if (stream.shouldBeCountedForInUse()) {
1✔
1060
      inUseState.updateObjectInUse(stream, true);
1✔
1061
    }
1062
  }
1✔
1063

1064
  private Throwable getPingFailure() {
1065
    synchronized (lock) {
1✔
1066
      if (goAwayStatus != null) {
1✔
1067
        return goAwayStatus.asException();
1✔
1068
      } else {
1069
        return Status.UNAVAILABLE.withDescription("Connection closed").asException();
×
1070
      }
1071
    }
1072
  }
1073

1074
  boolean mayHaveCreatedStream(int streamId) {
1075
    synchronized (lock) {
1✔
1076
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1077
    }
1078
  }
1079

1080
  OkHttpClientStream getStream(int streamId) {
1081
    synchronized (lock) {
1✔
1082
      return streams.get(streamId);
1✔
1083
    }
1084
  }
1085

1086
  /**
1087
   * Returns a Grpc status corresponding to the given ErrorCode.
1088
   */
1089
  @VisibleForTesting
1090
  static Status toGrpcStatus(ErrorCode code) {
1091
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1092
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1093
        "Unknown http2 error code: " + code.httpCode);
1094
  }
1095

1096
  @Override
1097
  public ListenableFuture<SocketStats> getStats() {
1098
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1099
    synchronized (lock) {
1✔
1100
      if (socket == null) {
1✔
1101
        ret.set(new SocketStats(
×
1102
            transportTracer.getStats(),
×
1103
            /*local=*/ null,
1104
            /*remote=*/ null,
1105
            new InternalChannelz.SocketOptions.Builder().build(),
×
1106
            /*security=*/ null));
1107
      } else {
1108
        ret.set(new SocketStats(
1✔
1109
            transportTracer.getStats(),
1✔
1110
            socket.getLocalSocketAddress(),
1✔
1111
            socket.getRemoteSocketAddress(),
1✔
1112
            Utils.getSocketOptions(socket),
1✔
1113
            securityInfo));
1114
      }
1115
      return ret;
1✔
1116
    }
1117
  }
1118

1119
  /**
1120
   * Runnable which reads frames and dispatches them to in flight calls.
1121
   */
1122
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1123

1124
    private final OkHttpFrameLogger logger =
1✔
1125
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1126
    FrameReader frameReader;
1127
    boolean firstSettings = true;
1✔
1128

1129
    ClientFrameHandler(FrameReader frameReader) {
1✔
1130
      this.frameReader = frameReader;
1✔
1131
    }
1✔
1132

1133
    @Override
1134
    @SuppressWarnings("Finally")
1135
    public void run() {
1136
      String threadName = Thread.currentThread().getName();
1✔
1137
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1138
      try {
1139
        // Read until the underlying socket closes.
1140
        while (frameReader.nextFrame(this)) {
1✔
1141
          if (keepAliveManager != null) {
1✔
1142
            keepAliveManager.onDataReceived();
×
1143
          }
1144
        }
1145
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1146
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1147
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1148
        Status status;
1149
        synchronized (lock) {
1✔
1150
          status = goAwayStatus;
1✔
1151
        }
1✔
1152
        if (status == null) {
1✔
1153
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1154
        }
1155
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1156
      } catch (Throwable t) {
1✔
1157
        // TODO(madongfly): Send the exception message to the server.
1158
        startGoAway(
1✔
1159
            0,
1160
            ErrorCode.PROTOCOL_ERROR,
1161
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1162
      } finally {
1163
        try {
1164
          frameReader.close();
1✔
1165
        } catch (IOException ex) {
×
1166
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1167
        } catch (RuntimeException e) {
×
1168
          // This same check is done in okhttp proper:
1169
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1170

1171
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1172
          // https://issuetracker.google.com/issues/177450597
1173
          if (!"bio == null".equals(e.getMessage())) {
×
1174
            throw e;
×
1175
          }
1176
        }
1✔
1177
        listener.transportTerminated();
1✔
1178
        Thread.currentThread().setName(threadName);
1✔
1179
      }
1180
    }
1✔
1181

1182
    /**
1183
     * Handle an HTTP2 DATA frame.
1184
     */
1185
    @SuppressWarnings("GuardedBy")
1186
    @Override
1187
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1188
                     int paddedLength)
1189
        throws IOException {
1190
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1191
          streamId, in.getBuffer(), length, inFinished);
1✔
1192
      OkHttpClientStream stream = getStream(streamId);
1✔
1193
      if (stream == null) {
1✔
1194
        if (mayHaveCreatedStream(streamId)) {
1✔
1195
          synchronized (lock) {
1✔
1196
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1197
          }
1✔
1198
          in.skip(length);
1✔
1199
        } else {
1200
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1201
          return;
1✔
1202
        }
1203
      } else {
1204
        // Wait until the frame is complete.
1205
        in.require(length);
1✔
1206

1207
        Buffer buf = new Buffer();
1✔
1208
        buf.write(in.getBuffer(), length);
1✔
1209
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1210
            stream.transportState().tag());
1✔
1211
        synchronized (lock) {
1✔
1212
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1213
          // instead found: 'OkHttpClientTransport.this.lock'
1214
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1215
        }
1✔
1216
      }
1217

1218
      // connection window update
1219
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1220
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1221
        synchronized (lock) {
1✔
1222
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1223
        }
1✔
1224
        connectionUnacknowledgedBytesRead = 0;
1✔
1225
      }
1226
    }
1✔
1227

1228
    /**
1229
     * Handle HTTP2 HEADER and CONTINUATION frames.
1230
     */
1231
    @SuppressWarnings("GuardedBy")
1232
    @Override
1233
    public void headers(boolean outFinished,
1234
        boolean inFinished,
1235
        int streamId,
1236
        int associatedStreamId,
1237
        List<Header> headerBlock,
1238
        HeadersMode headersMode) {
1239
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1240
      boolean unknownStream = false;
1✔
1241
      Status failedStatus = null;
1✔
1242
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1243
        int metadataSize = headerBlockSize(headerBlock);
1✔
1244
        if (metadataSize > maxInboundMetadataSize) {
1✔
1245
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1246
              String.format(
1✔
1247
                  Locale.US,
1248
                  "Response %s metadata larger than %d: %d",
1249
                  inFinished ? "trailer" : "header",
1✔
1250
                  maxInboundMetadataSize,
1✔
1251
                  metadataSize));
1✔
1252
        }
1253
      }
1254
      synchronized (lock) {
1✔
1255
        OkHttpClientStream stream = streams.get(streamId);
1✔
1256
        if (stream == null) {
1✔
1257
          if (mayHaveCreatedStream(streamId)) {
1✔
1258
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1259
          } else {
1260
            unknownStream = true;
1✔
1261
          }
1262
        } else {
1263
          if (failedStatus == null) {
1✔
1264
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1265
                stream.transportState().tag());
1✔
1266
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1267
            // instead found: 'OkHttpClientTransport.this.lock'
1268
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1269
          } else {
1270
            if (!inFinished) {
1✔
1271
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1272
            }
1273
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1274
          }
1275
        }
1276
      }
1✔
1277
      if (unknownStream) {
1✔
1278
        // We don't expect any server-initiated streams.
1279
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1280
      }
1281
    }
1✔
1282

1283
    private int headerBlockSize(List<Header> headerBlock) {
1284
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1285
      long size = 0;
1✔
1286
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1287
        Header header = headerBlock.get(i);
1✔
1288
        size += 32 + header.name.size() + header.value.size();
1✔
1289
      }
1290
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1291
      return (int) size;
1✔
1292
    }
1293

1294
    @Override
1295
    public void rstStream(int streamId, ErrorCode errorCode) {
1296
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1297
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1298
      boolean stopDelivery =
1✔
1299
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1300
      synchronized (lock) {
1✔
1301
        OkHttpClientStream stream = streams.get(streamId);
1✔
1302
        if (stream != null) {
1✔
1303
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1304
              stream.transportState().tag());
1✔
1305
          finishStream(
1✔
1306
              streamId, status,
1307
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1308
              stopDelivery, null, null);
1309
        }
1310
      }
1✔
1311
    }
1✔
1312

1313
    @Override
1314
    public void settings(boolean clearPrevious, Settings settings) {
1315
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1316
      boolean outboundWindowSizeIncreased = false;
1✔
1317
      synchronized (lock) {
1✔
1318
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1319
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1320
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1321
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1322
        }
1323

1324
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1325
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1326
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1327
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1328
        }
1329
        if (firstSettings) {
1✔
1330
          attributes = listener.filterTransport(attributes);
1✔
1331
          listener.transportReady();
1✔
1332
          firstSettings = false;
1✔
1333
        }
1334

1335
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1336
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1337
        // otherwise it will cause a stream error (RST_STREAM).
1338
        frameWriter.ackSettings(settings);
1✔
1339

1340
        // send any pending bytes / streams
1341
        if (outboundWindowSizeIncreased) {
1✔
1342
          outboundFlow.writeStreams();
1✔
1343
        }
1344
        startPendingStreams();
1✔
1345
      }
1✔
1346
    }
1✔
1347

1348
    @Override
1349
    public void ping(boolean ack, int payload1, int payload2) {
1350
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1351
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1352
      if (!ack) {
1✔
1353
        synchronized (lock) {
1✔
1354
          frameWriter.ping(true, payload1, payload2);
1✔
1355
        }
1✔
1356
      } else {
1357
        Http2Ping p = null;
1✔
1358
        synchronized (lock) {
1✔
1359
          if (ping != null) {
1✔
1360
            if (ping.payload() == ackPayload) {
1✔
1361
              p = ping;
1✔
1362
              ping = null;
1✔
1363
            } else {
1364
              log.log(Level.WARNING, String.format(
1✔
1365
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1366
                  ping.payload(), ackPayload));
1✔
1367
            }
1368
          } else {
1369
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1370
          }
1371
        }
1✔
1372
        // don't complete it while holding lock since callbacks could run immediately
1373
        if (p != null) {
1✔
1374
          p.complete();
1✔
1375
        }
1376
      }
1377
    }
1✔
1378

1379
    @Override
1380
    public void ackSettings() {
1381
      // Do nothing currently.
1382
    }
1✔
1383

1384
    @Override
1385
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1386
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1387
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1388
        String data = debugData.utf8();
1✔
1389
        log.log(Level.WARNING, String.format(
1✔
1390
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1391
        if ("too_many_pings".equals(data)) {
1✔
1392
          tooManyPingsRunnable.run();
1✔
1393
        }
1394
      }
1395
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1396
          .augmentDescription("Received Goaway");
1✔
1397
      if (debugData.size() > 0) {
1✔
1398
        // If a debug message was provided, use it.
1399
        status = status.augmentDescription(debugData.utf8());
1✔
1400
      }
1401
      startGoAway(lastGoodStreamId, null, status);
1✔
1402
    }
1✔
1403

1404
    @Override
1405
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1406
        throws IOException {
1407
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1408
          streamId, promisedStreamId, requestHeaders);
1409
      // We don't accept server initiated stream.
1410
      synchronized (lock) {
1✔
1411
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1412
      }
1✔
1413
    }
1✔
1414

1415
    @Override
1416
    public void windowUpdate(int streamId, long delta) {
1417
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1418
      if (delta == 0) {
1✔
1419
        String errorMsg = "Received 0 flow control window increment.";
×
1420
        if (streamId == 0) {
×
1421
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1422
        } else {
1423
          finishStream(
×
1424
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1425
              ErrorCode.PROTOCOL_ERROR, null);
1426
        }
1427
        return;
×
1428
      }
1429

1430
      boolean unknownStream = false;
1✔
1431
      synchronized (lock) {
1✔
1432
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1433
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1434
          return;
1✔
1435
        }
1436

1437
        OkHttpClientStream stream = streams.get(streamId);
1✔
1438
        if (stream != null) {
1✔
1439
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1440
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1441
          unknownStream = true;
1✔
1442
        }
1443
      }
1✔
1444
      if (unknownStream) {
1✔
1445
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1446
            "Received window_update for unknown stream: " + streamId);
1447
      }
1448
    }
1✔
1449

1450
    @Override
1451
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1452
      // Ignore priority change.
1453
      // TODO(madongfly): log
1454
    }
×
1455

1456
    @Override
1457
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1458
        int port, long maxAge) {
1459
      // TODO(madongfly): Deal with alternateService propagation
1460
    }
×
1461
  }
1462
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc