• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20125

23 Dec 2025 04:32AM UTC coverage: 88.706% (-0.01%) from 88.72%
#20125

push

github

web-flow
grpclb: pick_first delegation (#12568)

**Summary of Changes**
This pull request refactors the grpclb load balancer's PICK_FIRST mode
to delegate its logic to a standard pick_first load balancing policy.

The key changes are as follows:
1. **`grpclb/build.gradle`**

Added dependency on `grpc-util` module to access
`ForwardingLoadBalancerHelper`

2. **`grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java`**
- New imports:
LoadBalancer, LoadBalancerProvider, LoadBalancerRegistry,
ResolvedAddresses, FixedResultPicker, ForwardingLoadBalancerHelper
- New fields for PICK_FIRST delegation:
    - pickFirstLbProvider - Provider for creating child pick_first LB
    - pickFirstLb - The child LoadBalancer instance
- pickFirstLbState / pickFirstLbPicker - Track child LB's state and
picker
    - currentPickFirstLoadRecorder - Load recorder for token attachment
- Key behavioral changes:
- updateServerList() PICK_FIRST case: Instead of creating a single
subchannel, it now:
- Creates the child pick_first LB once and then updates it with new
addresses on subsequent updates.
        - Passes addresses to child LB via acceptResolvedAddresses()
- maybeUpdatePicker() PICK_FIRST case: Uses child LB's state and picker
wrapped with ChildLbPickerEntry
- RoundRobinEntry.picked() signature change: Changed from
picked(Metadata) to picked(PickSubchannelArgs) to allow child picker
delegation
- New ChildLbPickerEntry class: Wraps child LB's picker and attaches
TokenAttachingTracerFactory for token propagation
- New PickFirstLbHelper class: Forwarding helper that intercepts
updateBalancingState() to store child state and trigger grpclb picker
updates
- Updated shutdown(), requestConnection(), maybeUseFallbackBackends():
Handle the new child LB delegation model

3. **`grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java`**

- Updated tests to reflect the new delegation behavior:
- Initial state is now CONNECTING (not IDLE) since standard pick_first
eagerly connects
- Tests ... (continued)

35463 of 39978 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.03
/../okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
1
/*
2
 * Copyright 2014 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.okhttp;
18

19
import static com.google.common.base.Preconditions.checkState;
20
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_SIZE;
21
import static io.grpc.okhttp.Utils.DEFAULT_WINDOW_UPDATE_RATIO;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import com.google.common.base.MoreObjects;
25
import com.google.common.base.Preconditions;
26
import com.google.common.base.Stopwatch;
27
import com.google.common.base.Supplier;
28
import com.google.common.util.concurrent.ListenableFuture;
29
import com.google.common.util.concurrent.SettableFuture;
30
import com.google.errorprone.annotations.concurrent.GuardedBy;
31
import io.grpc.Attributes;
32
import io.grpc.CallOptions;
33
import io.grpc.ChannelCredentials;
34
import io.grpc.ClientStreamTracer;
35
import io.grpc.Grpc;
36
import io.grpc.HttpConnectProxiedSocketAddress;
37
import io.grpc.InternalChannelz;
38
import io.grpc.InternalChannelz.SocketStats;
39
import io.grpc.InternalLogId;
40
import io.grpc.Metadata;
41
import io.grpc.MethodDescriptor;
42
import io.grpc.MethodDescriptor.MethodType;
43
import io.grpc.SecurityLevel;
44
import io.grpc.Status;
45
import io.grpc.Status.Code;
46
import io.grpc.StatusException;
47
import io.grpc.TlsChannelCredentials;
48
import io.grpc.internal.CertificateUtils;
49
import io.grpc.internal.ClientStreamListener.RpcProgress;
50
import io.grpc.internal.ConnectionClientTransport;
51
import io.grpc.internal.GrpcAttributes;
52
import io.grpc.internal.GrpcUtil;
53
import io.grpc.internal.Http2Ping;
54
import io.grpc.internal.InUseStateAggregator;
55
import io.grpc.internal.KeepAliveManager;
56
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
57
import io.grpc.internal.NoopSslSession;
58
import io.grpc.internal.SerializingExecutor;
59
import io.grpc.internal.StatsTraceContext;
60
import io.grpc.internal.TransportTracer;
61
import io.grpc.okhttp.ExceptionHandlingFrameWriter.TransportExceptionHandler;
62
import io.grpc.okhttp.OkHttpChannelBuilder.OkHttpTransportFactory;
63
import io.grpc.okhttp.internal.ConnectionSpec;
64
import io.grpc.okhttp.internal.Credentials;
65
import io.grpc.okhttp.internal.OkHostnameVerifier;
66
import io.grpc.okhttp.internal.StatusLine;
67
import io.grpc.okhttp.internal.framed.ErrorCode;
68
import io.grpc.okhttp.internal.framed.FrameReader;
69
import io.grpc.okhttp.internal.framed.FrameWriter;
70
import io.grpc.okhttp.internal.framed.Header;
71
import io.grpc.okhttp.internal.framed.HeadersMode;
72
import io.grpc.okhttp.internal.framed.Http2;
73
import io.grpc.okhttp.internal.framed.Settings;
74
import io.grpc.okhttp.internal.framed.Variant;
75
import io.grpc.okhttp.internal.proxy.HttpUrl;
76
import io.grpc.okhttp.internal.proxy.Request;
77
import io.perfmark.PerfMark;
78
import java.io.EOFException;
79
import java.io.IOException;
80
import java.lang.reflect.InvocationTargetException;
81
import java.lang.reflect.Method;
82
import java.net.InetSocketAddress;
83
import java.net.Socket;
84
import java.net.URI;
85
import java.security.GeneralSecurityException;
86
import java.security.KeyStore;
87
import java.security.cert.Certificate;
88
import java.security.cert.X509Certificate;
89
import java.util.Collections;
90
import java.util.Deque;
91
import java.util.EnumMap;
92
import java.util.HashMap;
93
import java.util.Iterator;
94
import java.util.LinkedHashMap;
95
import java.util.LinkedList;
96
import java.util.List;
97
import java.util.Locale;
98
import java.util.Map;
99
import java.util.Random;
100
import java.util.concurrent.BrokenBarrierException;
101
import java.util.concurrent.CountDownLatch;
102
import java.util.concurrent.CyclicBarrier;
103
import java.util.concurrent.Executor;
104
import java.util.concurrent.ScheduledExecutorService;
105
import java.util.concurrent.TimeUnit;
106
import java.util.concurrent.TimeoutException;
107
import java.util.logging.Level;
108
import java.util.logging.Logger;
109
import javax.annotation.Nullable;
110
import javax.net.SocketFactory;
111
import javax.net.ssl.HostnameVerifier;
112
import javax.net.ssl.SSLParameters;
113
import javax.net.ssl.SSLPeerUnverifiedException;
114
import javax.net.ssl.SSLSession;
115
import javax.net.ssl.SSLSocket;
116
import javax.net.ssl.SSLSocketFactory;
117
import javax.net.ssl.TrustManager;
118
import javax.net.ssl.TrustManagerFactory;
119
import javax.net.ssl.X509TrustManager;
120
import okio.Buffer;
121
import okio.BufferedSink;
122
import okio.BufferedSource;
123
import okio.ByteString;
124
import okio.Okio;
125
import okio.Source;
126
import okio.Timeout;
127

128
/**
129
 * A okhttp-based {@link ConnectionClientTransport} implementation.
130
 */
131
class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler,
132
      OutboundFlowController.Transport {
133
  private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
1✔
134
  private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
1✔
135
  private static final String GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK =
136
          "GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK";
137
  static boolean enablePerRpcAuthorityCheck =
1✔
138
          GrpcUtil.getFlag(GRPC_ENABLE_PER_RPC_AUTHORITY_CHECK, false);
1✔
139
  private Socket sock;
140
  private SSLSession sslSession;
141

142
  private static Map<ErrorCode, Status> buildErrorCodeToStatusMap() {
143
    Map<ErrorCode, Status> errorToStatus = new EnumMap<>(ErrorCode.class);
1✔
144
    errorToStatus.put(ErrorCode.NO_ERROR,
1✔
145
        Status.INTERNAL.withDescription("No error: A GRPC status of OK should have been sent"));
1✔
146
    errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
1✔
147
        Status.INTERNAL.withDescription("Protocol error"));
1✔
148
    errorToStatus.put(ErrorCode.INTERNAL_ERROR,
1✔
149
        Status.INTERNAL.withDescription("Internal error"));
1✔
150
    errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
1✔
151
        Status.INTERNAL.withDescription("Flow control error"));
1✔
152
    errorToStatus.put(ErrorCode.STREAM_CLOSED,
1✔
153
        Status.INTERNAL.withDescription("Stream closed"));
1✔
154
    errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
1✔
155
        Status.INTERNAL.withDescription("Frame too large"));
1✔
156
    errorToStatus.put(ErrorCode.REFUSED_STREAM,
1✔
157
        Status.UNAVAILABLE.withDescription("Refused stream"));
1✔
158
    errorToStatus.put(ErrorCode.CANCEL,
1✔
159
        Status.CANCELLED.withDescription("Cancelled"));
1✔
160
    errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
1✔
161
        Status.INTERNAL.withDescription("Compression error"));
1✔
162
    errorToStatus.put(ErrorCode.CONNECT_ERROR,
1✔
163
        Status.INTERNAL.withDescription("Connect error"));
1✔
164
    errorToStatus.put(ErrorCode.ENHANCE_YOUR_CALM,
1✔
165
        Status.RESOURCE_EXHAUSTED.withDescription("Enhance your calm"));
1✔
166
    errorToStatus.put(ErrorCode.INADEQUATE_SECURITY,
1✔
167
        Status.PERMISSION_DENIED.withDescription("Inadequate security"));
1✔
168
    return Collections.unmodifiableMap(errorToStatus);
1✔
169
  }
170

171
  private static final Class<?> x509ExtendedTrustManagerClass;
172
  private static final Method checkServerTrustedMethod;
173

174
  static {
175
    Class<?> x509ExtendedTrustManagerClass1 = null;
1✔
176
    Method checkServerTrustedMethod1 = null;
1✔
177
    try {
178
      x509ExtendedTrustManagerClass1 = Class.forName("javax.net.ssl.X509ExtendedTrustManager");
1✔
179
      checkServerTrustedMethod1 = x509ExtendedTrustManagerClass1.getMethod("checkServerTrusted",
1✔
180
              X509Certificate[].class, String.class, Socket.class);
181
    } catch (ClassNotFoundException e) {
×
182
      // Per-rpc authority override via call options will be disallowed.
183
    } catch (NoSuchMethodException e) {
×
184
      // Should never happen since X509ExtendedTrustManager was introduced in Android API level 24
185
      // along with checkServerTrusted.
186
    }
1✔
187
    x509ExtendedTrustManagerClass = x509ExtendedTrustManagerClass1;
1✔
188
    checkServerTrustedMethod = checkServerTrustedMethod1;
1✔
189
  }
1✔
190

191
  private final InetSocketAddress address;
192
  private final String defaultAuthority;
193
  private final String userAgent;
194
  private final Random random = new Random();
1✔
195
  // Returns new unstarted stopwatches
196
  private final Supplier<Stopwatch> stopwatchFactory;
197
  private final int initialWindowSize;
198
  private final Variant variant;
199
  private Listener listener;
200
  @GuardedBy("lock")
201
  private ExceptionHandlingFrameWriter frameWriter;
202
  private OutboundFlowController outboundFlow;
203
  private final Object lock = new Object();
1✔
204
  private final InternalLogId logId;
205
  @GuardedBy("lock")
206
  private int nextStreamId;
207
  @GuardedBy("lock")
1✔
208
  private final Map<Integer, OkHttpClientStream> streams = new HashMap<>();
209
  private final Executor executor;
210
  // Wrap on executor, to guarantee some operations be executed serially.
211
  private final SerializingExecutor serializingExecutor;
212
  private final ScheduledExecutorService scheduler;
213
  private final int maxMessageSize;
214
  private int connectionUnacknowledgedBytesRead;
215
  private ClientFrameHandler clientFrameHandler;
216
  // Caution: Not synchronized, new value can only be safely read after the connection is complete.
217
  private Attributes attributes;
218
  /**
219
   * Indicates the transport is in go-away state: no new streams will be processed, but existing
220
   * streams may continue.
221
   */
222
  @GuardedBy("lock")
223
  private Status goAwayStatus;
224
  @GuardedBy("lock")
225
  private boolean goAwaySent;
226
  @GuardedBy("lock")
227
  private Http2Ping ping;
228
  @GuardedBy("lock")
229
  private boolean stopped;
230
  @GuardedBy("lock")
231
  private boolean hasStream;
232
  private final SocketFactory socketFactory;
233
  private SSLSocketFactory sslSocketFactory;
234
  private HostnameVerifier hostnameVerifier;
235
  private Socket socket;
236
  @GuardedBy("lock")
1✔
237
  private int maxConcurrentStreams = 0;
238
  @SuppressWarnings("JdkObsolete") // Usage is bursty; want low memory usage when empty
1✔
239
  @GuardedBy("lock")
240
  private final Deque<OkHttpClientStream> pendingStreams = new LinkedList<>();
241
  private final ConnectionSpec connectionSpec;
242
  private KeepAliveManager keepAliveManager;
243
  private boolean enableKeepAlive;
244
  private long keepAliveTimeNanos;
245
  private long keepAliveTimeoutNanos;
246
  private boolean keepAliveWithoutCalls;
247
  private final Runnable tooManyPingsRunnable;
248
  private final int maxInboundMetadataSize;
249
  private final boolean useGetForSafeMethods;
250
  @GuardedBy("lock")
251
  private final TransportTracer transportTracer;
252
  private final TrustManager x509TrustManager;
253

254
  @SuppressWarnings("serial")
255
  private static class LruCache<T> extends LinkedHashMap<String, T> {
256
    @Override
257
    protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
258
      return size() > 100;
1✔
259
    }
260
  }
261

262
  @GuardedBy("lock")
1✔
263
  private final Map<String, Status> authorityVerificationResults = new LruCache<>();
264

265
  @GuardedBy("lock")
1✔
266
  private final InUseStateAggregator<OkHttpClientStream> inUseState =
267
      new InUseStateAggregator<OkHttpClientStream>() {
1✔
268
        @Override
269
        protected void handleInUse() {
270
          listener.transportInUse(true);
1✔
271
        }
1✔
272

273
        @Override
274
        protected void handleNotInUse() {
275
          listener.transportInUse(false);
1✔
276
        }
1✔
277
      };
278
  @GuardedBy("lock")
279
  private InternalChannelz.Security securityInfo;
280

281
  @VisibleForTesting
282
  @Nullable
283
  final HttpConnectProxiedSocketAddress proxiedAddr;
284

285
  @VisibleForTesting
1✔
286
  int proxySocketTimeout = 30000;
287

288
  // The following fields should only be used for test.
289
  Runnable connectingCallback;
290
  SettableFuture<Void> connectedFuture;
291

292
  public OkHttpClientTransport(
293
          OkHttpTransportFactory transportFactory,
294
          InetSocketAddress address,
295
          String authority,
296
          @Nullable String userAgent,
297
          Attributes eagAttrs,
298
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
299
          Runnable tooManyPingsRunnable,
300
          ChannelCredentials channelCredentials) {
301
    this(
1✔
302
        transportFactory,
303
        address,
304
        authority,
305
        userAgent,
306
        eagAttrs,
307
        GrpcUtil.STOPWATCH_SUPPLIER,
308
        new Http2(),
309
        proxiedAddr,
310
        tooManyPingsRunnable,
311
        channelCredentials);
312
  }
1✔
313

314
  private OkHttpClientTransport(
315
          OkHttpTransportFactory transportFactory,
316
          InetSocketAddress address,
317
          String authority,
318
          @Nullable String userAgent,
319
          Attributes eagAttrs,
320
          Supplier<Stopwatch> stopwatchFactory,
321
          Variant variant,
322
          @Nullable HttpConnectProxiedSocketAddress proxiedAddr,
323
          Runnable tooManyPingsRunnable,
324
          ChannelCredentials channelCredentials) {
1✔
325
    this.address = Preconditions.checkNotNull(address, "address");
1✔
326
    this.defaultAuthority = authority;
1✔
327
    this.maxMessageSize = transportFactory.maxMessageSize;
1✔
328
    this.initialWindowSize = transportFactory.flowControlWindow;
1✔
329
    this.executor = Preconditions.checkNotNull(transportFactory.executor, "executor");
1✔
330
    serializingExecutor = new SerializingExecutor(transportFactory.executor);
1✔
331
    this.scheduler = Preconditions.checkNotNull(
1✔
332
        transportFactory.scheduledExecutorService, "scheduledExecutorService");
333
    // Client initiated streams are odd, server initiated ones are even. Server should not need to
334
    // use it. We start clients at 3 to avoid conflicting with HTTP negotiation.
335
    nextStreamId = 3;
1✔
336
    this.socketFactory = transportFactory.socketFactory == null
1✔
337
        ? SocketFactory.getDefault() : transportFactory.socketFactory;
1✔
338
    this.sslSocketFactory = transportFactory.sslSocketFactory;
1✔
339
    this.hostnameVerifier = transportFactory.hostnameVerifier != null
1✔
340
        ? transportFactory.hostnameVerifier : OkHostnameVerifier.INSTANCE;
1✔
341
    this.connectionSpec = Preconditions.checkNotNull(
1✔
342
        transportFactory.connectionSpec, "connectionSpec");
343
    this.stopwatchFactory = Preconditions.checkNotNull(stopwatchFactory, "stopwatchFactory");
1✔
344
    this.variant = Preconditions.checkNotNull(variant, "variant");
1✔
345
    this.userAgent = GrpcUtil.getGrpcUserAgent("okhttp", userAgent);
1✔
346
    this.proxiedAddr = proxiedAddr;
1✔
347
    this.tooManyPingsRunnable =
1✔
348
        Preconditions.checkNotNull(tooManyPingsRunnable, "tooManyPingsRunnable");
1✔
349
    this.maxInboundMetadataSize = transportFactory.maxInboundMetadataSize;
1✔
350
    this.transportTracer = transportFactory.transportTracerFactory.create();
1✔
351
    this.logId = InternalLogId.allocate(getClass(), address.toString());
1✔
352
    this.attributes = Attributes.newBuilder()
1✔
353
        .set(GrpcAttributes.ATTR_CLIENT_EAG_ATTRS, eagAttrs).build();
1✔
354
    this.useGetForSafeMethods = transportFactory.useGetForSafeMethods;
1✔
355
    initTransportTracer();
1✔
356
    TrustManager tempX509TrustManager;
357
    if (channelCredentials instanceof TlsChannelCredentials
1✔
358
        && x509ExtendedTrustManagerClass != null) {
359
      try {
360
        tempX509TrustManager = getTrustManager(
1✔
361
                (TlsChannelCredentials) channelCredentials);
362
      } catch (GeneralSecurityException e) {
×
363
        tempX509TrustManager = null;
×
364
        log.log(Level.WARNING, "Obtaining X509ExtendedTrustManager for the transport failed."
×
365
            + "Per-rpc authority overrides will be disallowed.", e);
366
      }
1✔
367
    } else {
368
      tempX509TrustManager = null;
1✔
369
    }
370
    x509TrustManager = tempX509TrustManager;
1✔
371
  }
1✔
372

373
  /**
374
   * Create a transport connected to a fake peer for test.
375
   */
376
  @SuppressWarnings("AddressSelection") // An IP address always returns one address
377
  @VisibleForTesting
378
  OkHttpClientTransport(
379
      OkHttpTransportFactory transportFactory,
380
      String userAgent,
381
      Supplier<Stopwatch> stopwatchFactory,
382
      Variant variant,
383
      @Nullable Runnable connectingCallback,
384
      SettableFuture<Void> connectedFuture,
385
      Runnable tooManyPingsRunnable) {
386
    this(
1✔
387
        transportFactory,
388
        new InetSocketAddress("127.0.0.1", 80),
389
        "notarealauthority:80",
390
        userAgent,
391
        Attributes.EMPTY,
392
        stopwatchFactory,
393
        variant,
394
        null,
395
        tooManyPingsRunnable,
396
        null);
397
    this.connectingCallback = connectingCallback;
1✔
398
    this.connectedFuture = Preconditions.checkNotNull(connectedFuture, "connectedFuture");
1✔
399
  }
1✔
400

401
  // sslSocketFactory is set to null when use plaintext.
402
  boolean isUsingPlaintext() {
403
    return sslSocketFactory == null;
1✔
404
  }
405

406
  private void initTransportTracer() {
407
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
408
      transportTracer.setFlowControlWindowReader(new TransportTracer.FlowControlReader() {
1✔
409
        @Override
410
        public TransportTracer.FlowControlWindows read() {
411
          synchronized (lock) {
1✔
412
            long local = outboundFlow == null ? -1 : outboundFlow.windowUpdate(null, 0);
1✔
413
            // connectionUnacknowledgedBytesRead is only readable by ClientFrameHandler, so we
414
            // provide a lower bound.
415
            long remote = (long) (initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO);
1✔
416
            return new TransportTracer.FlowControlWindows(local, remote);
1✔
417
          }
418
        }
419
      });
420
    }
1✔
421
  }
1✔
422

423
  /**
424
   * Enable keepalive with custom delay and timeout.
425
   */
426
  void enableKeepAlive(boolean enable, long keepAliveTimeNanos,
427
      long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
428
    enableKeepAlive = enable;
×
429
    this.keepAliveTimeNanos = keepAliveTimeNanos;
×
430
    this.keepAliveTimeoutNanos = keepAliveTimeoutNanos;
×
431
    this.keepAliveWithoutCalls = keepAliveWithoutCalls;
×
432
  }
×
433

434
  @Override
435
  public void ping(final PingCallback callback, Executor executor) {
436
    long data = 0;
1✔
437
    Http2Ping p;
438
    boolean writePing;
439
    synchronized (lock) {
1✔
440
      checkState(frameWriter != null);
1✔
441
      if (stopped) {
1✔
442
        Http2Ping.notifyFailed(callback, executor, getPingFailure());
1✔
443
        return;
1✔
444
      }
445
      if (ping != null) {
1✔
446
        // we only allow one outstanding ping at a time, so just add the callback to
447
        // any outstanding operation
448
        p = ping;
1✔
449
        writePing = false;
1✔
450
      } else {
451
        // set outstanding operation and then write the ping after releasing lock
452
        data = random.nextLong();
1✔
453
        Stopwatch stopwatch = stopwatchFactory.get();
1✔
454
        stopwatch.start();
1✔
455
        p = ping = new Http2Ping(data, stopwatch);
1✔
456
        writePing = true;
1✔
457
        transportTracer.reportKeepAliveSent();
1✔
458
      }
459
      if (writePing) {
1✔
460
        frameWriter.ping(false, (int) (data >>> 32), (int) data);
1✔
461
      }
462
    }
1✔
463
    // If transport concurrently failed/stopped since we released the lock above, this could
464
    // immediately invoke callback (which we shouldn't do while holding a lock)
465
    p.addCallback(callback, executor);
1✔
466
  }
1✔
467

468
  @Override
469
  public OkHttpClientStream newStream(
470
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
471
      ClientStreamTracer[] tracers) {
472
    Preconditions.checkNotNull(method, "method");
1✔
473
    Preconditions.checkNotNull(headers, "headers");
1✔
474
    StatsTraceContext statsTraceContext =
1✔
475
        StatsTraceContext.newClientContext(tracers, getAttributes(), headers);
1✔
476

477
    // FIXME: it is likely wrong to pass the transportTracer here as it'll exit the lock's scope
478
    synchronized (lock) { // to make @GuardedBy linter happy
1✔
479
      return new OkHttpClientStream(
1✔
480
          method,
481
          headers,
482
          frameWriter,
483
          OkHttpClientTransport.this,
484
          outboundFlow,
485
          lock,
486
          maxMessageSize,
487
          initialWindowSize,
488
          defaultAuthority,
489
          userAgent,
490
          statsTraceContext,
491
          transportTracer,
492
          callOptions,
493
          useGetForSafeMethods);
494
    }
495
  }
496

497
  private TrustManager getTrustManager(TlsChannelCredentials tlsCreds)
498
      throws GeneralSecurityException {
499
    TrustManager[] tm;
500
    // Using the same way of creating TrustManager from OkHttpChannelBuilder.sslSocketFactoryFrom()
501
    if (tlsCreds.getTrustManagers() != null) {
1✔
502
      tm = tlsCreds.getTrustManagers().toArray(new TrustManager[0]);
1✔
503
    } else if (tlsCreds.getRootCertificates() != null) {
1✔
504
      tm = CertificateUtils.createTrustManager(tlsCreds.getRootCertificates());
1✔
505
    } else { // else use system default
506
      TrustManagerFactory tmf = TrustManagerFactory.getInstance(
1✔
507
          TrustManagerFactory.getDefaultAlgorithm());
1✔
508
      tmf.init((KeyStore) null);
1✔
509
      tm = tmf.getTrustManagers();
1✔
510
    }
511
    for (TrustManager trustManager: tm) {
1✔
512
      if (trustManager instanceof X509TrustManager) {
1✔
513
        return trustManager;
1✔
514
      }
515
    }
516
    return null;
×
517
  }
518

519
  @GuardedBy("lock")
520
  void streamReadyToStart(OkHttpClientStream clientStream, String authority) {
521
    if (goAwayStatus != null) {
1✔
522
      clientStream.transportState().transportReportStatus(
1✔
523
          goAwayStatus, RpcProgress.MISCARRIED, true, new Metadata());
524
    } else {
525
      if (socket instanceof SSLSocket && !authority.equals(defaultAuthority)) {
1✔
526
        Status authorityVerificationResult;
527
        if (authorityVerificationResults.containsKey(authority)) {
1✔
528
          authorityVerificationResult = authorityVerificationResults.get(authority);
×
529
        } else {
530
          authorityVerificationResult = verifyAuthority(authority);
1✔
531
          authorityVerificationResults.put(authority, authorityVerificationResult);
1✔
532
        }
533
        if (!authorityVerificationResult.isOk()) {
1✔
534
          if (enablePerRpcAuthorityCheck) {
1✔
535
            clientStream.transportState().transportReportStatus(
1✔
536
                    authorityVerificationResult, RpcProgress.PROCESSED, true, new Metadata());
537
            return;
1✔
538
          }
539
        }
540
      }
541
      if (streams.size() >= maxConcurrentStreams) {
1✔
542
        pendingStreams.add(clientStream);
1✔
543
        setInUse(clientStream);
1✔
544
      } else {
545
        startStream(clientStream);
1✔
546
      }
547
    }
548
  }
1✔
549

550
  private Status verifyAuthority(String authority) {
551
    Status authorityVerificationResult;
552
    if (hostnameVerifier.verify(authority, ((SSLSocket) socket).getSession())) {
1✔
553
      authorityVerificationResult = Status.OK;
1✔
554
    } else {
555
      authorityVerificationResult = Status.UNAVAILABLE.withDescription(String.format(
1✔
556
              "HostNameVerifier verification failed for authority '%s'",
557
              authority));
558
    }
559
    if (!authorityVerificationResult.isOk() && !enablePerRpcAuthorityCheck) {
1✔
560
      log.log(Level.WARNING, String.format("HostNameVerifier verification failed for "
1✔
561
                      + "authority '%s'. This will be an error in the future.",
562
              authority));
563
    }
564
    if (authorityVerificationResult.isOk()) {
1✔
565
      // The status is trivially assigned in this case, but we are still making use of the
566
      // cache to keep track that a warning log had been logged for the authority when
567
      // enablePerRpcAuthorityCheck is false. When we permanently enable the feature, the
568
      // status won't need to be cached for case when x509TrustManager is null.
569
      if (x509TrustManager == null) {
1✔
570
        authorityVerificationResult = Status.UNAVAILABLE.withDescription(
1✔
571
                String.format("Could not verify authority '%s' for the rpc with no "
1✔
572
                                + "X509TrustManager available",
573
                        authority));
574
      } else if (x509ExtendedTrustManagerClass.isInstance(x509TrustManager)) {
1✔
575
        try {
576
          Certificate[] peerCertificates = sslSession.getPeerCertificates();
1✔
577
          X509Certificate[] x509PeerCertificates =
1✔
578
                  new X509Certificate[peerCertificates.length];
579
          for (int i = 0; i < peerCertificates.length; i++) {
1✔
580
            x509PeerCertificates[i] = (X509Certificate) peerCertificates[i];
1✔
581
          }
582
          checkServerTrustedMethod.invoke(x509TrustManager, x509PeerCertificates,
1✔
583
                  "RSA", new SslSocketWrapper((SSLSocket) socket, authority));
584
          authorityVerificationResult = Status.OK;
1✔
585
        } catch (SSLPeerUnverifiedException | InvocationTargetException
1✔
586
                 | IllegalAccessException e) {
587
          authorityVerificationResult = Status.UNAVAILABLE.withCause(e).withDescription(
1✔
588
                  "Peer verification failed");
589
        }
1✔
590
        if (authorityVerificationResult.getCause() != null) {
1✔
591
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
592
                          + ". This will be an error in the future.",
593
                  authorityVerificationResult.getCause());
1✔
594
        } else {
595
          log.log(Level.WARNING, authorityVerificationResult.getDescription()
1✔
596
                  + ". This will be an error in the future.");
597
        }
598
      }
599
    }
600
    return authorityVerificationResult;
1✔
601
  }
602

603
  @SuppressWarnings("GuardedBy")
604
  @GuardedBy("lock")
605
  private void startStream(OkHttpClientStream stream) {
606
    checkState(
1✔
607
        stream.transportState().id() == OkHttpClientStream.ABSENT_ID, "StreamId already assigned");
1✔
608
    streams.put(nextStreamId, stream);
1✔
609
    setInUse(stream);
1✔
610
    // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock'; instead
611
    // found: 'this.lock'
612
    stream.transportState().start(nextStreamId);
1✔
613
    // For unary and server streaming, there will be a data frame soon, no need to flush the header.
614
    if ((stream.getType() != MethodType.UNARY && stream.getType() != MethodType.SERVER_STREAMING)
1✔
615
        || stream.useGet()) {
1✔
616
      frameWriter.flush();
1✔
617
    }
618
    if (nextStreamId >= Integer.MAX_VALUE - 2) {
1✔
619
      // Make sure nextStreamId greater than all used id, so that mayHaveCreatedStream() performs
620
      // correctly.
621
      nextStreamId = Integer.MAX_VALUE;
1✔
622
      startGoAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR,
1✔
623
          Status.UNAVAILABLE.withDescription("Stream ids exhausted"));
1✔
624
    } else {
625
      nextStreamId += 2;
1✔
626
    }
627
  }
1✔
628

629
  /**
630
   * Starts pending streams, returns true if at least one pending stream is started.
631
   */
632
  @GuardedBy("lock")
633
  private boolean startPendingStreams() {
634
    boolean hasStreamStarted = false;
1✔
635
    while (!pendingStreams.isEmpty() && streams.size() < maxConcurrentStreams) {
1✔
636
      OkHttpClientStream stream = pendingStreams.poll();
1✔
637
      startStream(stream);
1✔
638
      hasStreamStarted = true;
1✔
639
    }
1✔
640
    return hasStreamStarted;
1✔
641
  }
642

643
  /**
644
   * Removes given pending stream, used when a pending stream is cancelled.
645
   */
646
  @GuardedBy("lock")
647
  void removePendingStream(OkHttpClientStream pendingStream) {
648
    pendingStreams.remove(pendingStream);
1✔
649
    maybeClearInUse(pendingStream);
1✔
650
  }
1✔
651

652
  @Override
653
  public Runnable start(Listener listener) {
654
    this.listener = Preconditions.checkNotNull(listener, "listener");
1✔
655

656
    if (enableKeepAlive) {
1✔
657
      keepAliveManager = new KeepAliveManager(
×
658
          new ClientKeepAlivePinger(this), scheduler, keepAliveTimeNanos, keepAliveTimeoutNanos,
659
          keepAliveWithoutCalls);
660
      keepAliveManager.onTransportStarted();
×
661
    }
662

663
    int maxQueuedControlFrames = 10000;
1✔
664
    final AsyncSink asyncSink = AsyncSink.sink(serializingExecutor, this, maxQueuedControlFrames);
1✔
665
    FrameWriter rawFrameWriter = asyncSink.limitControlFramesWriter(
1✔
666
        variant.newWriter(Okio.buffer(asyncSink), true));
1✔
667

668
    synchronized (lock) {
1✔
669
      // Handle FrameWriter exceptions centrally, since there are many callers. Note that errors
670
      // coming from rawFrameWriter are generally broken invariants/bugs, as AsyncSink does not
671
      // propagate syscall errors through the FrameWriter. But we handle the AsyncSink failures with
672
      // the same TransportExceptionHandler instance so it is all mixed back together.
673
      frameWriter = new ExceptionHandlingFrameWriter(this, rawFrameWriter);
1✔
674
      outboundFlow = new OutboundFlowController(this, frameWriter);
1✔
675
    }
1✔
676
    final CountDownLatch latch = new CountDownLatch(1);
1✔
677
    final CountDownLatch latchForExtraThread = new CountDownLatch(1);
1✔
678
    // The transport needs up to two threads to function once started,
679
    // but only needs one during handshaking. Start another thread during handshaking
680
    // to make sure there's still a free thread available. If the number of threads is exhausted,
681
    // it is better to kill the transport than for all the transports to hang unable to send.
682
    CyclicBarrier barrier = new CyclicBarrier(2);
1✔
683
    // Connecting in the serializingExecutor, so that some stream operations like synStream
684
    // will be executed after connected.
685

686
    serializingExecutor.execute(new Runnable() {
1✔
687
      @Override
688
      public void run() {
689
        // Use closed source on failure so that the reader immediately shuts down.
690
        BufferedSource source = Okio.buffer(new Source() {
1✔
691
          @Override
692
          public long read(Buffer sink, long byteCount) {
693
            return -1;
1✔
694
          }
695

696
          @Override
697
          public Timeout timeout() {
698
            return Timeout.NONE;
×
699
          }
700

701
          @Override
702
          public void close() {
703
          }
1✔
704
        });
705
        try {
706
          // This is a hack to make sure the connection preface and initial settings to be sent out
707
          // without blocking the start. By doing this essentially prevents potential deadlock when
708
          // network is not available during startup while another thread holding lock to send the
709
          // initial preface.
710
          try {
711
            latch.await();
1✔
712
            barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
713
          } catch (InterruptedException e) {
×
714
            Thread.currentThread().interrupt();
×
715
          } catch (TimeoutException | BrokenBarrierException e) {
1✔
716
            startGoAway(0, ErrorCode.INTERNAL_ERROR, Status.UNAVAILABLE
1✔
717
                .withDescription("Timed out waiting for second handshake thread. "
1✔
718
                    + "The transport executor pool may have run out of threads"));
719
            return;
1✔
720
          }
1✔
721

722
          if (proxiedAddr == null) {
1✔
723
            sock = socketFactory.createSocket(address.getAddress(), address.getPort());
1✔
724
          } else {
725
            if (proxiedAddr.getProxyAddress() instanceof InetSocketAddress) {
1✔
726
              sock = createHttpProxySocket(
1✔
727
                  proxiedAddr.getTargetAddress(),
1✔
728
                  (InetSocketAddress) proxiedAddr.getProxyAddress(),
1✔
729
                  proxiedAddr.getUsername(),
1✔
730
                  proxiedAddr.getPassword()
1✔
731
              );
732
            } else {
733
              throw Status.INTERNAL.withDescription(
×
734
                  "Unsupported SocketAddress implementation "
735
                  + proxiedAddr.getProxyAddress().getClass()).asException();
×
736
            }
737
          }
738
          if (sslSocketFactory != null) {
1✔
739
            SSLSocket sslSocket = OkHttpTlsUpgrader.upgrade(
1✔
740
                sslSocketFactory, hostnameVerifier, sock, getOverridenHost(), getOverridenPort(),
1✔
741
                connectionSpec);
1✔
742
            sslSession = sslSocket.getSession();
1✔
743
            sock = sslSocket;
1✔
744
          }
745
          sock.setTcpNoDelay(true);
1✔
746
          source = Okio.buffer(Okio.source(sock));
1✔
747
          asyncSink.becomeConnected(Okio.sink(sock), sock);
1✔
748

749
          // The return value of OkHttpTlsUpgrader.upgrade is an SSLSocket that has this info
750
          attributes = attributes.toBuilder()
1✔
751
              .set(Grpc.TRANSPORT_ATTR_REMOTE_ADDR, sock.getRemoteSocketAddress())
1✔
752
              .set(Grpc.TRANSPORT_ATTR_LOCAL_ADDR, sock.getLocalSocketAddress())
1✔
753
              .set(Grpc.TRANSPORT_ATTR_SSL_SESSION, sslSession)
1✔
754
              .set(GrpcAttributes.ATTR_SECURITY_LEVEL,
1✔
755
                  sslSession == null ? SecurityLevel.NONE : SecurityLevel.PRIVACY_AND_INTEGRITY)
1✔
756
              .build();
1✔
757
        } catch (StatusException e) {
1✔
758
          startGoAway(0, ErrorCode.INTERNAL_ERROR, e.getStatus());
1✔
759
          return;
1✔
760
        } catch (Exception e) {
1✔
761
          onException(e);
1✔
762
          return;
1✔
763
        } finally {
764
          clientFrameHandler = new ClientFrameHandler(variant.newReader(source, true));
1✔
765
          latchForExtraThread.countDown();
1✔
766
        }
767
        synchronized (lock) {
1✔
768
          socket = Preconditions.checkNotNull(sock, "socket");
1✔
769
          if (sslSession != null) {
1✔
770
            securityInfo = new InternalChannelz.Security(new InternalChannelz.Tls(sslSession));
1✔
771
          }
772
        }
1✔
773
      }
1✔
774
    });
775

776
    executor.execute(new Runnable() {
1✔
777
      @Override
778
      public void run() {
779
        try {
780
          barrier.await(1000, TimeUnit.MILLISECONDS);
1✔
781
          latchForExtraThread.await();
1✔
782
        } catch (BrokenBarrierException | TimeoutException e) {
1✔
783
          // Something bad happened, maybe too few threads available!
784
          // This will be handled in the handshake thread.
785
        } catch (InterruptedException e) {
×
786
          Thread.currentThread().interrupt();
×
787
        }
1✔
788
      }
1✔
789
    });
790
    // Schedule to send connection preface & settings before any other write.
791
    try {
792
      sendConnectionPrefaceAndSettings();
1✔
793
    } finally {
794
      latch.countDown();
1✔
795
    }
796

797
    serializingExecutor.execute(new Runnable() {
1✔
798
      @Override
799
      public void run() {
800
        if (connectingCallback != null) {
1✔
801
          connectingCallback.run();
1✔
802
        }
803
        synchronized (lock) {
1✔
804
          maxConcurrentStreams = Integer.MAX_VALUE;
1✔
805
          checkState(pendingStreams.isEmpty(),
1✔
806
              "Pending streams detected during transport start."
807
                  + " RPCs should not be started before transport is ready.");
808
        }
1✔
809
        // ClientFrameHandler need to be started after connectionPreface / settings, otherwise it
810
        // may send goAway immediately.
811
        executor.execute(clientFrameHandler);
1✔
812
        if (connectedFuture != null) {
1✔
813
          connectedFuture.set(null);
1✔
814
        }
815
      }
1✔
816
    });
817
    return null;
1✔
818
  }
819

820
  /**
821
   * Should only be called once when the transport is first established.
822
   */
823
  private void sendConnectionPrefaceAndSettings() {
824
    synchronized (lock) {
1✔
825
      frameWriter.connectionPreface();
1✔
826
      Settings settings = new Settings();
1✔
827
      OkHttpSettingsUtil.set(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE, initialWindowSize);
1✔
828
      frameWriter.settings(settings);
1✔
829
      if (initialWindowSize > DEFAULT_WINDOW_SIZE) {
1✔
830
        frameWriter.windowUpdate(
1✔
831
                Utils.CONNECTION_STREAM_ID, initialWindowSize - DEFAULT_WINDOW_SIZE);
832
      }
833
    }
1✔
834
  }
1✔
835

836
  private Socket createHttpProxySocket(InetSocketAddress address, InetSocketAddress proxyAddress,
837
      String proxyUsername, String proxyPassword) throws StatusException {
838
    Socket sock = null;
1✔
839
    try {
840
      // The proxy address may not be resolved
841
      if (proxyAddress.getAddress() != null) {
1✔
842
        sock = socketFactory.createSocket(proxyAddress.getAddress(), proxyAddress.getPort());
1✔
843
      } else {
844
        sock =
×
845
            socketFactory.createSocket(proxyAddress.getHostName(), proxyAddress.getPort());
×
846
      }
847
      sock.setTcpNoDelay(true);
1✔
848
      // A socket timeout is needed because lost network connectivity while reading from the proxy,
849
      // can cause reading from the socket to hang.
850
      sock.setSoTimeout(proxySocketTimeout);
1✔
851

852
      Source source = Okio.source(sock);
1✔
853
      BufferedSink sink = Okio.buffer(Okio.sink(sock));
1✔
854

855
      // Prepare headers and request method line
856
      Request proxyRequest = createHttpProxyRequest(address, proxyUsername, proxyPassword);
1✔
857
      HttpUrl url = proxyRequest.httpUrl();
1✔
858
      String requestLine =
1✔
859
          String.format(Locale.US, "CONNECT %s:%d HTTP/1.1", url.host(), url.port());
1✔
860

861
      // Write request to socket
862
      sink.writeUtf8(requestLine).writeUtf8("\r\n");
1✔
863
      for (int i = 0, size = proxyRequest.headers().size(); i < size; i++) {
1✔
864
        sink.writeUtf8(proxyRequest.headers().name(i))
1✔
865
            .writeUtf8(": ")
1✔
866
            .writeUtf8(proxyRequest.headers().value(i))
1✔
867
            .writeUtf8("\r\n");
1✔
868
      }
869
      sink.writeUtf8("\r\n");
1✔
870
      // Flush buffer (flushes socket and sends request)
871
      sink.flush();
1✔
872

873
      // Read status line, check if 2xx was returned
874
      StatusLine statusLine = StatusLine.parse(readUtf8LineStrictUnbuffered(source));
1✔
875
      // Drain rest of headers
876
      while (!readUtf8LineStrictUnbuffered(source).equals("")) {}
1✔
877
      if (statusLine.code < 200 || statusLine.code >= 300) {
1✔
878
        Buffer body = new Buffer();
1✔
879
        try {
880
          sock.shutdownOutput();
1✔
881
          source.read(body, 1024);
1✔
882
        } catch (IOException ex) {
×
883
          body.writeUtf8("Unable to read body: " + ex.toString());
×
884
        }
1✔
885
        try {
886
          sock.close();
1✔
887
        } catch (IOException ignored) {
×
888
          // ignored
889
        }
1✔
890
        String message = String.format(
1✔
891
            Locale.US,
892
            "Response returned from proxy was not successful (expected 2xx, got %d %s). "
893
              + "Response body:\n%s",
894
            statusLine.code, statusLine.message, body.readUtf8());
1✔
895
        throw Status.UNAVAILABLE.withDescription(message).asException();
1✔
896
      }
897
      // As the socket will be used for RPCs from here on, we want the socket timeout back to zero.
898
      sock.setSoTimeout(0);
1✔
899
      return sock;
1✔
900
    } catch (IOException e) {
1✔
901
      if (sock != null) {
1✔
902
        GrpcUtil.closeQuietly(sock);
1✔
903
      }
904
      throw Status.UNAVAILABLE.withDescription("Failed trying to connect with proxy").withCause(e)
1✔
905
          .asException();
1✔
906
    }
907
  }
908

909
  private Request createHttpProxyRequest(InetSocketAddress address, String proxyUsername,
910
                                         String proxyPassword) {
911
    HttpUrl tunnelUrl = new HttpUrl.Builder()
1✔
912
        .scheme("https")
1✔
913
        .host(address.getHostName())
1✔
914
        .port(address.getPort())
1✔
915
        .build();
1✔
916

917
    Request.Builder request = new Request.Builder()
1✔
918
        .url(tunnelUrl)
1✔
919
        .header("Host", tunnelUrl.host() + ":" + tunnelUrl.port())
1✔
920
        .header("User-Agent", userAgent);
1✔
921

922
    // If we have proxy credentials, set them right away
923
    if (proxyUsername != null && proxyPassword != null) {
1✔
924
      request.header("Proxy-Authorization", Credentials.basic(proxyUsername, proxyPassword));
×
925
    }
926
    return request.build();
1✔
927
  }
928

929
  private static String readUtf8LineStrictUnbuffered(Source source) throws IOException {
930
    Buffer buffer = new Buffer();
1✔
931
    while (true) {
932
      if (source.read(buffer, 1) == -1) {
1✔
933
        throw new EOFException("\\n not found: " + buffer.readByteString().hex());
1✔
934
      }
935
      if (buffer.getByte(buffer.size() - 1) == '\n') {
1✔
936
        return buffer.readUtf8LineStrict();
1✔
937
      }
938
    }
939
  }
940

941
  @Override
942
  public String toString() {
943
    return MoreObjects.toStringHelper(this)
1✔
944
        .add("logId", logId.getId())
1✔
945
        .add("address", address)
1✔
946
        .toString();
1✔
947
  }
948

949
  @Override
950
  public InternalLogId getLogId() {
951
    return logId;
1✔
952
  }
953

954
  /**
955
   * Gets the overridden authority hostname.  If the authority is overridden to be an invalid
956
   * authority, uri.getHost() will (rightly) return null, since the authority is no longer
957
   * an actual service.  This method overrides the behavior for practical reasons.  For example,
958
   * if an authority is in the form "invalid_authority" (note the "_"), rather than return null,
959
   * we return the input.  This is because the return value, in conjunction with getOverridenPort,
960
   * are used by the SSL library to reconstruct the actual authority.  It /already/ has a
961
   * connection to the port, independent of this function.
962
   *
963
   * <p>Note: if the defaultAuthority has a port number in it and is also bad, this code will do
964
   * the wrong thing.  An example wrong behavior would be "invalid_host:443".   Registry based
965
   * authorities do not have ports, so this is even more wrong than before.  Sorry.
966
   */
967
  @VisibleForTesting
968
  String getOverridenHost() {
969
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
970
    if (uri.getHost() != null) {
1✔
971
      return uri.getHost();
1✔
972
    }
973

974
    return defaultAuthority;
1✔
975
  }
976

977
  @VisibleForTesting
978
  int getOverridenPort() {
979
    URI uri = GrpcUtil.authorityToUri(defaultAuthority);
1✔
980
    if (uri.getPort() != -1) {
1✔
981
      return uri.getPort();
×
982
    }
983

984
    return address.getPort();
1✔
985
  }
986

987
  @Override
988
  public void shutdown(Status reason) {
989
    synchronized (lock) {
1✔
990
      if (goAwayStatus != null) {
1✔
991
        return;
1✔
992
      }
993

994
      goAwayStatus = reason;
1✔
995
      listener.transportShutdown(goAwayStatus);
1✔
996
      stopIfNecessary();
1✔
997
    }
1✔
998
  }
1✔
999

1000
  @Override
1001
  public void shutdownNow(Status reason) {
1002
    shutdown(reason);
1✔
1003
    synchronized (lock) {
1✔
1004
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1005
      while (it.hasNext()) {
1✔
1006
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1007
        it.remove();
1✔
1008
        entry.getValue().transportState().transportReportStatus(reason, false, new Metadata());
1✔
1009
        maybeClearInUse(entry.getValue());
1✔
1010
      }
1✔
1011

1012
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1013
        // in cases such as the connection fails to ACK keep-alive, pending streams should have a
1014
        // chance to retry and be routed to another connection.
1015
        stream.transportState().transportReportStatus(
1✔
1016
            reason, RpcProgress.MISCARRIED, true, new Metadata());
1017
        maybeClearInUse(stream);
1✔
1018
      }
1✔
1019
      pendingStreams.clear();
1✔
1020

1021
      stopIfNecessary();
1✔
1022
    }
1✔
1023
  }
1✔
1024

1025
  @Override
1026
  public Attributes getAttributes() {
1027
    return attributes;
1✔
1028
  }
1029

1030
  /**
1031
   * Gets all active streams as an array.
1032
   */
1033
  @Override
1034
  public OutboundFlowController.StreamState[] getActiveStreams() {
1035
    synchronized (lock) {
1✔
1036
      OutboundFlowController.StreamState[] flowStreams =
1✔
1037
          new OutboundFlowController.StreamState[streams.size()];
1✔
1038
      int i = 0;
1✔
1039
      for (OkHttpClientStream stream : streams.values()) {
1✔
1040
        flowStreams[i++] = stream.transportState().getOutboundFlowState();
1✔
1041
      }
1✔
1042
      return flowStreams;
1✔
1043
    }
1044
  }
1045

1046
  @VisibleForTesting
1047
  ClientFrameHandler getHandler() {
1048
    return clientFrameHandler;
1✔
1049
  }
1050

1051
  @VisibleForTesting
1052
  SocketFactory getSocketFactory() {
1053
    return socketFactory;
1✔
1054
  }
1055

1056
  @VisibleForTesting
1057
  int getPendingStreamSize() {
1058
    synchronized (lock) {
1✔
1059
      return pendingStreams.size();
1✔
1060
    }
1061
  }
1062

1063
  @VisibleForTesting
1064
  void setNextStreamId(int nextStreamId) {
1065
    synchronized (lock) {
1✔
1066
      this.nextStreamId = nextStreamId;
1✔
1067
    }
1✔
1068
  }
1✔
1069

1070
  /**
1071
   * Finish all active streams due to an IOException, then close the transport.
1072
   */
1073
  @Override
1074
  public void onException(Throwable failureCause) {
1075
    Preconditions.checkNotNull(failureCause, "failureCause");
1✔
1076
    Status status = Status.UNAVAILABLE.withCause(failureCause);
1✔
1077
    startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1078
  }
1✔
1079

1080
  /**
1081
   * Send GOAWAY to the server, then finish all active streams and close the transport.
1082
   */
1083
  private void onError(ErrorCode errorCode, String moreDetail) {
1084
    startGoAway(0, errorCode, toGrpcStatus(errorCode).augmentDescription(moreDetail));
1✔
1085
  }
1✔
1086

1087
  private void startGoAway(int lastKnownStreamId, ErrorCode errorCode, Status status) {
1088
    synchronized (lock) {
1✔
1089
      if (goAwayStatus == null) {
1✔
1090
        goAwayStatus = status;
1✔
1091
        listener.transportShutdown(status);
1✔
1092
      }
1093
      if (errorCode != null && !goAwaySent) {
1✔
1094
        // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1095
        // streams. The GOAWAY is part of graceful shutdown.
1096
        goAwaySent = true;
1✔
1097
        frameWriter.goAway(0, errorCode, new byte[0]);
1✔
1098
      }
1099

1100
      Iterator<Map.Entry<Integer, OkHttpClientStream>> it = streams.entrySet().iterator();
1✔
1101
      while (it.hasNext()) {
1✔
1102
        Map.Entry<Integer, OkHttpClientStream> entry = it.next();
1✔
1103
        if (entry.getKey() > lastKnownStreamId) {
1✔
1104
          it.remove();
1✔
1105
          entry.getValue().transportState().transportReportStatus(
1✔
1106
              status, RpcProgress.REFUSED, false, new Metadata());
1107
          maybeClearInUse(entry.getValue());
1✔
1108
        }
1109
      }
1✔
1110

1111
      for (OkHttpClientStream stream : pendingStreams) {
1✔
1112
        stream.transportState().transportReportStatus(
1✔
1113
            status, RpcProgress.MISCARRIED, true, new Metadata());
1114
        maybeClearInUse(stream);
1✔
1115
      }
1✔
1116
      pendingStreams.clear();
1✔
1117

1118
      stopIfNecessary();
1✔
1119
    }
1✔
1120
  }
1✔
1121

1122
  /**
1123
   * Called when a stream is closed. We do things like:
1124
   * <ul>
1125
   * <li>Removing the stream from the map.
1126
   * <li>Optionally reporting the status.
1127
   * <li>Starting pending streams if we can.
1128
   * <li>Stopping the transport if this is the last live stream under a go-away status.
1129
   * </ul>
1130
   *
1131
   * @param streamId the Id of the stream.
1132
   * @param status the final status of this stream, null means no need to report.
1133
   * @param stopDelivery interrupt queued messages in the deframer
1134
   * @param errorCode reset the stream with this ErrorCode if not null.
1135
   * @param trailers the trailers received if not null
1136
   */
1137
  void finishStream(
1138
      int streamId,
1139
      @Nullable Status status,
1140
      RpcProgress rpcProgress,
1141
      boolean stopDelivery,
1142
      @Nullable ErrorCode errorCode,
1143
      @Nullable Metadata trailers) {
1144
    synchronized (lock) {
1✔
1145
      OkHttpClientStream stream = streams.remove(streamId);
1✔
1146
      if (stream != null) {
1✔
1147
        if (errorCode != null) {
1✔
1148
          frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1149
        }
1150
        if (status != null) {
1✔
1151
          stream
1✔
1152
              .transportState()
1✔
1153
              .transportReportStatus(
1✔
1154
                  status,
1155
                  rpcProgress,
1156
                  stopDelivery,
1157
                  trailers != null ? trailers : new Metadata());
1✔
1158
        }
1159
        if (!startPendingStreams()) {
1✔
1160
          stopIfNecessary();
1✔
1161
        }
1162
        maybeClearInUse(stream);
1✔
1163
      }
1164
    }
1✔
1165
  }
1✔
1166

1167
  /**
1168
   * When the transport is in goAway state, we should stop it once all active streams finish.
1169
   */
1170
  @GuardedBy("lock")
1171
  private void stopIfNecessary() {
1172
    if (!(goAwayStatus != null && streams.isEmpty() && pendingStreams.isEmpty())) {
1✔
1173
      return;
1✔
1174
    }
1175
    if (stopped) {
1✔
1176
      return;
1✔
1177
    }
1178
    stopped = true;
1✔
1179

1180
    if (keepAliveManager != null) {
1✔
1181
      keepAliveManager.onTransportTermination();
×
1182
    }
1183

1184
    if (ping != null) {
1✔
1185
      ping.failed(getPingFailure());
1✔
1186
      ping = null;
1✔
1187
    }
1188

1189
    if (!goAwaySent) {
1✔
1190
      // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated
1191
      // streams. The GOAWAY is part of graceful shutdown.
1192
      goAwaySent = true;
1✔
1193
      frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
1✔
1194
    }
1195

1196
    // We will close the underlying socket in the writing thread to break out the reader
1197
    // thread, which will close the frameReader and notify the listener.
1198
    frameWriter.close();
1✔
1199
  }
1✔
1200

1201
  @GuardedBy("lock")
1202
  private void maybeClearInUse(OkHttpClientStream stream) {
1203
    if (hasStream) {
1✔
1204
      if (pendingStreams.isEmpty() && streams.isEmpty()) {
1✔
1205
        hasStream = false;
1✔
1206
        if (keepAliveManager != null) {
1✔
1207
          // We don't have any active streams. No need to do keepalives any more.
1208
          // Again, we have to call this inside the lock to avoid the race between onTransportIdle
1209
          // and onTransportActive.
1210
          keepAliveManager.onTransportIdle();
×
1211
        }
1212
      }
1213
    }
1214
    if (stream.shouldBeCountedForInUse()) {
1✔
1215
      inUseState.updateObjectInUse(stream, false);
1✔
1216
    }
1217
  }
1✔
1218

1219
  @GuardedBy("lock")
1220
  private void setInUse(OkHttpClientStream stream) {
1221
    if (!hasStream) {
1✔
1222
      hasStream = true;
1✔
1223
      if (keepAliveManager != null) {
1✔
1224
        // We have a new stream. We might need to do keepalives now.
1225
        // Note that we have to do this inside the lock to avoid calling
1226
        // KeepAliveManager.onTransportActive and KeepAliveManager.onTransportIdle in the wrong
1227
        // order.
1228
        keepAliveManager.onTransportActive();
×
1229
      }
1230
    }
1231
    if (stream.shouldBeCountedForInUse()) {
1✔
1232
      inUseState.updateObjectInUse(stream, true);
1✔
1233
    }
1234
  }
1✔
1235

1236
  private Status getPingFailure() {
1237
    synchronized (lock) {
1✔
1238
      if (goAwayStatus != null) {
1✔
1239
        return goAwayStatus;
1✔
1240
      } else {
1241
        return Status.UNAVAILABLE.withDescription("Connection closed");
×
1242
      }
1243
    }
1244
  }
1245

1246
  boolean mayHaveCreatedStream(int streamId) {
1247
    synchronized (lock) {
1✔
1248
      return streamId < nextStreamId && (streamId & 1) == 1;
1✔
1249
    }
1250
  }
1251

1252
  OkHttpClientStream getStream(int streamId) {
1253
    synchronized (lock) {
1✔
1254
      return streams.get(streamId);
1✔
1255
    }
1256
  }
1257

1258
  /**
1259
   * Returns a Grpc status corresponding to the given ErrorCode.
1260
   */
1261
  @VisibleForTesting
1262
  static Status toGrpcStatus(ErrorCode code) {
1263
    Status status = ERROR_CODE_TO_STATUS.get(code);
1✔
1264
    return status != null ? status : Status.UNKNOWN.withDescription(
1✔
1265
        "Unknown http2 error code: " + code.httpCode);
1266
  }
1267

1268
  @Override
1269
  public ListenableFuture<SocketStats> getStats() {
1270
    SettableFuture<SocketStats> ret = SettableFuture.create();
1✔
1271
    synchronized (lock) {
1✔
1272
      if (socket == null) {
1✔
1273
        ret.set(new SocketStats(
×
1274
            transportTracer.getStats(),
×
1275
            /*local=*/ null,
1276
            /*remote=*/ null,
1277
            new InternalChannelz.SocketOptions.Builder().build(),
×
1278
            /*security=*/ null));
1279
      } else {
1280
        ret.set(new SocketStats(
1✔
1281
            transportTracer.getStats(),
1✔
1282
            socket.getLocalSocketAddress(),
1✔
1283
            socket.getRemoteSocketAddress(),
1✔
1284
            Utils.getSocketOptions(socket),
1✔
1285
            securityInfo));
1286
      }
1287
      return ret;
1✔
1288
    }
1289
  }
1290

1291
  /**
1292
   * Runnable which reads frames and dispatches them to in flight calls.
1293
   */
1294
  class ClientFrameHandler implements FrameReader.Handler, Runnable {
1295

1296
    private final OkHttpFrameLogger logger =
1✔
1297
        new OkHttpFrameLogger(Level.FINE, OkHttpClientTransport.class);
1298
    FrameReader frameReader;
1299
    boolean firstSettings = true;
1✔
1300

1301
    ClientFrameHandler(FrameReader frameReader) {
1✔
1302
      this.frameReader = frameReader;
1✔
1303
    }
1✔
1304

1305
    @Override
1306
    @SuppressWarnings("Finally")
1307
    public void run() {
1308
      String threadName = Thread.currentThread().getName();
1✔
1309
      Thread.currentThread().setName("OkHttpClientTransport");
1✔
1310
      try {
1311
        // Read until the underlying socket closes.
1312
        while (frameReader.nextFrame(this)) {
1✔
1313
          if (keepAliveManager != null) {
1✔
1314
            keepAliveManager.onDataReceived();
×
1315
          }
1316
        }
1317
        // frameReader.nextFrame() returns false when the underlying read encounters an IOException,
1318
        // it may be triggered by the socket closing, in such case, the startGoAway() will do
1319
        // nothing, otherwise, we finish all streams since it's a real IO issue.
1320
        Status status;
1321
        synchronized (lock) {
1✔
1322
          status = goAwayStatus;
1✔
1323
        }
1✔
1324
        if (status == null) {
1✔
1325
          status = Status.UNAVAILABLE.withDescription("End of stream or IOException");
1✔
1326
        }
1327
        startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
1✔
1328
      } catch (Throwable t) {
1✔
1329
        // TODO(madongfly): Send the exception message to the server.
1330
        startGoAway(
1✔
1331
            0,
1332
            ErrorCode.PROTOCOL_ERROR,
1333
            Status.INTERNAL.withDescription("error in frame handler").withCause(t));
1✔
1334
      } finally {
1335
        try {
1336
          frameReader.close();
1✔
1337
        } catch (IOException ex) {
×
1338
          log.log(Level.INFO, "Exception closing frame reader", ex);
×
1339
        } catch (RuntimeException e) {
×
1340
          // This same check is done in okhttp proper:
1341
          // https://github.com/square/okhttp/blob/3cc0f4917cbda03cb31617f8ead1e0aeb19de2fb/okhttp/src/main/kotlin/okhttp3/internal/-UtilJvm.kt#L270
1342

1343
          // Conscrypt in Android 10 and 11 may throw closing an SSLSocket. This is safe to ignore.
1344
          // https://issuetracker.google.com/issues/177450597
1345
          if (!"bio == null".equals(e.getMessage())) {
×
1346
            throw e;
×
1347
          }
1348
        }
1✔
1349
        listener.transportTerminated();
1✔
1350
        Thread.currentThread().setName(threadName);
1✔
1351
      }
1352
    }
1✔
1353

1354
    /**
1355
     * Handle an HTTP2 DATA frame.
1356
     */
1357
    @SuppressWarnings("GuardedBy")
1358
    @Override
1359
    public void data(boolean inFinished, int streamId, BufferedSource in, int length,
1360
                     int paddedLength)
1361
        throws IOException {
1362
      logger.logData(OkHttpFrameLogger.Direction.INBOUND,
1✔
1363
          streamId, in.getBuffer(), length, inFinished);
1✔
1364
      OkHttpClientStream stream = getStream(streamId);
1✔
1365
      if (stream == null) {
1✔
1366
        if (mayHaveCreatedStream(streamId)) {
1✔
1367
          synchronized (lock) {
1✔
1368
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1369
          }
1✔
1370
          in.skip(length);
1✔
1371
        } else {
1372
          onError(ErrorCode.PROTOCOL_ERROR, "Received data for unknown stream: " + streamId);
1✔
1373
          return;
1✔
1374
        }
1375
      } else {
1376
        // Wait until the frame is complete.
1377
        in.require(length);
1✔
1378

1379
        Buffer buf = new Buffer();
1✔
1380
        buf.write(in.getBuffer(), length);
1✔
1381
        PerfMark.event("OkHttpClientTransport$ClientFrameHandler.data",
1✔
1382
            stream.transportState().tag());
1✔
1383
        synchronized (lock) {
1✔
1384
          // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1385
          // instead found: 'OkHttpClientTransport.this.lock'
1386
          stream.transportState().transportDataReceived(buf, inFinished, paddedLength - length);
1✔
1387
        }
1✔
1388
      }
1389

1390
      // connection window update
1391
      connectionUnacknowledgedBytesRead += paddedLength;
1✔
1392
      if (connectionUnacknowledgedBytesRead >= initialWindowSize * DEFAULT_WINDOW_UPDATE_RATIO) {
1✔
1393
        synchronized (lock) {
1✔
1394
          frameWriter.windowUpdate(0, connectionUnacknowledgedBytesRead);
1✔
1395
        }
1✔
1396
        connectionUnacknowledgedBytesRead = 0;
1✔
1397
      }
1398
    }
1✔
1399

1400
    /**
1401
     * Handle HTTP2 HEADER and CONTINUATION frames.
1402
     */
1403
    @SuppressWarnings("GuardedBy")
1404
    @Override
1405
    public void headers(boolean outFinished,
1406
        boolean inFinished,
1407
        int streamId,
1408
        int associatedStreamId,
1409
        List<Header> headerBlock,
1410
        HeadersMode headersMode) {
1411
      logger.logHeaders(OkHttpFrameLogger.Direction.INBOUND, streamId, headerBlock, inFinished);
1✔
1412
      boolean unknownStream = false;
1✔
1413
      Status failedStatus = null;
1✔
1414
      if (maxInboundMetadataSize != Integer.MAX_VALUE) {
1✔
1415
        int metadataSize = headerBlockSize(headerBlock);
1✔
1416
        if (metadataSize > maxInboundMetadataSize) {
1✔
1417
          failedStatus = Status.RESOURCE_EXHAUSTED.withDescription(
1✔
1418
              String.format(
1✔
1419
                  Locale.US,
1420
                  "Response %s metadata larger than %d: %d",
1421
                  inFinished ? "trailer" : "header",
1✔
1422
                  maxInboundMetadataSize,
1✔
1423
                  metadataSize));
1✔
1424
        }
1425
      }
1426
      synchronized (lock) {
1✔
1427
        OkHttpClientStream stream = streams.get(streamId);
1✔
1428
        if (stream == null) {
1✔
1429
          if (mayHaveCreatedStream(streamId)) {
1✔
1430
            frameWriter.rstStream(streamId, ErrorCode.STREAM_CLOSED);
1✔
1431
          } else {
1432
            unknownStream = true;
1✔
1433
          }
1434
        } else {
1435
          if (failedStatus == null) {
1✔
1436
            PerfMark.event("OkHttpClientTransport$ClientFrameHandler.headers",
1✔
1437
                stream.transportState().tag());
1✔
1438
            // TODO(b/145386688): This access should be guarded by 'stream.transportState().lock';
1439
            // instead found: 'OkHttpClientTransport.this.lock'
1440
            stream.transportState().transportHeadersReceived(headerBlock, inFinished);
1✔
1441
          } else {
1442
            if (!inFinished) {
1✔
1443
              frameWriter.rstStream(streamId, ErrorCode.CANCEL);
1✔
1444
            }
1445
            stream.transportState().transportReportStatus(failedStatus, false, new Metadata());
1✔
1446
          }
1447
        }
1448
      }
1✔
1449
      if (unknownStream) {
1✔
1450
        // We don't expect any server-initiated streams.
1451
        onError(ErrorCode.PROTOCOL_ERROR, "Received header for unknown stream: " + streamId);
1✔
1452
      }
1453
    }
1✔
1454

1455
    private int headerBlockSize(List<Header> headerBlock) {
1456
      // Calculate as defined for SETTINGS_MAX_HEADER_LIST_SIZE in RFC 7540 ยง6.5.2.
1457
      long size = 0;
1✔
1458
      for (int i = 0; i < headerBlock.size(); i++) {
1✔
1459
        Header header = headerBlock.get(i);
1✔
1460
        size += 32 + header.name.size() + header.value.size();
1✔
1461
      }
1462
      size = Math.min(size, Integer.MAX_VALUE);
1✔
1463
      return (int) size;
1✔
1464
    }
1465

1466
    @Override
1467
    public void rstStream(int streamId, ErrorCode errorCode) {
1468
      logger.logRstStream(OkHttpFrameLogger.Direction.INBOUND, streamId, errorCode);
1✔
1469
      Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
1✔
1470
      boolean stopDelivery =
1✔
1471
          (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
1✔
1472
      synchronized (lock) {
1✔
1473
        OkHttpClientStream stream = streams.get(streamId);
1✔
1474
        if (stream != null) {
1✔
1475
          PerfMark.event("OkHttpClientTransport$ClientFrameHandler.rstStream",
1✔
1476
              stream.transportState().tag());
1✔
1477
          finishStream(
1✔
1478
              streamId, status,
1479
              errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
1✔
1480
              stopDelivery, null, null);
1481
        }
1482
      }
1✔
1483
    }
1✔
1484

1485
    @Override
1486
    public void settings(boolean clearPrevious, Settings settings) {
1487
      logger.logSettings(OkHttpFrameLogger.Direction.INBOUND, settings);
1✔
1488
      boolean outboundWindowSizeIncreased = false;
1✔
1489
      synchronized (lock) {
1✔
1490
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
1✔
1491
          int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
1✔
1492
              settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS);
1493
          maxConcurrentStreams = receivedMaxConcurrentStreams;
1✔
1494
        }
1495

1496
        if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
1✔
1497
          int initialWindowSize = OkHttpSettingsUtil.get(
1✔
1498
              settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
1499
          outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
1✔
1500
        }
1501
        if (firstSettings) {
1✔
1502
          attributes = listener.filterTransport(attributes);
1✔
1503
          listener.transportReady();
1✔
1504
          firstSettings = false;
1✔
1505
        }
1506

1507
        // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
1508
        // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
1509
        // otherwise it will cause a stream error (RST_STREAM).
1510
        frameWriter.ackSettings(settings);
1✔
1511

1512
        // send any pending bytes / streams
1513
        if (outboundWindowSizeIncreased) {
1✔
1514
          outboundFlow.writeStreams();
1✔
1515
        }
1516
        startPendingStreams();
1✔
1517
      }
1✔
1518
    }
1✔
1519

1520
    @Override
1521
    public void ping(boolean ack, int payload1, int payload2) {
1522
      long ackPayload = (((long) payload1) << 32) | (payload2 & 0xffffffffL);
1✔
1523
      logger.logPing(OkHttpFrameLogger.Direction.INBOUND, ackPayload);
1✔
1524
      if (!ack) {
1✔
1525
        synchronized (lock) {
1✔
1526
          frameWriter.ping(true, payload1, payload2);
1✔
1527
        }
1✔
1528
      } else {
1529
        Http2Ping p = null;
1✔
1530
        synchronized (lock) {
1✔
1531
          if (ping != null) {
1✔
1532
            if (ping.payload() == ackPayload) {
1✔
1533
              p = ping;
1✔
1534
              ping = null;
1✔
1535
            } else {
1536
              log.log(Level.WARNING, String.format(
1✔
1537
                  Locale.US, "Received unexpected ping ack. Expecting %d, got %d",
1538
                  ping.payload(), ackPayload));
1✔
1539
            }
1540
          } else {
1541
            log.warning("Received unexpected ping ack. No ping outstanding");
×
1542
          }
1543
        }
1✔
1544
        // don't complete it while holding lock since callbacks could run immediately
1545
        if (p != null) {
1✔
1546
          p.complete();
1✔
1547
        }
1548
      }
1549
    }
1✔
1550

1551
    @Override
1552
    public void ackSettings() {
1553
      // Do nothing currently.
1554
    }
1✔
1555

1556
    @Override
1557
    public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
1558
      logger.logGoAway(OkHttpFrameLogger.Direction.INBOUND, lastGoodStreamId, errorCode, debugData);
1✔
1559
      if (errorCode == ErrorCode.ENHANCE_YOUR_CALM) {
1✔
1560
        String data = debugData.utf8();
1✔
1561
        log.log(Level.WARNING, String.format(
1✔
1562
            "%s: Received GOAWAY with ENHANCE_YOUR_CALM. Debug data: %s", this, data));
1563
        if ("too_many_pings".equals(data)) {
1✔
1564
          tooManyPingsRunnable.run();
1✔
1565
        }
1566
      }
1567
      Status status = GrpcUtil.Http2Error.statusForCode(errorCode.httpCode)
1✔
1568
          .augmentDescription("Received Goaway");
1✔
1569
      if (debugData.size() > 0) {
1✔
1570
        // If a debug message was provided, use it.
1571
        status = status.augmentDescription(debugData.utf8());
1✔
1572
      }
1573
      startGoAway(lastGoodStreamId, null, status);
1✔
1574
    }
1✔
1575

1576
    @Override
1577
    public void pushPromise(int streamId, int promisedStreamId, List<Header> requestHeaders)
1578
        throws IOException {
1579
      logger.logPushPromise(OkHttpFrameLogger.Direction.INBOUND,
1✔
1580
          streamId, promisedStreamId, requestHeaders);
1581
      // We don't accept server initiated stream.
1582
      synchronized (lock) {
1✔
1583
        frameWriter.rstStream(streamId, ErrorCode.PROTOCOL_ERROR);
1✔
1584
      }
1✔
1585
    }
1✔
1586

1587
    @Override
1588
    public void windowUpdate(int streamId, long delta) {
1589
      logger.logWindowsUpdate(OkHttpFrameLogger.Direction.INBOUND, streamId, delta);
1✔
1590
      if (delta == 0) {
1✔
1591
        String errorMsg = "Received 0 flow control window increment.";
×
1592
        if (streamId == 0) {
×
1593
          onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
×
1594
        } else {
1595
          finishStream(
×
1596
              streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
×
1597
              ErrorCode.PROTOCOL_ERROR, null);
1598
        }
1599
        return;
×
1600
      }
1601

1602
      boolean unknownStream = false;
1✔
1603
      synchronized (lock) {
1✔
1604
        if (streamId == Utils.CONNECTION_STREAM_ID) {
1✔
1605
          outboundFlow.windowUpdate(null, (int) delta);
1✔
1606
          return;
1✔
1607
        }
1608

1609
        OkHttpClientStream stream = streams.get(streamId);
1✔
1610
        if (stream != null) {
1✔
1611
          outboundFlow.windowUpdate(stream.transportState().getOutboundFlowState(), (int) delta);
1✔
1612
        } else if (!mayHaveCreatedStream(streamId)) {
1✔
1613
          unknownStream = true;
1✔
1614
        }
1615
      }
1✔
1616
      if (unknownStream) {
1✔
1617
        onError(ErrorCode.PROTOCOL_ERROR,
1✔
1618
            "Received window_update for unknown stream: " + streamId);
1619
      }
1620
    }
1✔
1621

1622
    @Override
1623
    public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
1624
      // Ignore priority change.
1625
      // TODO(madongfly): log
1626
    }
×
1627

1628
    @Override
1629
    public void alternateService(int streamId, String origin, ByteString protocol, String host,
1630
        int port, long maxAge) {
1631
      // TODO(madongfly): Deal with alternateService propagation
1632
    }
×
1633
  }
1634

1635
  /**
1636
   * SSLSocket wrapper that provides a fake SSLSession for handshake session.
1637
   */
1638
  static final class SslSocketWrapper extends NoopSslSocket {
1639

1640
    private final SSLSession sslSession;
1641
    private final SSLSocket sslSocket;
1642

1643
    SslSocketWrapper(SSLSocket sslSocket, String peerHost) {
1✔
1644
      this.sslSocket = sslSocket;
1✔
1645
      this.sslSession = new FakeSslSession(peerHost);
1✔
1646
    }
1✔
1647

1648
    @Override
1649
    public SSLSession getHandshakeSession() {
1650
      return this.sslSession;
1✔
1651
    }
1652

1653
    @Override
1654
    public boolean isConnected() {
1655
      return sslSocket.isConnected();
1✔
1656
    }
1657

1658
    @Override
1659
    public SSLParameters getSSLParameters() {
1660
      return sslSocket.getSSLParameters();
1✔
1661
    }
1662
  }
1663

1664
  /**
1665
   * Fake SSLSession instance that provides the peer host name to verify for per-rpc check.
1666
   */
1667
  static class FakeSslSession extends NoopSslSession {
1668

1669
    private final String peerHost;
1670

1671
    FakeSslSession(String peerHost) {
1✔
1672
      this.peerHost = peerHost;
1✔
1673
    }
1✔
1674

1675
    @Override
1676
    public String getPeerHost() {
1677
      return peerHost;
1✔
1678
    }
1679
  }
1680
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc