• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20126

23 Dec 2025 05:06AM UTC coverage: 88.691% (-0.02%) from 88.706%
#20126

push

github

web-flow
opentelemetry: plumb subchannel metrics disconnect error (#12342)

Finishes the remaining work of
[A94](https://github.com/grpc/proposal/pull/485/files) i.e. the plumbing the disconnect error

35472 of 39995 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.94
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import com.google.errorprone.annotations.concurrent.GuardedBy;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.ChannelCredentials;
34
import io.grpc.ClientStreamTracer;
35
import io.grpc.Grpc;
36
import io.grpc.HttpConnectProxiedSocketAddress;
37
import io.grpc.InternalChannelz;
38
import io.grpc.InternalChannelz.SocketStats;
39
import io.grpc.InternalLogId;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.MethodDescriptor.MethodType;
43
import io.grpc.SecurityLevel;
44
import io.grpc.Status;
45
import io.grpc.Status.Code;
46
import io.grpc.StatusException;
47
import io.grpc.TlsChannelCredentials;
48
import io.grpc.internal.CertificateUtils;
49
import io.grpc.internal.ClientStreamListener.RpcProgress;
50
import io.grpc.internal.ConnectionClientTransport;
51
import io.grpc.internal.DisconnectError;
52
import io.grpc.internal.GoAwayDisconnectError;
53
import io.grpc.internal.GrpcAttributes;
54
import io.grpc.internal.GrpcUtil;
55
import io.grpc.internal.Http2Ping;
56
import io.grpc.internal.InUseStateAggregator;
57
import io.grpc.internal.KeepAliveManager;
58
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
59
import io.grpc.internal.NoopSslSession;
60
import io.grpc.internal.SerializingExecutor;
61
import io.grpc.internal.SimpleDisconnectError;
62
import io.grpc.internal.StatsTraceContext;
63
import io.grpc.internal.TransportTracer;
64
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
65
import io.grpc.okhttp.OkHttpChannelBuilder.OkHttpTransportFactory;
66
import io.grpc.okhttp.internal.ConnectionSpec;
67
import io.grpc.okhttp.internal.Credentials;
68
import io.grpc.okhttp.internal.OkHostnameVerifier;
69
import io.grpc.okhttp.internal.StatusLine;
70
import io.grpc.okhttp.internal.framed.ErrorCode;
71
import io.grpc.okhttp.internal.framed.FrameReader;
72
import io.grpc.okhttp.internal.framed.FrameWriter;
73
import io.grpc.okhttp.internal.framed.Header;
74
import io.grpc.okhttp.internal.framed.HeadersMode;
75
import io.grpc.okhttp.internal.framed.Http2;
76
import io.grpc.okhttp.internal.framed.Settings;
77
import io.grpc.okhttp.internal.framed.Variant;
78
import io.grpc.okhttp.internal.proxy.HttpUrl;
79
import io.grpc.okhttp.internal.proxy.Request;
80
import io.perfmark.PerfMark;
81
import java.io.EOFException;
82
import java.io.IOException;
83
import java.lang.reflect.InvocationTargetException;
84
import java.lang.reflect.Method;
85
import java.net.InetSocketAddress;
86
import java.net.Socket;
87
import java.net.URI;
88
import java.security.GeneralSecurityException;
89
import java.security.KeyStore;
90
import java.security.cert.Certificate;
91
import java.security.cert.X509Certificate;
92
import java.util.Collections;
93
import java.util.Deque;
94
import java.util.EnumMap;
95
import java.util.HashMap;
96
import java.util.Iterator;
97
import java.util.LinkedHashMap;
98
import java.util.LinkedList;
99
import java.util.List;
100
import java.util.Locale;
101
import java.util.Map;
102
import java.util.Random;
103
import java.util.concurrent.BrokenBarrierException;
104
import java.util.concurrent.CountDownLatch;
105
import java.util.concurrent.CyclicBarrier;
106
import java.util.concurrent.Executor;
107
import java.util.concurrent.ScheduledExecutorService;
108
import java.util.concurrent.TimeUnit;
109
import java.util.concurrent.TimeoutException;
110
import java.util.logging.Level;
111
import java.util.logging.Logger;
112
import javax.annotation.Nullable;
113
import javax.net.SocketFactory;
114
import javax.net.ssl.HostnameVerifier;
115
import javax.net.ssl.SSLParameters;
116
import javax.net.ssl.SSLPeerUnverifiedException;
117
import javax.net.ssl.SSLSession;
118
import javax.net.ssl.SSLSocket;
119
import javax.net.ssl.SSLSocketFactory;
120
import javax.net.ssl.TrustManager;
121
import javax.net.ssl.TrustManagerFactory;
122
import javax.net.ssl.X509TrustManager;
123
import okio.Buffer;
124
import okio.BufferedSink;
125
import okio.BufferedSource;
126
import okio.ByteString;
127
import okio.Okio;
128
import okio.Source;
129
import okio.Timeout;
130

131
/**
132
 * A okhttp-based {@link ConnectionClientTransport} implementation.
133
 */
134
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
135
      OutboundFlowController.Transport, ClientKeepAlivePinger.TransportWithDisconnectReason {
136
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
137
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
138
  private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK =
139
          "GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK";
140
  static boolean enablePerRpcAuthorityCheck =
1✔
141
          GrpcUtil.getFlag(GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK, false);
1✔
142
  private Socket sock;
143
  private SSLSession sslSession;
144

145
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
146
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
147
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
148
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
149
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
150
        Status.INTERNAL.withDescription("Protocol error"));
1✔
151
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
152
        Status.INTERNAL.withDescription("Internal error"));
1✔
153
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
154
        Status.INTERNAL.withDescription("Flow control error"));
1✔
155
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
156
        Status.INTERNAL.withDescription("Stream closed"));
1✔
157
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
158
        Status.INTERNAL.withDescription("Frame too large"));
1✔
159
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
160
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
161
    errorToStatus.put(ErrorCode.CANCEL,
1✔
162
        Status.CANCELLED.withDescription("Cancelled"));
1✔
163
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
164
        Status.INTERNAL.withDescription("Compression error"));
1✔
165
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
166
        Status.INTERNAL.withDescription("Connect error"));
1✔
167
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
168
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
169
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
170
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
171
    return Collections.unmodifiableMap(errorToStatus);
1✔
172
  }
173

174
  private static final Class<?> x509ExtendedTrustManagerClass;
175
  private static final Method checkServerTrustedMethod;
176

177
  static {
178
    Class<?> x509ExtendedTrustManagerClass1 = null;
1✔
179
    Method checkServerTrustedMethod1 = null;
1✔
180
    try {
181
      x509ExtendedTrustManagerClass1 = Class.forName("javax.net.ssl.X509ExtendedTrustManager");
1✔
182
      checkServerTrustedMethod1 = x509ExtendedTrustManagerClass1.getMethod("checkServerTrusted",
1✔
183
              X509Certificate[].class, String.class, Socket.class);
184
    } catch (ClassNotFoundException e) {
×
185
      // Per-rpc authority override via call options will be disallowed.
186
    } catch (NoSuchMethodException e) {
×
187
      // Should never happen since X509ExtendedTrustManager was introduced in Android API level 24
188
      // along with checkServerTrusted.
189
    }
1✔
190
    x509ExtendedTrustManagerClass = x509ExtendedTrustManagerClass1;
1✔
191
    checkServerTrustedMethod = checkServerTrustedMethod1;
1✔
192
  }
1✔
193

194
  private final InetSocketAddress address;
195
  private final String defaultAuthority;
196
  private final String userAgent;
197
  private final Random random = new Random();
1✔
198
  // Returns new unstarted stopwatches
199
  private final Supplier<Stopwatch> stopwatchFactory;
200
  private final int initialWindowSize;
201
  private final Variant variant;
202
  private Listener listener;
203
  @GuardedBy("lock")
204
  private ExceptionHandlingFrameWriter frameWriter;
205
  private OutboundFlowController outboundFlow;
206
  private final Object lock = new Object();
1✔
207
  private final InternalLogId logId;
208
  @GuardedBy("lock")
209
  private int nextStreamId;
210
  @GuardedBy("lock")
1✔
211
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
212
  private final Executor executor;
213
  // Wrap on executor, to guarantee some operations be executed serially.
214
  private final SerializingExecutor serializingExecutor;
215
  private final ScheduledExecutorService scheduler;
216
  private final int maxMessageSize;
217
  private int connectionUnacknowledgedBytesRead;
218
  private ClientFrameHandler clientFrameHandler;
219
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
220
  private Attributes attributes;
221
  /**
222
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
223
   * streams may continue.
224
   */
225
  @GuardedBy("lock")
226
  private Status goAwayStatus;
227
  @GuardedBy("lock")
228
  private boolean goAwaySent;
229
  @GuardedBy("lock")
230
  private Http2Ping ping;
231
  @GuardedBy("lock")
232
  private boolean stopped;
233
  @GuardedBy("lock")
234
  private boolean hasStream;
235
  private final SocketFactory socketFactory;
236
  private SSLSocketFactory sslSocketFactory;
237
  private HostnameVerifier hostnameVerifier;
238
  private Socket socket;
239
  @GuardedBy("lock")
1✔
240
  private int maxConcurrentStreams = 0;
241
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
242
  @GuardedBy("lock")
243
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
244
  private final ConnectionSpec connectionSpec;
245
  private KeepAliveManager keepAliveManager;
246
  private boolean enableKeepAlive;
247
  private long keepAliveTimeNanos;
248
  private long keepAliveTimeoutNanos;
249
  private boolean keepAliveWithoutCalls;
250
  private final Runnable tooManyPingsRunnable;
251
  private final int maxInboundMetadataSize;
252
  private final boolean useGetForSafeMethods;
253
  @GuardedBy("lock")
254
  private final TransportTracer transportTracer;
255
  private final TrustManager x509TrustManager;
256

257
  @SuppressWarnings("serial")
258
  private static class LruCache<T> extends LinkedHashMap<String, T> {
259
    @Override
260
    protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
261
      return size() > 100;
1✔
262
    }
263
  }
264

265
  @GuardedBy("lock")
1✔
266
  private final Map<String, Status> authorityVerificationResults = new LruCache<>();
267

268
  @GuardedBy("lock")
1✔
269
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
270
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
271
        @Override
272
        protected void handleInUse() {
273
          listener.transportInUse(true);
1✔
274
        }
1✔
275

276
        @Override
277
        protected void handleNotInUse() {
278
          listener.transportInUse(false);
1✔
279
        }
1✔
280
      };
281
  @GuardedBy("lock")
282
  private InternalChannelz.Security securityInfo;
283

284
  @VisibleForTesting
285
  @Nullable
286
  final HttpConnectProxiedSocketAddress proxiedAddr;
287

288
  @VisibleForTesting
1✔
289
  int proxySocketTimeout = 30000;
290

291
  // The following fields should only be used for test.
292
  Runnable connectingCallback;
293
  SettableFuture<Void> connectedFuture;
294

295
  public OkHttpClientTransport(
296
          OkHttpTransportFactory transportFactory,
297
          InetSocketAddress address,
298
          String authority,
299
          @Nullable String userAgent,
300
          Attributes eagAttrs,
301
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
302
          Runnable tooManyPingsRunnable,
303
          ChannelCredentials channelCredentials) {
304
    this(
1✔
305
        transportFactory,
306
        address,
307
        authority,
308
        userAgent,
309
        eagAttrs,
310
        GrpcUtil.STOPWATCH_SUPPLIER,
311
        new Http2(),
312
        proxiedAddr,
313
        tooManyPingsRunnable,
314
        channelCredentials);
315
  }
1✔
316

317
  private OkHttpClientTransport(
318
          OkHttpTransportFactory transportFactory,
319
          InetSocketAddress address,
320
          String authority,
321
          @Nullable String userAgent,
322
          Attributes eagAttrs,
323
          Supplier<Stopwatch> stopwatchFactory,
324
          Variant variant,
325
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
326
          Runnable tooManyPingsRunnable,
327
          ChannelCredentials channelCredentials) {
1✔
328
    this.address = Preconditions.checkNotNull(address, "address");
1✔
329
    this.defaultAuthority = authority;
1✔
330
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
331
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
332
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
333
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
334
    this.scheduler = Preconditions.checkNotNull(
1✔
335
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
336
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
337
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
338
    nextStreamId = 3;
1✔
339
    this.socketFactory = transportFactory.socketFactory == null
1✔
340
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
341
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
342
    this.hostnameVerifier = transportFactory.hostnameVerifier != null
1✔
343
        ? transportFactory.hostnameVerifier : OkHostnameVerifier.INSTANCE;
1✔
344
    this.connectionSpec = Preconditions.checkNotNull(
1✔
345
        transportFactory.connectionSpec, "connectionSpec");
346
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
347
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
348
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
349
    this.proxiedAddr = proxiedAddr;
1✔
350
    this.tooManyPingsRunnable =
1✔
351
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
352
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
353
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
354
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
355
    this.attributes = Attributes.newBuilder()
1✔
356
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
357
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
358
    initTransportTracer();
1✔
359
    TrustManager tempX509TrustManager;
360
    if (channelCredentials instanceof TlsChannelCredentials
1✔
361
        && x509ExtendedTrustManagerClass != null) {
362
      try {
363
        tempX509TrustManager = getTrustManager(
1✔
364
                (TlsChannelCredentials) channelCredentials);
365
      } catch (GeneralSecurityException e) {
×
366
        tempX509TrustManager = null;
×
367
        log.log(Level.WARNING, "Obtaining X509ExtendedTrustManager for the transport failed."
×
368
            + "Per-rpc authority overrides will be disallowed.", e);
369
      }
1✔
370
    } else {
371
      tempX509TrustManager = null;
1✔
372
    }
373
    x509TrustManager = tempX509TrustManager;
1✔
374
  }
1✔
375

376
  /**
377
   * Create a transport connected to a fake peer for test.
378
   */
379
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
380
  @VisibleForTesting
381
  OkHttpClientTransport(
382
      OkHttpTransportFactory transportFactory,
383
      String userAgent,
384
      Supplier<Stopwatch> stopwatchFactory,
385
      Variant variant,
386
      @Nullable Runnable connectingCallback,
387
      SettableFuture<Void> connectedFuture,
388
      Runnable tooManyPingsRunnable) {
389
    this(
1✔
390
        transportFactory,
391
        new InetSocketAddress("127.0.0.1", 80),
392
        "notarealauthority:80",
393
        userAgent,
394
        Attributes.EMPTY,
395
        stopwatchFactory,
396
        variant,
397
        null,
398
        tooManyPingsRunnable,
399
        null);
400
    this.connectingCallback = connectingCallback;
1✔
401
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
402
  }
1✔
403

404
  // sslSocketFactory is set to null when use plaintext.
405
  boolean isUsingPlaintext() {
406
    return sslSocketFactory == null;
1✔
407
  }
408

409
  private void initTransportTracer() {
410
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
411
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
412
        @Override
413
        public TransportTracer.FlowControlWindows read() {
414
          synchronized (lock) {
1✔
415
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
416
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
417
            // provide a lower bound.
418
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
419
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
420
          }
421
        }
422
      });
423
    }
1✔
424
  }
1✔
425

426
  /**
427
   * Enable keepalive with custom delay and timeout.
428
   */
429
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
430
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
431
    enableKeepAlive = enable;
×
432
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
433
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
434
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
435
  }
×
436

437
  @Override
438
  public void ping(final PingCallback callback, Executor executor) {
439
    long data = 0;
1✔
440
    Http2Ping p;
441
    boolean writePing;
442
    synchronized (lock) {
1✔
443
      checkState(frameWriter != null);
1✔
444
      if (stopped) {
1✔
445
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
446
        return;
1✔
447
      }
448
      if (ping != null) {
1✔
449
        // we only allow one outstanding ping at a time, so just add the callback to
450
        // any outstanding operation
451
        p = ping;
1✔
452
        writePing = false;
1✔
453
      } else {
454
        // set outstanding operation and then write the ping after releasing lock
455
        data = random.nextLong();
1✔
456
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
457
        stopwatch.start();
1✔
458
        p = ping = new Http2Ping(data, stopwatch);
1✔
459
        writePing = true;
1✔
460
        transportTracer.reportKeepAliveSent();
1✔
461
      }
462
      if (writePing) {
1✔
463
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
464
      }
465
    }
1✔
466
    // If transport concurrently failed/stopped since we released the lock above, this could
467
    // immediately invoke callback (which we shouldn't do while holding a lock)
468
    p.addCallback(callback, executor);
1✔
469
  }
1✔
470

471
  @Override
472
  public OkHttpClientStream newStream(
473
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
474
      ClientStreamTracer[] tracers) {
475
    Preconditions.checkNotNull(method, "method");
1✔
476
    Preconditions.checkNotNull(headers, "headers");
1✔
477
    StatsTraceContext statsTraceContext =
1✔
478
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
479

480
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
481
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
482
      return new OkHttpClientStream(
1✔
483
          method,
484
          headers,
485
          frameWriter,
486
          OkHttpClientTransport.this,
487
          outboundFlow,
488
          lock,
489
          maxMessageSize,
490
          initialWindowSize,
491
          defaultAuthority,
492
          userAgent,
493
          statsTraceContext,
494
          transportTracer,
495
          callOptions,
496
          useGetForSafeMethods);
497
    }
498
  }
499

500
  private TrustManager getTrustManager(TlsChannelCredentials tlsCreds)
501
      throws GeneralSecurityException {
502
    TrustManager[] tm;
503
    // Using the same way of creating TrustManager from OkHttpChannelBuilder.sslSocketFactoryFrom()
504
    if (tlsCreds.getTrustManagers() != null) {
1✔
505
      tm = tlsCreds.getTrustManagers().toArray(new TrustManager[0]);
1✔
506
    } else if (tlsCreds.getRootCertificates() != null) {
1✔
507
      tm = CertificateUtils.createTrustManager(tlsCreds.getRootCertificates());
1✔
508
    } else { // else use system default
509
      TrustManagerFactory tmf = TrustManagerFactory.getInstance(
1✔
510
          TrustManagerFactory.getDefaultAlgorithm());
1✔
511
      tmf.init((KeyStore) null);
1✔
512
      tm = tmf.getTrustManagers();
1✔
513
    }
514
    for (TrustManager trustManager: tm) {
1✔
515
      if (trustManager instanceof X509TrustManager) {
1✔
516
        return trustManager;
1✔
517
      }
518
    }
519
    return null;
×
520
  }
521

522
  @GuardedBy("lock")
523
  void streamReadyToStart(OkHttpClientStream clientStream, String authority) {
524
    if (goAwayStatus != null) {
1✔
525
      clientStream.transportState().transportReportStatus(
1✔
526
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
527
    } else {
528
      if (socket instanceof SSLSocket && !authority.equals(defaultAuthority)) {
1✔
529
        Status authorityVerificationResult;
530
        if (authorityVerificationResults.containsKey(authority)) {
1✔
531
          authorityVerificationResult = authorityVerificationResults.get(authority);
×
532
        } else {
533
          authorityVerificationResult = verifyAuthority(authority);
1✔
534
          authorityVerificationResults.put(authority, authorityVerificationResult);
1✔
535
        }
536
        if (!authorityVerificationResult.isOk()) {
1✔
537
          if (enablePerRpcAuthorityCheck) {
1✔
538
            clientStream.transportState().transportReportStatus(
1✔
539
                    authorityVerificationResult, RpcProgress.PROCESSED, true, new Metadata());
540
            return;
1✔
541
          }
542
        }
543
      }
544
      if (streams.size() >= maxConcurrentStreams) {
1✔
545
        pendingStreams.add(clientStream);
1✔
546
        setInUse(clientStream);
1✔
547
      } else {
548
        startStream(clientStream);
1✔
549
      }
550
    }
551
  }
1✔
552

553
  private Status verifyAuthority(String authority) {
554
    Status authorityVerificationResult;
555
    if (hostnameVerifier.verify(authority, ((SSLSocket) socket).getSession())) {
1✔
556
      authorityVerificationResult = Status.OK;
1✔
557
    } else {
558
      authorityVerificationResult = Status.UNAVAILABLE.withDescription(String.format(
1✔
559
              "HostNameVerifier verification failed for authority '%s'",
560
              authority));
561
    }
562
    if (!authorityVerificationResult.isOk() && !enablePerRpcAuthorityCheck) {
1✔
563
      log.log(Level.WARNING, String.format("HostNameVerifier verification failed for "
1✔
564
                      + "authority '%s'. This will be an error in the future.",
565
              authority));
566
    }
567
    if (authorityVerificationResult.isOk()) {
1✔
568
      // The status is trivially assigned in this case, but we are still making use of the
569
      // cache to keep track that a warning log had been logged for the authority when
570
      // enablePerRpcAuthorityCheck is false. When we permanently enable the feature, the
571
      // status won't need to be cached for case when x509TrustManager is null.
572
      if (x509TrustManager == null) {
1✔
573
        authorityVerificationResult = Status.UNAVAILABLE.withDescription(
1✔
574
                String.format("Could not verify authority '%s' for the rpc with no "
1✔
575
                                + "X509TrustManager available",
576
                        authority));
577
      } else if (x509ExtendedTrustManagerClass.isInstance(x509TrustManager)) {
1✔
578
        try {
579
          Certificate[] peerCertificates = sslSession.getPeerCertificates();
1✔
580
          X509Certificate[] x509PeerCertificates =
1✔
581
                  new X509Certificate[peerCertificates.length];
582
          for (int i = 0; i < peerCertificates.length; i++) {
1✔
583
            x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
1✔
584
          }
585
          checkServerTrustedMethod.invoke(x509TrustManager, x509PeerCertificates,
1✔
586
                  "RSA", new SslSocketWrapper((SSLSocket) socket, authority));
587
          authorityVerificationResult = Status.OK;
1✔
588
        } catch (SSLPeerUnverifiedException | InvocationTargetException
1✔
589
                 | IllegalAccessException e) {
590
          authorityVerificationResult = Status.UNAVAILABLE.withCause(e).withDescription(
1✔
591
                  "Peer verification failed");
592
        }
1✔
593
        if (authorityVerificationResult.getCause() != null) {
1✔
594
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
595
                          + ". This will be an error in the future.",
596
                  authorityVerificationResult.getCause());
1✔
597
        } else {
598
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
599
                  + ". This will be an error in the future.");
600
        }
601
      }
602
    }
603
    return authorityVerificationResult;
1✔
604
  }
605

606
  @SuppressWarnings("GuardedBy")
607
  @GuardedBy("lock")
608
  private void startStream(OkHttpClientStream stream) {
609
    checkState(
1✔
610
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
611
    streams.put(nextStreamId, stream);
1✔
612
    setInUse(stream);
1✔
613
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
614
    // found: 'this.lock'
615
    stream.transportState().start(nextStreamId);
1✔
616
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
617
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
618
        || stream.useGet()) {
1✔
619
      frameWriter.flush();
1✔
620
    }
621
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
622
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
623
      // correctly.
624
      nextStreamId = Integer.MAX_VALUE;
1✔
625
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
626
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
627
    } else {
628
      nextStreamId += 2;
1✔
629
    }
630
  }
1✔
631

632
  /**
633
   * Starts pending streams, returns true if at least one pending stream is started.
634
   */
635
  @GuardedBy("lock")
636
  private boolean startPendingStreams() {
637
    boolean hasStreamStarted = false;
1✔
638
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
639
      OkHttpClientStream stream = pendingStreams.poll();
1✔
640
      startStream(stream);
1✔
641
      hasStreamStarted = true;
1✔
642
    }
1✔
643
    return hasStreamStarted;
1✔
644
  }
645

646
  /**
647
   * Removes given pending stream, used when a pending stream is cancelled.
648
   */
649
  @GuardedBy("lock")
650
  void removePendingStream(OkHttpClientStream pendingStream) {
651
    pendingStreams.remove(pendingStream);
1✔
652
    maybeClearInUse(pendingStream);
1✔
653
  }
1✔
654

655
  @Override
656
  public Runnable start(Listener listener) {
657
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
658

659
    if (enableKeepAlive) {
1✔
660
      keepAliveManager = new KeepAliveManager(
×
661
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
662
          keepAliveWithoutCalls);
663
      keepAliveManager.onTransportStarted();
×
664
    }
665

666
    int maxQueuedControlFrames = 10000;
1✔
667
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
668
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
669
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
670

671
    synchronized (lock) {
1✔
672
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
673
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
674
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
675
      // the same TransportExceptionHandler instance so it is all mixed back together.
676
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
677
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
678
    }
1✔
679
    final CountDownLatch latch = new CountDownLatch(1);
1✔
680
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
681
    // The transport needs up to two threads to function once started,
682
    // but only needs one during handshaking. Start another thread during handshaking
683
    // to make sure there's still a free thread available. If the number of threads is exhausted,
684
    // it is better to kill the transport than for all the transports to hang unable to send.
685
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
686
    // Connecting in the serializingExecutor, so that some stream operations like synStream
687
    // will be executed after connected.
688

689
    serializingExecutor.execute(new Runnable() {
1✔
690
      @Override
691
      public void run() {
692
        // Use closed source on failure so that the reader immediately shuts down.
693
        BufferedSource source = Okio.buffer(new Source() {
1✔
694
          @Override
695
          public long read(Buffer sink, long byteCount) {
696
            return -1;
1✔
697
          }
698

699
          @Override
700
          public Timeout timeout() {
701
            return Timeout.NONE;
×
702
          }
703

704
          @Override
705
          public void close() {
706
          }
1✔
707
        });
708
        try {
709
          // This is a hack to make sure the connection preface and initial settings to be sent out
710
          // without blocking the start. By doing this essentially prevents potential deadlock when
711
          // network is not available during startup while another thread holding lock to send the
712
          // initial preface.
713
          try {
714
            latch.await();
1✔
715
            barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
716
          } catch (InterruptedException e) {
×
717
            Thread.currentThread().interrupt();
×
718
          } catch (TimeoutException | BrokenBarrierException e) {
1✔
719
            startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
720
                .withDescription("Timed out waiting for second handshake thread. "
1✔
721
                    + "The transport executor pool may have run out of threads"));
722
            return;
1✔
723
          }
1✔
724

725
          if (proxiedAddr == null) {
1✔
726
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
727
          } else {
728
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
729
              sock = createHttpProxySocket(
1✔
730
                  proxiedAddr.getTargetAddress(),
1✔
731
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
732
                  proxiedAddr.getUsername(),
1✔
733
                  proxiedAddr.getPassword()
1✔
734
              );
735
            } else {
736
              throw Status.INTERNAL.withDescription(
×
737
                  "Unsupported SocketAddress implementation "
738
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
739
            }
740
          }
741
          if (sslSocketFactory != null) {
1✔
742
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
743
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
744
                connectionSpec);
1✔
745
            sslSession = sslSocket.getSession();
1✔
746
            sock = sslSocket;
1✔
747
          }
748
          sock.setTcpNoDelay(true);
1✔
749
          source = Okio.buffer(Okio.source(sock));
1✔
750
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
751

752
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
753
          attributes = attributes.toBuilder()
1✔
754
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
755
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
756
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
757
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
758
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
759
              .build();
1✔
760
        } catch (StatusException e) {
1✔
761
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
762
          return;
1✔
763
        } catch (Exception e) {
1✔
764
          onException(e);
1✔
765
          return;
1✔
766
        } finally {
767
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
768
          latchForExtraThread.countDown();
1✔
769
        }
770
        synchronized (lock) {
1✔
771
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
772
          if (sslSession != null) {
1✔
773
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
774
          }
775
        }
1✔
776
      }
1✔
777
    });
778

779
    executor.execute(new Runnable() {
1✔
780
      @Override
781
      public void run() {
782
        try {
783
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
784
          latchForExtraThread.await();
1✔
785
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
786
          // Something bad happened, maybe too few threads available!
787
          // This will be handled in the handshake thread.
788
        } catch (InterruptedException e) {
×
789
          Thread.currentThread().interrupt();
×
790
        }
1✔
791
      }
1✔
792
    });
793
    // Schedule to send connection preface & settings before any other write.
794
    try {
795
      sendConnectionPrefaceAndSettings();
1✔
796
    } finally {
797
      latch.countDown();
1✔
798
    }
799

800
    serializingExecutor.execute(new Runnable() {
1✔
801
      @Override
802
      public void run() {
803
        if (connectingCallback != null) {
1✔
804
          connectingCallback.run();
1✔
805
        }
806
        synchronized (lock) {
1✔
807
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
808
          checkState(pendingStreams.isEmpty(),
1✔
809
              "Pending streams detected during transport start."
810
                  + " RPCs should not be started before transport is ready.");
811
        }
1✔
812
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
813
        // may send goAway immediately.
814
        executor.execute(clientFrameHandler);
1✔
815
        if (connectedFuture != null) {
1✔
816
          connectedFuture.set(null);
1✔
817
        }
818
      }
1✔
819
    });
820
    return null;
1✔
821
  }
822

823
  /**
824
   * Should only be called once when the transport is first established.
825
   */
826
  private void sendConnectionPrefaceAndSettings() {
827
    synchronized (lock) {
1✔
828
      frameWriter.connectionPreface();
1✔
829
      Settings settings = new Settings();
1✔
830
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
831
      frameWriter.settings(settings);
1✔
832
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
833
        frameWriter.windowUpdate(
1✔
834
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
835
      }
836
    }
1✔
837
  }
1✔
838

839
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
840
      String proxyUsername, String proxyPassword) throws StatusException {
841
    Socket sock = null;
1✔
842
    try {
843
      // The proxy address may not be resolved
844
      if (proxyAddress.getAddress() != null) {
1✔
845
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
846
      } else {
847
        sock =
×
848
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
849
      }
850
      sock.setTcpNoDelay(true);
1✔
851
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
852
      // can cause reading from the socket to hang.
853
      sock.setSoTimeout(proxySocketTimeout);
1✔
854

855
      Source source = Okio.source(sock);
1✔
856
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
857

858
      // Prepare headers and request method line
859
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
860
      HttpUrl url = proxyRequest.httpUrl();
1✔
861
      String requestLine =
1✔
862
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
863

864
      // Write request to socket
865
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
866
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
867
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
868
            .writeUtf8(": ")
1✔
869
            .writeUtf8(proxyRequest.headers().value(i))
1✔
870
            .writeUtf8("\r\n");
1✔
871
      }
872
      sink.writeUtf8("\r\n");
1✔
873
      // Flush buffer (flushes socket and sends request)
874
      sink.flush();
1✔
875

876
      // Read status line, check if 2xx was returned
877
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
878
      // Drain rest of headers
879
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
880
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
881
        Buffer body = new Buffer();
1✔
882
        try {
883
          sock.shutdownOutput();
1✔
884
          source.read(body, 1024);
1✔
885
        } catch (IOException ex) {
×
886
          body.writeUtf8("Unable to read body: " + ex.toString());
×
887
        }
1✔
888
        try {
889
          sock.close();
1✔
890
        } catch (IOException ignored) {
×
891
          // ignored
892
        }
1✔
893
        String message = String.format(
1✔
894
            Locale.US,
895
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
896
              + "Response body:\n%s",
897
            statusLine.code, statusLine.message, body.readUtf8());
1✔
898
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
899
      }
900
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
901
      sock.setSoTimeout(0);
1✔
902
      return sock;
1✔
903
    } catch (IOException e) {
1✔
904
      if (sock != null) {
1✔
905
        GrpcUtil.closeQuietly(sock);
1✔
906
      }
907
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
908
          .asException();
1✔
909
    }
910
  }
911

912
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
913
                                         String proxyPassword) {
914
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
915
        .scheme("https")
1✔
916
        .host(address.getHostName())
1✔
917
        .port(address.getPort())
1✔
918
        .build();
1✔
919

920
    Request.Builder request = new Request.Builder()
1✔
921
        .url(tunnelUrl)
1✔
922
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
923
        .header("User-Agent", userAgent);
1✔
924

925
    // If we have proxy credentials, set them right away
926
    if (proxyUsername != null && proxyPassword != null) {
1✔
927
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
928
    }
929
    return request.build();
1✔
930
  }
931

932
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
933
    Buffer buffer = new Buffer();
1✔
934
    while (true) {
935
      if (source.read(buffer, 1) == -1) {
1✔
936
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
937
      }
938
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
939
        return buffer.readUtf8LineStrict();
1✔
940
      }
941
    }
942
  }
943

944
  @Override
945
  public String toString() {
946
    return MoreObjects.toStringHelper(this)
1✔
947
        .add("logId", logId.getId())
1✔
948
        .add("address", address)
1✔
949
        .toString();
1✔
950
  }
951

952
  @Override
953
  public InternalLogId getLogId() {
954
    return logId;
1✔
955
  }
956

957
  /**
958
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
959
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
960
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
961
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
962
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
963
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
964
   * connection to the port, independent of this function.
965
   *
966
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
967
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
968
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
969
   */
970
  @VisibleForTesting
971
  String getOverridenHost() {
972
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
973
    if (uri.getHost() != null) {
1✔
974
      return uri.getHost();
1✔
975
    }
976

977
    return defaultAuthority;
1✔
978
  }
979

980
  @VisibleForTesting
981
  int getOverridenPort() {
982
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
983
    if (uri.getPort() != -1) {
1✔
984
      return uri.getPort();
×
985
    }
986

987
    return address.getPort();
1✔
988
  }
989

990
  @Override
991
  public void shutdown(Status reason) {
992
    synchronized (lock) {
1✔
993
      if (goAwayStatus != null) {
1✔
994
        return;
1✔
995
      }
996

997
      goAwayStatus = reason;
1✔
998
      listener.transportShutdown(goAwayStatus, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
999
      stopIfNecessary();
1✔
1000
    }
1✔
1001
  }
1✔
1002

1003
  @Override
1004
  public void shutdownNow(Status reason) {
1005
    shutdownNow(reason, SimpleDisconnectError.SUBCHANNEL_SHUTDOWN);
1✔
1006
  }
1✔
1007

1008
  @Override
1009
  public void shutdownNow(Status reason, DisconnectError disconnectError) {
1010
    shutdown(reason);
1✔
1011
    synchronized (lock) {
1✔
1012
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1013
      while (it.hasNext()) {
1✔
1014
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1015
        it.remove();
1✔
1016
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
1017
        maybeClearInUse(entry.getValue());
1✔
1018
      }
1✔
1019

1020
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1021
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
1022
        // chance to retry and be routed to another connection.
1023
        stream.transportState().transportReportStatus(
1✔
1024
            reason, RpcProgress.MISCARRIED, true, new Metadata());
1025
        maybeClearInUse(stream);
1✔
1026
      }
1✔
1027
      pendingStreams.clear();
1✔
1028

1029
      stopIfNecessary();
1✔
1030
    }
1✔
1031
  }
1✔
1032

1033
  @Override
1034
  public Attributes getAttributes() {
1035
    return attributes;
1✔
1036
  }
1037

1038
  /**
1039
   * Gets all active streams as an array.
1040
   */
1041
  @Override
1042
  public OutboundFlowController.StreamState[] getActiveStreams() {
1043
    synchronized (lock) {
1✔
1044
      OutboundFlowController.StreamState[] flowStreams =
1✔
1045
          new OutboundFlowController.StreamState[streams.size()];
1✔
1046
      int i = 0;
1✔
1047
      for (OkHttpClientStream stream : streams.values()) {
1✔
1048
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
1049
      }
1✔
1050
      return flowStreams;
1✔
1051
    }
1052
  }
1053

1054
  @VisibleForTesting
1055
  ClientFrameHandler getHandler() {
1056
    return clientFrameHandler;
1✔
1057
  }
1058

1059
  @VisibleForTesting
1060
  SocketFactory getSocketFactory() {
1061
    return socketFactory;
1✔
1062
  }
1063

1064
  @VisibleForTesting
1065
  int getPendingStreamSize() {
1066
    synchronized (lock) {
1✔
1067
      return pendingStreams.size();
1✔
1068
    }
1069
  }
1070

1071
  @VisibleForTesting
1072
  void setNextStreamId(int nextStreamId) {
1073
    synchronized (lock) {
1✔
1074
      this.nextStreamId = nextStreamId;
1✔
1075
    }
1✔
1076
  }
1✔
1077

1078
  /**
1079
   * Finish all active streams due to an IOException, then close the transport.
1080
   */
1081
  @Override
1082
  public void onException(Throwable failureCause) {
1083
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
1084
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
1085
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1086
  }
1✔
1087

1088
  /**
1089
   * Send GOAWAY to the server, then finish all active streams and close the transport.
1090
   */
1091
  private void onError(ErrorCode errorCode, String moreDetail) {
1092
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
1093
  }
1✔
1094

1095
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
1096
    synchronized (lock) {
1✔
1097
      if (goAwayStatus == null) {
1✔
1098
        goAwayStatus = status;
1✔
1099
        GrpcUtil.Http2Error http2Error;
1100
        if (errorCode == null) {
1✔
1101
          http2Error = GrpcUtil.Http2Error.NO_ERROR;
1✔
1102
        } else {
1103
          http2Error = GrpcUtil.Http2Error.forCode(errorCode.httpCode);
1✔
1104
        }
1105
        listener.transportShutdown(status, new GoAwayDisconnectError(http2Error));
1✔
1106
      }
1107
      if (errorCode != null && !goAwaySent) {
1✔
1108
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1109
        // streams. The GOAWAY is part of graceful shutdown.
1110
        goAwaySent = true;
1✔
1111
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
1112
      }
1113

1114
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1115
      while (it.hasNext()) {
1✔
1116
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1117
        if (entry.getKey() > lastKnownStreamId) {
1✔
1118
          it.remove();
1✔
1119
          entry.getValue().transportState().transportReportStatus(
1✔
1120
              status, RpcProgress.REFUSED, false, new Metadata());
1121
          maybeClearInUse(entry.getValue());
1✔
1122
        }
1123
      }
1✔
1124

1125
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1126
        stream.transportState().transportReportStatus(
1✔
1127
            status, RpcProgress.MISCARRIED, true, new Metadata());
1128
        maybeClearInUse(stream);
1✔
1129
      }
1✔
1130
      pendingStreams.clear();
1✔
1131

1132
      stopIfNecessary();
1✔
1133
    }
1✔
1134
  }
1✔
1135

1136
  /**
1137
   * Called when a stream is closed. We do things like:
1138
   * <ul>
1139
   * <li>Removing the stream from the map.
1140
   * <li>Optionally reporting the status.
1141
   * <li>Starting pending streams if we can.
1142
   * <li>Stopping the transport if this is the last live stream under a go-away status.
1143
   * </ul>
1144
   *
1145
   * @param streamId the Id of the stream.
1146
   * @param status the final status of this stream, null means no need to report.
1147
   * @param stopDelivery interrupt queued messages in the deframer
1148
   * @param errorCode reset the stream with this ErrorCode if not null.
1149
   * @param trailers the trailers received if not null
1150
   */
1151
  void finishStream(
1152
      int streamId,
1153
      @Nullable Status status,
1154
      RpcProgress rpcProgress,
1155
      boolean stopDelivery,
1156
      @Nullable ErrorCode errorCode,
1157
      @Nullable Metadata trailers) {
1158
    synchronized (lock) {
1✔
1159
      OkHttpClientStream stream = streams.remove(streamId);
1✔
1160
      if (stream != null) {
1✔
1161
        if (errorCode != null) {
1✔
1162
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1163
        }
1164
        if (status != null) {
1✔
1165
          stream
1✔
1166
              .transportState()
1✔
1167
              .transportReportStatus(
1✔
1168
                  status,
1169
                  rpcProgress,
1170
                  stopDelivery,
1171
                  trailers != null ? trailers : new Metadata());
1✔
1172
        }
1173
        if (!startPendingStreams()) {
1✔
1174
          stopIfNecessary();
1✔
1175
        }
1176
        maybeClearInUse(stream);
1✔
1177
      }
1178
    }
1✔
1179
  }
1✔
1180

1181
  /**
1182
   * When the transport is in goAway state, we should stop it once all active streams finish.
1183
   */
1184
  @GuardedBy("lock")
1185
  private void stopIfNecessary() {
1186
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1187
      return;
1✔
1188
    }
1189
    if (stopped) {
1✔
1190
      return;
1✔
1191
    }
1192
    stopped = true;
1✔
1193

1194
    if (keepAliveManager != null) {
1✔
1195
      keepAliveManager.onTransportTermination();
×
1196
    }
1197

1198
    if (ping != null) {
1✔
1199
      ping.failed(getPingFailure());
1✔
1200
      ping = null;
1✔
1201
    }
1202

1203
    if (!goAwaySent) {
1✔
1204
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1205
      // streams. The GOAWAY is part of graceful shutdown.
1206
      goAwaySent = true;
1✔
1207
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1208
    }
1209

1210
    // We will close the underlying socket in the writing thread to break out the reader
1211
    // thread, which will close the frameReader and notify the listener.
1212
    frameWriter.close();
1✔
1213
  }
1✔
1214

1215
  @GuardedBy("lock")
1216
  private void maybeClearInUse(OkHttpClientStream stream) {
1217
    if (hasStream) {
1✔
1218
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1219
        hasStream = false;
1✔
1220
        if (keepAliveManager != null) {
1✔
1221
          // We don't have any active streams. No need to do keepalives any more.
1222
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1223
          // and onTransportActive.
1224
          keepAliveManager.onTransportIdle();
×
1225
        }
1226
      }
1227
    }
1228
    if (stream.shouldBeCountedForInUse()) {
1✔
1229
      inUseState.updateObjectInUse(stream, false);
1✔
1230
    }
1231
  }
1✔
1232

1233
  @GuardedBy("lock")
1234
  private void setInUse(OkHttpClientStream stream) {
1235
    if (!hasStream) {
1✔
1236
      hasStream = true;
1✔
1237
      if (keepAliveManager != null) {
1✔
1238
        // We have a new stream. We might need to do keepalives now.
1239
        // Note that we have to do this inside the lock to avoid calling
1240
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1241
        // order.
1242
        keepAliveManager.onTransportActive();
×
1243
      }
1244
    }
1245
    if (stream.shouldBeCountedForInUse()) {
1✔
1246
      inUseState.updateObjectInUse(stream, true);
1✔
1247
    }
1248
  }
1✔
1249

1250
  private Status getPingFailure() {
1251
    synchronized (lock) {
1✔
1252
      if (goAwayStatus != null) {
1✔
1253
        return goAwayStatus;
1✔
1254
      } else {
1255
        return Status.UNAVAILABLE.withDescription("Connection closed");
×
1256
      }
1257
    }
1258
  }
1259

1260
  boolean mayHaveCreatedStream(int streamId) {
1261
    synchronized (lock) {
1✔
1262
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1263
    }
1264
  }
1265

1266
  OkHttpClientStream getStream(int streamId) {
1267
    synchronized (lock) {
1✔
1268
      return streams.get(streamId);
1✔
1269
    }
1270
  }
1271

1272
  /**
1273
   * Returns a Grpc status corresponding to the given ErrorCode.
1274
   */
1275
  @VisibleForTesting
1276
  static Status toGrpcStatus(ErrorCode code) {
1277
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1278
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1279
        "Unknown http2 error code: " + code.httpCode);
1280
  }
1281

1282
  @Override
1283
  public ListenableFuture<SocketStats> getStats() {
1284
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1285
    synchronized (lock) {
1✔
1286
      if (socket == null) {
1✔
1287
        ret.set(new SocketStats(
×
1288
            transportTracer.getStats(),
×
1289
            /*local=*/ null,
1290
            /*remote=*/ null,
1291
            new InternalChannelz.SocketOptions.Builder().build(),
×
1292
            /*security=*/ null));
1293
      } else {
1294
        ret.set(new SocketStats(
1✔
1295
            transportTracer.getStats(),
1✔
1296
            socket.getLocalSocketAddress(),
1✔
1297
            socket.getRemoteSocketAddress(),
1✔
1298
            Utils.getSocketOptions(socket),
1✔
1299
            securityInfo));
1300
      }
1301
      return ret;
1✔
1302
    }
1303
  }
1304

1305
  /**
1306
   * Runnable which reads frames and dispatches them to in flight calls.
1307
   */
1308
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1309

1310
    private final OkHttpFrameLogger logger =
1✔
1311
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1312
    FrameReader frameReader;
1313
    boolean firstSettings = true;
1✔
1314

1315
    ClientFrameHandler(FrameReader frameReader) {
1✔
1316
      this.frameReader = frameReader;
1✔
1317
    }
1✔
1318

1319
    @Override
1320
    @SuppressWarnings("Finally")
1321
    public void run() {
1322
      String threadName = Thread.currentThread().getName();
1✔
1323
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1324
      try {
1325
        // Read until the underlying socket closes.
1326
        while (frameReader.nextFrame(this)) {
1✔
1327
          if (keepAliveManager != null) {
1✔
1328
            keepAliveManager.onDataReceived();
×
1329
          }
1330
        }
1331
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1332
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1333
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1334
        Status status;
1335
        synchronized (lock) {
1✔
1336
          status = goAwayStatus;
1✔
1337
        }
1✔
1338
        if (status == null) {
1✔
1339
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1340
        }
1341
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1342
      } catch (Throwable t) {
1✔
1343
        // TODO(madongfly): Send the exception message to the server.
1344
        startGoAway(
1✔
1345
            0,
1346
            ErrorCode.PROTOCOL_ERROR,
1347
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1348
      } finally {
1349
        try {
1350
          frameReader.close();
1✔
1351
        } catch (IOException ex) {
×
1352
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1353
        } catch (RuntimeException e) {
×
1354
          // This same check is done in okhttp proper:
1355
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1356

1357
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1358
          // https://issuetracker.google.com/issues/177450597
1359
          if (!"bio == null".equals(e.getMessage())) {
×
1360
            throw e;
×
1361
          }
1362
        }
1✔
1363
        listener.transportTerminated();
1✔
1364
        Thread.currentThread().setName(threadName);
1✔
1365
      }
1366
    }
1✔
1367

1368
    /**
1369
     * Handle an HTTP2 DATA frame.
1370
     */
1371
    @SuppressWarnings("GuardedBy")
1372
    @Override
1373
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1374
                     int paddedLength)
1375
        throws IOException {
1376
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1377
          streamId, in.getBuffer(), length, inFinished);
1✔
1378
      OkHttpClientStream stream = getStream(streamId);
1✔
1379
      if (stream == null) {
1✔
1380
        if (mayHaveCreatedStream(streamId)) {
1✔
1381
          synchronized (lock) {
1✔
1382
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1383
          }
1✔
1384
          in.skip(length);
1✔
1385
        } else {
1386
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1387
          return;
1✔
1388
        }
1389
      } else {
1390
        // Wait until the frame is complete.
1391
        in.require(length);
1✔
1392

1393
        Buffer buf = new Buffer();
1✔
1394
        buf.write(in.getBuffer(), length);
1✔
1395
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1396
            stream.transportState().tag());
1✔
1397
        synchronized (lock) {
1✔
1398
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1399
          // instead found: 'OkHttpClientTransport.this.lock'
1400
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1401
        }
1✔
1402
      }
1403

1404
      // connection window update
1405
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1406
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1407
        synchronized (lock) {
1✔
1408
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1409
        }
1✔
1410
        connectionUnacknowledgedBytesRead = 0;
1✔
1411
      }
1412
    }
1✔
1413

1414
    /**
1415
     * Handle HTTP2 HEADER and CONTINUATION frames.
1416
     */
1417
    @SuppressWarnings("GuardedBy")
1418
    @Override
1419
    public void headers(boolean outFinished,
1420
        boolean inFinished,
1421
        int streamId,
1422
        int associatedStreamId,
1423
        List<Header> headerBlock,
1424
        HeadersMode headersMode) {
1425
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1426
      boolean unknownStream = false;
1✔
1427
      Status failedStatus = null;
1✔
1428
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1429
        int metadataSize = headerBlockSize(headerBlock);
1✔
1430
        if (metadataSize > maxInboundMetadataSize) {
1✔
1431
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1432
              String.format(
1✔
1433
                  Locale.US,
1434
                  "Response %s metadata larger than %d: %d",
1435
                  inFinished ? "trailer" : "header",
1✔
1436
                  maxInboundMetadataSize,
1✔
1437
                  metadataSize));
1✔
1438
        }
1439
      }
1440
      synchronized (lock) {
1✔
1441
        OkHttpClientStream stream = streams.get(streamId);
1✔
1442
        if (stream == null) {
1✔
1443
          if (mayHaveCreatedStream(streamId)) {
1✔
1444
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1445
          } else {
1446
            unknownStream = true;
1✔
1447
          }
1448
        } else {
1449
          if (failedStatus == null) {
1✔
1450
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1451
                stream.transportState().tag());
1✔
1452
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1453
            // instead found: 'OkHttpClientTransport.this.lock'
1454
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1455
          } else {
1456
            if (!inFinished) {
1✔
1457
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1458
            }
1459
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1460
          }
1461
        }
1462
      }
1✔
1463
      if (unknownStream) {
1✔
1464
        // We don't expect any server-initiated streams.
1465
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1466
      }
1467
    }
1✔
1468

1469
    private int headerBlockSize(List<Header> headerBlock) {
1470
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1471
      long size = 0;
1✔
1472
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1473
        Header header = headerBlock.get(i);
1✔
1474
        size += 32 + header.name.size() + header.value.size();
1✔
1475
      }
1476
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1477
      return (int) size;
1✔
1478
    }
1479

1480
    @Override
1481
    public void rstStream(int streamId, ErrorCode errorCode) {
1482
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1483
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1484
      boolean stopDelivery =
1✔
1485
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1486
      synchronized (lock) {
1✔
1487
        OkHttpClientStream stream = streams.get(streamId);
1✔
1488
        if (stream != null) {
1✔
1489
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1490
              stream.transportState().tag());
1✔
1491
          finishStream(
1✔
1492
              streamId, status,
1493
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1494
              stopDelivery, null, null);
1495
        }
1496
      }
1✔
1497
    }
1✔
1498

1499
    @Override
1500
    public void settings(boolean clearPrevious, Settings settings) {
1501
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1502
      boolean outboundWindowSizeIncreased = false;
1✔
1503
      synchronized (lock) {
1✔
1504
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1505
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1506
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1507
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1508
        }
1509

1510
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1511
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1512
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1513
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1514
        }
1515
        if (firstSettings) {
1✔
1516
          attributes = listener.filterTransport(attributes);
1✔
1517
          listener.transportReady();
1✔
1518
          firstSettings = false;
1✔
1519
        }
1520

1521
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1522
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1523
        // otherwise it will cause a stream error (RST_STREAM).
1524
        frameWriter.ackSettings(settings);
1✔
1525

1526
        // send any pending bytes / streams
1527
        if (outboundWindowSizeIncreased) {
1✔
1528
          outboundFlow.writeStreams();
1✔
1529
        }
1530
        startPendingStreams();
1✔
1531
      }
1✔
1532
    }
1✔
1533

1534
    @Override
1535
    public void ping(boolean ack, int payload1, int payload2) {
1536
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1537
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1538
      if (!ack) {
1✔
1539
        synchronized (lock) {
1✔
1540
          frameWriter.ping(true, payload1, payload2);
1✔
1541
        }
1✔
1542
      } else {
1543
        Http2Ping p = null;
1✔
1544
        synchronized (lock) {
1✔
1545
          if (ping != null) {
1✔
1546
            if (ping.payload() == ackPayload) {
1✔
1547
              p = ping;
1✔
1548
              ping = null;
1✔
1549
            } else {
1550
              log.log(Level.WARNING, String.format(
1✔
1551
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1552
                  ping.payload(), ackPayload));
1✔
1553
            }
1554
          } else {
1555
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1556
          }
1557
        }
1✔
1558
        // don't complete it while holding lock since callbacks could run immediately
1559
        if (p != null) {
1✔
1560
          p.complete();
1✔
1561
        }
1562
      }
1563
    }
1✔
1564

1565
    @Override
1566
    public void ackSettings() {
1567
      // Do nothing currently.
1568
    }
1✔
1569

1570
    @Override
1571
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1572
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1573
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1574
        String data = debugData.utf8();
1✔
1575
        log.log(Level.WARNING, String.format(
1✔
1576
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1577
        if ("too_many_pings".equals(data)) {
1✔
1578
          tooManyPingsRunnable.run();
1✔
1579
        }
1580
      }
1581
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1582
          .augmentDescription("Received Goaway");
1✔
1583
      if (debugData.size() > 0) {
1✔
1584
        // If a debug message was provided, use it.
1585
        status = status.augmentDescription(debugData.utf8());
1✔
1586
      }
1587
      startGoAway(lastGoodStreamId, null, status);
1✔
1588
    }
1✔
1589

1590
    @Override
1591
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1592
        throws IOException {
1593
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1594
          streamId, promisedStreamId, requestHeaders);
1595
      // We don't accept server initiated stream.
1596
      synchronized (lock) {
1✔
1597
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1598
      }
1✔
1599
    }
1✔
1600

1601
    @Override
1602
    public void windowUpdate(int streamId, long delta) {
1603
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1604
      if (delta == 0) {
1✔
1605
        String errorMsg = "Received 0 flow control window increment.";
×
1606
        if (streamId == 0) {
×
1607
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1608
        } else {
1609
          finishStream(
×
1610
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1611
              ErrorCode.PROTOCOL_ERROR, null);
1612
        }
1613
        return;
×
1614
      }
1615

1616
      boolean unknownStream = false;
1✔
1617
      synchronized (lock) {
1✔
1618
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1619
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1620
          return;
1✔
1621
        }
1622

1623
        OkHttpClientStream stream = streams.get(streamId);
1✔
1624
        if (stream != null) {
1✔
1625
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1626
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1627
          unknownStream = true;
1✔
1628
        }
1629
      }
1✔
1630
      if (unknownStream) {
1✔
1631
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1632
            "Received window_update for unknown stream: " + streamId);
1633
      }
1634
    }
1✔
1635

1636
    @Override
1637
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1638
      // Ignore priority change.
1639
      // TODO(madongfly): log
1640
    }
×
1641

1642
    @Override
1643
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1644
        int port, long maxAge) {
1645
      // TODO(madongfly): Deal with alternateService propagation
1646
    }
×
1647
  }
1648

1649
  /**
1650
   * SSLSocket wrapper that provides a fake SSLSession for handshake session.
1651
   */
1652
  static final class SslSocketWrapper extends NoopSslSocket {
1653

1654
    private final SSLSession sslSession;
1655
    private final SSLSocket sslSocket;
1656

1657
    SslSocketWrapper(SSLSocket sslSocket, String peerHost) {
1✔
1658
      this.sslSocket = sslSocket;
1✔
1659
      this.sslSession = new FakeSslSession(peerHost);
1✔
1660
    }
1✔
1661

1662
    @Override
1663
    public SSLSession getHandshakeSession() {
1664
      return this.sslSession;
1✔
1665
    }
1666

1667
    @Override
1668
    public boolean isConnected() {
1669
      return sslSocket.isConnected();
1✔
1670
    }
1671

1672
    @Override
1673
    public SSLParameters getSSLParameters() {
1674
      return sslSocket.getSSLParameters();
1✔
1675
    }
1676
  }
1677

1678
  /**
1679
   * Fake SSLSession instance that provides the peer host name to verify for per-rpc check.
1680
   */
1681
  static class FakeSslSession extends NoopSslSession {
1682

1683
    private final String peerHost;
1684

1685
    FakeSslSession(String peerHost) {
1✔
1686
      this.peerHost = peerHost;
1✔
1687
    }
1✔
1688

1689
    @Override
1690
    public String getPeerHost() {
1691
      return peerHost;
1✔
1692
    }
1693
  }
1694
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc