• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19871

18 Jun 2025 05:29AM UTC coverage: 88.538% (-0.005%) from 88.543%
#19871

push

github

web-flow
Mark a few test helper methods as @CanIgnoreReturnValue (#12162)

34635 of 39119 relevant lines covered (88.54%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.89
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import com.google.errorprone.annotations.concurrent.GuardedBy;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.ChannelCredentials;
34
import io.grpc.ClientStreamTracer;
35
import io.grpc.Grpc;
36
import io.grpc.HttpConnectProxiedSocketAddress;
37
import io.grpc.InternalChannelz;
38
import io.grpc.InternalChannelz.SocketStats;
39
import io.grpc.InternalLogId;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.MethodDescriptor.MethodType;
43
import io.grpc.SecurityLevel;
44
import io.grpc.Status;
45
import io.grpc.Status.Code;
46
import io.grpc.StatusException;
47
import io.grpc.TlsChannelCredentials;
48
import io.grpc.internal.CertificateUtils;
49
import io.grpc.internal.ClientStreamListener.RpcProgress;
50
import io.grpc.internal.ConnectionClientTransport;
51
import io.grpc.internal.GrpcAttributes;
52
import io.grpc.internal.GrpcUtil;
53
import io.grpc.internal.Http2Ping;
54
import io.grpc.internal.InUseStateAggregator;
55
import io.grpc.internal.KeepAliveManager;
56
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
57
import io.grpc.internal.NoopSslSession;
58
import io.grpc.internal.SerializingExecutor;
59
import io.grpc.internal.StatsTraceContext;
60
import io.grpc.internal.TransportTracer;
61
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
62
import io.grpc.okhttp.OkHttpChannelBuilder.OkHttpTransportFactory;
63
import io.grpc.okhttp.internal.ConnectionSpec;
64
import io.grpc.okhttp.internal.Credentials;
65
import io.grpc.okhttp.internal.OkHostnameVerifier;
66
import io.grpc.okhttp.internal.StatusLine;
67
import io.grpc.okhttp.internal.framed.ErrorCode;
68
import io.grpc.okhttp.internal.framed.FrameReader;
69
import io.grpc.okhttp.internal.framed.FrameWriter;
70
import io.grpc.okhttp.internal.framed.Header;
71
import io.grpc.okhttp.internal.framed.HeadersMode;
72
import io.grpc.okhttp.internal.framed.Http2;
73
import io.grpc.okhttp.internal.framed.Settings;
74
import io.grpc.okhttp.internal.framed.Variant;
75
import io.grpc.okhttp.internal.proxy.HttpUrl;
76
import io.grpc.okhttp.internal.proxy.Request;
77
import io.perfmark.PerfMark;
78
import java.io.EOFException;
79
import java.io.IOException;
80
import java.lang.reflect.InvocationTargetException;
81
import java.lang.reflect.Method;
82
import java.net.InetSocketAddress;
83
import java.net.Socket;
84
import java.net.URI;
85
import java.security.GeneralSecurityException;
86
import java.security.KeyStore;
87
import java.security.cert.Certificate;
88
import java.security.cert.X509Certificate;
89
import java.util.Collections;
90
import java.util.Deque;
91
import java.util.EnumMap;
92
import java.util.HashMap;
93
import java.util.Iterator;
94
import java.util.LinkedHashMap;
95
import java.util.LinkedList;
96
import java.util.List;
97
import java.util.Locale;
98
import java.util.Map;
99
import java.util.Random;
100
import java.util.concurrent.BrokenBarrierException;
101
import java.util.concurrent.CountDownLatch;
102
import java.util.concurrent.CyclicBarrier;
103
import java.util.concurrent.Executor;
104
import java.util.concurrent.ScheduledExecutorService;
105
import java.util.concurrent.TimeUnit;
106
import java.util.concurrent.TimeoutException;
107
import java.util.logging.Level;
108
import java.util.logging.Logger;
109
import javax.annotation.Nullable;
110
import javax.net.SocketFactory;
111
import javax.net.ssl.HostnameVerifier;
112
import javax.net.ssl.SSLParameters;
113
import javax.net.ssl.SSLPeerUnverifiedException;
114
import javax.net.ssl.SSLSession;
115
import javax.net.ssl.SSLSocket;
116
import javax.net.ssl.SSLSocketFactory;
117
import javax.net.ssl.TrustManager;
118
import javax.net.ssl.TrustManagerFactory;
119
import javax.net.ssl.X509TrustManager;
120
import okio.Buffer;
121
import okio.BufferedSink;
122
import okio.BufferedSource;
123
import okio.ByteString;
124
import okio.Okio;
125
import okio.Source;
126
import okio.Timeout;
127

128
/**
129
 * A okhttp-based {@link ConnectionClientTransport} implementation.
130
 */
131
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
132
      OutboundFlowController.Transport {
133
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
134
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
135
  private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK =
136
          "GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK";
137
  static boolean enablePerRpcAuthorityCheck =
1✔
138
          GrpcUtil.getFlag(GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK, false);
1✔
139
  private Socket sock;
140
  private SSLSession sslSession;
141

142
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
143
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
144
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
145
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
146
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
147
        Status.INTERNAL.withDescription("Protocol error"));
1✔
148
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
149
        Status.INTERNAL.withDescription("Internal error"));
1✔
150
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
151
        Status.INTERNAL.withDescription("Flow control error"));
1✔
152
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
153
        Status.INTERNAL.withDescription("Stream closed"));
1✔
154
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
155
        Status.INTERNAL.withDescription("Frame too large"));
1✔
156
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
157
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
158
    errorToStatus.put(ErrorCode.CANCEL,
1✔
159
        Status.CANCELLED.withDescription("Cancelled"));
1✔
160
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
161
        Status.INTERNAL.withDescription("Compression error"));
1✔
162
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
163
        Status.INTERNAL.withDescription("Connect error"));
1✔
164
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
165
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
166
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
167
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
168
    return Collections.unmodifiableMap(errorToStatus);
1✔
169
  }
170

171
  private static final Class<?> x509ExtendedTrustManagerClass;
172
  private static final Method checkServerTrustedMethod;
173

174
  static {
175
    Class<?> x509ExtendedTrustManagerClass1 = null;
1✔
176
    Method checkServerTrustedMethod1 = null;
1✔
177
    try {
178
      x509ExtendedTrustManagerClass1 = Class.forName("javax.net.ssl.X509ExtendedTrustManager");
1✔
179
      checkServerTrustedMethod1 = x509ExtendedTrustManagerClass1.getMethod("checkServerTrusted",
1✔
180
              X509Certificate[].class, String.class, Socket.class);
181
    } catch (ClassNotFoundException e) {
×
182
      // Per-rpc authority override via call options will be disallowed.
183
    } catch (NoSuchMethodException e) {
×
184
      // Should never happen since X509ExtendedTrustManager was introduced in Android API level 24
185
      // along with checkServerTrusted.
186
    }
1✔
187
    x509ExtendedTrustManagerClass = x509ExtendedTrustManagerClass1;
1✔
188
    checkServerTrustedMethod = checkServerTrustedMethod1;
1✔
189
  }
1✔
190

191
  private final InetSocketAddress address;
192
  private final String defaultAuthority;
193
  private final String userAgent;
194
  private final Random random = new Random();
1✔
195
  // Returns new unstarted stopwatches
196
  private final Supplier<Stopwatch> stopwatchFactory;
197
  private final int initialWindowSize;
198
  private final Variant variant;
199
  private Listener listener;
200
  @GuardedBy("lock")
201
  private ExceptionHandlingFrameWriter frameWriter;
202
  private OutboundFlowController outboundFlow;
203
  private final Object lock = new Object();
1✔
204
  private final InternalLogId logId;
205
  @GuardedBy("lock")
206
  private int nextStreamId;
207
  @GuardedBy("lock")
1✔
208
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
209
  private final Executor executor;
210
  // Wrap on executor, to guarantee some operations be executed serially.
211
  private final SerializingExecutor serializingExecutor;
212
  private final ScheduledExecutorService scheduler;
213
  private final int maxMessageSize;
214
  private int connectionUnacknowledgedBytesRead;
215
  private ClientFrameHandler clientFrameHandler;
216
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
217
  private Attributes attributes;
218
  /**
219
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
220
   * streams may continue.
221
   */
222
  @GuardedBy("lock")
223
  private Status goAwayStatus;
224
  @GuardedBy("lock")
225
  private boolean goAwaySent;
226
  @GuardedBy("lock")
227
  private Http2Ping ping;
228
  @GuardedBy("lock")
229
  private boolean stopped;
230
  @GuardedBy("lock")
231
  private boolean hasStream;
232
  private final SocketFactory socketFactory;
233
  private SSLSocketFactory sslSocketFactory;
234
  private HostnameVerifier hostnameVerifier;
235
  private Socket socket;
236
  @GuardedBy("lock")
1✔
237
  private int maxConcurrentStreams = 0;
238
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
239
  @GuardedBy("lock")
240
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
241
  private final ConnectionSpec connectionSpec;
242
  private KeepAliveManager keepAliveManager;
243
  private boolean enableKeepAlive;
244
  private long keepAliveTimeNanos;
245
  private long keepAliveTimeoutNanos;
246
  private boolean keepAliveWithoutCalls;
247
  private final Runnable tooManyPingsRunnable;
248
  private final int maxInboundMetadataSize;
249
  private final boolean useGetForSafeMethods;
250
  @GuardedBy("lock")
251
  private final TransportTracer transportTracer;
252
  private final TrustManager x509TrustManager;
253

254
  @SuppressWarnings("serial")
255
  private static class LruCache<T> extends LinkedHashMap<String, T> {
256
    @Override
257
    protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
258
      return size() > 100;
1✔
259
    }
260
  }
261

262
  @GuardedBy("lock")
1✔
263
  private final Map<String, Status> authorityVerificationResults = new LruCache<>();
264

265
  @GuardedBy("lock")
1✔
266
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
267
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
268
        @Override
269
        protected void handleInUse() {
270
          listener.transportInUse(true);
1✔
271
        }
1✔
272

273
        @Override
274
        protected void handleNotInUse() {
275
          listener.transportInUse(false);
1✔
276
        }
1✔
277
      };
278
  @GuardedBy("lock")
279
  private InternalChannelz.Security securityInfo;
280

281
  @VisibleForTesting
282
  @Nullable
283
  final HttpConnectProxiedSocketAddress proxiedAddr;
284

285
  @VisibleForTesting
1✔
286
  int proxySocketTimeout = 30000;
287

288
  // The following fields should only be used for test.
289
  Runnable connectingCallback;
290
  SettableFuture<Void> connectedFuture;
291

292
  public OkHttpClientTransport(
293
          OkHttpTransportFactory transportFactory,
294
          InetSocketAddress address,
295
          String authority,
296
          @Nullable String userAgent,
297
          Attributes eagAttrs,
298
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
299
          Runnable tooManyPingsRunnable,
300
          ChannelCredentials channelCredentials) {
301
    this(
1✔
302
        transportFactory,
303
        address,
304
        authority,
305
        userAgent,
306
        eagAttrs,
307
        GrpcUtil.STOPWATCH_SUPPLIER,
308
        new Http2(),
309
        proxiedAddr,
310
        tooManyPingsRunnable,
311
        channelCredentials);
312
  }
1✔
313

314
  private OkHttpClientTransport(
315
          OkHttpTransportFactory transportFactory,
316
          InetSocketAddress address,
317
          String authority,
318
          @Nullable String userAgent,
319
          Attributes eagAttrs,
320
          Supplier<Stopwatch> stopwatchFactory,
321
          Variant variant,
322
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
323
          Runnable tooManyPingsRunnable,
324
          ChannelCredentials channelCredentials) {
1✔
325
    this.address = Preconditions.checkNotNull(address, "address");
1✔
326
    this.defaultAuthority = authority;
1✔
327
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
328
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
329
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
330
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
331
    this.scheduler = Preconditions.checkNotNull(
1✔
332
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
333
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
334
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
335
    nextStreamId = 3;
1✔
336
    this.socketFactory = transportFactory.socketFactory == null
1✔
337
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
338
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
339
    this.hostnameVerifier = transportFactory.hostnameVerifier != null
1✔
340
      ? transportFactory.hostnameVerifier : OkHostnameVerifier.INSTANCE;
1✔
341
    this.connectionSpec = Preconditions.checkNotNull(
1✔
342
        transportFactory.connectionSpec, "connectionSpec");
343
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
344
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
345
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
346
    this.proxiedAddr = proxiedAddr;
1✔
347
    this.tooManyPingsRunnable =
1✔
348
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
349
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
350
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
351
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
352
    this.attributes = Attributes.newBuilder()
1✔
353
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
354
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
355
    initTransportTracer();
1✔
356
    TrustManager tempX509TrustManager;
357
    if (channelCredentials instanceof TlsChannelCredentials
1✔
358
        && x509ExtendedTrustManagerClass != null) {
359
      try {
360
        tempX509TrustManager = getTrustManager(
1✔
361
                (TlsChannelCredentials) channelCredentials);
362
      } catch (GeneralSecurityException e) {
×
363
        tempX509TrustManager = null;
×
364
        log.log(Level.WARNING, "Obtaining X509ExtendedTrustManager for the transport failed."
×
365
            + "Per-rpc authority overrides will be disallowed.", e);
366
      }
1✔
367
    } else {
368
      tempX509TrustManager = null;
1✔
369
    }
370
    x509TrustManager = tempX509TrustManager;
1✔
371
  }
1✔
372

373
  /**
374
   * Create a transport connected to a fake peer for test.
375
   */
376
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
377
  @VisibleForTesting
378
  OkHttpClientTransport(
379
      OkHttpTransportFactory transportFactory,
380
      String userAgent,
381
      Supplier<Stopwatch> stopwatchFactory,
382
      Variant variant,
383
      @Nullable Runnable connectingCallback,
384
      SettableFuture<Void> connectedFuture,
385
      Runnable tooManyPingsRunnable) {
386
    this(
1✔
387
        transportFactory,
388
        new InetSocketAddress("127.0.0.1", 80),
389
        "notarealauthority:80",
390
        userAgent,
391
        Attributes.EMPTY,
392
        stopwatchFactory,
393
        variant,
394
        null,
395
        tooManyPingsRunnable,
396
        null);
397
    this.connectingCallback = connectingCallback;
1✔
398
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
399
  }
1✔
400

401
  // sslSocketFactory is set to null when use plaintext.
402
  boolean isUsingPlaintext() {
403
    return sslSocketFactory == null;
1✔
404
  }
405

406
  private void initTransportTracer() {
407
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
408
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
409
        @Override
410
        public TransportTracer.FlowControlWindows read() {
411
          synchronized (lock) {
1✔
412
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
413
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
414
            // provide a lower bound.
415
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
416
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
417
          }
418
        }
419
      });
420
    }
1✔
421
  }
1✔
422

423
  /**
424
   * Enable keepalive with custom delay and timeout.
425
   */
426
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
427
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
428
    enableKeepAlive = enable;
×
429
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
430
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
431
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
432
  }
×
433

434
  @Override
435
  public void ping(final PingCallback callback, Executor executor) {
436
    long data = 0;
1✔
437
    Http2Ping p;
438
    boolean writePing;
439
    synchronized (lock) {
1✔
440
      checkState(frameWriter != null);
1✔
441
      if (stopped) {
1✔
442
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
443
        return;
1✔
444
      }
445
      if (ping != null) {
1✔
446
        // we only allow one outstanding ping at a time, so just add the callback to
447
        // any outstanding operation
448
        p = ping;
1✔
449
        writePing = false;
1✔
450
      } else {
451
        // set outstanding operation and then write the ping after releasing lock
452
        data = random.nextLong();
1✔
453
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
454
        stopwatch.start();
1✔
455
        p = ping = new Http2Ping(data, stopwatch);
1✔
456
        writePing = true;
1✔
457
        transportTracer.reportKeepAliveSent();
1✔
458
      }
459
      if (writePing) {
1✔
460
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
461
      }
462
    }
1✔
463
    // If transport concurrently failed/stopped since we released the lock above, this could
464
    // immediately invoke callback (which we shouldn't do while holding a lock)
465
    p.addCallback(callback, executor);
1✔
466
  }
1✔
467

468
  @Override
469
  public OkHttpClientStream newStream(
470
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
471
      ClientStreamTracer[] tracers) {
472
    Preconditions.checkNotNull(method, "method");
1✔
473
    Preconditions.checkNotNull(headers, "headers");
1✔
474
    StatsTraceContext statsTraceContext =
1✔
475
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
476

477
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
478
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
479
      return new OkHttpClientStream(
1✔
480
          method,
481
          headers,
482
          frameWriter,
483
          OkHttpClientTransport.this,
484
          outboundFlow,
485
          lock,
486
          maxMessageSize,
487
          initialWindowSize,
488
          defaultAuthority,
489
          userAgent,
490
          statsTraceContext,
491
          transportTracer,
492
          callOptions,
493
          useGetForSafeMethods);
494
    }
495
  }
496

497
  private TrustManager getTrustManager(TlsChannelCredentials tlsCreds)
498
      throws GeneralSecurityException {
499
    TrustManager[] tm;
500
    // Using the same way of creating TrustManager from OkHttpChannelBuilder.sslSocketFactoryFrom()
501
    if (tlsCreds.getTrustManagers() != null) {
1✔
502
      tm = tlsCreds.getTrustManagers().toArray(new TrustManager[0]);
1✔
503
    } else if (tlsCreds.getRootCertificates() != null) {
1✔
504
      tm = CertificateUtils.createTrustManager(tlsCreds.getRootCertificates());
1✔
505
    } else { // else use system default
506
      TrustManagerFactory tmf = TrustManagerFactory.getInstance(
1✔
507
          TrustManagerFactory.getDefaultAlgorithm());
1✔
508
      tmf.init((KeyStore) null);
1✔
509
      tm = tmf.getTrustManagers();
1✔
510
    }
511
    for (TrustManager trustManager: tm) {
1✔
512
      if (trustManager instanceof X509TrustManager) {
1✔
513
        return trustManager;
1✔
514
      }
515
    }
516
    return null;
×
517
  }
518

519
  @GuardedBy("lock")
520
  void streamReadyToStart(OkHttpClientStream clientStream, String authority) {
521
    if (goAwayStatus != null) {
1✔
522
      clientStream.transportState().transportReportStatus(
1✔
523
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
524
    } else {
525
      if (socket instanceof SSLSocket && !authority.equals(defaultAuthority)) {
1✔
526
        Status authorityVerificationResult;
527
        if (authorityVerificationResults.containsKey(authority)) {
1✔
528
          authorityVerificationResult = authorityVerificationResults.get(authority);
×
529
        } else {
530
          authorityVerificationResult = verifyAuthority(authority);
1✔
531
          authorityVerificationResults.put(authority, authorityVerificationResult);
1✔
532
        }
533
        if (!authorityVerificationResult.isOk()) {
1✔
534
          if (enablePerRpcAuthorityCheck) {
1✔
535
            clientStream.transportState().transportReportStatus(
1✔
536
                    authorityVerificationResult, RpcProgress.PROCESSED, true, new Metadata());
537
            return;
1✔
538
          }
539
        }
540
      }
541
      if (streams.size() >= maxConcurrentStreams) {
1✔
542
        pendingStreams.add(clientStream);
1✔
543
        setInUse(clientStream);
1✔
544
      } else {
545
        startStream(clientStream);
1✔
546
      }
547
    }
548
  }
1✔
549

550
  private Status verifyAuthority(String authority) {
551
    Status authorityVerificationResult;
552
    if (hostnameVerifier.verify(authority, ((SSLSocket) socket).getSession())) {
1✔
553
      authorityVerificationResult = Status.OK;
1✔
554
    } else {
555
      authorityVerificationResult = Status.UNAVAILABLE.withDescription(String.format(
1✔
556
              "HostNameVerifier verification failed for authority '%s'",
557
              authority));
558
    }
559
    if (!authorityVerificationResult.isOk() && !enablePerRpcAuthorityCheck) {
1✔
560
      log.log(Level.WARNING, String.format("HostNameVerifier verification failed for "
1✔
561
                      + "authority '%s'. This will be an error in the future.",
562
              authority));
563
    }
564
    if (authorityVerificationResult.isOk()) {
1✔
565
      // The status is trivially assigned in this case, but we are still making use of the
566
      // cache to keep track that a warning log had been logged for the authority when
567
      // enablePerRpcAuthorityCheck is false. When we permanently enable the feature, the
568
      // status won't need to be cached for case when x509TrustManager is null.
569
      if (x509TrustManager == null) {
1✔
570
        authorityVerificationResult = Status.UNAVAILABLE.withDescription(
1✔
571
                String.format("Could not verify authority '%s' for the rpc with no "
1✔
572
                                + "X509TrustManager available",
573
                        authority));
574
      } else if (x509ExtendedTrustManagerClass.isInstance(x509TrustManager)) {
1✔
575
        try {
576
          Certificate[] peerCertificates = sslSession.getPeerCertificates();
1✔
577
          X509Certificate[] x509PeerCertificates =
1✔
578
                  new X509Certificate[peerCertificates.length];
579
          for (int i = 0; i < peerCertificates.length; i++) {
1✔
580
            x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
1✔
581
          }
582
          checkServerTrustedMethod.invoke(x509TrustManager, x509PeerCertificates,
1✔
583
                  "RSA", new SslSocketWrapper((SSLSocket) socket, authority));
584
          authorityVerificationResult = Status.OK;
1✔
585
        } catch (SSLPeerUnverifiedException | InvocationTargetException
1✔
586
                 | IllegalAccessException e) {
587
          authorityVerificationResult = Status.UNAVAILABLE.withCause(e).withDescription(
1✔
588
                  "Peer verification failed");
589
        }
1✔
590
        if (authorityVerificationResult.getCause() != null) {
1✔
591
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
592
                          + ". This will be an error in the future.",
593
                  authorityVerificationResult.getCause());
1✔
594
        } else {
595
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
596
                  + ". This will be an error in the future.");
597
        }
598
      }
599
    }
600
    return authorityVerificationResult;
1✔
601
  }
602

603
  @SuppressWarnings("GuardedBy")
604
  @GuardedBy("lock")
605
  private void startStream(OkHttpClientStream stream) {
606
    checkState(
1✔
607
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
608
    streams.put(nextStreamId, stream);
1✔
609
    setInUse(stream);
1✔
610
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
611
    // found: 'this.lock'
612
    stream.transportState().start(nextStreamId);
1✔
613
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
614
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
615
        || stream.useGet()) {
1✔
616
      frameWriter.flush();
1✔
617
    }
618
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
619
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
620
      // correctly.
621
      nextStreamId = Integer.MAX_VALUE;
1✔
622
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
623
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
624
    } else {
625
      nextStreamId += 2;
1✔
626
    }
627
  }
1✔
628

629
  /**
630
   * Starts pending streams, returns true if at least one pending stream is started.
631
   */
632
  @GuardedBy("lock")
633
  private boolean startPendingStreams() {
634
    boolean hasStreamStarted = false;
1✔
635
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
636
      OkHttpClientStream stream = pendingStreams.poll();
1✔
637
      startStream(stream);
1✔
638
      hasStreamStarted = true;
1✔
639
    }
1✔
640
    return hasStreamStarted;
1✔
641
  }
642

643
  /**
644
   * Removes given pending stream, used when a pending stream is cancelled.
645
   */
646
  @GuardedBy("lock")
647
  void removePendingStream(OkHttpClientStream pendingStream) {
648
    pendingStreams.remove(pendingStream);
1✔
649
    maybeClearInUse(pendingStream);
1✔
650
  }
1✔
651

652
  @Override
653
  public Runnable start(Listener listener) {
654
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
655

656
    if (enableKeepAlive) {
1✔
657
      keepAliveManager = new KeepAliveManager(
×
658
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
659
          keepAliveWithoutCalls);
660
      keepAliveManager.onTransportStarted();
×
661
    }
662

663
    int maxQueuedControlFrames = 10000;
1✔
664
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
665
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
666
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
667

668
    synchronized (lock) {
1✔
669
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
670
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
671
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
672
      // the same TransportExceptionHandler instance so it is all mixed back together.
673
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
674
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
675
    }
1✔
676
    final CountDownLatch latch = new CountDownLatch(1);
1✔
677
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
678
    // The transport needs up to two threads to function once started,
679
    // but only needs one during handshaking. Start another thread during handshaking
680
    // to make sure there's still a free thread available. If the number of threads is exhausted,
681
    // it is better to kill the transport than for all the transports to hang unable to send.
682
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
683
    // Connecting in the serializingExecutor, so that some stream operations like synStream
684
    // will be executed after connected.
685

686
    serializingExecutor.execute(new Runnable() {
1✔
687
      @Override
688
      public void run() {
689
        // Use closed source on failure so that the reader immediately shuts down.
690
        BufferedSource source = Okio.buffer(new Source() {
1✔
691
          @Override
692
          public long read(Buffer sink, long byteCount) {
693
            return -1;
1✔
694
          }
695

696
          @Override
697
          public Timeout timeout() {
698
            return Timeout.NONE;
×
699
          }
700

701
          @Override
702
          public void close() {
703
          }
1✔
704
        });
705
        try {
706
          // This is a hack to make sure the connection preface and initial settings to be sent out
707
          // without blocking the start. By doing this essentially prevents potential deadlock when
708
          // network is not available during startup while another thread holding lock to send the
709
          // initial preface.
710
          try {
711
            latch.await();
1✔
712
            barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
713
          } catch (InterruptedException e) {
×
714
            Thread.currentThread().interrupt();
×
715
          } catch (TimeoutException | BrokenBarrierException e) {
1✔
716
            startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
717
                .withDescription("Timed out waiting for second handshake thread. "
1✔
718
                    + "The transport executor pool may have run out of threads"));
719
            return;
1✔
720
          }
1✔
721

722
          if (proxiedAddr == null) {
1✔
723
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
724
          } else {
725
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
726
              sock = createHttpProxySocket(
1✔
727
                  proxiedAddr.getTargetAddress(),
1✔
728
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
729
                  proxiedAddr.getUsername(),
1✔
730
                  proxiedAddr.getPassword()
1✔
731
              );
732
            } else {
733
              throw Status.INTERNAL.withDescription(
×
734
                  "Unsupported SocketAddress implementation "
735
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
736
            }
737
          }
738
          if (sslSocketFactory != null) {
1✔
739
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
740
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
741
                connectionSpec);
1✔
742
            sslSession = sslSocket.getSession();
1✔
743
            sock = sslSocket;
1✔
744
          }
745
          sock.setTcpNoDelay(true);
1✔
746
          source = Okio.buffer(Okio.source(sock));
1✔
747
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
748

749
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
750
          attributes = attributes.toBuilder()
1✔
751
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
752
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
753
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
754
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
755
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
756
              .build();
1✔
757
        } catch (StatusException e) {
1✔
758
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
759
          return;
1✔
760
        } catch (Exception e) {
1✔
761
          onException(e);
1✔
762
          return;
1✔
763
        } finally {
764
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
765
          latchForExtraThread.countDown();
1✔
766
        }
767
        synchronized (lock) {
1✔
768
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
769
          if (sslSession != null) {
1✔
770
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
771
          }
772
        }
1✔
773
      }
1✔
774
    });
775

776
    executor.execute(new Runnable() {
1✔
777
      @Override
778
      public void run() {
779
        try {
780
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
781
          latchForExtraThread.await();
1✔
782
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
783
          // Something bad happened, maybe too few threads available!
784
          // This will be handled in the handshake thread.
785
        } catch (InterruptedException e) {
×
786
          Thread.currentThread().interrupt();
×
787
        }
1✔
788
      }
1✔
789
    });
790
    // Schedule to send connection preface & settings before any other write.
791
    try {
792
      sendConnectionPrefaceAndSettings();
1✔
793
    } finally {
794
      latch.countDown();
1✔
795
    }
796

797
    serializingExecutor.execute(new Runnable() {
1✔
798
      @Override
799
      public void run() {
800
        if (connectingCallback != null) {
1✔
801
          connectingCallback.run();
1✔
802
        }
803
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
804
        // may send goAway immediately.
805
        executor.execute(clientFrameHandler);
1✔
806
        synchronized (lock) {
1✔
807
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
808
          startPendingStreams();
1✔
809
        }
1✔
810
        if (connectedFuture != null) {
1✔
811
          connectedFuture.set(null);
1✔
812
        }
813
      }
1✔
814
    });
815
    return null;
1✔
816
  }
817

818
  /**
819
   * Should only be called once when the transport is first established.
820
   */
821
  private void sendConnectionPrefaceAndSettings() {
822
    synchronized (lock) {
1✔
823
      frameWriter.connectionPreface();
1✔
824
      Settings settings = new Settings();
1✔
825
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
826
      frameWriter.settings(settings);
1✔
827
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
828
        frameWriter.windowUpdate(
1✔
829
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
830
      }
831
    }
1✔
832
  }
1✔
833

834
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
835
      String proxyUsername, String proxyPassword) throws StatusException {
836
    Socket sock = null;
1✔
837
    try {
838
      // The proxy address may not be resolved
839
      if (proxyAddress.getAddress() != null) {
1✔
840
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
841
      } else {
842
        sock =
×
843
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
844
      }
845
      sock.setTcpNoDelay(true);
1✔
846
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
847
      // can cause reading from the socket to hang.
848
      sock.setSoTimeout(proxySocketTimeout);
1✔
849

850
      Source source = Okio.source(sock);
1✔
851
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
852

853
      // Prepare headers and request method line
854
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
855
      HttpUrl url = proxyRequest.httpUrl();
1✔
856
      String requestLine =
1✔
857
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
858

859
      // Write request to socket
860
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
861
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
862
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
863
            .writeUtf8(": ")
1✔
864
            .writeUtf8(proxyRequest.headers().value(i))
1✔
865
            .writeUtf8("\r\n");
1✔
866
      }
867
      sink.writeUtf8("\r\n");
1✔
868
      // Flush buffer (flushes socket and sends request)
869
      sink.flush();
1✔
870

871
      // Read status line, check if 2xx was returned
872
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
873
      // Drain rest of headers
874
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
875
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
876
        Buffer body = new Buffer();
1✔
877
        try {
878
          sock.shutdownOutput();
1✔
879
          source.read(body, 1024);
1✔
880
        } catch (IOException ex) {
×
881
          body.writeUtf8("Unable to read body: " + ex.toString());
×
882
        }
1✔
883
        try {
884
          sock.close();
1✔
885
        } catch (IOException ignored) {
×
886
          // ignored
887
        }
1✔
888
        String message = String.format(
1✔
889
            Locale.US,
890
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
891
              + "Response body:\n%s",
892
            statusLine.code, statusLine.message, body.readUtf8());
1✔
893
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
894
      }
895
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
896
      sock.setSoTimeout(0);
1✔
897
      return sock;
1✔
898
    } catch (IOException e) {
1✔
899
      if (sock != null) {
1✔
900
        GrpcUtil.closeQuietly(sock);
1✔
901
      }
902
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
903
          .asException();
1✔
904
    }
905
  }
906

907
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
908
                                         String proxyPassword) {
909
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
910
        .scheme("https")
1✔
911
        .host(address.getHostName())
1✔
912
        .port(address.getPort())
1✔
913
        .build();
1✔
914

915
    Request.Builder request = new Request.Builder()
1✔
916
        .url(tunnelUrl)
1✔
917
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
918
        .header("User-Agent", userAgent);
1✔
919

920
    // If we have proxy credentials, set them right away
921
    if (proxyUsername != null && proxyPassword != null) {
1✔
922
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
923
    }
924
    return request.build();
1✔
925
  }
926

927
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
928
    Buffer buffer = new Buffer();
1✔
929
    while (true) {
930
      if (source.read(buffer, 1) == -1) {
1✔
931
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
×
932
      }
933
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
934
        return buffer.readUtf8LineStrict();
1✔
935
      }
936
    }
937
  }
938

939
  @Override
940
  public String toString() {
941
    return MoreObjects.toStringHelper(this)
1✔
942
        .add("logId", logId.getId())
1✔
943
        .add("address", address)
1✔
944
        .toString();
1✔
945
  }
946

947
  @Override
948
  public InternalLogId getLogId() {
949
    return logId;
1✔
950
  }
951

952
  /**
953
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
954
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
955
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
956
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
957
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
958
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
959
   * connection to the port, independent of this function.
960
   *
961
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
962
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
963
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
964
   */
965
  @VisibleForTesting
966
  String getOverridenHost() {
967
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
968
    if (uri.getHost() != null) {
1✔
969
      return uri.getHost();
1✔
970
    }
971

972
    return defaultAuthority;
1✔
973
  }
974

975
  @VisibleForTesting
976
  int getOverridenPort() {
977
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
978
    if (uri.getPort() != -1) {
1✔
979
      return uri.getPort();
×
980
    }
981

982
    return address.getPort();
1✔
983
  }
984

985
  @Override
986
  public void shutdown(Status reason) {
987
    synchronized (lock) {
1✔
988
      if (goAwayStatus != null) {
1✔
989
        return;
1✔
990
      }
991

992
      goAwayStatus = reason;
1✔
993
      listener.transportShutdown(goAwayStatus);
1✔
994
      stopIfNecessary();
1✔
995
    }
1✔
996
  }
1✔
997

998
  @Override
999
  public void shutdownNow(Status reason) {
1000
    shutdown(reason);
1✔
1001
    synchronized (lock) {
1✔
1002
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1003
      while (it.hasNext()) {
1✔
1004
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1005
        it.remove();
1✔
1006
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
1007
        maybeClearInUse(entry.getValue());
1✔
1008
      }
1✔
1009

1010
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1011
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
1012
        // chance to retry and be routed to another connection.
1013
        stream.transportState().transportReportStatus(
1✔
1014
            reason, RpcProgress.MISCARRIED, true, new Metadata());
1015
        maybeClearInUse(stream);
1✔
1016
      }
1✔
1017
      pendingStreams.clear();
1✔
1018

1019
      stopIfNecessary();
1✔
1020
    }
1✔
1021
  }
1✔
1022

1023
  @Override
1024
  public Attributes getAttributes() {
1025
    return attributes;
1✔
1026
  }
1027

1028
  /**
1029
   * Gets all active streams as an array.
1030
   */
1031
  @Override
1032
  public OutboundFlowController.StreamState[] getActiveStreams() {
1033
    synchronized (lock) {
1✔
1034
      OutboundFlowController.StreamState[] flowStreams =
1✔
1035
          new OutboundFlowController.StreamState[streams.size()];
1✔
1036
      int i = 0;
1✔
1037
      for (OkHttpClientStream stream : streams.values()) {
1✔
1038
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
1039
      }
1✔
1040
      return flowStreams;
1✔
1041
    }
1042
  }
1043

1044
  @VisibleForTesting
1045
  ClientFrameHandler getHandler() {
1046
    return clientFrameHandler;
1✔
1047
  }
1048

1049
  @VisibleForTesting
1050
  SocketFactory getSocketFactory() {
1051
    return socketFactory;
1✔
1052
  }
1053

1054
  @VisibleForTesting
1055
  int getPendingStreamSize() {
1056
    synchronized (lock) {
1✔
1057
      return pendingStreams.size();
1✔
1058
    }
1059
  }
1060

1061
  @VisibleForTesting
1062
  void setNextStreamId(int nextStreamId) {
1063
    synchronized (lock) {
1✔
1064
      this.nextStreamId = nextStreamId;
1✔
1065
    }
1✔
1066
  }
1✔
1067

1068
  /**
1069
   * Finish all active streams due to an IOException, then close the transport.
1070
   */
1071
  @Override
1072
  public void onException(Throwable failureCause) {
1073
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
1074
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
1075
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1076
  }
1✔
1077

1078
  /**
1079
   * Send GOAWAY to the server, then finish all active streams and close the transport.
1080
   */
1081
  private void onError(ErrorCode errorCode, String moreDetail) {
1082
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
1083
  }
1✔
1084

1085
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
1086
    synchronized (lock) {
1✔
1087
      if (goAwayStatus == null) {
1✔
1088
        goAwayStatus = status;
1✔
1089
        listener.transportShutdown(status);
1✔
1090
      }
1091
      if (errorCode != null && !goAwaySent) {
1✔
1092
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1093
        // streams. The GOAWAY is part of graceful shutdown.
1094
        goAwaySent = true;
1✔
1095
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
1096
      }
1097

1098
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1099
      while (it.hasNext()) {
1✔
1100
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1101
        if (entry.getKey() > lastKnownStreamId) {
1✔
1102
          it.remove();
1✔
1103
          entry.getValue().transportState().transportReportStatus(
1✔
1104
              status, RpcProgress.REFUSED, false, new Metadata());
1105
          maybeClearInUse(entry.getValue());
1✔
1106
        }
1107
      }
1✔
1108

1109
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1110
        stream.transportState().transportReportStatus(
1✔
1111
            status, RpcProgress.MISCARRIED, true, new Metadata());
1112
        maybeClearInUse(stream);
1✔
1113
      }
1✔
1114
      pendingStreams.clear();
1✔
1115

1116
      stopIfNecessary();
1✔
1117
    }
1✔
1118
  }
1✔
1119

1120
  /**
1121
   * Called when a stream is closed. We do things like:
1122
   * <ul>
1123
   * <li>Removing the stream from the map.
1124
   * <li>Optionally reporting the status.
1125
   * <li>Starting pending streams if we can.
1126
   * <li>Stopping the transport if this is the last live stream under a go-away status.
1127
   * </ul>
1128
   *
1129
   * @param streamId the Id of the stream.
1130
   * @param status the final status of this stream, null means no need to report.
1131
   * @param stopDelivery interrupt queued messages in the deframer
1132
   * @param errorCode reset the stream with this ErrorCode if not null.
1133
   * @param trailers the trailers received if not null
1134
   */
1135
  void finishStream(
1136
      int streamId,
1137
      @Nullable Status status,
1138
      RpcProgress rpcProgress,
1139
      boolean stopDelivery,
1140
      @Nullable ErrorCode errorCode,
1141
      @Nullable Metadata trailers) {
1142
    synchronized (lock) {
1✔
1143
      OkHttpClientStream stream = streams.remove(streamId);
1✔
1144
      if (stream != null) {
1✔
1145
        if (errorCode != null) {
1✔
1146
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1147
        }
1148
        if (status != null) {
1✔
1149
          stream
1✔
1150
              .transportState()
1✔
1151
              .transportReportStatus(
1✔
1152
                  status,
1153
                  rpcProgress,
1154
                  stopDelivery,
1155
                  trailers != null ? trailers : new Metadata());
1✔
1156
        }
1157
        if (!startPendingStreams()) {
1✔
1158
          stopIfNecessary();
1✔
1159
        }
1160
        maybeClearInUse(stream);
1✔
1161
      }
1162
    }
1✔
1163
  }
1✔
1164

1165
  /**
1166
   * When the transport is in goAway state, we should stop it once all active streams finish.
1167
   */
1168
  @GuardedBy("lock")
1169
  private void stopIfNecessary() {
1170
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1171
      return;
1✔
1172
    }
1173
    if (stopped) {
1✔
1174
      return;
1✔
1175
    }
1176
    stopped = true;
1✔
1177

1178
    if (keepAliveManager != null) {
1✔
1179
      keepAliveManager.onTransportTermination();
×
1180
    }
1181

1182
    if (ping != null) {
1✔
1183
      ping.failed(getPingFailure());
1✔
1184
      ping = null;
1✔
1185
    }
1186

1187
    if (!goAwaySent) {
1✔
1188
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1189
      // streams. The GOAWAY is part of graceful shutdown.
1190
      goAwaySent = true;
1✔
1191
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1192
    }
1193

1194
    // We will close the underlying socket in the writing thread to break out the reader
1195
    // thread, which will close the frameReader and notify the listener.
1196
    frameWriter.close();
1✔
1197
  }
1✔
1198

1199
  @GuardedBy("lock")
1200
  private void maybeClearInUse(OkHttpClientStream stream) {
1201
    if (hasStream) {
1✔
1202
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1203
        hasStream = false;
1✔
1204
        if (keepAliveManager != null) {
1✔
1205
          // We don't have any active streams. No need to do keepalives any more.
1206
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1207
          // and onTransportActive.
1208
          keepAliveManager.onTransportIdle();
×
1209
        }
1210
      }
1211
    }
1212
    if (stream.shouldBeCountedForInUse()) {
1✔
1213
      inUseState.updateObjectInUse(stream, false);
1✔
1214
    }
1215
  }
1✔
1216

1217
  @GuardedBy("lock")
1218
  private void setInUse(OkHttpClientStream stream) {
1219
    if (!hasStream) {
1✔
1220
      hasStream = true;
1✔
1221
      if (keepAliveManager != null) {
1✔
1222
        // We have a new stream. We might need to do keepalives now.
1223
        // Note that we have to do this inside the lock to avoid calling
1224
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1225
        // order.
1226
        keepAliveManager.onTransportActive();
×
1227
      }
1228
    }
1229
    if (stream.shouldBeCountedForInUse()) {
1✔
1230
      inUseState.updateObjectInUse(stream, true);
1✔
1231
    }
1232
  }
1✔
1233

1234
  private Status getPingFailure() {
1235
    synchronized (lock) {
1✔
1236
      if (goAwayStatus != null) {
1✔
1237
        return goAwayStatus;
1✔
1238
      } else {
1239
        return Status.UNAVAILABLE.withDescription("Connection closed");
×
1240
      }
1241
    }
1242
  }
1243

1244
  boolean mayHaveCreatedStream(int streamId) {
1245
    synchronized (lock) {
1✔
1246
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1247
    }
1248
  }
1249

1250
  OkHttpClientStream getStream(int streamId) {
1251
    synchronized (lock) {
1✔
1252
      return streams.get(streamId);
1✔
1253
    }
1254
  }
1255

1256
  /**
1257
   * Returns a Grpc status corresponding to the given ErrorCode.
1258
   */
1259
  @VisibleForTesting
1260
  static Status toGrpcStatus(ErrorCode code) {
1261
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1262
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1263
        "Unknown http2 error code: " + code.httpCode);
1264
  }
1265

1266
  @Override
1267
  public ListenableFuture<SocketStats> getStats() {
1268
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1269
    synchronized (lock) {
1✔
1270
      if (socket == null) {
1✔
1271
        ret.set(new SocketStats(
×
1272
            transportTracer.getStats(),
×
1273
            /*local=*/ null,
1274
            /*remote=*/ null,
1275
            new InternalChannelz.SocketOptions.Builder().build(),
×
1276
            /*security=*/ null));
1277
      } else {
1278
        ret.set(new SocketStats(
1✔
1279
            transportTracer.getStats(),
1✔
1280
            socket.getLocalSocketAddress(),
1✔
1281
            socket.getRemoteSocketAddress(),
1✔
1282
            Utils.getSocketOptions(socket),
1✔
1283
            securityInfo));
1284
      }
1285
      return ret;
1✔
1286
    }
1287
  }
1288

1289
  /**
1290
   * Runnable which reads frames and dispatches them to in flight calls.
1291
   */
1292
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1293

1294
    private final OkHttpFrameLogger logger =
1✔
1295
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1296
    FrameReader frameReader;
1297
    boolean firstSettings = true;
1✔
1298

1299
    ClientFrameHandler(FrameReader frameReader) {
1✔
1300
      this.frameReader = frameReader;
1✔
1301
    }
1✔
1302

1303
    @Override
1304
    @SuppressWarnings("Finally")
1305
    public void run() {
1306
      String threadName = Thread.currentThread().getName();
1✔
1307
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1308
      try {
1309
        // Read until the underlying socket closes.
1310
        while (frameReader.nextFrame(this)) {
1✔
1311
          if (keepAliveManager != null) {
1✔
1312
            keepAliveManager.onDataReceived();
×
1313
          }
1314
        }
1315
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1316
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1317
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1318
        Status status;
1319
        synchronized (lock) {
1✔
1320
          status = goAwayStatus;
1✔
1321
        }
1✔
1322
        if (status == null) {
1✔
1323
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1324
        }
1325
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1326
      } catch (Throwable t) {
1✔
1327
        // TODO(madongfly): Send the exception message to the server.
1328
        startGoAway(
1✔
1329
            0,
1330
            ErrorCode.PROTOCOL_ERROR,
1331
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1332
      } finally {
1333
        try {
1334
          frameReader.close();
1✔
1335
        } catch (IOException ex) {
×
1336
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1337
        } catch (RuntimeException e) {
×
1338
          // This same check is done in okhttp proper:
1339
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1340

1341
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1342
          // https://issuetracker.google.com/issues/177450597
1343
          if (!"bio == null".equals(e.getMessage())) {
×
1344
            throw e;
×
1345
          }
1346
        }
1✔
1347
        listener.transportTerminated();
1✔
1348
        Thread.currentThread().setName(threadName);
1✔
1349
      }
1350
    }
1✔
1351

1352
    /**
1353
     * Handle an HTTP2 DATA frame.
1354
     */
1355
    @SuppressWarnings("GuardedBy")
1356
    @Override
1357
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1358
                     int paddedLength)
1359
        throws IOException {
1360
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1361
          streamId, in.getBuffer(), length, inFinished);
1✔
1362
      OkHttpClientStream stream = getStream(streamId);
1✔
1363
      if (stream == null) {
1✔
1364
        if (mayHaveCreatedStream(streamId)) {
1✔
1365
          synchronized (lock) {
1✔
1366
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1367
          }
1✔
1368
          in.skip(length);
1✔
1369
        } else {
1370
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1371
          return;
1✔
1372
        }
1373
      } else {
1374
        // Wait until the frame is complete.
1375
        in.require(length);
1✔
1376

1377
        Buffer buf = new Buffer();
1✔
1378
        buf.write(in.getBuffer(), length);
1✔
1379
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1380
            stream.transportState().tag());
1✔
1381
        synchronized (lock) {
1✔
1382
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1383
          // instead found: 'OkHttpClientTransport.this.lock'
1384
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1385
        }
1✔
1386
      }
1387

1388
      // connection window update
1389
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1390
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1391
        synchronized (lock) {
1✔
1392
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1393
        }
1✔
1394
        connectionUnacknowledgedBytesRead = 0;
1✔
1395
      }
1396
    }
1✔
1397

1398
    /**
1399
     * Handle HTTP2 HEADER and CONTINUATION frames.
1400
     */
1401
    @SuppressWarnings("GuardedBy")
1402
    @Override
1403
    public void headers(boolean outFinished,
1404
        boolean inFinished,
1405
        int streamId,
1406
        int associatedStreamId,
1407
        List<Header> headerBlock,
1408
        HeadersMode headersMode) {
1409
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1410
      boolean unknownStream = false;
1✔
1411
      Status failedStatus = null;
1✔
1412
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1413
        int metadataSize = headerBlockSize(headerBlock);
1✔
1414
        if (metadataSize > maxInboundMetadataSize) {
1✔
1415
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1416
              String.format(
1✔
1417
                  Locale.US,
1418
                  "Response %s metadata larger than %d: %d",
1419
                  inFinished ? "trailer" : "header",
1✔
1420
                  maxInboundMetadataSize,
1✔
1421
                  metadataSize));
1✔
1422
        }
1423
      }
1424
      synchronized (lock) {
1✔
1425
        OkHttpClientStream stream = streams.get(streamId);
1✔
1426
        if (stream == null) {
1✔
1427
          if (mayHaveCreatedStream(streamId)) {
1✔
1428
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1429
          } else {
1430
            unknownStream = true;
1✔
1431
          }
1432
        } else {
1433
          if (failedStatus == null) {
1✔
1434
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1435
                stream.transportState().tag());
1✔
1436
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1437
            // instead found: 'OkHttpClientTransport.this.lock'
1438
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1439
          } else {
1440
            if (!inFinished) {
1✔
1441
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1442
            }
1443
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1444
          }
1445
        }
1446
      }
1✔
1447
      if (unknownStream) {
1✔
1448
        // We don't expect any server-initiated streams.
1449
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1450
      }
1451
    }
1✔
1452

1453
    private int headerBlockSize(List<Header> headerBlock) {
1454
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1455
      long size = 0;
1✔
1456
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1457
        Header header = headerBlock.get(i);
1✔
1458
        size += 32 + header.name.size() + header.value.size();
1✔
1459
      }
1460
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1461
      return (int) size;
1✔
1462
    }
1463

1464
    @Override
1465
    public void rstStream(int streamId, ErrorCode errorCode) {
1466
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1467
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1468
      boolean stopDelivery =
1✔
1469
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1470
      synchronized (lock) {
1✔
1471
        OkHttpClientStream stream = streams.get(streamId);
1✔
1472
        if (stream != null) {
1✔
1473
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1474
              stream.transportState().tag());
1✔
1475
          finishStream(
1✔
1476
              streamId, status,
1477
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1478
              stopDelivery, null, null);
1479
        }
1480
      }
1✔
1481
    }
1✔
1482

1483
    @Override
1484
    public void settings(boolean clearPrevious, Settings settings) {
1485
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1486
      boolean outboundWindowSizeIncreased = false;
1✔
1487
      synchronized (lock) {
1✔
1488
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1489
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1490
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1491
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1492
        }
1493

1494
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1495
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1496
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1497
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1498
        }
1499
        if (firstSettings) {
1✔
1500
          attributes = listener.filterTransport(attributes);
1✔
1501
          listener.transportReady();
1✔
1502
          firstSettings = false;
1✔
1503
        }
1504

1505
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1506
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1507
        // otherwise it will cause a stream error (RST_STREAM).
1508
        frameWriter.ackSettings(settings);
1✔
1509

1510
        // send any pending bytes / streams
1511
        if (outboundWindowSizeIncreased) {
1✔
1512
          outboundFlow.writeStreams();
1✔
1513
        }
1514
        startPendingStreams();
1✔
1515
      }
1✔
1516
    }
1✔
1517

1518
    @Override
1519
    public void ping(boolean ack, int payload1, int payload2) {
1520
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1521
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1522
      if (!ack) {
1✔
1523
        synchronized (lock) {
1✔
1524
          frameWriter.ping(true, payload1, payload2);
1✔
1525
        }
1✔
1526
      } else {
1527
        Http2Ping p = null;
1✔
1528
        synchronized (lock) {
1✔
1529
          if (ping != null) {
1✔
1530
            if (ping.payload() == ackPayload) {
1✔
1531
              p = ping;
1✔
1532
              ping = null;
1✔
1533
            } else {
1534
              log.log(Level.WARNING, String.format(
1✔
1535
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1536
                  ping.payload(), ackPayload));
1✔
1537
            }
1538
          } else {
1539
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1540
          }
1541
        }
1✔
1542
        // don't complete it while holding lock since callbacks could run immediately
1543
        if (p != null) {
1✔
1544
          p.complete();
1✔
1545
        }
1546
      }
1547
    }
1✔
1548

1549
    @Override
1550
    public void ackSettings() {
1551
      // Do nothing currently.
1552
    }
1✔
1553

1554
    @Override
1555
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1556
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1557
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1558
        String data = debugData.utf8();
1✔
1559
        log.log(Level.WARNING, String.format(
1✔
1560
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1561
        if ("too_many_pings".equals(data)) {
1✔
1562
          tooManyPingsRunnable.run();
1✔
1563
        }
1564
      }
1565
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1566
          .augmentDescription("Received Goaway");
1✔
1567
      if (debugData.size() > 0) {
1✔
1568
        // If a debug message was provided, use it.
1569
        status = status.augmentDescription(debugData.utf8());
1✔
1570
      }
1571
      startGoAway(lastGoodStreamId, null, status);
1✔
1572
    }
1✔
1573

1574
    @Override
1575
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1576
        throws IOException {
1577
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1578
          streamId, promisedStreamId, requestHeaders);
1579
      // We don't accept server initiated stream.
1580
      synchronized (lock) {
1✔
1581
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1582
      }
1✔
1583
    }
1✔
1584

1585
    @Override
1586
    public void windowUpdate(int streamId, long delta) {
1587
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1588
      if (delta == 0) {
1✔
1589
        String errorMsg = "Received 0 flow control window increment.";
×
1590
        if (streamId == 0) {
×
1591
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1592
        } else {
1593
          finishStream(
×
1594
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1595
              ErrorCode.PROTOCOL_ERROR, null);
1596
        }
1597
        return;
×
1598
      }
1599

1600
      boolean unknownStream = false;
1✔
1601
      synchronized (lock) {
1✔
1602
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1603
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1604
          return;
1✔
1605
        }
1606

1607
        OkHttpClientStream stream = streams.get(streamId);
1✔
1608
        if (stream != null) {
1✔
1609
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1610
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1611
          unknownStream = true;
1✔
1612
        }
1613
      }
1✔
1614
      if (unknownStream) {
1✔
1615
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1616
            "Received window_update for unknown stream: " + streamId);
1617
      }
1618
    }
1✔
1619

1620
    @Override
1621
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1622
      // Ignore priority change.
1623
      // TODO(madongfly): log
1624
    }
×
1625

1626
    @Override
1627
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1628
        int port, long maxAge) {
1629
      // TODO(madongfly): Deal with alternateService propagation
1630
    }
×
1631
  }
1632

1633
  /**
1634
   * SSLSocket wrapper that provides a fake SSLSession for handshake session.
1635
   */
1636
  static final class SslSocketWrapper extends NoopSslSocket {
1637

1638
    private final SSLSession sslSession;
1639
    private final SSLSocket sslSocket;
1640

1641
    SslSocketWrapper(SSLSocket sslSocket, String peerHost) {
1✔
1642
      this.sslSocket = sslSocket;
1✔
1643
      this.sslSession = new FakeSslSession(peerHost);
1✔
1644
    }
1✔
1645

1646
    @Override
1647
    public SSLSession getHandshakeSession() {
1648
      return this.sslSession;
1✔
1649
    }
1650

1651
    @Override
1652
    public boolean isConnected() {
1653
      return sslSocket.isConnected();
1✔
1654
    }
1655

1656
    @Override
1657
    public SSLParameters getSSLParameters() {
1658
      return sslSocket.getSSLParameters();
1✔
1659
    }
1660
  }
1661

1662
  /**
1663
   * Fake SSLSession instance that provides the peer host name to verify for per-rpc check.
1664
   */
1665
  static class FakeSslSession extends NoopSslSession {
1666

1667
    private final String peerHost;
1668

1669
    FakeSslSession(String peerHost) {
1✔
1670
      this.peerHost = peerHost;
1✔
1671
    }
1✔
1672

1673
    @Override
1674
    public String getPeerHost() {
1675
      return peerHost;
1✔
1676
    }
1677
  }
1678
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc