• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19493

03 Oct 2024 12:03AM UTC coverage: 84.599% (+0.01%) from 84.586%
#19493

push

github

web-flow
Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time (#11520)

* Change PickFirstLeafLoadBalancer to only have 1 subchannel at a time if environment variable GRPC_SERIALIZE_RETRIES == true.

Cache serializingRetries value so that it doesn't have to look up the flag every time.

Clear the correct task when READY in processSubchannelState and move the logic to cancelScheduledTasks

Cleanup based on PR review

remove unneeded checks for shutdown.

* Fix previously broken tests

* Shutdown previous subchannel when run off end of index.

* Provide option to disable subchannel retries to let PFLeafLB take control of retries.

* InternalSubchannel internally goes to IDLE when sees TF when reconnect is disabled.
Remove an extra index.increment in LeafLB

33705 of 39841 relevant lines covered (84.6%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.11
/../core/src/main/java/io/grpc/internal/InternalSubchannel.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static io.grpc.ConnectivityState.CONNECTING;
20
import static io.grpc.ConnectivityState.IDLE;
21
import static io.grpc.ConnectivityState.READY;
22
import static io.grpc.ConnectivityState.SHUTDOWN;
23
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.MoreObjects;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Stopwatch;
29
import com.google.common.base.Supplier;
30
import com.google.common.util.concurrent.ListenableFuture;
31
import com.google.common.util.concurrent.SettableFuture;
32
import com.google.errorprone.annotations.ForOverride;
33
import io.grpc.Attributes;
34
import io.grpc.CallOptions;
35
import io.grpc.ChannelLogger;
36
import io.grpc.ChannelLogger.ChannelLogLevel;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.ClientTransportFilter;
39
import io.grpc.ConnectivityState;
40
import io.grpc.ConnectivityStateInfo;
41
import io.grpc.EquivalentAddressGroup;
42
import io.grpc.HttpConnectProxiedSocketAddress;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ChannelStats;
45
import io.grpc.InternalInstrumented;
46
import io.grpc.InternalLogId;
47
import io.grpc.InternalWithLogId;
48
import io.grpc.LoadBalancer;
49
import io.grpc.Metadata;
50
import io.grpc.MethodDescriptor;
51
import io.grpc.Status;
52
import io.grpc.SynchronizationContext;
53
import io.grpc.SynchronizationContext.ScheduledHandle;
54
import java.net.SocketAddress;
55
import java.util.ArrayList;
56
import java.util.Collection;
57
import java.util.Collections;
58
import java.util.List;
59
import java.util.concurrent.ScheduledExecutorService;
60
import java.util.concurrent.TimeUnit;
61
import javax.annotation.Nullable;
62
import javax.annotation.concurrent.ThreadSafe;
63

64
/**
65
 * Transports for a single {@link SocketAddress}.
66
 */
67
@ThreadSafe
68
final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider {
69

70
  private final InternalLogId logId;
71
  private final String authority;
72
  private final String userAgent;
73
  private final BackoffPolicy.Provider backoffPolicyProvider;
74
  private final Callback callback;
75
  private final ClientTransportFactory transportFactory;
76
  private final ScheduledExecutorService scheduledExecutor;
77
  private final InternalChannelz channelz;
78
  private final CallTracer callsTracer;
79
  private final ChannelTracer channelTracer;
80
  private final ChannelLogger channelLogger;
81
  private final boolean reconnectDisabled;
82

83
  private final List<ClientTransportFilter> transportFilters;
84

85
  /**
86
   * All field must be mutated in the syncContext.
87
   */
88
  private final SynchronizationContext syncContext;
89

90
  /**
91
   * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
92
   * both are null.
93
   *
94
   * <p>Note: any {@link Index#updateAddresses(List)} should also update {@link #addressGroups}.
95
   */
96
  private final Index addressIndex;
97

98
  /**
99
   * A volatile accessor to {@link Index#getAddressGroups()}. There are few methods ({@link
100
   * #getAddressGroups()} and {@link #toString()} access this value where they supposed to access
101
   * in the {@link #syncContext}. Ideally {@link Index#getAddressGroups()} can be volatile, so we
102
   * don't need to maintain this volatile accessor. Although, having this accessor can reduce
103
   * unnecessary volatile reads while it delivers clearer intention of why .
104
   */
105
  private volatile List<EquivalentAddressGroup> addressGroups;
106

107
  /**
108
   * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
109
   * scheduled.
110
   */
111
  private BackoffPolicy reconnectPolicy;
112

113
  /**
114
   * Timer monitoring duration since entering CONNECTING state.
115
   */
116
  private final Stopwatch connectingTimer;
117

118
  @Nullable
119
  private ScheduledHandle reconnectTask;
120
  @Nullable
121
  private ScheduledHandle shutdownDueToUpdateTask;
122
  @Nullable
123
  private ManagedClientTransport shutdownDueToUpdateTransport;
124

125
  /**
126
   * All transports that are not terminated. At the very least the value of {@link #activeTransport}
127
   * will be present, but previously used transports that still have streams or are stopping may
128
   * also be present.
129
   */
130
  private final Collection<ConnectionClientTransport> transports = new ArrayList<>();
1✔
131

132
  // Must only be used from syncContext
133
  private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
1✔
134
      new InUseStateAggregator<ConnectionClientTransport>() {
1✔
135
        @Override
136
        protected void handleInUse() {
137
          callback.onInUse(InternalSubchannel.this);
1✔
138
        }
1✔
139

140
        @Override
141
        protected void handleNotInUse() {
142
          callback.onNotInUse(InternalSubchannel.this);
1✔
143
        }
1✔
144
      };
145

146
  /**
147
   * The to-be active transport, which is not ready yet.
148
   */
149
  @Nullable
150
  private ConnectionClientTransport pendingTransport;
151

152
  /**
153
   * The transport for new outgoing requests. Non-null only in READY state.
154
   */
155
  @Nullable
156
  private volatile ManagedClientTransport activeTransport;
157

158
  private volatile ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
1✔
159

160
  private Status shutdownReason;
161

162
  private volatile Attributes connectedAddressAttributes;
163

164
  InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
165
                     BackoffPolicy.Provider backoffPolicyProvider,
166
                     ClientTransportFactory transportFactory,
167
                     ScheduledExecutorService scheduledExecutor,
168
                     Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext,
169
                     Callback callback, InternalChannelz channelz, CallTracer callsTracer,
170
                     ChannelTracer channelTracer, InternalLogId logId,
171
                     ChannelLogger channelLogger, List<ClientTransportFilter> transportFilters) {
1✔
172
    List<EquivalentAddressGroup> addressGroups = args.getAddresses();
1✔
173
    Preconditions.checkNotNull(addressGroups, "addressGroups");
1✔
174
    Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
1✔
175
    checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
1✔
176
    List<EquivalentAddressGroup> unmodifiableAddressGroups =
1✔
177
        Collections.unmodifiableList(new ArrayList<>(addressGroups));
1✔
178
    this.addressGroups = unmodifiableAddressGroups;
1✔
179
    this.addressIndex = new Index(unmodifiableAddressGroups);
1✔
180
    this.authority = authority;
1✔
181
    this.userAgent = userAgent;
1✔
182
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
183
    this.transportFactory = transportFactory;
1✔
184
    this.scheduledExecutor = scheduledExecutor;
1✔
185
    this.connectingTimer = stopwatchSupplier.get();
1✔
186
    this.syncContext = syncContext;
1✔
187
    this.callback = callback;
1✔
188
    this.channelz = channelz;
1✔
189
    this.callsTracer = callsTracer;
1✔
190
    this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
1✔
191
    this.logId = Preconditions.checkNotNull(logId, "logId");
1✔
192
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
193
    this.transportFilters = transportFilters;
1✔
194
    this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
1✔
195
  }
1✔
196

197
  ChannelLogger getChannelLogger() {
198
    return channelLogger;
1✔
199
  }
200

201
  @Override
202
  public ClientTransport obtainActiveTransport() {
203
    ClientTransport savedTransport = activeTransport;
1✔
204
    if (savedTransport != null) {
1✔
205
      return savedTransport;
1✔
206
    }
207
    syncContext.execute(new Runnable() {
1✔
208
      @Override
209
      public void run() {
210
        if (state.getState() == IDLE) {
1✔
211
          channelLogger.log(ChannelLogLevel.INFO, "CONNECTING as requested");
1✔
212
          gotoNonErrorState(CONNECTING);
1✔
213
          startNewTransport();
1✔
214
        }
215
      }
1✔
216
    });
217
    return null;
1✔
218
  }
219

220
  /**
221
   * Returns a READY transport if there is any, without trying to connect.
222
   */
223
  @Nullable
224
  ClientTransport getTransport() {
225
    return activeTransport;
1✔
226
  }
227

228
  /**
229
   * Returns the authority string associated with this Subchannel.
230
   */
231
  String getAuthority() {
232
    return authority;
×
233
  }
234

235
  private void startNewTransport() {
236
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
237

238
    Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
1✔
239

240
    if (addressIndex.isAtBeginning()) {
1✔
241
      connectingTimer.reset().start();
1✔
242
    }
243
    SocketAddress address = addressIndex.getCurrentAddress();
1✔
244

245
    HttpConnectProxiedSocketAddress proxiedAddr = null;
1✔
246
    if (address instanceof HttpConnectProxiedSocketAddress) {
1✔
247
      proxiedAddr = (HttpConnectProxiedSocketAddress) address;
×
248
      address = proxiedAddr.getTargetAddress();
×
249
    }
250

251
    Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes();
1✔
252
    String eagChannelAuthority = currentEagAttributes
1✔
253
            .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE);
1✔
254
    ClientTransportFactory.ClientTransportOptions options =
1✔
255
        new ClientTransportFactory.ClientTransportOptions()
256
          .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
1✔
257
          .setEagAttributes(currentEagAttributes)
1✔
258
          .setUserAgent(userAgent)
1✔
259
          .setHttpConnectProxiedSocketAddress(proxiedAddr);
1✔
260
    TransportLogger transportLogger = new TransportLogger();
1✔
261
    // In case the transport logs in the constructor, use the subchannel logId
262
    transportLogger.logId = getLogId();
1✔
263
    ConnectionClientTransport transport =
1✔
264
        new CallTracingTransport(
265
            transportFactory
266
                .newClientTransport(address, options, transportLogger), callsTracer);
1✔
267
    transportLogger.logId = transport.getLogId();
1✔
268
    channelz.addClientSocket(transport);
1✔
269
    pendingTransport = transport;
1✔
270
    transports.add(transport);
1✔
271
    Runnable runnable = transport.start(new TransportListener(transport));
1✔
272
    if (runnable != null) {
1✔
273
      syncContext.executeLater(runnable);
1✔
274
    }
275
    channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
1✔
276
  }
1✔
277

278
  /**
279
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
280
   * @param status the causal status when the channel begins transition to
281
   *     TRANSIENT_FAILURE.
282
   */
283
  private void scheduleBackoff(final Status status) {
284
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
285

286
    class EndOfCurrentBackoff implements Runnable {
1✔
287
      @Override
288
      public void run() {
289
        reconnectTask = null;
1✔
290
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING after backoff");
1✔
291
        gotoNonErrorState(CONNECTING);
1✔
292
        startNewTransport();
1✔
293
      }
1✔
294
    }
295

296
    gotoState(ConnectivityStateInfo.forTransientFailure(status));
1✔
297

298
    if (reconnectDisabled) {
1✔
299
      return;
1✔
300
    }
301

302
    if (reconnectPolicy == null) {
1✔
303
      reconnectPolicy = backoffPolicyProvider.get();
1✔
304
    }
305
    long delayNanos =
1✔
306
        reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS);
1✔
307
    channelLogger.log(
1✔
308
        ChannelLogLevel.INFO,
309
        "TRANSIENT_FAILURE ({0}). Will reconnect after {1} ns",
310
        printShortStatus(status), delayNanos);
1✔
311
    Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
1✔
312
    reconnectTask = syncContext.schedule(
1✔
313
        new EndOfCurrentBackoff(),
314
        delayNanos,
315
        TimeUnit.NANOSECONDS,
316
        scheduledExecutor);
317
  }
1✔
318

319
  /**
320
   * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this
321
   * method has no effect.
322
   */
323
  void resetConnectBackoff() {
324
    syncContext.execute(new Runnable() {
1✔
325
      @Override
326
      public void run() {
327
        if (state.getState() != TRANSIENT_FAILURE) {
1✔
328
          return;
1✔
329
        }
330
        cancelReconnectTask();
1✔
331
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING; backoff interrupted");
1✔
332
        gotoNonErrorState(CONNECTING);
1✔
333
        startNewTransport();
1✔
334
      }
1✔
335
    });
336
  }
1✔
337

338
  private void gotoNonErrorState(final ConnectivityState newState) {
339
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
340

341
    gotoState(ConnectivityStateInfo.forNonError(newState));
1✔
342
  }
1✔
343

344
  private void gotoState(final ConnectivityStateInfo newState) {
345
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
346

347
    if (state.getState() != newState.getState()) {
1✔
348
      Preconditions.checkState(state.getState() != SHUTDOWN,
1✔
349
          "Cannot transition out of SHUTDOWN to " + newState);
350
      if (reconnectDisabled && newState.getState() == TRANSIENT_FAILURE) {
1✔
351
        state = ConnectivityStateInfo.forNonError(IDLE);
1✔
352
      } else {
353
        state = newState;
1✔
354
      }
355
      callback.onStateChange(InternalSubchannel.this, newState);
1✔
356
    }
357
  }
1✔
358

359
  /** Replaces the existing addresses, avoiding unnecessary reconnects. */
360
  public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups) {
361
    Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
1✔
362
    checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
1✔
363
    Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
1✔
364
    final List<EquivalentAddressGroup> newImmutableAddressGroups =
1✔
365
        Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
1✔
366

367
    syncContext.execute(new Runnable() {
1✔
368
      @Override
369
      public void run() {
370
        ManagedClientTransport savedTransport = null;
1✔
371
        SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
372
        addressIndex.updateGroups(newImmutableAddressGroups);
1✔
373
        addressGroups = newImmutableAddressGroups;
1✔
374
        if (state.getState() == READY || state.getState() == CONNECTING) {
1✔
375
          if (!addressIndex.seekTo(previousAddress)) {
1✔
376
            // Forced to drop the connection
377
            if (state.getState() == READY) {
1✔
378
              savedTransport = activeTransport;
1✔
379
              activeTransport = null;
1✔
380
              addressIndex.reset();
1✔
381
              gotoNonErrorState(IDLE);
1✔
382
            } else {
383
              pendingTransport.shutdown(
1✔
384
                  Status.UNAVAILABLE.withDescription(
1✔
385
                    "InternalSubchannel closed pending transport due to address change"));
386
              pendingTransport = null;
1✔
387
              addressIndex.reset();
1✔
388
              startNewTransport();
1✔
389
            }
390
          }
391
        }
392
        if (savedTransport != null) {
1✔
393
          if (shutdownDueToUpdateTask != null) {
1✔
394
            // Keeping track of multiple shutdown tasks adds complexity, and shouldn't generally be
395
            // necessary. This transport has probably already had plenty of time.
396
            shutdownDueToUpdateTransport.shutdown(
1✔
397
                Status.UNAVAILABLE.withDescription(
1✔
398
                    "InternalSubchannel closed transport early due to address change"));
399
            shutdownDueToUpdateTask.cancel();
1✔
400
            shutdownDueToUpdateTask = null;
1✔
401
            shutdownDueToUpdateTransport = null;
1✔
402
          }
403
          // Avoid needless RPC failures by delaying the shutdown. See
404
          // https://github.com/grpc/grpc-java/issues/2562
405
          shutdownDueToUpdateTransport = savedTransport;
1✔
406
          shutdownDueToUpdateTask = syncContext.schedule(
1✔
407
              new Runnable() {
1✔
408
                @Override public void run() {
409
                  ManagedClientTransport transport = shutdownDueToUpdateTransport;
1✔
410
                  shutdownDueToUpdateTask = null;
1✔
411
                  shutdownDueToUpdateTransport = null;
1✔
412
                  transport.shutdown(
1✔
413
                      Status.UNAVAILABLE.withDescription(
1✔
414
                          "InternalSubchannel closed transport due to address change"));
415
                }
1✔
416
              },
417
              ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS,
418
              TimeUnit.SECONDS,
419
              scheduledExecutor);
1✔
420
        }
421
      }
1✔
422
    });
423
  }
1✔
424

425
  public void shutdown(final Status reason) {
426
    syncContext.execute(new Runnable() {
1✔
427
      @Override
428
      public void run() {
429
        ManagedClientTransport savedActiveTransport;
430
        ConnectionClientTransport savedPendingTransport;
431
        if (state.getState() == SHUTDOWN) {
1✔
432
          return;
1✔
433
        }
434
        shutdownReason = reason;
1✔
435
        savedActiveTransport = activeTransport;
1✔
436
        savedPendingTransport = pendingTransport;
1✔
437
        activeTransport = null;
1✔
438
        pendingTransport = null;
1✔
439
        gotoNonErrorState(SHUTDOWN);
1✔
440
        addressIndex.reset();
1✔
441
        if (transports.isEmpty()) {
1✔
442
          handleTermination();
1✔
443
        }  // else: the callback will be run once all transports have been terminated
444
        cancelReconnectTask();
1✔
445
        if (shutdownDueToUpdateTask != null) {
1✔
446
          shutdownDueToUpdateTask.cancel();
1✔
447
          shutdownDueToUpdateTransport.shutdown(reason);
1✔
448
          shutdownDueToUpdateTask = null;
1✔
449
          shutdownDueToUpdateTransport = null;
1✔
450
        }
451
        if (savedActiveTransport != null) {
1✔
452
          savedActiveTransport.shutdown(reason);
1✔
453
        }
454
        if (savedPendingTransport != null) {
1✔
455
          savedPendingTransport.shutdown(reason);
1✔
456
        }
457
      }
1✔
458
    });
459
  }
1✔
460

461
  @Override
462
  public String toString() {
463
    // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
464
    // since there may be many addresses.
465
    return MoreObjects.toStringHelper(this)
×
466
        .add("logId", logId.getId())
×
467
        .add("addressGroups", addressGroups)
×
468
        .toString();
×
469
  }
470

471
  private void handleTermination() {
472
    syncContext.execute(new Runnable() {
1✔
473
      @Override
474
      public void run() {
475
        channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
476
        callback.onTerminated(InternalSubchannel.this);
1✔
477
      }
1✔
478
    });
479
  }
1✔
480

481
  private void handleTransportInUseState(
482
      final ConnectionClientTransport transport, final boolean inUse) {
483
    syncContext.execute(new Runnable() {
1✔
484
      @Override
485
      public void run() {
486
        inUseStateAggregator.updateObjectInUse(transport, inUse);
1✔
487
      }
1✔
488
    });
489
  }
1✔
490

491
  void shutdownNow(final Status reason) {
492
    shutdown(reason);
1✔
493
    syncContext.execute(new Runnable() {
1✔
494
      @Override
495
      public void run() {
496
        Collection<ManagedClientTransport> transportsCopy =
1✔
497
            new ArrayList<ManagedClientTransport>(transports);
1✔
498

499
        for (ManagedClientTransport transport : transportsCopy) {
1✔
500
          transport.shutdownNow(reason);
1✔
501
        }
1✔
502
      }
1✔
503
    });
504
  }
1✔
505

506
  List<EquivalentAddressGroup> getAddressGroups() {
507
    return addressGroups;
1✔
508
  }
509

510
  private void cancelReconnectTask() {
511
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
512

513
    if (reconnectTask != null) {
1✔
514
      reconnectTask.cancel();
1✔
515
      reconnectTask = null;
1✔
516
      reconnectPolicy = null;
1✔
517
    }
518
  }
1✔
519

520
  @Override
521
  public InternalLogId getLogId() {
522
    return logId;
1✔
523
  }
524

525
  @Override
526
  public ListenableFuture<ChannelStats> getStats() {
527
    final SettableFuture<ChannelStats> channelStatsFuture = SettableFuture.create();
1✔
528
    syncContext.execute(new Runnable() {
1✔
529
      @Override
530
      public void run() {
531
        ChannelStats.Builder builder = new ChannelStats.Builder();
1✔
532
        List<EquivalentAddressGroup> addressGroupsSnapshot = addressIndex.getGroups();
1✔
533
        List<InternalWithLogId> transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
1✔
534
        builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
1✔
535
        builder.setSockets(transportsSnapshot);
1✔
536
        callsTracer.updateBuilder(builder);
1✔
537
        channelTracer.updateBuilder(builder);
1✔
538
        channelStatsFuture.set(builder.build());
1✔
539
      }
1✔
540
    });
541
    return channelStatsFuture;
1✔
542
  }
543

544
  /**
545
   * Return attributes for server address connected by sub channel.
546
   */
547
  public Attributes getConnectedAddressAttributes() {
548
    return connectedAddressAttributes;
1✔
549
  }
550

551
  ConnectivityState getState() {
552
    return state.getState();
1✔
553
  }
554

555
  private static void checkListHasNoNulls(List<?> list, String msg) {
556
    for (Object item : list) {
1✔
557
      Preconditions.checkNotNull(item, msg);
1✔
558
    }
1✔
559
  }
1✔
560

561
  /** Listener for real transports. */
562
  private class TransportListener implements ManagedClientTransport.Listener {
563
    final ConnectionClientTransport transport;
564
    boolean shutdownInitiated = false;
1✔
565

566
    TransportListener(ConnectionClientTransport transport) {
1✔
567
      this.transport = transport;
1✔
568
    }
1✔
569

570
    @Override
571
    public Attributes filterTransport(Attributes attributes) {
572
      for (ClientTransportFilter filter : transportFilters) {
1✔
573
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
574
            "Filter %s returned null", filter);
575
      }
1✔
576
      return attributes;
1✔
577
    }
578

579
    @Override
580
    public void transportReady() {
581
      channelLogger.log(ChannelLogLevel.INFO, "READY");
1✔
582
      syncContext.execute(new Runnable() {
1✔
583
        @Override
584
        public void run() {
585
          reconnectPolicy = null;
1✔
586
          if (shutdownReason != null) {
1✔
587
            // activeTransport should have already been set to null by shutdown(). We keep it null.
588
            Preconditions.checkState(activeTransport == null,
1✔
589
                "Unexpected non-null activeTransport");
590
            transport.shutdown(shutdownReason);
1✔
591
          } else if (pendingTransport == transport) {
1✔
592
            activeTransport = transport;
1✔
593
            pendingTransport = null;
1✔
594
            connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
1✔
595
            gotoNonErrorState(READY);
1✔
596
          }
597
        }
1✔
598
      });
599
    }
1✔
600

601
    @Override
602
    public void transportInUse(boolean inUse) {
603
      handleTransportInUseState(transport, inUse);
1✔
604
    }
1✔
605

606
    @Override
607
    public void transportShutdown(final Status s) {
608
      channelLogger.log(
1✔
609
          ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
1✔
610
      shutdownInitiated = true;
1✔
611
      syncContext.execute(new Runnable() {
1✔
612
        @Override
613
        public void run() {
614
          if (state.getState() == SHUTDOWN) {
1✔
615
            return;
1✔
616
          }
617
          if (activeTransport == transport) {
1✔
618
            activeTransport = null;
1✔
619
            addressIndex.reset();
1✔
620
            gotoNonErrorState(IDLE);
1✔
621
          } else if (pendingTransport == transport) {
1✔
622
            Preconditions.checkState(state.getState() == CONNECTING,
1✔
623
                "Expected state is CONNECTING, actual state is %s", state.getState());
1✔
624
            addressIndex.increment();
1✔
625
            // Continue reconnect if there are still addresses to try.
626
            if (!addressIndex.isValid()) {
1✔
627
              pendingTransport = null;
1✔
628
              addressIndex.reset();
1✔
629
              // Initiate backoff
630
              // Transition to TRANSIENT_FAILURE
631
              scheduleBackoff(s);
1✔
632
            } else {
633
              startNewTransport();
1✔
634
            }
635
          }
636
        }
1✔
637
      });
638
    }
1✔
639

640
    @Override
641
    public void transportTerminated() {
642
      Preconditions.checkState(
1✔
643
          shutdownInitiated, "transportShutdown() must be called before transportTerminated().");
644

645
      channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
1✔
646
      channelz.removeClientSocket(transport);
1✔
647
      handleTransportInUseState(transport, false);
1✔
648
      for (ClientTransportFilter filter : transportFilters) {
1✔
649
        filter.transportTerminated(transport.getAttributes());
1✔
650
      }
1✔
651
      syncContext.execute(new Runnable() {
1✔
652
        @Override
653
        public void run() {
654
          transports.remove(transport);
1✔
655
          if (state.getState() == SHUTDOWN && transports.isEmpty()) {
1✔
656
            handleTermination();
1✔
657
          }
658
        }
1✔
659
      });
660
    }
1✔
661
  }
662

663
  // All methods are called in syncContext
664
  abstract static class Callback {
1✔
665
    /**
666
     * Called when the subchannel is terminated, which means it's shut down and all transports
667
     * have been terminated.
668
     */
669
    @ForOverride
670
    void onTerminated(InternalSubchannel is) { }
×
671

672
    /**
673
     * Called when the subchannel's connectivity state has changed.
674
     */
675
    @ForOverride
676
    void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
×
677

678
    /**
679
     * Called when the subchannel's in-use state has changed to true, which means at least one
680
     * transport is in use.
681
     */
682
    @ForOverride
683
    void onInUse(InternalSubchannel is) { }
1✔
684

685
    /**
686
     * Called when the subchannel's in-use state has changed to false, which means no transport is
687
     * in use.
688
     */
689
    @ForOverride
690
    void onNotInUse(InternalSubchannel is) { }
×
691
  }
692

693
  @VisibleForTesting
694
  static final class CallTracingTransport extends ForwardingConnectionClientTransport {
695
    private final ConnectionClientTransport delegate;
696
    private final CallTracer callTracer;
697

698
    private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
1✔
699
      this.delegate = delegate;
1✔
700
      this.callTracer = callTracer;
1✔
701
    }
1✔
702

703
    @Override
704
    protected ConnectionClientTransport delegate() {
705
      return delegate;
1✔
706
    }
707

708
    @Override
709
    public ClientStream newStream(
710
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
711
        ClientStreamTracer[] tracers) {
712
      final ClientStream streamDelegate = super.newStream(method, headers, callOptions, tracers);
1✔
713
      return new ForwardingClientStream() {
1✔
714
        @Override
715
        protected ClientStream delegate() {
716
          return streamDelegate;
1✔
717
        }
718

719
        @Override
720
        public void start(final ClientStreamListener listener) {
721
          callTracer.reportCallStarted();
1✔
722
          super.start(new ForwardingClientStreamListener() {
1✔
723
            @Override
724
            protected ClientStreamListener delegate() {
725
              return listener;
1✔
726
            }
727

728
            @Override
729
            public void closed(
730
                Status status, RpcProgress rpcProgress, Metadata trailers) {
731
              callTracer.reportCallEnded(status.isOk());
1✔
732
              super.closed(status, rpcProgress, trailers);
1✔
733
            }
1✔
734
          });
735
        }
1✔
736
      };
737
    }
738
  }
739

740
  /** Index as in 'i', the pointer to an entry. Not a "search index." */
741
  @VisibleForTesting
742
  static final class Index {
743
    private List<EquivalentAddressGroup> addressGroups;
744
    private int groupIndex;
745
    private int addressIndex;
746

747
    public Index(List<EquivalentAddressGroup> groups) {
1✔
748
      this.addressGroups = groups;
1✔
749
    }
1✔
750

751
    public boolean isValid() {
752
      // addressIndex will never be invalid
753
      return groupIndex < addressGroups.size();
1✔
754
    }
755

756
    public boolean isAtBeginning() {
757
      return groupIndex == 0 && addressIndex == 0;
1✔
758
    }
759

760
    public void increment() {
761
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
762
      addressIndex++;
1✔
763
      if (addressIndex >= group.getAddresses().size()) {
1✔
764
        groupIndex++;
1✔
765
        addressIndex = 0;
1✔
766
      }
767
    }
1✔
768

769
    public void reset() {
770
      groupIndex = 0;
1✔
771
      addressIndex = 0;
1✔
772
    }
1✔
773

774
    public SocketAddress getCurrentAddress() {
775
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
776
    }
777

778
    public Attributes getCurrentEagAttributes() {
779
      return addressGroups.get(groupIndex).getAttributes();
1✔
780
    }
781

782
    public List<EquivalentAddressGroup> getGroups() {
783
      return addressGroups;
1✔
784
    }
785

786
    /** Update to new groups, resetting the current index. */
787
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
788
      addressGroups = newGroups;
1✔
789
      reset();
1✔
790
    }
1✔
791

792
    /** Returns false if the needle was not found and the current index was left unchanged. */
793
    public boolean seekTo(SocketAddress needle) {
794
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
795
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
796
        int j = group.getAddresses().indexOf(needle);
1✔
797
        if (j == -1) {
1✔
798
          continue;
1✔
799
        }
800
        this.groupIndex = i;
1✔
801
        this.addressIndex = j;
1✔
802
        return true;
1✔
803
      }
804
      return false;
1✔
805
    }
806
  }
807

808
  private String printShortStatus(Status status) {
809
    StringBuilder buffer = new StringBuilder();
1✔
810
    buffer.append(status.getCode());
1✔
811
    if (status.getDescription() != null) {
1✔
812
      buffer.append("(").append(status.getDescription()).append(")");
1✔
813
    }
814
    if (status.getCause() != null) {
1✔
815
      buffer.append("[").append(status.getCause()).append("]");
1✔
816
    }
817
    return buffer.toString();
1✔
818
  }
819

820
  @VisibleForTesting
821
  static final class TransportLogger extends ChannelLogger {
1✔
822
    // Changed just after construction to break a cyclic dependency.
823
    InternalLogId logId;
824

825
    @Override
826
    public void log(ChannelLogLevel level, String message) {
827
      ChannelLoggerImpl.logOnly(logId, level, message);
1✔
828
    }
1✔
829

830
    @Override
831
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
832
      ChannelLoggerImpl.logOnly(logId, level, messageFormat, args);
1✔
833
    }
1✔
834
  }
835
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc