• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20141

07 Jan 2026 03:51PM UTC coverage: 88.665% (+0.006%) from 88.659%
#20141

push

github

ejona86
Catch Errors when calling complex parsing code

When we have involved parsing/validation logic, it would be easy to have
bugs. There's little reason we can't handle those bugs, as the logic
shouldn't be modifying global state. While the current config is bad or
trigger a bug, we want to keep working until we get good/working config
again.

For XDS, we do have the problem of losing the exception's stack trace.
We could consider increasing the log level, but we could also consider
propagating the error to the listener. Let's skip figuring out that
exact behavior for now, and just move a step in the right direction.

35326 of 39842 relevant lines covered (88.67%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.21
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import com.google.errorprone.annotations.concurrent.GuardedBy;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.ChannelCredentials;
34
import io.grpc.ClientStreamTracer;
35
import io.grpc.Grpc;
36
import io.grpc.HttpConnectProxiedSocketAddress;
37
import io.grpc.InternalChannelz;
38
import io.grpc.InternalChannelz.SocketStats;
39
import io.grpc.InternalLogId;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.MethodDescriptor.MethodType;
43
import io.grpc.SecurityLevel;
44
import io.grpc.Status;
45
import io.grpc.Status.Code;
46
import io.grpc.StatusException;
47
import io.grpc.TlsChannelCredentials;
48
import io.grpc.internal.CertificateUtils;
49
import io.grpc.internal.ClientStreamListener.RpcProgress;
50
import io.grpc.internal.ConnectionClientTransport;
51
import io.grpc.internal.DisconnectError;
52
import io.grpc.internal.GoAwayDisconnectError;
53
import io.grpc.internal.GrpcAttributes;
54
import io.grpc.internal.GrpcUtil;
55
import io.grpc.internal.Http2Ping;
56
import io.grpc.internal.InUseStateAggregator;
57
import io.grpc.internal.KeepAliveManager;
58
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
59
import io.grpc.internal.NoopSslSession;
60
import io.grpc.internal.SerializingExecutor;
61
import io.grpc.internal.SimpleDisconnectError;
62
import io.grpc.internal.StatsTraceContext;
63
import io.grpc.internal.TransportTracer;
64
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
65
import io.grpc.okhttp.OkHttpChannelBuilder.OkHttpTransportFactory;
66
import io.grpc.okhttp.internal.ConnectionSpec;
67
import io.grpc.okhttp.internal.Credentials;
68
import io.grpc.okhttp.internal.OkHostnameVerifier;
69
import io.grpc.okhttp.internal.StatusLine;
70
import io.grpc.okhttp.internal.framed.ErrorCode;
71
import io.grpc.okhttp.internal.framed.FrameReader;
72
import io.grpc.okhttp.internal.framed.FrameWriter;
73
import io.grpc.okhttp.internal.framed.Header;
74
import io.grpc.okhttp.internal.framed.HeadersMode;
75
import io.grpc.okhttp.internal.framed.Http2;
76
import io.grpc.okhttp.internal.framed.Settings;
77
import io.grpc.okhttp.internal.framed.Variant;
78
import io.grpc.okhttp.internal.proxy.HttpUrl;
79
import io.grpc.okhttp.internal.proxy.Request;
80
import io.perfmark.PerfMark;
81
import java.io.EOFException;
82
import java.io.IOException;
83
import java.lang.reflect.InvocationTargetException;
84
import java.lang.reflect.Method;
85
import java.net.InetSocketAddress;
86
import java.net.Socket;
87
import java.net.URI;
88
import java.security.GeneralSecurityException;
89
import java.security.KeyStore;
90
import java.security.cert.Certificate;
91
import java.security.cert.X509Certificate;
92
import java.util.Collections;
93
import java.util.Deque;
94
import java.util.EnumMap;
95
import java.util.HashMap;
96
import java.util.Iterator;
97
import java.util.LinkedHashMap;
98
import java.util.LinkedList;
99
import java.util.List;
100
import java.util.Locale;
101
import java.util.Map;
102
import java.util.Random;
103
import java.util.concurrent.BrokenBarrierException;
104
import java.util.concurrent.CountDownLatch;
105
import java.util.concurrent.CyclicBarrier;
106
import java.util.concurrent.Executor;
107
import java.util.concurrent.ScheduledExecutorService;
108
import java.util.concurrent.TimeUnit;
109
import java.util.concurrent.TimeoutException;
110
import java.util.logging.Level;
111
import java.util.logging.Logger;
112
import javax.annotation.Nullable;
113
import javax.net.SocketFactory;
114
import javax.net.ssl.HostnameVerifier;
115
import javax.net.ssl.SSLParameters;
116
import javax.net.ssl.SSLPeerUnverifiedException;
117
import javax.net.ssl.SSLSession;
118
import javax.net.ssl.SSLSocket;
119
import javax.net.ssl.SSLSocketFactory;
120
import javax.net.ssl.TrustManager;
121
import javax.net.ssl.TrustManagerFactory;
122
import javax.net.ssl.X509TrustManager;
123
import okio.Buffer;
124
import okio.BufferedSink;
125
import okio.BufferedSource;
126
import okio.ByteString;
127
import okio.Okio;
128
import okio.Source;
129
import okio.Timeout;
130

131
/**
132
 * A okhttp-based {@link ConnectionClientTransport} implementation.
133
 */
134
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
135
      OutboundFlowController.Transport, ClientKeepAlivePinger.TransportWithDisconnectReason {
136
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
137
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
138
  private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK =
139
          "GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK";
140
  static boolean enablePerRpcAuthorityCheck =
1✔
141
          GrpcUtil.getFlag(GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK, false);
1✔
142
  private Socket sock;
143
  private SSLSession sslSession;
144

145
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
146
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
147
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
148
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
149
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
150
        Status.INTERNAL.withDescription("Protocol error"));
1✔
151
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
152
        Status.INTERNAL.withDescription("Internal error"));
1✔
153
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
154
        Status.INTERNAL.withDescription("Flow control error"));
1✔
155
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
156
        Status.INTERNAL.withDescription("Stream closed"));
1✔
157
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
158
        Status.INTERNAL.withDescription("Frame too large"));
1✔
159
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
160
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
161
    errorToStatus.put(ErrorCode.CANCEL,
1✔
162
        Status.CANCELLED.withDescription("Cancelled"));
1✔
163
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
164
        Status.INTERNAL.withDescription("Compression error"));
1✔
165
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
166
        Status.INTERNAL.withDescription("Connect error"));
1✔
167
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
168
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
169
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
170
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
171
    return Collections.unmodifiableMap(errorToStatus);
1✔
172
  }
173

174
  private static final Class<?> x509ExtendedTrustManagerClass;
175
  private static final Method checkServerTrustedMethod;
176

177
  static {
178
    Class<?> x509ExtendedTrustManagerClass1 = null;
1✔
179
    Method checkServerTrustedMethod1 = null;
1✔
180
    try {
181
      x509ExtendedTrustManagerClass1 = Class.forName("javax.net.ssl.X509ExtendedTrustManager");
1✔
182
      checkServerTrustedMethod1 = x509ExtendedTrustManagerClass1.getMethod("checkServerTrusted",
1✔
183
              X509Certificate[].class, String.class, Socket.class);
184
    } catch (ClassNotFoundException e) {
×
185
      // Per-rpc authority override via call options will be disallowed.
186
    } catch (NoSuchMethodException e) {
×
187
      // Should never happen since X509ExtendedTrustManager was introduced in Android API level 24
188
      // along with checkServerTrusted.
189
    }
1✔
190
    x509ExtendedTrustManagerClass = x509ExtendedTrustManagerClass1;
1✔
191
    checkServerTrustedMethod = checkServerTrustedMethod1;
1✔
192
  }
1✔
193

194
  private final InetSocketAddress address;
195
  private final String defaultAuthority;
196
  private final String userAgent;
197
  private final Random random = new Random();
1✔
198
  // Returns new unstarted stopwatches
199
  private final Supplier<Stopwatch> stopwatchFactory;
200
  private final int initialWindowSize;
201
  private final Variant variant;
202
  private Listener listener;
203
  @GuardedBy("lock")
204
  private ExceptionHandlingFrameWriter frameWriter;
205
  private OutboundFlowController outboundFlow;
206
  private final Object lock = new Object();
1✔
207
  private final InternalLogId logId;
208
  @GuardedBy("lock")
209
  private int nextStreamId;
210
  @GuardedBy("lock")
1✔
211
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
212
  private final Executor executor;
213
  // Wrap on executor, to guarantee some operations be executed serially.
214
  private final SerializingExecutor serializingExecutor;
215
  private final ScheduledExecutorService scheduler;
216
  private final int maxMessageSize;
217
  private int connectionUnacknowledgedBytesRead;
218
  private ClientFrameHandler clientFrameHandler;
219
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
220
  private Attributes attributes;
221
  /**
222
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
223
   * streams may continue.
224
   */
225
  @GuardedBy("lock")
226
  private Status goAwayStatus;
227
  @GuardedBy("lock")
228
  private boolean goAwaySent;
229
  @GuardedBy("lock")
230
  private Http2Ping ping;
231
  @GuardedBy("lock")
232
  private boolean stopped;
233
  @GuardedBy("lock")
234
  private boolean hasStream;
235
  private final SocketFactory socketFactory;
236
  private SSLSocketFactory sslSocketFactory;
237
  private HostnameVerifier hostnameVerifier;
238
  private Socket socket;
239
  @GuardedBy("lock")
1✔
240
  private int maxConcurrentStreams = 0;
241
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
242
  @GuardedBy("lock")
243
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
244
  private final ConnectionSpec connectionSpec;
245
  private KeepAliveManager keepAliveManager;
246
  private boolean enableKeepAlive;
247
  private long keepAliveTimeNanos;
248
  private long keepAliveTimeoutNanos;
249
  private boolean keepAliveWithoutCalls;
250
  private final Runnable tooManyPingsRunnable;
251
  private final int maxInboundMetadataSize;
252
  private final boolean useGetForSafeMethods;
253
  @GuardedBy("lock")
254
  private final TransportTracer transportTracer;
255
  private final TrustManager x509TrustManager;
256

257
  @SuppressWarnings("serial")
258
  private static class LruCache<T> extends LinkedHashMap<String, T> {
259
    @Override
260
    protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
261
      return size() > 100;
1✔
262
    }
263
  }
264

265
  @GuardedBy("lock")
1✔
266
  private final Map<String, Status> authorityVerificationResults = new LruCache<>();
267

268
  @GuardedBy("lock")
1✔
269
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
270
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
271
        @Override
272
        protected void handleInUse() {
273
          listener.transportInUse(true);
1✔
274
        }
1✔
275

276
        @Override
277
        protected void handleNotInUse() {
278
          listener.transportInUse(false);
1✔
279
        }
1✔
280
      };
281
  @GuardedBy("lock")
282
  private InternalChannelz.Security securityInfo;
283

284
  @VisibleForTesting
285
  @Nullable
286
  final HttpConnectProxiedSocketAddress proxiedAddr;
287

288
  @VisibleForTesting
1✔
289
  int proxySocketTimeout = 30000;
290

291
  // The following fields should only be used for test.
292
  Runnable connectingCallback;
293
  SettableFuture<Void> connectedFuture;
294

295
  public OkHttpClientTransport(
296
          OkHttpTransportFactory transportFactory,
297
          InetSocketAddress address,
298
          String authority,
299
          @Nullable String userAgent,
300
          Attributes eagAttrs,
301
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
302
          Runnable tooManyPingsRunnable,
303
          ChannelCredentials channelCredentials) {
304
    this(
1✔
305
        transportFactory,
306
        address,
307
        authority,
308
        userAgent,
309
        eagAttrs,
310
        GrpcUtil.STOPWATCH_SUPPLIER,
311
        new Http2(),
312
        proxiedAddr,
313
        tooManyPingsRunnable,
314
        channelCredentials);
315
  }
1✔
316

317
  private OkHttpClientTransport(
318
          OkHttpTransportFactory transportFactory,
319
          InetSocketAddress address,
320
          String authority,
321
          @Nullable String userAgent,
322
          Attributes eagAttrs,
323
          Supplier<Stopwatch> stopwatchFactory,
324
          Variant variant,
325
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
326
          Runnable tooManyPingsRunnable,
327
          ChannelCredentials channelCredentials) {
1✔
328
    this.address = Preconditions.checkNotNull(address, "address");
1✔
329
    this.defaultAuthority = authority;
1✔
330
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
331
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
332
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
333
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
334
    this.scheduler = Preconditions.checkNotNull(
1✔
335
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
336
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
337
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
338
    nextStreamId = 3;
1✔
339
    this.socketFactory = transportFactory.socketFactory == null
1✔
340
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
341
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
342
    this.hostnameVerifier = transportFactory.hostnameVerifier != null
1✔
343
        ? transportFactory.hostnameVerifier : OkHostnameVerifier.INSTANCE;
1✔
344
    this.connectionSpec = Preconditions.checkNotNull(
1✔
345
        transportFactory.connectionSpec, "connectionSpec");
346
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
347
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
348
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
349
    this.proxiedAddr = proxiedAddr;
1✔
350
    this.tooManyPingsRunnable =
1✔
351
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
352
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
353
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
354
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
355
    this.attributes = Attributes.newBuilder()
1✔
356
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
357
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
358
    initTransportTracer();
1✔
359
    TrustManager tempX509TrustManager;
360
    if (channelCredentials instanceof TlsChannelCredentials
1✔
361
        && x509ExtendedTrustManagerClass != null) {
362
      try {
363
        tempX509TrustManager = getTrustManager(
1✔
364
                (TlsChannelCredentials) channelCredentials);
365
      } catch (GeneralSecurityException e) {
×
366
        tempX509TrustManager = null;
×
367
        log.log(Level.WARNING, "Obtaining X509ExtendedTrustManager for the transport failed."
×
368
            + "Per-rpc authority overrides will be disallowed.", e);
369
      }
1✔
370
    } else {
371
      tempX509TrustManager = null;
1✔
372
    }
373
    x509TrustManager = tempX509TrustManager;
1✔
374
  }
1✔
375

376
  /**
377
   * Create a transport connected to a fake peer for test.
378
   */
379
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
380
  @VisibleForTesting
381
  OkHttpClientTransport(
382
      OkHttpTransportFactory transportFactory,
383
      String userAgent,
384
      Supplier<Stopwatch> stopwatchFactory,
385
      Variant variant,
386
      @Nullable Runnable connectingCallback,
387
      SettableFuture<Void> connectedFuture,
388
      Runnable tooManyPingsRunnable) {
389
    this(
1✔
390
        transportFactory,
391
        new InetSocketAddress("127.0.0.1", 80),
392
        "notarealauthority:80",
393
        userAgent,
394
        Attributes.EMPTY,
395
        stopwatchFactory,
396
        variant,
397
        null,
398
        tooManyPingsRunnable,
399
        null);
400
    this.connectingCallback = connectingCallback;
1✔
401
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
402
  }
1✔
403

404
  // sslSocketFactory is set to null when use plaintext.
405
  boolean isUsingPlaintext() {
406
    return sslSocketFactory == null;
1✔
407
  }
408

409
  private void initTransportTracer() {
410
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
411
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
412
        @Override
413
        public TransportTracer.FlowControlWindows read() {
414
          synchronized (lock) {
1✔
415
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
416
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
417
            // provide a lower bound.
418
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
419
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
420
          }
421
        }
422
      });
423
    }
1✔
424
  }
1✔
425

426
  /**
427
   * Enable keepalive with custom delay and timeout.
428
   */
429
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
430
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
431
    enableKeepAlive = enable;
×
432
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
433
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
434
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
435
  }
×
436

437
  @Override
438
  public void ping(final PingCallback callback, Executor executor) {
439
    long data = 0;
1✔
440
    Http2Ping p;
441
    boolean writePing;
442
    synchronized (lock) {
1✔
443
      checkState(frameWriter != null);
1✔
444
      if (stopped) {
1✔
445
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
446
        return;
1✔
447
      }
448
      if (ping != null) {
1✔
449
        // we only allow one outstanding ping at a time, so just add the callback to
450
        // any outstanding operation
451
        p = ping;
1✔
452
        writePing = false;
1✔
453
      } else {
454
        // set outstanding operation and then write the ping after releasing lock
455
        data = random.nextLong();
1✔
456
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
457
        stopwatch.start();
1✔
458
        p = ping = new Http2Ping(data, stopwatch);
1✔
459
        writePing = true;
1✔
460
        transportTracer.reportKeepAliveSent();
1✔
461
      }
462
      if (writePing) {
1✔
463
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
464
      }
465
    }
1✔
466
    // If transport concurrently failed/stopped since we released the lock above, this could
467
    // immediately invoke callback (which we shouldn't do while holding a lock)
468
    p.addCallback(callback, executor);
1✔
469
  }
1✔
470

471
  @Override
472
  public OkHttpClientStream newStream(
473
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
474
      ClientStreamTracer[] tracers) {
475
    Preconditions.checkNotNull(method, "method");
1✔
476
    Preconditions.checkNotNull(headers, "headers");
1✔
477
    StatsTraceContext statsTraceContext =
1✔
478
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
479

480
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
481
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
482
      return new OkHttpClientStream(
1✔
483
          method,
484
          headers,
485
          frameWriter,
486
          OkHttpClientTransport.this,
487
          outboundFlow,
488
          lock,
489
          maxMessageSize,
490
          initialWindowSize,
491
          defaultAuthority,
492
          userAgent,
493
          statsTraceContext,
494
          transportTracer,
495
          callOptions,
496
          useGetForSafeMethods);
497
    }
498
  }
499

500
  private TrustManager getTrustManager(TlsChannelCredentials tlsCreds)
501
      throws GeneralSecurityException {
502
    TrustManager[] tm;
503
    // Using the same way of creating TrustManager from OkHttpChannelBuilder.sslSocketFactoryFrom()
504
    if (tlsCreds.getTrustManagers() != null) {
1✔
505
      tm = tlsCreds.getTrustManagers().toArray(new TrustManager[0]);
1✔
506
    } else if (tlsCreds.getRootCertificates() != null) {
1✔
507
      tm = CertificateUtils.createTrustManager(tlsCreds.getRootCertificates());
1✔
508
    } else { // else use system default
509
      TrustManagerFactory tmf = TrustManagerFactory.getInstance(
1✔
510
          TrustManagerFactory.getDefaultAlgorithm());
1✔
511
      tmf.init((KeyStore) null);
1✔
512
      tm = tmf.getTrustManagers();
1✔
513
    }
514
    for (TrustManager trustManager: tm) {
1✔
515
      if (trustManager instanceof X509TrustManager) {
1✔
516
        return trustManager;
1✔
517
      }
518
    }
519
    return null;
×
520
  }
521

522
  @GuardedBy("lock")
523
  void streamReadyToStart(OkHttpClientStream clientStream, String authority) {
524
    if (goAwayStatus != null) {
1✔
525
      clientStream.transportState().transportReportStatus(
1✔
526
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
527
    } else {
528
      if (socket instanceof SSLSocket && !authority.equals(defaultAuthority)) {
1✔
529
        Status authorityVerificationResult;
530
        if (authorityVerificationResults.containsKey(authority)) {
1✔
531
          authorityVerificationResult = authorityVerificationResults.get(authority);
×
532
        } else {
533
          authorityVerificationResult = verifyAuthority(authority);
1✔
534
          authorityVerificationResults.put(authority, authorityVerificationResult);
1✔
535
        }
536
        if (!authorityVerificationResult.isOk()) {
1✔
537
          if (enablePerRpcAuthorityCheck) {
1✔
538
            clientStream.transportState().transportReportStatus(
1✔
539
                    authorityVerificationResult, RpcProgress.PROCESSED, true, new Metadata());
540
            return;
1✔
541
          }
542
        }
543
      }
544
      if (streams.size() >= maxConcurrentStreams) {
1✔
545
        pendingStreams.add(clientStream);
1✔
546
        setInUse(clientStream);
1✔
547
      } else {
548
        startStream(clientStream);
1✔
549
      }
550
    }
551
  }
1✔
552

553
  private Status verifyAuthority(String authority) {
554
    Status authorityVerificationResult;
555
    if (hostnameVerifier.verify(authority, ((SSLSocket) socket).getSession())) {
1✔
556
      authorityVerificationResult = Status.OK;
1✔
557
    } else {
558
      authorityVerificationResult = Status.UNAVAILABLE.withDescription(String.format(
1✔
559
              "HostNameVerifier verification failed for authority '%s'",
560
              authority));
561
    }
562
    if (!authorityVerificationResult.isOk() && !enablePerRpcAuthorityCheck) {
1✔
563
      log.log(Level.WARNING, String.format("HostNameVerifier verification failed for "
1✔
564
                      + "authority '%s'. This will be an error in the future.",
565
              authority));
566
    }
567
    if (authorityVerificationResult.isOk()) {
1✔
568
      // The status is trivially assigned in this case, but we are still making use of the
569
      // cache to keep track that a warning log had been logged for the authority when
570
      // enablePerRpcAuthorityCheck is false. When we permanently enable the feature, the
571
      // status won't need to be cached for case when x509TrustManager is null.
572
      if (x509TrustManager == null) {
1✔
573
        authorityVerificationResult = Status.UNAVAILABLE.withDescription(
1✔
574
                String.format("Could not verify authority '%s' for the rpc with no "
1✔
575
                                + "X509TrustManager available",
576
                        authority));
577
      } else if (x509ExtendedTrustManagerClass.isInstance(x509TrustManager)) {
1✔
578
        try {
579
          Certificate[] peerCertificates = sslSession.getPeerCertificates();
1✔
580
          X509Certificate[] x509PeerCertificates =
1✔
581
                  new X509Certificate[peerCertificates.length];
582
          for (int i = 0; i < peerCertificates.length; i++) {
1✔
583
            x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
1✔
584
          }
585
          checkServerTrustedMethod.invoke(x509TrustManager, x509PeerCertificates,
1✔
586
                  "RSA", new SslSocketWrapper((SSLSocket) socket, authority));
587
          authorityVerificationResult = Status.OK;
1✔
588
        } catch (SSLPeerUnverifiedException | InvocationTargetException
1✔
589
                 | IllegalAccessException e) {
590
          authorityVerificationResult = Status.UNAVAILABLE.withCause(e).withDescription(
1✔
591
                  "Peer verification failed");
592
        }
1✔
593
        if (authorityVerificationResult.getCause() != null) {
1✔
594
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
595
                          + ". This will be an error in the future.",
596
                  authorityVerificationResult.getCause());
1✔
597
        } else {
598
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
599
                  + ". This will be an error in the future.");
600
        }
601
      }
602
    }
603
    return authorityVerificationResult;
1✔
604
  }
605

606
  @SuppressWarnings("GuardedBy")
607
  @GuardedBy("lock")
608
  private void startStream(OkHttpClientStream stream) {
609
    checkState(
1✔
610
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
611
    streams.put(nextStreamId, stream);
1✔
612
    setInUse(stream);
1✔
613
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
614
    // found: 'this.lock'
615
    stream.transportState().start(nextStreamId);
1✔
616
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
617
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
618
        || stream.useGet()) {
1✔
619
      frameWriter.flush();
1✔
620
    }
621
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
622
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
623
      // correctly.
624
      nextStreamId = Integer.MAX_VALUE;
1✔
625
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
626
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
627
    } else {
628
      nextStreamId += 2;
1✔
629
    }
630
  }
1✔
631

632
  /**
633
   * Starts pending streams, returns true if at least one pending stream is started.
634
   */
635
  @GuardedBy("lock")
636
  private boolean startPendingStreams() {
637
    boolean hasStreamStarted = false;
1✔
638
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
639
      OkHttpClientStream stream = pendingStreams.poll();
1✔
640
      startStream(stream);
1✔
641
      hasStreamStarted = true;
1✔
642
    }
1✔
643
    return hasStreamStarted;
1✔
644
  }
645

646
  /**
647
   * Removes given pending stream, used when a pending stream is cancelled.
648
   */
649
  @GuardedBy("lock")
650
  void removePendingStream(OkHttpClientStream pendingStream) {
651
    pendingStreams.remove(pendingStream);
1✔
652
    maybeClearInUse(pendingStream);
1✔
653
  }
1✔
654

655
  @Override
656
  public Runnable start(Listener listener) {
657
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
658

659
    if (enableKeepAlive) {
1✔
660
      keepAliveManager = new KeepAliveManager(
×
661
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
662
          keepAliveWithoutCalls);
663
      keepAliveManager.onTransportStarted();
×
664
    }
665

666
    int maxQueuedControlFrames = 10000;
1✔
667
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
668
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
669
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
670

671
    synchronized (lock) {
1✔
672
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
673
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
674
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
675
      // the same TransportExceptionHandler instance so it is all mixed back together.
676
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
677
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
678
    }
1✔
679
    final CountDownLatch latch = new CountDownLatch(1);
1✔
680
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
681
    // The transport needs up to two threads to function once started,
682
    // but only needs one during handshaking. Start another thread during handshaking
683
    // to make sure there's still a free thread available. If the number of threads is exhausted,
684
    // it is better to kill the transport than for all the transports to hang unable to send.
685
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
686
    // Connecting in the serializingExecutor, so that some stream operations like synStream
687
    // will be executed after connected.
688

689
    serializingExecutor.execute(new Runnable() {
1✔
690
      @Override
691
      public void run() {
692
        // Use closed source on failure so that the reader immediately shuts down.
693
        BufferedSource source = Okio.buffer(new Source() {
1✔
694
          @Override
695
          public long read(Buffer sink, long byteCount) {
696
            return -1;
1✔
697
          }
698

699
          @Override
700
          public Timeout timeout() {
701
            return Timeout.NONE;
×
702
          }
703

704
          @Override
705
          public void close() {
706
          }
1✔
707
        });
708
        try {
709
          // This is a hack to make sure the connection preface and initial settings to be sent out
710
          // without blocking the start. By doing this essentially prevents potential deadlock when
711
          // network is not available during startup while another thread holding lock to send the
712
          // initial preface.
713
          try {
714
            latch.await();
1✔
715
            barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
716
          } catch (InterruptedException e) {
×
717
            Thread.currentThread().interrupt();
×
718
          } catch (TimeoutException | BrokenBarrierException e) {
1✔
719
            startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
720
                .withDescription("Timed out waiting for second handshake thread. "
1✔
721
                    + "The transport executor pool may have run out of threads"));
722
            return;
1✔
723
          }
1✔
724

725
          if (proxiedAddr == null) {
1✔
726
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
727
          } else {
728
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
729
              sock = createHttpProxySocket(
1✔
730
                  proxiedAddr.getTargetAddress(),
1✔
731
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
732
                  proxiedAddr.getUsername(),
1✔
733
                  proxiedAddr.getPassword()
1✔
734
              );
735
            } else {
736
              throw Status.INTERNAL.withDescription(
×
737
                  "Unsupported SocketAddress implementation "
738
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
739
            }
740
          }
741
          if (sslSocketFactory != null) {
1✔
742
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
743
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
744
                connectionSpec);
1✔
745
            sslSession = sslSocket.getSession();
1✔
746
            sock = sslSocket;
1✔
747
          }
748
          sock.setTcpNoDelay(true);
1✔
749
          source = Okio.buffer(Okio.source(sock));
1✔
750
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
751

752
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
753
          attributes = attributes.toBuilder()
1✔
754
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
755
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
756
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
757
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
758
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
759
              .build();
1✔
760
        } catch (StatusException e) {
1✔
761
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
762
          return;
1✔
763
        } catch (Exception e) {
1✔
764
          onException(e);
1✔
765
          return;
1✔
766
        } finally {
767
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
768
          latchForExtraThread.countDown();
1✔
769
        }
770
        synchronized (lock) {
1✔
771
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
772
          if (sslSession != null) {
1✔
773
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
774
          }
775
        }
1✔
776
      }
1✔
777
    });
778

779
    executor.execute(new Runnable() {
1✔
780
      @Override
781
      public void run() {
782
        try {
783
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
784
          latchForExtraThread.await();
1✔
785
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
786
          // Something bad happened, maybe too few threads available!
787
          // This will be handled in the handshake thread.
788
        } catch (InterruptedException e) {
1✔
789
          Thread.currentThread().interrupt();
1✔
790
        }
1✔
791
      }
1✔
792
    });
793
    // Schedule to send connection preface & settings before any other write.
794
    try {
795
      sendConnectionPrefaceAndSettings();
1✔
796
    } finally {
797
      latch.countDown();
1✔
798
    }
799

800
    serializingExecutor.execute(new Runnable() {
1✔
801
      @Override
802
      public void run() {
803
        if (connectingCallback != null) {
1✔
804
          connectingCallback.run();
1✔
805
        }
806
        synchronized (lock) {
1✔
807
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
808
          checkState(pendingStreams.isEmpty(),
1✔
809
              "Pending streams detected during transport start."
810
                  + " RPCs should not be started before transport is ready.");
811
        }
1✔
812
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
813
        // may send goAway immediately.
814
        executor.execute(clientFrameHandler);
1✔
815
        if (connectedFuture != null) {
1✔
816
          connectedFuture.set(null);
1✔
817
        }
818
      }
1✔
819
    });
820
    return null;
1✔
821
  }
822

823
  /**
824
   * Should only be called once when the transport is first established.
825
   */
826
  private void sendConnectionPrefaceAndSettings() {
827
    synchronized (lock) {
1✔
828
      frameWriter.connectionPreface();
1✔
829
      Settings settings = new Settings();
1✔
830
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
831
      frameWriter.settings(settings);
1✔
832
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
833
        frameWriter.windowUpdate(
1✔
834
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
835
      }
836
    }
1✔
837
  }
1✔
838

839
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
840
      String proxyUsername, String proxyPassword) throws StatusException {
841
    Socket sock = null;
1✔
842
    try {
843
      // The proxy address may not be resolved
844
      if (proxyAddress.getAddress() != null) {
1✔
845
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
846
      } else {
847
        sock =
×
848
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
849
      }
850
      sock.setTcpNoDelay(true);
1✔
851
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
852
      // can cause reading from the socket to hang.
853
      sock.setSoTimeout(proxySocketTimeout);
1✔
854

855
      Source source = Okio.source(sock);
1✔
856
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
857

858
      // Prepare headers and request method line
859
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
860
      HttpUrl url = proxyRequest.httpUrl();
1✔
861
      String requestLine =
1✔
862
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
863

864
      // Write request to socket
865
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
866
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
867
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
868
            .writeUtf8(": ")
1✔
869
            .writeUtf8(proxyRequest.headers().value(i))
1✔
870
            .writeUtf8("\r\n");
1✔
871
      }
872
      sink.writeUtf8("\r\n");
1✔
873
      // Flush buffer (flushes socket and sends request)
874
      sink.flush();
1✔
875

876
      // Read status line, check if 2xx was returned
877
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
878
      // Drain rest of headers
879
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
880
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
881
        Buffer body = new Buffer();
1✔
882
        try {
883
          sock.shutdownOutput();
1✔
884
          source.read(body, 1024);
1✔
885
        } catch (IOException ex) {
×
886
          body.writeUtf8("Unable to read body: " + ex.toString());
×
887
        }
1✔
888
        try {
889
          sock.close();
1✔
890
        } catch (IOException ignored) {
×
891
          // ignored
892
        }
1✔
893
        String message = String.format(
1✔
894
            Locale.US,
895
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
896
              + "Response body:\n%s",
897
            statusLine.code, statusLine.message, body.readUtf8());
1✔
898
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
899
      }
900
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
901
      sock.setSoTimeout(0);
1✔
902
      return sock;
1✔
903
    } catch (IOException e) {
1✔
904
      if (sock != null) {
1✔
905
        GrpcUtil.closeQuietly(sock);
1✔
906
      }
907
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
908
          .asException();
1✔
909
    }
910
  }
911

912
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
913
                                         String proxyPassword) {
914
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
915
        .scheme("https")
1✔
916
        .host(address.getHostName())
1✔
917
        .port(address.getPort())
1✔
918
        .build();
1✔
919

920
    Request.Builder request = new Request.Builder()
1✔
921
        .url(tunnelUrl)
1✔
922
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
923
        .header("User-Agent", userAgent);
1✔
924

925
    // If we have proxy credentials, set them right away
926
    if (proxyUsername != null && proxyPassword != null) {
1✔
927
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
928
    }
929
    return request.build();
1✔
930
  }
931

932
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
933
    Buffer buffer = new Buffer();
1✔
934
    while (true) {
935
      if (source.read(buffer, 1) == -1) {
1✔
936
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
937
      }
938
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
939
        return buffer.readUtf8LineStrict();
1✔
940
      }
941
    }
942
  }
943

944
  @Override
945
  public String toString() {
946
    return MoreObjects.toStringHelper(this)
1✔
947
        .add("logId", logId.getId())
1✔
948
        .add("address", address)
1✔
949
        .toString();
1✔
950
  }
951

952
  @Override
953
  public InternalLogId getLogId() {
954
    return logId;
1✔
955
  }
956

957
  /**
958
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
959
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
960
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
961
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
962
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
963
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
964
   * connection to the port, independent of this function.
965
   *
966
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
967
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
968
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
969
   */
970
  @VisibleForTesting
971
  String getOverridenHost() {
972
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
973
    if (uri.getHost() != null) {
1✔
974
      return uri.getHost();
1✔
975
    }
976

977
    return defaultAuthority;
1✔
978
  }
979

980
  @VisibleForTesting
981
  int getOverridenPort() {
982
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
983
    if (uri.getPort() != -1) {
1✔
984
      return uri.getPort();
×
985
    }
986

987
    return address.getPort();
1✔
988
  }
989

990
  @Override
991
  public void shutdown(Status reason) {
992
    synchronized (lock) {
1✔
993
      if (goAwayStatus != null) {
1✔
994
        return;
1✔
995
      }
996

997
      goAwayStatus = reason;
1✔
998
      listener.transportShutdown(goAwayStatus, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
999
      stopIfNecessary();
1✔
1000
    }
1✔
1001
  }
1✔
1002

1003
  @Override
1004
  public void shutdownNow(Status reason) {
1005
    shutdownNow(reason, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
1006
  }
1✔
1007

1008
  @Override
1009
  public void shutdownNow(Status reason, DisconnectError disconnectError) {
1010
    shutdown(reason);
1✔
1011
    synchronized (lock) {
1✔
1012
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1013
      while (it.hasNext()) {
1✔
1014
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1015
        it.remove();
1✔
1016
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
1017
        maybeClearInUse(entry.getValue());
1✔
1018
      }
1✔
1019

1020
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1021
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
1022
        // chance to retry and be routed to another connection.
1023
        stream.transportState().transportReportStatus(
1✔
1024
            reason, RpcProgress.MISCARRIED, true, new Metadata());
1025
        maybeClearInUse(stream);
1✔
1026
      }
1✔
1027
      pendingStreams.clear();
1✔
1028

1029
      stopIfNecessary();
1✔
1030
    }
1✔
1031
  }
1✔
1032

1033
  @Override
1034
  public Attributes getAttributes() {
1035
    return attributes;
1✔
1036
  }
1037

1038
  /**
1039
   * Gets all active streams as an array.
1040
   */
1041
  @Override
1042
  public OutboundFlowController.StreamState[] getActiveStreams() {
1043
    synchronized (lock) {
1✔
1044
      OutboundFlowController.StreamState[] flowStreams =
1✔
1045
          new OutboundFlowController.StreamState[streams.size()];
1✔
1046
      int i = 0;
1✔
1047
      for (OkHttpClientStream stream : streams.values()) {
1✔
1048
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
1049
      }
1✔
1050
      return flowStreams;
1✔
1051
    }
1052
  }
1053

1054
  @VisibleForTesting
1055
  ClientFrameHandler getHandler() {
1056
    return clientFrameHandler;
1✔
1057
  }
1058

1059
  @VisibleForTesting
1060
  SocketFactory getSocketFactory() {
1061
    return socketFactory;
1✔
1062
  }
1063

1064
  @VisibleForTesting
1065
  int getPendingStreamSize() {
1066
    synchronized (lock) {
1✔
1067
      return pendingStreams.size();
1✔
1068
    }
1069
  }
1070

1071
  @VisibleForTesting
1072
  void setNextStreamId(int nextStreamId) {
1073
    synchronized (lock) {
1✔
1074
      this.nextStreamId = nextStreamId;
1✔
1075
    }
1✔
1076
  }
1✔
1077

1078
  /**
1079
   * Finish all active streams due to an IOException, then close the transport.
1080
   */
1081
  @Override
1082
  public void onException(Throwable failureCause) {
1083
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
1084
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
1085
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1086
  }
1✔
1087

1088
  /**
1089
   * Send GOAWAY to the server, then finish all active streams and close the transport.
1090
   */
1091
  private void onError(ErrorCode errorCode, String moreDetail) {
1092
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
1093
  }
1✔
1094

1095
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
1096
    synchronized (lock) {
1✔
1097
      if (goAwayStatus == null) {
1✔
1098
        goAwayStatus = status;
1✔
1099
        GrpcUtil.Http2Error http2Error;
1100
        if (errorCode == null) {
1✔
1101
          http2Error = GrpcUtil.Http2Error.NO_ERROR;
1✔
1102
        } else {
1103
          http2Error = GrpcUtil.Http2Error.forCode(errorCode.httpCode);
1✔
1104
        }
1105
        listener.transportShutdown(status, new GoAwayDisconnectError(http2Error));
1✔
1106
      }
1107
      if (errorCode != null && !goAwaySent) {
1✔
1108
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1109
        // streams. The GOAWAY is part of graceful shutdown.
1110
        goAwaySent = true;
1✔
1111
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
1112
      }
1113

1114
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1115
      while (it.hasNext()) {
1✔
1116
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1117
        if (entry.getKey() > lastKnownStreamId) {
1✔
1118
          it.remove();
1✔
1119
          entry.getValue().transportState().transportReportStatus(
1✔
1120
              status, RpcProgress.REFUSED, false, new Metadata());
1121
          maybeClearInUse(entry.getValue());
1✔
1122
        }
1123
      }
1✔
1124

1125
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1126
        stream.transportState().transportReportStatus(
1✔
1127
            status, RpcProgress.MISCARRIED, true, new Metadata());
1128
        maybeClearInUse(stream);
1✔
1129
      }
1✔
1130
      pendingStreams.clear();
1✔
1131

1132
      stopIfNecessary();
1✔
1133
    }
1✔
1134
  }
1✔
1135

1136
  /**
1137
   * Called when a stream is closed. We do things like:
1138
   * <ul>
1139
   * <li>Removing the stream from the map.
1140
   * <li>Optionally reporting the status.
1141
   * <li>Starting pending streams if we can.
1142
   * <li>Stopping the transport if this is the last live stream under a go-away status.
1143
   * </ul>
1144
   *
1145
   * @param streamId the Id of the stream.
1146
   * @param status the final status of this stream, null means no need to report.
1147
   * @param stopDelivery interrupt queued messages in the deframer
1148
   * @param errorCode reset the stream with this ErrorCode if not null.
1149
   * @param trailers the trailers received if not null
1150
   */
1151
  void finishStream(
1152
      int streamId,
1153
      @Nullable Status status,
1154
      RpcProgress rpcProgress,
1155
      boolean stopDelivery,
1156
      @Nullable ErrorCode errorCode,
1157
      @Nullable Metadata trailers) {
1158
    synchronized (lock) {
1✔
1159
      OkHttpClientStream stream = streams.remove(streamId);
1✔
1160
      if (stream != null) {
1✔
1161
        if (errorCode != null) {
1✔
1162
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1163
        }
1164
        if (status != null) {
1✔
1165
          stream
1✔
1166
              .transportState()
1✔
1167
              .transportReportStatus(
1✔
1168
                  status,
1169
                  rpcProgress,
1170
                  stopDelivery,
1171
                  trailers != null ? trailers : new Metadata());
1✔
1172
        }
1173
        if (!startPendingStreams()) {
1✔
1174
          stopIfNecessary();
1✔
1175
        }
1176
        maybeClearInUse(stream);
1✔
1177
      }
1178
    }
1✔
1179
  }
1✔
1180

1181
  /**
1182
   * When the transport is in goAway state, we should stop it once all active streams finish.
1183
   */
1184
  @GuardedBy("lock")
1185
  private void stopIfNecessary() {
1186
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1187
      return;
1✔
1188
    }
1189
    if (stopped) {
1✔
1190
      return;
1✔
1191
    }
1192
    stopped = true;
1✔
1193

1194
    if (keepAliveManager != null) {
1✔
1195
      keepAliveManager.onTransportTermination();
×
1196
    }
1197

1198
    if (ping != null) {
1✔
1199
      ping.failed(getPingFailure());
1✔
1200
      ping = null;
1✔
1201
    }
1202

1203
    if (!goAwaySent) {
1✔
1204
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1205
      // streams. The GOAWAY is part of graceful shutdown.
1206
      goAwaySent = true;
1✔
1207
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1208
    }
1209

1210
    // We will close the underlying socket in the writing thread to break out the reader
1211
    // thread, which will close the frameReader and notify the listener.
1212
    frameWriter.close();
1✔
1213
  }
1✔
1214

1215
  @GuardedBy("lock")
1216
  private void maybeClearInUse(OkHttpClientStream stream) {
1217
    if (hasStream) {
1✔
1218
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1219
        hasStream = false;
1✔
1220
        if (keepAliveManager != null) {
1✔
1221
          // We don't have any active streams. No need to do keepalives any more.
1222
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1223
          // and onTransportActive.
1224
          keepAliveManager.onTransportIdle();
×
1225
        }
1226
      }
1227
    }
1228
    if (stream.shouldBeCountedForInUse()) {
1✔
1229
      inUseState.updateObjectInUse(stream, false);
1✔
1230
    }
1231
  }
1✔
1232

1233
  @GuardedBy("lock")
1234
  private void setInUse(OkHttpClientStream stream) {
1235
    if (!hasStream) {
1✔
1236
      hasStream = true;
1✔
1237
      if (keepAliveManager != null) {
1✔
1238
        // We have a new stream. We might need to do keepalives now.
1239
        // Note that we have to do this inside the lock to avoid calling
1240
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1241
        // order.
1242
        keepAliveManager.onTransportActive();
×
1243
      }
1244
    }
1245
    if (stream.shouldBeCountedForInUse()) {
1✔
1246
      inUseState.updateObjectInUse(stream, true);
1✔
1247
    }
1248
  }
1✔
1249

1250
  private Status getPingFailure() {
1251
    synchronized (lock) {
1✔
1252
      if (goAwayStatus != null) {
1✔
1253
        return goAwayStatus;
1✔
1254
      } else {
1255
        return Status.UNAVAILABLE.withDescription("Connection closed");
×
1256
      }
1257
    }
1258
  }
1259

1260
  boolean mayHaveCreatedStream(int streamId) {
1261
    synchronized (lock) {
1✔
1262
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1263
    }
1264
  }
1265

1266
  OkHttpClientStream getStream(int streamId) {
1267
    synchronized (lock) {
1✔
1268
      return streams.get(streamId);
1✔
1269
    }
1270
  }
1271

1272
  /**
1273
   * Returns a Grpc status corresponding to the given ErrorCode.
1274
   */
1275
  @VisibleForTesting
1276
  static Status toGrpcStatus(ErrorCode code) {
1277
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1278
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1279
        "Unknown http2 error code: " + code.httpCode);
1280
  }
1281

1282
  @Override
1283
  public ListenableFuture<SocketStats> getStats() {
1284
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1285
    synchronized (lock) {
1✔
1286
      if (socket == null) {
1✔
1287
        ret.set(new SocketStats(
×
1288
            transportTracer.getStats(),
×
1289
            /*local=*/ null,
1290
            /*remote=*/ null,
1291
            new InternalChannelz.SocketOptions.Builder().build(),
×
1292
            /*security=*/ null));
1293
      } else {
1294
        ret.set(new SocketStats(
1✔
1295
            transportTracer.getStats(),
1✔
1296
            socket.getLocalSocketAddress(),
1✔
1297
            socket.getRemoteSocketAddress(),
1✔
1298
            Utils.getSocketOptions(socket),
1✔
1299
            securityInfo));
1300
      }
1301
      return ret;
1✔
1302
    }
1303
  }
1304

1305
  /**
1306
   * Runnable which reads frames and dispatches them to in flight calls.
1307
   */
1308
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1309

1310
    private final OkHttpFrameLogger logger =
1✔
1311
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1312
    FrameReader frameReader;
1313
    boolean firstSettings = true;
1✔
1314

1315
    ClientFrameHandler(FrameReader frameReader) {
1✔
1316
      this.frameReader = frameReader;
1✔
1317
    }
1✔
1318

1319
    @Override
1320
    @SuppressWarnings("Finally")
1321
    public void run() {
1322
      String threadName = Thread.currentThread().getName();
1✔
1323
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1324
      try {
1325
        // Read until the underlying socket closes.
1326
        while (frameReader.nextFrame(this)) {
1✔
1327
          if (keepAliveManager != null) {
1✔
1328
            keepAliveManager.onDataReceived();
×
1329
          }
1330
        }
1331
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1332
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1333
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1334
        Status status;
1335
        synchronized (lock) {
1✔
1336
          status = goAwayStatus;
1✔
1337
        }
1✔
1338
        if (status == null) {
1✔
1339
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1340
        }
1341
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1342
      } catch (Throwable t) {
1✔
1343
        // TODO(madongfly): Send the exception message to the server.
1344
        startGoAway(
1✔
1345
            0,
1346
            ErrorCode.PROTOCOL_ERROR,
1347
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1348
      } finally {
1349
        try {
1350
          frameReader.close();
1✔
1351
        } catch (IOException ex) {
×
1352
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1353
        } catch (RuntimeException e) {
×
1354
          // This same check is done in okhttp proper:
1355
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1356

1357
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1358
          // https://issuetracker.google.com/issues/177450597
1359
          if (!"bio == null".equals(e.getMessage())) {
×
1360
            throw e;
×
1361
          }
1362
        }
1✔
1363
        listener.transportTerminated();
1✔
1364
        Thread.currentThread().setName(threadName);
1✔
1365
      }
1366
    }
1✔
1367

1368
    /**
1369
     * Handle an HTTP2 DATA frame.
1370
     */
1371
    @SuppressWarnings("GuardedBy")
1372
    @Override
1373
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1374
                     int paddedLength)
1375
        throws IOException {
1376
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1377
          streamId, in.getBuffer(), length, inFinished);
1✔
1378
      OkHttpClientStream stream = getStream(streamId);
1✔
1379
      if (stream == null) {
1✔
1380
        if (mayHaveCreatedStream(streamId)) {
1✔
1381
          synchronized (lock) {
1✔
1382
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1383
          }
1✔
1384
          in.skip(length);
1✔
1385
        } else {
1386
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1387
          return;
1✔
1388
        }
1389
      } else {
1390
        // Wait until the frame is complete.
1391
        in.require(length);
1✔
1392

1393
        Buffer buf = new Buffer();
1✔
1394
        buf.write(in.getBuffer(), length);
1✔
1395
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1396
            stream.transportState().tag());
1✔
1397
        synchronized (lock) {
1✔
1398
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1399
          // instead found: 'OkHttpClientTransport.this.lock'
1400
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1401
        }
1✔
1402
      }
1403

1404
      // connection window update
1405
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1406
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1407
        synchronized (lock) {
1✔
1408
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1409
        }
1✔
1410
        connectionUnacknowledgedBytesRead = 0;
1✔
1411
      }
1412
    }
1✔
1413

1414
    /**
1415
     * Handle HTTP2 HEADER and CONTINUATION frames.
1416
     */
1417
    @SuppressWarnings("GuardedBy")
1418
    @Override
1419
    public void headers(boolean outFinished,
1420
        boolean inFinished,
1421
        int streamId,
1422
        int associatedStreamId,
1423
        List<Header> headerBlock,
1424
        HeadersMode headersMode) {
1425
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1426
      boolean unknownStream = false;
1✔
1427
      Status failedStatus = null;
1✔
1428
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1429
        int metadataSize = headerBlockSize(headerBlock);
1✔
1430
        if (metadataSize > maxInboundMetadataSize) {
1✔
1431
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1432
              String.format(
1✔
1433
                  Locale.US,
1434
                  "Response %s metadata larger than %d: %d",
1435
                  inFinished ? "trailer" : "header",
1✔
1436
                  maxInboundMetadataSize,
1✔
1437
                  metadataSize));
1✔
1438
        }
1439
      }
1440
      synchronized (lock) {
1✔
1441
        OkHttpClientStream stream = streams.get(streamId);
1✔
1442
        if (stream == null) {
1✔
1443
          if (mayHaveCreatedStream(streamId)) {
1✔
1444
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1445
          } else {
1446
            unknownStream = true;
1✔
1447
          }
1448
        } else {
1449
          if (failedStatus == null) {
1✔
1450
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1451
                stream.transportState().tag());
1✔
1452
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1453
            // instead found: 'OkHttpClientTransport.this.lock'
1454
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1455
          } else {
1456
            if (!inFinished) {
1✔
1457
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1458
            }
1459
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1460
          }
1461
        }
1462
      }
1✔
1463
      if (unknownStream) {
1✔
1464
        // We don't expect any server-initiated streams.
1465
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1466
      }
1467
    }
1✔
1468

1469
    private int headerBlockSize(List<Header> headerBlock) {
1470
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1471
      long size = 0;
1✔
1472
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1473
        Header header = headerBlock.get(i);
1✔
1474
        size += 32 + header.name.size() + header.value.size();
1✔
1475
      }
1476
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1477
      return (int) size;
1✔
1478
    }
1479

1480
    @Override
1481
    public void rstStream(int streamId, ErrorCode errorCode) {
1482
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1483
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1484
      boolean stopDelivery =
1✔
1485
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1486
      synchronized (lock) {
1✔
1487
        OkHttpClientStream stream = streams.get(streamId);
1✔
1488
        if (stream != null) {
1✔
1489
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1490
              stream.transportState().tag());
1✔
1491
          finishStream(
1✔
1492
              streamId, status,
1493
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1494
              stopDelivery, null, null);
1495
        }
1496
      }
1✔
1497
    }
1✔
1498

1499
    @Override
1500
    public void settings(boolean clearPrevious, Settings settings) {
1501
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1502
      boolean outboundWindowSizeIncreased = false;
1✔
1503
      synchronized (lock) {
1✔
1504
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1505
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1506
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1507
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1508
        }
1509

1510
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1511
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1512
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1513
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1514
        }
1515
        if (firstSettings) {
1✔
1516
          attributes = listener.filterTransport(attributes);
1✔
1517
          listener.transportReady();
1✔
1518
          firstSettings = false;
1✔
1519
        }
1520

1521
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1522
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1523
        // otherwise it will cause a stream error (RST_STREAM).
1524
        frameWriter.ackSettings(settings);
1✔
1525

1526
        // send any pending bytes / streams
1527
        if (outboundWindowSizeIncreased) {
1✔
1528
          outboundFlow.writeStreams();
1✔
1529
        }
1530
        startPendingStreams();
1✔
1531
      }
1✔
1532
    }
1✔
1533

1534
    @Override
1535
    public void ping(boolean ack, int payload1, int payload2) {
1536
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1537
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1538
      if (!ack) {
1✔
1539
        synchronized (lock) {
1✔
1540
          frameWriter.ping(true, payload1, payload2);
1✔
1541
        }
1✔
1542
      } else {
1543
        Http2Ping p = null;
1✔
1544
        synchronized (lock) {
1✔
1545
          if (ping != null) {
1✔
1546
            if (ping.payload() == ackPayload) {
1✔
1547
              p = ping;
1✔
1548
              ping = null;
1✔
1549
            } else {
1550
              log.log(Level.WARNING, String.format(
1✔
1551
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1552
                  ping.payload(), ackPayload));
1✔
1553
            }
1554
          } else {
1555
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1556
          }
1557
        }
1✔
1558
        // don't complete it while holding lock since callbacks could run immediately
1559
        if (p != null) {
1✔
1560
          p.complete();
1✔
1561
        }
1562
      }
1563
    }
1✔
1564

1565
    @Override
1566
    public void ackSettings() {
1567
      // Do nothing currently.
1568
    }
1✔
1569

1570
    @Override
1571
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1572
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1573
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1574
        String data = debugData.utf8();
1✔
1575
        log.log(Level.WARNING, String.format(
1✔
1576
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1577
        if ("too_many_pings".equals(data)) {
1✔
1578
          tooManyPingsRunnable.run();
1✔
1579
        }
1580
      }
1581
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1582
          .augmentDescription("Received Goaway");
1✔
1583
      if (debugData.size() > 0) {
1✔
1584
        // If a debug message was provided, use it.
1585
        status = status.augmentDescription(debugData.utf8());
1✔
1586
      }
1587
      startGoAway(lastGoodStreamId, null, status);
1✔
1588
    }
1✔
1589

1590
    @Override
1591
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1592
        throws IOException {
1593
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1594
          streamId, promisedStreamId, requestHeaders);
1595
      // We don't accept server initiated stream.
1596
      synchronized (lock) {
1✔
1597
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1598
      }
1✔
1599
    }
1✔
1600

1601
    @Override
1602
    public void windowUpdate(int streamId, long delta) {
1603
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1604
      if (delta == 0) {
1✔
1605
        String errorMsg = "Received 0 flow control window increment.";
×
1606
        if (streamId == 0) {
×
1607
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1608
        } else {
1609
          finishStream(
×
1610
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1611
              ErrorCode.PROTOCOL_ERROR, null);
1612
        }
1613
        return;
×
1614
      }
1615

1616
      boolean unknownStream = false;
1✔
1617
      synchronized (lock) {
1✔
1618
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1619
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1620
          return;
1✔
1621
        }
1622

1623
        OkHttpClientStream stream = streams.get(streamId);
1✔
1624
        if (stream != null) {
1✔
1625
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1626
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1627
          unknownStream = true;
1✔
1628
        }
1629
      }
1✔
1630
      if (unknownStream) {
1✔
1631
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1632
            "Received window_update for unknown stream: " + streamId);
1633
      }
1634
    }
1✔
1635

1636
    @Override
1637
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1638
      // Ignore priority change.
1639
      // TODO(madongfly): log
1640
    }
×
1641

1642
    @Override
1643
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1644
        int port, long maxAge) {
1645
      // TODO(madongfly): Deal with alternateService propagation
1646
    }
×
1647
  }
1648

1649
  /**
1650
   * SSLSocket wrapper that provides a fake SSLSession for handshake session.
1651
   */
1652
  static final class SslSocketWrapper extends NoopSslSocket {
1653

1654
    private final SSLSession sslSession;
1655
    private final SSLSocket sslSocket;
1656

1657
    SslSocketWrapper(SSLSocket sslSocket, String peerHost) {
1✔
1658
      this.sslSocket = sslSocket;
1✔
1659
      this.sslSession = new FakeSslSession(peerHost);
1✔
1660
    }
1✔
1661

1662
    @Override
1663
    public SSLSession getHandshakeSession() {
1664
      return this.sslSession;
1✔
1665
    }
1666

1667
    @Override
1668
    public boolean isConnected() {
1669
      return sslSocket.isConnected();
1✔
1670
    }
1671

1672
    @Override
1673
    public SSLParameters getSSLParameters() {
1674
      return sslSocket.getSSLParameters();
1✔
1675
    }
1676
  }
1677

1678
  /**
1679
   * Fake SSLSession instance that provides the peer host name to verify for per-rpc check.
1680
   */
1681
  static class FakeSslSession extends NoopSslSession {
1682

1683
    private final String peerHost;
1684

1685
    FakeSslSession(String peerHost) {
1✔
1686
      this.peerHost = peerHost;
1✔
1687
    }
1✔
1688

1689
    @Override
1690
    public String getPeerHost() {
1691
      return peerHost;
1✔
1692
    }
1693
  }
1694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc