• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19965

28 Aug 2025 10:58PM UTC coverage: 88.565% (+0.005%) from 88.56%
#19965

push

github

web-flow
binder: Replace queryIntentServices() hack with the new SystemApis.createContextAsUser()  (#12280)

createContextAsUser() wrapper makes the call to PackageManager's
resolveService() look the same in both the same-user and cross-user
cases. This is how the Android team recommends accessing XXXAsUser() APIs in
general.

We can also remove all the apologies for using reflection since
SystemApis already explains all that.

34691 of 39170 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.89
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import com.google.errorprone.annotations.concurrent.GuardedBy;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.ChannelCredentials;
34
import io.grpc.ClientStreamTracer;
35
import io.grpc.Grpc;
36
import io.grpc.HttpConnectProxiedSocketAddress;
37
import io.grpc.InternalChannelz;
38
import io.grpc.InternalChannelz.SocketStats;
39
import io.grpc.InternalLogId;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.MethodDescriptor.MethodType;
43
import io.grpc.SecurityLevel;
44
import io.grpc.Status;
45
import io.grpc.Status.Code;
46
import io.grpc.StatusException;
47
import io.grpc.TlsChannelCredentials;
48
import io.grpc.internal.CertificateUtils;
49
import io.grpc.internal.ClientStreamListener.RpcProgress;
50
import io.grpc.internal.ConnectionClientTransport;
51
import io.grpc.internal.GrpcAttributes;
52
import io.grpc.internal.GrpcUtil;
53
import io.grpc.internal.Http2Ping;
54
import io.grpc.internal.InUseStateAggregator;
55
import io.grpc.internal.KeepAliveManager;
56
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
57
import io.grpc.internal.NoopSslSession;
58
import io.grpc.internal.SerializingExecutor;
59
import io.grpc.internal.StatsTraceContext;
60
import io.grpc.internal.TransportTracer;
61
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
62
import io.grpc.okhttp.OkHttpChannelBuilder.OkHttpTransportFactory;
63
import io.grpc.okhttp.internal.ConnectionSpec;
64
import io.grpc.okhttp.internal.Credentials;
65
import io.grpc.okhttp.internal.OkHostnameVerifier;
66
import io.grpc.okhttp.internal.StatusLine;
67
import io.grpc.okhttp.internal.framed.ErrorCode;
68
import io.grpc.okhttp.internal.framed.FrameReader;
69
import io.grpc.okhttp.internal.framed.FrameWriter;
70
import io.grpc.okhttp.internal.framed.Header;
71
import io.grpc.okhttp.internal.framed.HeadersMode;
72
import io.grpc.okhttp.internal.framed.Http2;
73
import io.grpc.okhttp.internal.framed.Settings;
74
import io.grpc.okhttp.internal.framed.Variant;
75
import io.grpc.okhttp.internal.proxy.HttpUrl;
76
import io.grpc.okhttp.internal.proxy.Request;
77
import io.perfmark.PerfMark;
78
import java.io.EOFException;
79
import java.io.IOException;
80
import java.lang.reflect.InvocationTargetException;
81
import java.lang.reflect.Method;
82
import java.net.InetSocketAddress;
83
import java.net.Socket;
84
import java.net.URI;
85
import java.security.GeneralSecurityException;
86
import java.security.KeyStore;
87
import java.security.cert.Certificate;
88
import java.security.cert.X509Certificate;
89
import java.util.Collections;
90
import java.util.Deque;
91
import java.util.EnumMap;
92
import java.util.HashMap;
93
import java.util.Iterator;
94
import java.util.LinkedHashMap;
95
import java.util.LinkedList;
96
import java.util.List;
97
import java.util.Locale;
98
import java.util.Map;
99
import java.util.Random;
100
import java.util.concurrent.BrokenBarrierException;
101
import java.util.concurrent.CountDownLatch;
102
import java.util.concurrent.CyclicBarrier;
103
import java.util.concurrent.Executor;
104
import java.util.concurrent.ScheduledExecutorService;
105
import java.util.concurrent.TimeUnit;
106
import java.util.concurrent.TimeoutException;
107
import java.util.logging.Level;
108
import java.util.logging.Logger;
109
import javax.annotation.Nullable;
110
import javax.net.SocketFactory;
111
import javax.net.ssl.HostnameVerifier;
112
import javax.net.ssl.SSLParameters;
113
import javax.net.ssl.SSLPeerUnverifiedException;
114
import javax.net.ssl.SSLSession;
115
import javax.net.ssl.SSLSocket;
116
import javax.net.ssl.SSLSocketFactory;
117
import javax.net.ssl.TrustManager;
118
import javax.net.ssl.TrustManagerFactory;
119
import javax.net.ssl.X509TrustManager;
120
import okio.Buffer;
121
import okio.BufferedSink;
122
import okio.BufferedSource;
123
import okio.ByteString;
124
import okio.Okio;
125
import okio.Source;
126
import okio.Timeout;
127

128
/**
129
 * A okhttp-based {@link ConnectionClientTransport} implementation.
130
 */
131
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
132
      OutboundFlowController.Transport {
133
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
134
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
135
  private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK =
136
          "GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK";
137
  static boolean enablePerRpcAuthorityCheck =
1✔
138
          GrpcUtil.getFlag(GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK, false);
1✔
139
  private Socket sock;
140
  private SSLSession sslSession;
141

142
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
143
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
144
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
145
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
146
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
147
        Status.INTERNAL.withDescription("Protocol error"));
1✔
148
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
149
        Status.INTERNAL.withDescription("Internal error"));
1✔
150
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
151
        Status.INTERNAL.withDescription("Flow control error"));
1✔
152
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
153
        Status.INTERNAL.withDescription("Stream closed"));
1✔
154
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
155
        Status.INTERNAL.withDescription("Frame too large"));
1✔
156
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
157
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
158
    errorToStatus.put(ErrorCode.CANCEL,
1✔
159
        Status.CANCELLED.withDescription("Cancelled"));
1✔
160
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
161
        Status.INTERNAL.withDescription("Compression error"));
1✔
162
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
163
        Status.INTERNAL.withDescription("Connect error"));
1✔
164
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
165
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
166
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
167
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
168
    return Collections.unmodifiableMap(errorToStatus);
1✔
169
  }
170

171
  private static final Class<?> x509ExtendedTrustManagerClass;
172
  private static final Method checkServerTrustedMethod;
173

174
  static {
175
    Class<?> x509ExtendedTrustManagerClass1 = null;
1✔
176
    Method checkServerTrustedMethod1 = null;
1✔
177
    try {
178
      x509ExtendedTrustManagerClass1 = Class.forName("javax.net.ssl.X509ExtendedTrustManager");
1✔
179
      checkServerTrustedMethod1 = x509ExtendedTrustManagerClass1.getMethod("checkServerTrusted",
1✔
180
              X509Certificate[].class, String.class, Socket.class);
181
    } catch (ClassNotFoundException e) {
×
182
      // Per-rpc authority override via call options will be disallowed.
183
    } catch (NoSuchMethodException e) {
×
184
      // Should never happen since X509ExtendedTrustManager was introduced in Android API level 24
185
      // along with checkServerTrusted.
186
    }
1✔
187
    x509ExtendedTrustManagerClass = x509ExtendedTrustManagerClass1;
1✔
188
    checkServerTrustedMethod = checkServerTrustedMethod1;
1✔
189
  }
1✔
190

191
  private final InetSocketAddress address;
192
  private final String defaultAuthority;
193
  private final String userAgent;
194
  private final Random random = new Random();
1✔
195
  // Returns new unstarted stopwatches
196
  private final Supplier<Stopwatch> stopwatchFactory;
197
  private final int initialWindowSize;
198
  private final Variant variant;
199
  private Listener listener;
200
  @GuardedBy("lock")
201
  private ExceptionHandlingFrameWriter frameWriter;
202
  private OutboundFlowController outboundFlow;
203
  private final Object lock = new Object();
1✔
204
  private final InternalLogId logId;
205
  @GuardedBy("lock")
206
  private int nextStreamId;
207
  @GuardedBy("lock")
1✔
208
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
209
  private final Executor executor;
210
  // Wrap on executor, to guarantee some operations be executed serially.
211
  private final SerializingExecutor serializingExecutor;
212
  private final ScheduledExecutorService scheduler;
213
  private final int maxMessageSize;
214
  private int connectionUnacknowledgedBytesRead;
215
  private ClientFrameHandler clientFrameHandler;
216
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
217
  private Attributes attributes;
218
  /**
219
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
220
   * streams may continue.
221
   */
222
  @GuardedBy("lock")
223
  private Status goAwayStatus;
224
  @GuardedBy("lock")
225
  private boolean goAwaySent;
226
  @GuardedBy("lock")
227
  private Http2Ping ping;
228
  @GuardedBy("lock")
229
  private boolean stopped;
230
  @GuardedBy("lock")
231
  private boolean hasStream;
232
  private final SocketFactory socketFactory;
233
  private SSLSocketFactory sslSocketFactory;
234
  private HostnameVerifier hostnameVerifier;
235
  private Socket socket;
236
  @GuardedBy("lock")
1✔
237
  private int maxConcurrentStreams = 0;
238
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
239
  @GuardedBy("lock")
240
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
241
  private final ConnectionSpec connectionSpec;
242
  private KeepAliveManager keepAliveManager;
243
  private boolean enableKeepAlive;
244
  private long keepAliveTimeNanos;
245
  private long keepAliveTimeoutNanos;
246
  private boolean keepAliveWithoutCalls;
247
  private final Runnable tooManyPingsRunnable;
248
  private final int maxInboundMetadataSize;
249
  private final boolean useGetForSafeMethods;
250
  @GuardedBy("lock")
251
  private final TransportTracer transportTracer;
252
  private final TrustManager x509TrustManager;
253

254
  @SuppressWarnings("serial")
255
  private static class LruCache<T> extends LinkedHashMap<String, T> {
256
    @Override
257
    protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
258
      return size() > 100;
1✔
259
    }
260
  }
261

262
  @GuardedBy("lock")
1✔
263
  private final Map<String, Status> authorityVerificationResults = new LruCache<>();
264

265
  @GuardedBy("lock")
1✔
266
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
267
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
268
        @Override
269
        protected void handleInUse() {
270
          listener.transportInUse(true);
1✔
271
        }
1✔
272

273
        @Override
274
        protected void handleNotInUse() {
275
          listener.transportInUse(false);
1✔
276
        }
1✔
277
      };
278
  @GuardedBy("lock")
279
  private InternalChannelz.Security securityInfo;
280

281
  @VisibleForTesting
282
  @Nullable
283
  final HttpConnectProxiedSocketAddress proxiedAddr;
284

285
  @VisibleForTesting
1✔
286
  int proxySocketTimeout = 30000;
287

288
  // The following fields should only be used for test.
289
  Runnable connectingCallback;
290
  SettableFuture<Void> connectedFuture;
291

292
  public OkHttpClientTransport(
293
          OkHttpTransportFactory transportFactory,
294
          InetSocketAddress address,
295
          String authority,
296
          @Nullable String userAgent,
297
          Attributes eagAttrs,
298
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
299
          Runnable tooManyPingsRunnable,
300
          ChannelCredentials channelCredentials) {
301
    this(
1✔
302
        transportFactory,
303
        address,
304
        authority,
305
        userAgent,
306
        eagAttrs,
307
        GrpcUtil.STOPWATCH_SUPPLIER,
308
        new Http2(),
309
        proxiedAddr,
310
        tooManyPingsRunnable,
311
        channelCredentials);
312
  }
1✔
313

314
  private OkHttpClientTransport(
315
          OkHttpTransportFactory transportFactory,
316
          InetSocketAddress address,
317
          String authority,
318
          @Nullable String userAgent,
319
          Attributes eagAttrs,
320
          Supplier<Stopwatch> stopwatchFactory,
321
          Variant variant,
322
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
323
          Runnable tooManyPingsRunnable,
324
          ChannelCredentials channelCredentials) {
1✔
325
    this.address = Preconditions.checkNotNull(address, "address");
1✔
326
    this.defaultAuthority = authority;
1✔
327
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
328
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
329
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
330
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
331
    this.scheduler = Preconditions.checkNotNull(
1✔
332
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
333
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
334
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
335
    nextStreamId = 3;
1✔
336
    this.socketFactory = transportFactory.socketFactory == null
1✔
337
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
338
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
339
    this.hostnameVerifier = transportFactory.hostnameVerifier != null
1✔
340
      ? transportFactory.hostnameVerifier : OkHostnameVerifier.INSTANCE;
1✔
341
    this.connectionSpec = Preconditions.checkNotNull(
1✔
342
        transportFactory.connectionSpec, "connectionSpec");
343
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
344
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
345
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
346
    this.proxiedAddr = proxiedAddr;
1✔
347
    this.tooManyPingsRunnable =
1✔
348
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
349
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
350
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
351
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
352
    this.attributes = Attributes.newBuilder()
1✔
353
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
354
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
355
    initTransportTracer();
1✔
356
    TrustManager tempX509TrustManager;
357
    if (channelCredentials instanceof TlsChannelCredentials
1✔
358
        && x509ExtendedTrustManagerClass != null) {
359
      try {
360
        tempX509TrustManager = getTrustManager(
1✔
361
                (TlsChannelCredentials) channelCredentials);
362
      } catch (GeneralSecurityException e) {
×
363
        tempX509TrustManager = null;
×
364
        log.log(Level.WARNING, "Obtaining X509ExtendedTrustManager for the transport failed."
×
365
            + "Per-rpc authority overrides will be disallowed.", e);
366
      }
1✔
367
    } else {
368
      tempX509TrustManager = null;
1✔
369
    }
370
    x509TrustManager = tempX509TrustManager;
1✔
371
  }
1✔
372

373
  /**
374
   * Create a transport connected to a fake peer for test.
375
   */
376
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
377
  @VisibleForTesting
378
  OkHttpClientTransport(
379
      OkHttpTransportFactory transportFactory,
380
      String userAgent,
381
      Supplier<Stopwatch> stopwatchFactory,
382
      Variant variant,
383
      @Nullable Runnable connectingCallback,
384
      SettableFuture<Void> connectedFuture,
385
      Runnable tooManyPingsRunnable) {
386
    this(
1✔
387
        transportFactory,
388
        new InetSocketAddress("127.0.0.1", 80),
389
        "notarealauthority:80",
390
        userAgent,
391
        Attributes.EMPTY,
392
        stopwatchFactory,
393
        variant,
394
        null,
395
        tooManyPingsRunnable,
396
        null);
397
    this.connectingCallback = connectingCallback;
1✔
398
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
399
  }
1✔
400

401
  // sslSocketFactory is set to null when use plaintext.
402
  boolean isUsingPlaintext() {
403
    return sslSocketFactory == null;
1✔
404
  }
405

406
  private void initTransportTracer() {
407
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
408
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
409
        @Override
410
        public TransportTracer.FlowControlWindows read() {
411
          synchronized (lock) {
1✔
412
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
413
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
414
            // provide a lower bound.
415
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
416
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
417
          }
418
        }
419
      });
420
    }
1✔
421
  }
1✔
422

423
  /**
424
   * Enable keepalive with custom delay and timeout.
425
   */
426
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
427
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
428
    enableKeepAlive = enable;
×
429
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
430
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
431
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
432
  }
×
433

434
  @Override
435
  public void ping(final PingCallback callback, Executor executor) {
436
    long data = 0;
1✔
437
    Http2Ping p;
438
    boolean writePing;
439
    synchronized (lock) {
1✔
440
      checkState(frameWriter != null);
1✔
441
      if (stopped) {
1✔
442
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
443
        return;
1✔
444
      }
445
      if (ping != null) {
1✔
446
        // we only allow one outstanding ping at a time, so just add the callback to
447
        // any outstanding operation
448
        p = ping;
1✔
449
        writePing = false;
1✔
450
      } else {
451
        // set outstanding operation and then write the ping after releasing lock
452
        data = random.nextLong();
1✔
453
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
454
        stopwatch.start();
1✔
455
        p = ping = new Http2Ping(data, stopwatch);
1✔
456
        writePing = true;
1✔
457
        transportTracer.reportKeepAliveSent();
1✔
458
      }
459
      if (writePing) {
1✔
460
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
461
      }
462
    }
1✔
463
    // If transport concurrently failed/stopped since we released the lock above, this could
464
    // immediately invoke callback (which we shouldn't do while holding a lock)
465
    p.addCallback(callback, executor);
1✔
466
  }
1✔
467

468
  @Override
469
  public OkHttpClientStream newStream(
470
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
471
      ClientStreamTracer[] tracers) {
472
    Preconditions.checkNotNull(method, "method");
1✔
473
    Preconditions.checkNotNull(headers, "headers");
1✔
474
    StatsTraceContext statsTraceContext =
1✔
475
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
476

477
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
478
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
479
      return new OkHttpClientStream(
1✔
480
          method,
481
          headers,
482
          frameWriter,
483
          OkHttpClientTransport.this,
484
          outboundFlow,
485
          lock,
486
          maxMessageSize,
487
          initialWindowSize,
488
          defaultAuthority,
489
          userAgent,
490
          statsTraceContext,
491
          transportTracer,
492
          callOptions,
493
          useGetForSafeMethods);
494
    }
495
  }
496

497
  private TrustManager getTrustManager(TlsChannelCredentials tlsCreds)
498
      throws GeneralSecurityException {
499
    TrustManager[] tm;
500
    // Using the same way of creating TrustManager from OkHttpChannelBuilder.sslSocketFactoryFrom()
501
    if (tlsCreds.getTrustManagers() != null) {
1✔
502
      tm = tlsCreds.getTrustManagers().toArray(new TrustManager[0]);
1✔
503
    } else if (tlsCreds.getRootCertificates() != null) {
1✔
504
      tm = CertificateUtils.createTrustManager(tlsCreds.getRootCertificates());
1✔
505
    } else { // else use system default
506
      TrustManagerFactory tmf = TrustManagerFactory.getInstance(
1✔
507
          TrustManagerFactory.getDefaultAlgorithm());
1✔
508
      tmf.init((KeyStore) null);
1✔
509
      tm = tmf.getTrustManagers();
1✔
510
    }
511
    for (TrustManager trustManager: tm) {
1✔
512
      if (trustManager instanceof X509TrustManager) {
1✔
513
        return trustManager;
1✔
514
      }
515
    }
516
    return null;
×
517
  }
518

519
  @GuardedBy("lock")
520
  void streamReadyToStart(OkHttpClientStream clientStream, String authority) {
521
    if (goAwayStatus != null) {
1✔
522
      clientStream.transportState().transportReportStatus(
1✔
523
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
524
    } else {
525
      if (socket instanceof SSLSocket && !authority.equals(defaultAuthority)) {
1✔
526
        Status authorityVerificationResult;
527
        if (authorityVerificationResults.containsKey(authority)) {
1✔
528
          authorityVerificationResult = authorityVerificationResults.get(authority);
×
529
        } else {
530
          authorityVerificationResult = verifyAuthority(authority);
1✔
531
          authorityVerificationResults.put(authority, authorityVerificationResult);
1✔
532
        }
533
        if (!authorityVerificationResult.isOk()) {
1✔
534
          if (enablePerRpcAuthorityCheck) {
1✔
535
            clientStream.transportState().transportReportStatus(
1✔
536
                    authorityVerificationResult, RpcProgress.PROCESSED, true, new Metadata());
537
            return;
1✔
538
          }
539
        }
540
      }
541
      if (streams.size() >= maxConcurrentStreams) {
1✔
542
        pendingStreams.add(clientStream);
1✔
543
        setInUse(clientStream);
1✔
544
      } else {
545
        startStream(clientStream);
1✔
546
      }
547
    }
548
  }
1✔
549

550
  private Status verifyAuthority(String authority) {
551
    Status authorityVerificationResult;
552
    if (hostnameVerifier.verify(authority, ((SSLSocket) socket).getSession())) {
1✔
553
      authorityVerificationResult = Status.OK;
1✔
554
    } else {
555
      authorityVerificationResult = Status.UNAVAILABLE.withDescription(String.format(
1✔
556
              "HostNameVerifier verification failed for authority '%s'",
557
              authority));
558
    }
559
    if (!authorityVerificationResult.isOk() && !enablePerRpcAuthorityCheck) {
1✔
560
      log.log(Level.WARNING, String.format("HostNameVerifier verification failed for "
1✔
561
                      + "authority '%s'. This will be an error in the future.",
562
              authority));
563
    }
564
    if (authorityVerificationResult.isOk()) {
1✔
565
      // The status is trivially assigned in this case, but we are still making use of the
566
      // cache to keep track that a warning log had been logged for the authority when
567
      // enablePerRpcAuthorityCheck is false. When we permanently enable the feature, the
568
      // status won't need to be cached for case when x509TrustManager is null.
569
      if (x509TrustManager == null) {
1✔
570
        authorityVerificationResult = Status.UNAVAILABLE.withDescription(
1✔
571
                String.format("Could not verify authority '%s' for the rpc with no "
1✔
572
                                + "X509TrustManager available",
573
                        authority));
574
      } else if (x509ExtendedTrustManagerClass.isInstance(x509TrustManager)) {
1✔
575
        try {
576
          Certificate[] peerCertificates = sslSession.getPeerCertificates();
1✔
577
          X509Certificate[] x509PeerCertificates =
1✔
578
                  new X509Certificate[peerCertificates.length];
579
          for (int i = 0; i < peerCertificates.length; i++) {
1✔
580
            x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
1✔
581
          }
582
          checkServerTrustedMethod.invoke(x509TrustManager, x509PeerCertificates,
1✔
583
                  "RSA", new SslSocketWrapper((SSLSocket) socket, authority));
584
          authorityVerificationResult = Status.OK;
1✔
585
        } catch (SSLPeerUnverifiedException | InvocationTargetException
1✔
586
                 | IllegalAccessException e) {
587
          authorityVerificationResult = Status.UNAVAILABLE.withCause(e).withDescription(
1✔
588
                  "Peer verification failed");
589
        }
1✔
590
        if (authorityVerificationResult.getCause() != null) {
1✔
591
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
592
                          + ". This will be an error in the future.",
593
                  authorityVerificationResult.getCause());
1✔
594
        } else {
595
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
596
                  + ". This will be an error in the future.");
597
        }
598
      }
599
    }
600
    return authorityVerificationResult;
1✔
601
  }
602

603
  @SuppressWarnings("GuardedBy")
604
  @GuardedBy("lock")
605
  private void startStream(OkHttpClientStream stream) {
606
    checkState(
1✔
607
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
608
    streams.put(nextStreamId, stream);
1✔
609
    setInUse(stream);
1✔
610
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
611
    // found: 'this.lock'
612
    stream.transportState().start(nextStreamId);
1✔
613
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
614
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
615
        || stream.useGet()) {
1✔
616
      frameWriter.flush();
1✔
617
    }
618
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
619
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
620
      // correctly.
621
      nextStreamId = Integer.MAX_VALUE;
1✔
622
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
623
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
624
    } else {
625
      nextStreamId += 2;
1✔
626
    }
627
  }
1✔
628

629
  /**
630
   * Starts pending streams, returns true if at least one pending stream is started.
631
   */
632
  @GuardedBy("lock")
633
  private boolean startPendingStreams() {
634
    boolean hasStreamStarted = false;
1✔
635
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
636
      OkHttpClientStream stream = pendingStreams.poll();
1✔
637
      startStream(stream);
1✔
638
      hasStreamStarted = true;
1✔
639
    }
1✔
640
    return hasStreamStarted;
1✔
641
  }
642

643
  /**
644
   * Removes given pending stream, used when a pending stream is cancelled.
645
   */
646
  @GuardedBy("lock")
647
  void removePendingStream(OkHttpClientStream pendingStream) {
648
    pendingStreams.remove(pendingStream);
1✔
649
    maybeClearInUse(pendingStream);
1✔
650
  }
1✔
651

652
  @Override
653
  public Runnable start(Listener listener) {
654
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
655

656
    if (enableKeepAlive) {
1✔
657
      keepAliveManager = new KeepAliveManager(
×
658
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
659
          keepAliveWithoutCalls);
660
      keepAliveManager.onTransportStarted();
×
661
    }
662

663
    int maxQueuedControlFrames = 10000;
1✔
664
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
665
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
666
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
667

668
    synchronized (lock) {
1✔
669
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
670
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
671
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
672
      // the same TransportExceptionHandler instance so it is all mixed back together.
673
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
674
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
675
    }
1✔
676
    final CountDownLatch latch = new CountDownLatch(1);
1✔
677
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
678
    // The transport needs up to two threads to function once started,
679
    // but only needs one during handshaking. Start another thread during handshaking
680
    // to make sure there's still a free thread available. If the number of threads is exhausted,
681
    // it is better to kill the transport than for all the transports to hang unable to send.
682
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
683
    // Connecting in the serializingExecutor, so that some stream operations like synStream
684
    // will be executed after connected.
685

686
    serializingExecutor.execute(new Runnable() {
1✔
687
      @Override
688
      public void run() {
689
        // Use closed source on failure so that the reader immediately shuts down.
690
        BufferedSource source = Okio.buffer(new Source() {
1✔
691
          @Override
692
          public long read(Buffer sink, long byteCount) {
693
            return -1;
1✔
694
          }
695

696
          @Override
697
          public Timeout timeout() {
698
            return Timeout.NONE;
×
699
          }
700

701
          @Override
702
          public void close() {
703
          }
1✔
704
        });
705
        try {
706
          // This is a hack to make sure the connection preface and initial settings to be sent out
707
          // without blocking the start. By doing this essentially prevents potential deadlock when
708
          // network is not available during startup while another thread holding lock to send the
709
          // initial preface.
710
          try {
711
            latch.await();
1✔
712
            barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
713
          } catch (InterruptedException e) {
×
714
            Thread.currentThread().interrupt();
×
715
          } catch (TimeoutException | BrokenBarrierException e) {
1✔
716
            startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
717
                .withDescription("Timed out waiting for second handshake thread. "
1✔
718
                    + "The transport executor pool may have run out of threads"));
719
            return;
1✔
720
          }
1✔
721

722
          if (proxiedAddr == null) {
1✔
723
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
724
          } else {
725
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
726
              sock = createHttpProxySocket(
1✔
727
                  proxiedAddr.getTargetAddress(),
1✔
728
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
729
                  proxiedAddr.getUsername(),
1✔
730
                  proxiedAddr.getPassword()
1✔
731
              );
732
            } else {
733
              throw Status.INTERNAL.withDescription(
×
734
                  "Unsupported SocketAddress implementation "
735
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
736
            }
737
          }
738
          if (sslSocketFactory != null) {
1✔
739
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
740
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
741
                connectionSpec);
1✔
742
            sslSession = sslSocket.getSession();
1✔
743
            sock = sslSocket;
1✔
744
          }
745
          sock.setTcpNoDelay(true);
1✔
746
          source = Okio.buffer(Okio.source(sock));
1✔
747
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
748

749
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
750
          attributes = attributes.toBuilder()
1✔
751
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
752
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
753
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
754
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
755
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
756
              .build();
1✔
757
        } catch (StatusException e) {
1✔
758
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
759
          return;
1✔
760
        } catch (Exception e) {
1✔
761
          onException(e);
1✔
762
          return;
1✔
763
        } finally {
764
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
765
          latchForExtraThread.countDown();
1✔
766
        }
767
        synchronized (lock) {
1✔
768
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
769
          if (sslSession != null) {
1✔
770
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
771
          }
772
        }
1✔
773
      }
1✔
774
    });
775

776
    executor.execute(new Runnable() {
1✔
777
      @Override
778
      public void run() {
779
        try {
780
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
781
          latchForExtraThread.await();
1✔
782
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
783
          // Something bad happened, maybe too few threads available!
784
          // This will be handled in the handshake thread.
785
        } catch (InterruptedException e) {
×
786
          Thread.currentThread().interrupt();
×
787
        }
1✔
788
      }
1✔
789
    });
790
    // Schedule to send connection preface & settings before any other write.
791
    try {
792
      sendConnectionPrefaceAndSettings();
1✔
793
    } finally {
794
      latch.countDown();
1✔
795
    }
796

797
    serializingExecutor.execute(new Runnable() {
1✔
798
      @Override
799
      public void run() {
800
        if (connectingCallback != null) {
1✔
801
          connectingCallback.run();
1✔
802
        }
803
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
804
        // may send goAway immediately.
805
        executor.execute(clientFrameHandler);
1✔
806
        synchronized (lock) {
1✔
807
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
808
          startPendingStreams();
1✔
809
        }
1✔
810
        if (connectedFuture != null) {
1✔
811
          connectedFuture.set(null);
1✔
812
        }
813
      }
1✔
814
    });
815
    return null;
1✔
816
  }
817

818
  /**
819
   * Should only be called once when the transport is first established.
820
   */
821
  private void sendConnectionPrefaceAndSettings() {
822
    synchronized (lock) {
1✔
823
      frameWriter.connectionPreface();
1✔
824
      Settings settings = new Settings();
1✔
825
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
826
      frameWriter.settings(settings);
1✔
827
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
828
        frameWriter.windowUpdate(
1✔
829
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
830
      }
831
    }
1✔
832
  }
1✔
833

834
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
835
      String proxyUsername, String proxyPassword) throws StatusException {
836
    Socket sock = null;
1✔
837
    try {
838
      // The proxy address may not be resolved
839
      if (proxyAddress.getAddress() != null) {
1✔
840
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
841
      } else {
842
        sock =
×
843
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
844
      }
845
      sock.setTcpNoDelay(true);
1✔
846
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
847
      // can cause reading from the socket to hang.
848
      sock.setSoTimeout(proxySocketTimeout);
1✔
849

850
      Source source = Okio.source(sock);
1✔
851
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
852

853
      // Prepare headers and request method line
854
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
855
      HttpUrl url = proxyRequest.httpUrl();
1✔
856
      String requestLine =
1✔
857
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
858

859
      // Write request to socket
860
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
861
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
862
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
863
            .writeUtf8(": ")
1✔
864
            .writeUtf8(proxyRequest.headers().value(i))
1✔
865
            .writeUtf8("\r\n");
1✔
866
      }
867
      sink.writeUtf8("\r\n");
1✔
868
      // Flush buffer (flushes socket and sends request)
869
      sink.flush();
1✔
870

871
      // Read status line, check if 2xx was returned
872
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
873
      // Drain rest of headers
874
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
875
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
876
        Buffer body = new Buffer();
1✔
877
        try {
878
          sock.shutdownOutput();
1✔
879
          source.read(body, 1024);
1✔
880
        } catch (IOException ex) {
×
881
          body.writeUtf8("Unable to read body: " + ex.toString());
×
882
        }
1✔
883
        try {
884
          sock.close();
1✔
885
        } catch (IOException ignored) {
×
886
          // ignored
887
        }
1✔
888
        String message = String.format(
1✔
889
            Locale.US,
890
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
891
              + "Response body:\n%s",
892
            statusLine.code, statusLine.message, body.readUtf8());
1✔
893
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
894
      }
895
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
896
      sock.setSoTimeout(0);
1✔
897
      return sock;
1✔
898
    } catch (IOException e) {
1✔
899
      if (sock != null) {
1✔
900
        GrpcUtil.closeQuietly(sock);
1✔
901
      }
902
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
903
          .asException();
1✔
904
    }
905
  }
906

907
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
908
                                         String proxyPassword) {
909
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
910
        .scheme("https")
1✔
911
        .host(address.getHostName())
1✔
912
        .port(address.getPort())
1✔
913
        .build();
1✔
914

915
    Request.Builder request = new Request.Builder()
1✔
916
        .url(tunnelUrl)
1✔
917
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
918
        .header("User-Agent", userAgent);
1✔
919

920
    // If we have proxy credentials, set them right away
921
    if (proxyUsername != null && proxyPassword != null) {
1✔
922
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
923
    }
924
    return request.build();
1✔
925
  }
926

927
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
928
    Buffer buffer = new Buffer();
1✔
929
    while (true) {
930
      if (source.read(buffer, 1) == -1) {
1✔
931
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
932
      }
933
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
934
        return buffer.readUtf8LineStrict();
1✔
935
      }
936
    }
937
  }
938

939
  @Override
940
  public String toString() {
941
    return MoreObjects.toStringHelper(this)
1✔
942
        .add("logId", logId.getId())
1✔
943
        .add("address", address)
1✔
944
        .toString();
1✔
945
  }
946

947
  @Override
948
  public InternalLogId getLogId() {
949
    return logId;
1✔
950
  }
951

952
  /**
953
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
954
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
955
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
956
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
957
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
958
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
959
   * connection to the port, independent of this function.
960
   *
961
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
962
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
963
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
964
   */
965
  @VisibleForTesting
966
  String getOverridenHost() {
967
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
968
    if (uri.getHost() != null) {
1✔
969
      return uri.getHost();
1✔
970
    }
971

972
    return defaultAuthority;
1✔
973
  }
974

975
  @VisibleForTesting
976
  int getOverridenPort() {
977
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
978
    if (uri.getPort() != -1) {
1✔
979
      return uri.getPort();
×
980
    }
981

982
    return address.getPort();
1✔
983
  }
984

985
  @Override
986
  public void shutdown(Status reason) {
987
    synchronized (lock) {
1✔
988
      if (goAwayStatus != null) {
1✔
989
        return;
1✔
990
      }
991

992
      goAwayStatus = reason;
1✔
993
      listener.transportShutdown(goAwayStatus);
1✔
994
      stopIfNecessary();
1✔
995
    }
1✔
996
  }
1✔
997

998
  @Override
999
  public void shutdownNow(Status reason) {
1000
    shutdown(reason);
1✔
1001
    synchronized (lock) {
1✔
1002
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1003
      while (it.hasNext()) {
1✔
1004
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1005
        it.remove();
1✔
1006
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
1007
        maybeClearInUse(entry.getValue());
1✔
1008
      }
1✔
1009

1010
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1011
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
1012
        // chance to retry and be routed to another connection.
1013
        stream.transportState().transportReportStatus(
1✔
1014
            reason, RpcProgress.MISCARRIED, true, new Metadata());
1015
        maybeClearInUse(stream);
1✔
1016
      }
1✔
1017
      pendingStreams.clear();
1✔
1018

1019
      stopIfNecessary();
1✔
1020
    }
1✔
1021
  }
1✔
1022

1023
  @Override
1024
  public Attributes getAttributes() {
1025
    return attributes;
1✔
1026
  }
1027

1028
  /**
1029
   * Gets all active streams as an array.
1030
   */
1031
  @Override
1032
  public OutboundFlowController.StreamState[] getActiveStreams() {
1033
    synchronized (lock) {
1✔
1034
      OutboundFlowController.StreamState[] flowStreams =
1✔
1035
          new OutboundFlowController.StreamState[streams.size()];
1✔
1036
      int i = 0;
1✔
1037
      for (OkHttpClientStream stream : streams.values()) {
1✔
1038
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
1039
      }
1✔
1040
      return flowStreams;
1✔
1041
    }
1042
  }
1043

1044
  @VisibleForTesting
1045
  ClientFrameHandler getHandler() {
1046
    return clientFrameHandler;
1✔
1047
  }
1048

1049
  @VisibleForTesting
1050
  SocketFactory getSocketFactory() {
1051
    return socketFactory;
1✔
1052
  }
1053

1054
  @VisibleForTesting
1055
  int getPendingStreamSize() {
1056
    synchronized (lock) {
1✔
1057
      return pendingStreams.size();
1✔
1058
    }
1059
  }
1060

1061
  @VisibleForTesting
1062
  void setNextStreamId(int nextStreamId) {
1063
    synchronized (lock) {
1✔
1064
      this.nextStreamId = nextStreamId;
1✔
1065
    }
1✔
1066
  }
1✔
1067

1068
  /**
1069
   * Finish all active streams due to an IOException, then close the transport.
1070
   */
1071
  @Override
1072
  public void onException(Throwable failureCause) {
1073
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
1074
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
1075
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1076
  }
1✔
1077

1078
  /**
1079
   * Send GOAWAY to the server, then finish all active streams and close the transport.
1080
   */
1081
  private void onError(ErrorCode errorCode, String moreDetail) {
1082
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
1083
  }
1✔
1084

1085
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
1086
    synchronized (lock) {
1✔
1087
      if (goAwayStatus == null) {
1✔
1088
        goAwayStatus = status;
1✔
1089
        listener.transportShutdown(status);
1✔
1090
      }
1091
      if (errorCode != null && !goAwaySent) {
1✔
1092
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1093
        // streams. The GOAWAY is part of graceful shutdown.
1094
        goAwaySent = true;
1✔
1095
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
1096
      }
1097

1098
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1099
      while (it.hasNext()) {
1✔
1100
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1101
        if (entry.getKey() > lastKnownStreamId) {
1✔
1102
          it.remove();
1✔
1103
          entry.getValue().transportState().transportReportStatus(
1✔
1104
              status, RpcProgress.REFUSED, false, new Metadata());
1105
          maybeClearInUse(entry.getValue());
1✔
1106
        }
1107
      }
1✔
1108

1109
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1110
        stream.transportState().transportReportStatus(
1✔
1111
            status, RpcProgress.MISCARRIED, true, new Metadata());
1112
        maybeClearInUse(stream);
1✔
1113
      }
1✔
1114
      pendingStreams.clear();
1✔
1115

1116
      stopIfNecessary();
1✔
1117
    }
1✔
1118
  }
1✔
1119

1120
  /**
1121
   * Called when a stream is closed. We do things like:
1122
   * <ul>
1123
   * <li>Removing the stream from the map.
1124
   * <li>Optionally reporting the status.
1125
   * <li>Starting pending streams if we can.
1126
   * <li>Stopping the transport if this is the last live stream under a go-away status.
1127
   * </ul>
1128
   *
1129
   * @param streamId the Id of the stream.
1130
   * @param status the final status of this stream, null means no need to report.
1131
   * @param stopDelivery interrupt queued messages in the deframer
1132
   * @param errorCode reset the stream with this ErrorCode if not null.
1133
   * @param trailers the trailers received if not null
1134
   */
1135
  void finishStream(
1136
      int streamId,
1137
      @Nullable Status status,
1138
      RpcProgress rpcProgress,
1139
      boolean stopDelivery,
1140
      @Nullable ErrorCode errorCode,
1141
      @Nullable Metadata trailers) {
1142
    synchronized (lock) {
1✔
1143
      OkHttpClientStream stream = streams.remove(streamId);
1✔
1144
      if (stream != null) {
1✔
1145
        if (errorCode != null) {
1✔
1146
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1147
        }
1148
        if (status != null) {
1✔
1149
          stream
1✔
1150
              .transportState()
1✔
1151
              .transportReportStatus(
1✔
1152
                  status,
1153
                  rpcProgress,
1154
                  stopDelivery,
1155
                  trailers != null ? trailers : new Metadata());
1✔
1156
        }
1157
        if (!startPendingStreams()) {
1✔
1158
          stopIfNecessary();
1✔
1159
        }
1160
        maybeClearInUse(stream);
1✔
1161
      }
1162
    }
1✔
1163
  }
1✔
1164

1165
  /**
1166
   * When the transport is in goAway state, we should stop it once all active streams finish.
1167
   */
1168
  @GuardedBy("lock")
1169
  private void stopIfNecessary() {
1170
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1171
      return;
1✔
1172
    }
1173
    if (stopped) {
1✔
1174
      return;
1✔
1175
    }
1176
    stopped = true;
1✔
1177

1178
    if (keepAliveManager != null) {
1✔
1179
      keepAliveManager.onTransportTermination();
×
1180
    }
1181

1182
    if (ping != null) {
1✔
1183
      ping.failed(getPingFailure());
1✔
1184
      ping = null;
1✔
1185
    }
1186

1187
    if (!goAwaySent) {
1✔
1188
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1189
      // streams. The GOAWAY is part of graceful shutdown.
1190
      goAwaySent = true;
1✔
1191
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1192
    }
1193

1194
    // We will close the underlying socket in the writing thread to break out the reader
1195
    // thread, which will close the frameReader and notify the listener.
1196
    frameWriter.close();
1✔
1197
  }
1✔
1198

1199
  @GuardedBy("lock")
1200
  private void maybeClearInUse(OkHttpClientStream stream) {
1201
    if (hasStream) {
1✔
1202
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1203
        hasStream = false;
1✔
1204
        if (keepAliveManager != null) {
1✔
1205
          // We don't have any active streams. No need to do keepalives any more.
1206
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1207
          // and onTransportActive.
1208
          keepAliveManager.onTransportIdle();
×
1209
        }
1210
      }
1211
    }
1212
    if (stream.shouldBeCountedForInUse()) {
1✔
1213
      inUseState.updateObjectInUse(stream, false);
1✔
1214
    }
1215
  }
1✔
1216

1217
  @GuardedBy("lock")
1218
  private void setInUse(OkHttpClientStream stream) {
1219
    if (!hasStream) {
1✔
1220
      hasStream = true;
1✔
1221
      if (keepAliveManager != null) {
1✔
1222
        // We have a new stream. We might need to do keepalives now.
1223
        // Note that we have to do this inside the lock to avoid calling
1224
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1225
        // order.
1226
        keepAliveManager.onTransportActive();
×
1227
      }
1228
    }
1229
    if (stream.shouldBeCountedForInUse()) {
1✔
1230
      inUseState.updateObjectInUse(stream, true);
1✔
1231
    }
1232
  }
1✔
1233

1234
  private Status getPingFailure() {
1235
    synchronized (lock) {
1✔
1236
      if (goAwayStatus != null) {
1✔
1237
        return goAwayStatus;
1✔
1238
      } else {
1239
        return Status.UNAVAILABLE.withDescription("Connection closed");
×
1240
      }
1241
    }
1242
  }
1243

1244
  boolean mayHaveCreatedStream(int streamId) {
1245
    synchronized (lock) {
1✔
1246
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1247
    }
1248
  }
1249

1250
  OkHttpClientStream getStream(int streamId) {
1251
    synchronized (lock) {
1✔
1252
      return streams.get(streamId);
1✔
1253
    }
1254
  }
1255

1256
  /**
1257
   * Returns a Grpc status corresponding to the given ErrorCode.
1258
   */
1259
  @VisibleForTesting
1260
  static Status toGrpcStatus(ErrorCode code) {
1261
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1262
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1263
        "Unknown http2 error code: " + code.httpCode);
1264
  }
1265

1266
  @Override
1267
  public ListenableFuture<SocketStats> getStats() {
1268
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1269
    synchronized (lock) {
1✔
1270
      if (socket == null) {
1✔
1271
        ret.set(new SocketStats(
×
1272
            transportTracer.getStats(),
×
1273
            /*local=*/ null,
1274
            /*remote=*/ null,
1275
            new InternalChannelz.SocketOptions.Builder().build(),
×
1276
            /*security=*/ null));
1277
      } else {
1278
        ret.set(new SocketStats(
1✔
1279
            transportTracer.getStats(),
1✔
1280
            socket.getLocalSocketAddress(),
1✔
1281
            socket.getRemoteSocketAddress(),
1✔
1282
            Utils.getSocketOptions(socket),
1✔
1283
            securityInfo));
1284
      }
1285
      return ret;
1✔
1286
    }
1287
  }
1288

1289
  /**
1290
   * Runnable which reads frames and dispatches them to in flight calls.
1291
   */
1292
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1293

1294
    private final OkHttpFrameLogger logger =
1✔
1295
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1296
    FrameReader frameReader;
1297
    boolean firstSettings = true;
1✔
1298

1299
    ClientFrameHandler(FrameReader frameReader) {
1✔
1300
      this.frameReader = frameReader;
1✔
1301
    }
1✔
1302

1303
    @Override
1304
    @SuppressWarnings("Finally")
1305
    public void run() {
1306
      String threadName = Thread.currentThread().getName();
1✔
1307
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1308
      try {
1309
        // Read until the underlying socket closes.
1310
        while (frameReader.nextFrame(this)) {
1✔
1311
          if (keepAliveManager != null) {
1✔
1312
            keepAliveManager.onDataReceived();
×
1313
          }
1314
        }
1315
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1316
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1317
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1318
        Status status;
1319
        synchronized (lock) {
1✔
1320
          status = goAwayStatus;
1✔
1321
        }
1✔
1322
        if (status == null) {
1✔
1323
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1324
        }
1325
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1326
      } catch (Throwable t) {
1✔
1327
        // TODO(madongfly): Send the exception message to the server.
1328
        startGoAway(
1✔
1329
            0,
1330
            ErrorCode.PROTOCOL_ERROR,
1331
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1332
      } finally {
1333
        try {
1334
          frameReader.close();
1✔
1335
        } catch (IOException ex) {
×
1336
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1337
        } catch (RuntimeException e) {
×
1338
          // This same check is done in okhttp proper:
1339
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1340

1341
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1342
          // https://issuetracker.google.com/issues/177450597
1343
          if (!"bio == null".equals(e.getMessage())) {
×
1344
            throw e;
×
1345
          }
1346
        }
1✔
1347
        listener.transportTerminated();
1✔
1348
        Thread.currentThread().setName(threadName);
1✔
1349
      }
1350
    }
1✔
1351

1352
    /**
1353
     * Handle an HTTP2 DATA frame.
1354
     */
1355
    @SuppressWarnings("GuardedBy")
1356
    @Override
1357
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1358
                     int paddedLength)
1359
        throws IOException {
1360
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1361
          streamId, in.getBuffer(), length, inFinished);
1✔
1362
      OkHttpClientStream stream = getStream(streamId);
1✔
1363
      if (stream == null) {
1✔
1364
        if (mayHaveCreatedStream(streamId)) {
1✔
1365
          synchronized (lock) {
1✔
1366
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1367
          }
1✔
1368
          in.skip(length);
1✔
1369
        } else {
1370
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1371
          return;
1✔
1372
        }
1373
      } else {
1374
        // Wait until the frame is complete.
1375
        in.require(length);
1✔
1376

1377
        Buffer buf = new Buffer();
1✔
1378
        buf.write(in.getBuffer(), length);
1✔
1379
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1380
            stream.transportState().tag());
1✔
1381
        synchronized (lock) {
1✔
1382
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1383
          // instead found: 'OkHttpClientTransport.this.lock'
1384
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1385
        }
1✔
1386
      }
1387

1388
      // connection window update
1389
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1390
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1391
        synchronized (lock) {
1✔
1392
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1393
        }
1✔
1394
        connectionUnacknowledgedBytesRead = 0;
1✔
1395
      }
1396
    }
1✔
1397

1398
    /**
1399
     * Handle HTTP2 HEADER and CONTINUATION frames.
1400
     */
1401
    @SuppressWarnings("GuardedBy")
1402
    @Override
1403
    public void headers(boolean outFinished,
1404
        boolean inFinished,
1405
        int streamId,
1406
        int associatedStreamId,
1407
        List<Header> headerBlock,
1408
        HeadersMode headersMode) {
1409
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1410
      boolean unknownStream = false;
1✔
1411
      Status failedStatus = null;
1✔
1412
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1413
        int metadataSize = headerBlockSize(headerBlock);
1✔
1414
        if (metadataSize > maxInboundMetadataSize) {
1✔
1415
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1416
              String.format(
1✔
1417
                  Locale.US,
1418
                  "Response %s metadata larger than %d: %d",
1419
                  inFinished ? "trailer" : "header",
1✔
1420
                  maxInboundMetadataSize,
1✔
1421
                  metadataSize));
1✔
1422
        }
1423
      }
1424
      synchronized (lock) {
1✔
1425
        OkHttpClientStream stream = streams.get(streamId);
1✔
1426
        if (stream == null) {
1✔
1427
          if (mayHaveCreatedStream(streamId)) {
1✔
1428
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1429
          } else {
1430
            unknownStream = true;
1✔
1431
          }
1432
        } else {
1433
          if (failedStatus == null) {
1✔
1434
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1435
                stream.transportState().tag());
1✔
1436
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1437
            // instead found: 'OkHttpClientTransport.this.lock'
1438
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1439
          } else {
1440
            if (!inFinished) {
1✔
1441
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1442
            }
1443
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1444
          }
1445
        }
1446
      }
1✔
1447
      if (unknownStream) {
1✔
1448
        // We don't expect any server-initiated streams.
1449
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1450
      }
1451
    }
1✔
1452

1453
    private int headerBlockSize(List<Header> headerBlock) {
1454
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1455
      long size = 0;
1✔
1456
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1457
        Header header = headerBlock.get(i);
1✔
1458
        size += 32 + header.name.size() + header.value.size();
1✔
1459
      }
1460
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1461
      return (int) size;
1✔
1462
    }
1463

1464
    @Override
1465
    public void rstStream(int streamId, ErrorCode errorCode) {
1466
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1467
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1468
      boolean stopDelivery =
1✔
1469
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1470
      synchronized (lock) {
1✔
1471
        OkHttpClientStream stream = streams.get(streamId);
1✔
1472
        if (stream != null) {
1✔
1473
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1474
              stream.transportState().tag());
1✔
1475
          finishStream(
1✔
1476
              streamId, status,
1477
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1478
              stopDelivery, null, null);
1479
        }
1480
      }
1✔
1481
    }
1✔
1482

1483
    @Override
1484
    public void settings(boolean clearPrevious, Settings settings) {
1485
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1486
      boolean outboundWindowSizeIncreased = false;
1✔
1487
      synchronized (lock) {
1✔
1488
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1489
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1490
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1491
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1492
        }
1493

1494
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1495
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1496
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1497
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1498
        }
1499
        if (firstSettings) {
1✔
1500
          attributes = listener.filterTransport(attributes);
1✔
1501
          listener.transportReady();
1✔
1502
          firstSettings = false;
1✔
1503
        }
1504

1505
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1506
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1507
        // otherwise it will cause a stream error (RST_STREAM).
1508
        frameWriter.ackSettings(settings);
1✔
1509

1510
        // send any pending bytes / streams
1511
        if (outboundWindowSizeIncreased) {
1✔
1512
          outboundFlow.writeStreams();
1✔
1513
        }
1514
        startPendingStreams();
1✔
1515
      }
1✔
1516
    }
1✔
1517

1518
    @Override
1519
    public void ping(boolean ack, int payload1, int payload2) {
1520
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1521
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1522
      if (!ack) {
1✔
1523
        synchronized (lock) {
1✔
1524
          frameWriter.ping(true, payload1, payload2);
1✔
1525
        }
1✔
1526
      } else {
1527
        Http2Ping p = null;
1✔
1528
        synchronized (lock) {
1✔
1529
          if (ping != null) {
1✔
1530
            if (ping.payload() == ackPayload) {
1✔
1531
              p = ping;
1✔
1532
              ping = null;
1✔
1533
            } else {
1534
              log.log(Level.WARNING, String.format(
1✔
1535
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1536
                  ping.payload(), ackPayload));
1✔
1537
            }
1538
          } else {
1539
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1540
          }
1541
        }
1✔
1542
        // don't complete it while holding lock since callbacks could run immediately
1543
        if (p != null) {
1✔
1544
          p.complete();
1✔
1545
        }
1546
      }
1547
    }
1✔
1548

1549
    @Override
1550
    public void ackSettings() {
1551
      // Do nothing currently.
1552
    }
1✔
1553

1554
    @Override
1555
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1556
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1557
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1558
        String data = debugData.utf8();
1✔
1559
        log.log(Level.WARNING, String.format(
1✔
1560
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1561
        if ("too_many_pings".equals(data)) {
1✔
1562
          tooManyPingsRunnable.run();
1✔
1563
        }
1564
      }
1565
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1566
          .augmentDescription("Received Goaway");
1✔
1567
      if (debugData.size() > 0) {
1✔
1568
        // If a debug message was provided, use it.
1569
        status = status.augmentDescription(debugData.utf8());
1✔
1570
      }
1571
      startGoAway(lastGoodStreamId, null, status);
1✔
1572
    }
1✔
1573

1574
    @Override
1575
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1576
        throws IOException {
1577
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1578
          streamId, promisedStreamId, requestHeaders);
1579
      // We don't accept server initiated stream.
1580
      synchronized (lock) {
1✔
1581
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1582
      }
1✔
1583
    }
1✔
1584

1585
    @Override
1586
    public void windowUpdate(int streamId, long delta) {
1587
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1588
      if (delta == 0) {
1✔
1589
        String errorMsg = "Received 0 flow control window increment.";
×
1590
        if (streamId == 0) {
×
1591
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1592
        } else {
1593
          finishStream(
×
1594
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1595
              ErrorCode.PROTOCOL_ERROR, null);
1596
        }
1597
        return;
×
1598
      }
1599

1600
      boolean unknownStream = false;
1✔
1601
      synchronized (lock) {
1✔
1602
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1603
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1604
          return;
1✔
1605
        }
1606

1607
        OkHttpClientStream stream = streams.get(streamId);
1✔
1608
        if (stream != null) {
1✔
1609
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1610
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1611
          unknownStream = true;
1✔
1612
        }
1613
      }
1✔
1614
      if (unknownStream) {
1✔
1615
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1616
            "Received window_update for unknown stream: " + streamId);
1617
      }
1618
    }
1✔
1619

1620
    @Override
1621
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1622
      // Ignore priority change.
1623
      // TODO(madongfly): log
1624
    }
×
1625

1626
    @Override
1627
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1628
        int port, long maxAge) {
1629
      // TODO(madongfly): Deal with alternateService propagation
1630
    }
×
1631
  }
1632

1633
  /**
1634
   * SSLSocket wrapper that provides a fake SSLSession for handshake session.
1635
   */
1636
  static final class SslSocketWrapper extends NoopSslSocket {
1637

1638
    private final SSLSession sslSession;
1639
    private final SSLSocket sslSocket;
1640

1641
    SslSocketWrapper(SSLSocket sslSocket, String peerHost) {
1✔
1642
      this.sslSocket = sslSocket;
1✔
1643
      this.sslSession = new FakeSslSession(peerHost);
1✔
1644
    }
1✔
1645

1646
    @Override
1647
    public SSLSession getHandshakeSession() {
1648
      return this.sslSession;
1✔
1649
    }
1650

1651
    @Override
1652
    public boolean isConnected() {
1653
      return sslSocket.isConnected();
1✔
1654
    }
1655

1656
    @Override
1657
    public SSLParameters getSSLParameters() {
1658
      return sslSocket.getSSLParameters();
1✔
1659
    }
1660
  }
1661

1662
  /**
1663
   * Fake SSLSession instance that provides the peer host name to verify for per-rpc check.
1664
   */
1665
  static class FakeSslSession extends NoopSslSession {
1666

1667
    private final String peerHost;
1668

1669
    FakeSslSession(String peerHost) {
1✔
1670
      this.peerHost = peerHost;
1✔
1671
    }
1✔
1672

1673
    @Override
1674
    public String getPeerHost() {
1675
      return peerHost;
1✔
1676
    }
1677
  }
1678
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc