• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19447

31 Aug 2024 11:07PM CUT coverage: 84.561% (+0.04%) from 84.523%
#19447

push

github

web-flow
xds: Fix load reporting when pick first is used for locality-routing. (#11495)

* Determine subchannel's network locality from connected address, instead of assuming that all addresses for a subchannel are in the same locality.

33554 of 39680 relevant lines covered (84.56%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.06
/../core/src/main/java/io/grpc/internal/InternalSubchannel.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static io.grpc.ConnectivityState.CONNECTING;
20
import static io.grpc.ConnectivityState.IDLE;
21
import static io.grpc.ConnectivityState.READY;
22
import static io.grpc.ConnectivityState.SHUTDOWN;
23
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24

25
import com.google.common.annotations.VisibleForTesting;
26
import com.google.common.base.MoreObjects;
27
import com.google.common.base.Preconditions;
28
import com.google.common.base.Stopwatch;
29
import com.google.common.base.Supplier;
30
import com.google.common.util.concurrent.ListenableFuture;
31
import com.google.common.util.concurrent.SettableFuture;
32
import com.google.errorprone.annotations.ForOverride;
33
import io.grpc.Attributes;
34
import io.grpc.CallOptions;
35
import io.grpc.ChannelLogger;
36
import io.grpc.ChannelLogger.ChannelLogLevel;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.ClientTransportFilter;
39
import io.grpc.ConnectivityState;
40
import io.grpc.ConnectivityStateInfo;
41
import io.grpc.EquivalentAddressGroup;
42
import io.grpc.HttpConnectProxiedSocketAddress;
43
import io.grpc.InternalChannelz;
44
import io.grpc.InternalChannelz.ChannelStats;
45
import io.grpc.InternalInstrumented;
46
import io.grpc.InternalLogId;
47
import io.grpc.InternalWithLogId;
48
import io.grpc.Metadata;
49
import io.grpc.MethodDescriptor;
50
import io.grpc.Status;
51
import io.grpc.SynchronizationContext;
52
import io.grpc.SynchronizationContext.ScheduledHandle;
53
import java.net.SocketAddress;
54
import java.util.ArrayList;
55
import java.util.Collection;
56
import java.util.Collections;
57
import java.util.List;
58
import java.util.concurrent.ScheduledExecutorService;
59
import java.util.concurrent.TimeUnit;
60
import javax.annotation.Nullable;
61
import javax.annotation.concurrent.ThreadSafe;
62

63
/**
64
 * Transports for a single {@link SocketAddress}.
65
 */
66
@ThreadSafe
67
final class InternalSubchannel implements InternalInstrumented<ChannelStats>, TransportProvider {
68

69
  private final InternalLogId logId;
70
  private final String authority;
71
  private final String userAgent;
72
  private final BackoffPolicy.Provider backoffPolicyProvider;
73
  private final Callback callback;
74
  private final ClientTransportFactory transportFactory;
75
  private final ScheduledExecutorService scheduledExecutor;
76
  private final InternalChannelz channelz;
77
  private final CallTracer callsTracer;
78
  private final ChannelTracer channelTracer;
79
  private final ChannelLogger channelLogger;
80

81
  private final List<ClientTransportFilter> transportFilters;
82

83
  /**
84
   * All field must be mutated in the syncContext.
85
   */
86
  private final SynchronizationContext syncContext;
87

88
  /**
89
   * The index of the address corresponding to pendingTransport/activeTransport, or at beginning if
90
   * both are null.
91
   *
92
   * <p>Note: any {@link Index#updateAddresses(List)} should also update {@link #addressGroups}.
93
   */
94
  private final Index addressIndex;
95

96
  /**
97
   * A volatile accessor to {@link Index#getAddressGroups()}. There are few methods ({@link
98
   * #getAddressGroups()} and {@link #toString()} access this value where they supposed to access
99
   * in the {@link #syncContext}. Ideally {@link Index#getAddressGroups()} can be volatile, so we
100
   * don't need to maintain this volatile accessor. Although, having this accessor can reduce
101
   * unnecessary volatile reads while it delivers clearer intention of why .
102
   */
103
  private volatile List<EquivalentAddressGroup> addressGroups;
104

105
  /**
106
   * The policy to control back off between reconnects. Non-{@code null} when a reconnect task is
107
   * scheduled.
108
   */
109
  private BackoffPolicy reconnectPolicy;
110

111
  /**
112
   * Timer monitoring duration since entering CONNECTING state.
113
   */
114
  private final Stopwatch connectingTimer;
115

116
  @Nullable
117
  private ScheduledHandle reconnectTask;
118
  @Nullable
119
  private ScheduledHandle shutdownDueToUpdateTask;
120
  @Nullable
121
  private ManagedClientTransport shutdownDueToUpdateTransport;
122

123
  /**
124
   * All transports that are not terminated. At the very least the value of {@link #activeTransport}
125
   * will be present, but previously used transports that still have streams or are stopping may
126
   * also be present.
127
   */
128
  private final Collection<ConnectionClientTransport> transports = new ArrayList<>();
1✔
129

130
  // Must only be used from syncContext
131
  private final InUseStateAggregator<ConnectionClientTransport> inUseStateAggregator =
1✔
132
      new InUseStateAggregator<ConnectionClientTransport>() {
1✔
133
        @Override
134
        protected void handleInUse() {
135
          callback.onInUse(InternalSubchannel.this);
1✔
136
        }
1✔
137

138
        @Override
139
        protected void handleNotInUse() {
140
          callback.onNotInUse(InternalSubchannel.this);
1✔
141
        }
1✔
142
      };
143

144
  /**
145
   * The to-be active transport, which is not ready yet.
146
   */
147
  @Nullable
148
  private ConnectionClientTransport pendingTransport;
149

150
  /**
151
   * The transport for new outgoing requests. Non-null only in READY state.
152
   */
153
  @Nullable
154
  private volatile ManagedClientTransport activeTransport;
155

156
  private volatile ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
1✔
157

158
  private Status shutdownReason;
159

160
  private volatile Attributes connectedAddressAttributes;
161

162
  InternalSubchannel(List<EquivalentAddressGroup> addressGroups, String authority, String userAgent,
163
      BackoffPolicy.Provider backoffPolicyProvider,
164
      ClientTransportFactory transportFactory, ScheduledExecutorService scheduledExecutor,
165
      Supplier<Stopwatch> stopwatchSupplier, SynchronizationContext syncContext, Callback callback,
166
      InternalChannelz channelz, CallTracer callsTracer, ChannelTracer channelTracer,
167
      InternalLogId logId, ChannelLogger channelLogger,
168
      List<ClientTransportFilter> transportFilters) {
1✔
169
    Preconditions.checkNotNull(addressGroups, "addressGroups");
1✔
170
    Preconditions.checkArgument(!addressGroups.isEmpty(), "addressGroups is empty");
1✔
171
    checkListHasNoNulls(addressGroups, "addressGroups contains null entry");
1✔
172
    List<EquivalentAddressGroup> unmodifiableAddressGroups =
1✔
173
        Collections.unmodifiableList(new ArrayList<>(addressGroups));
1✔
174
    this.addressGroups = unmodifiableAddressGroups;
1✔
175
    this.addressIndex = new Index(unmodifiableAddressGroups);
1✔
176
    this.authority = authority;
1✔
177
    this.userAgent = userAgent;
1✔
178
    this.backoffPolicyProvider = backoffPolicyProvider;
1✔
179
    this.transportFactory = transportFactory;
1✔
180
    this.scheduledExecutor = scheduledExecutor;
1✔
181
    this.connectingTimer = stopwatchSupplier.get();
1✔
182
    this.syncContext = syncContext;
1✔
183
    this.callback = callback;
1✔
184
    this.channelz = channelz;
1✔
185
    this.callsTracer = callsTracer;
1✔
186
    this.channelTracer = Preconditions.checkNotNull(channelTracer, "channelTracer");
1✔
187
    this.logId = Preconditions.checkNotNull(logId, "logId");
1✔
188
    this.channelLogger = Preconditions.checkNotNull(channelLogger, "channelLogger");
1✔
189
    this.transportFilters = transportFilters;
1✔
190
  }
1✔
191

192
  ChannelLogger getChannelLogger() {
193
    return channelLogger;
1✔
194
  }
195

196
  @Override
197
  public ClientTransport obtainActiveTransport() {
198
    ClientTransport savedTransport = activeTransport;
1✔
199
    if (savedTransport != null) {
1✔
200
      return savedTransport;
1✔
201
    }
202
    syncContext.execute(new Runnable() {
1✔
203
      @Override
204
      public void run() {
205
        if (state.getState() == IDLE) {
1✔
206
          channelLogger.log(ChannelLogLevel.INFO, "CONNECTING as requested");
1✔
207
          gotoNonErrorState(CONNECTING);
1✔
208
          startNewTransport();
1✔
209
        }
210
      }
1✔
211
    });
212
    return null;
1✔
213
  }
214

215
  /**
216
   * Returns a READY transport if there is any, without trying to connect.
217
   */
218
  @Nullable
219
  ClientTransport getTransport() {
220
    return activeTransport;
1✔
221
  }
222

223
  /**
224
   * Returns the authority string associated with this Subchannel.
225
   */
226
  String getAuthority() {
227
    return authority;
×
228
  }
229

230
  private void startNewTransport() {
231
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
232

233
    Preconditions.checkState(reconnectTask == null, "Should have no reconnectTask scheduled");
1✔
234

235
    if (addressIndex.isAtBeginning()) {
1✔
236
      connectingTimer.reset().start();
1✔
237
    }
238
    SocketAddress address = addressIndex.getCurrentAddress();
1✔
239

240
    HttpConnectProxiedSocketAddress proxiedAddr = null;
1✔
241
    if (address instanceof HttpConnectProxiedSocketAddress) {
1✔
242
      proxiedAddr = (HttpConnectProxiedSocketAddress) address;
×
243
      address = proxiedAddr.getTargetAddress();
×
244
    }
245

246
    Attributes currentEagAttributes = addressIndex.getCurrentEagAttributes();
1✔
247
    String eagChannelAuthority = currentEagAttributes
1✔
248
            .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE);
1✔
249
    ClientTransportFactory.ClientTransportOptions options =
1✔
250
        new ClientTransportFactory.ClientTransportOptions()
251
          .setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
1✔
252
          .setEagAttributes(currentEagAttributes)
1✔
253
          .setUserAgent(userAgent)
1✔
254
          .setHttpConnectProxiedSocketAddress(proxiedAddr);
1✔
255
    TransportLogger transportLogger = new TransportLogger();
1✔
256
    // In case the transport logs in the constructor, use the subchannel logId
257
    transportLogger.logId = getLogId();
1✔
258
    ConnectionClientTransport transport =
1✔
259
        new CallTracingTransport(
260
            transportFactory
261
                .newClientTransport(address, options, transportLogger), callsTracer);
1✔
262
    transportLogger.logId = transport.getLogId();
1✔
263
    channelz.addClientSocket(transport);
1✔
264
    pendingTransport = transport;
1✔
265
    transports.add(transport);
1✔
266
    Runnable runnable = transport.start(new TransportListener(transport));
1✔
267
    if (runnable != null) {
1✔
268
      syncContext.executeLater(runnable);
1✔
269
    }
270
    channelLogger.log(ChannelLogLevel.INFO, "Started transport {0}", transportLogger.logId);
1✔
271
  }
1✔
272

273
  /**
274
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
275
   * @param status the causal status when the channel begins transition to
276
   *     TRANSIENT_FAILURE.
277
   */
278
  private void scheduleBackoff(final Status status) {
279
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
280

281
    class EndOfCurrentBackoff implements Runnable {
1✔
282
      @Override
283
      public void run() {
284
        reconnectTask = null;
1✔
285
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING after backoff");
1✔
286
        gotoNonErrorState(CONNECTING);
1✔
287
        startNewTransport();
1✔
288
      }
1✔
289
    }
290

291
    gotoState(ConnectivityStateInfo.forTransientFailure(status));
1✔
292
    if (reconnectPolicy == null) {
1✔
293
      reconnectPolicy = backoffPolicyProvider.get();
1✔
294
    }
295
    long delayNanos =
1✔
296
        reconnectPolicy.nextBackoffNanos() - connectingTimer.elapsed(TimeUnit.NANOSECONDS);
1✔
297
    channelLogger.log(
1✔
298
        ChannelLogLevel.INFO,
299
        "TRANSIENT_FAILURE ({0}). Will reconnect after {1} ns",
300
        printShortStatus(status), delayNanos);
1✔
301
    Preconditions.checkState(reconnectTask == null, "previous reconnectTask is not done");
1✔
302
    reconnectTask = syncContext.schedule(
1✔
303
        new EndOfCurrentBackoff(),
304
        delayNanos,
305
        TimeUnit.NANOSECONDS,
306
        scheduledExecutor);
307
  }
1✔
308

309
  /**
310
   * Immediately attempt to reconnect if the current state is TRANSIENT_FAILURE. Otherwise this
311
   * method has no effect.
312
   */
313
  void resetConnectBackoff() {
314
    syncContext.execute(new Runnable() {
1✔
315
      @Override
316
      public void run() {
317
        if (state.getState() != TRANSIENT_FAILURE) {
1✔
318
          return;
1✔
319
        }
320
        cancelReconnectTask();
1✔
321
        channelLogger.log(ChannelLogLevel.INFO, "CONNECTING; backoff interrupted");
1✔
322
        gotoNonErrorState(CONNECTING);
1✔
323
        startNewTransport();
1✔
324
      }
1✔
325
    });
326
  }
1✔
327

328
  private void gotoNonErrorState(final ConnectivityState newState) {
329
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
330

331
    gotoState(ConnectivityStateInfo.forNonError(newState));
1✔
332
  }
1✔
333

334
  private void gotoState(final ConnectivityStateInfo newState) {
335
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
336

337
    if (state.getState() != newState.getState()) {
1✔
338
      Preconditions.checkState(state.getState() != SHUTDOWN,
1✔
339
          "Cannot transition out of SHUTDOWN to " + newState);
340
      state = newState;
1✔
341
      callback.onStateChange(InternalSubchannel.this, newState);
1✔
342
    }
343
  }
1✔
344

345
  /** Replaces the existing addresses, avoiding unnecessary reconnects. */
346
  public void updateAddresses(final List<EquivalentAddressGroup> newAddressGroups) {
347
    Preconditions.checkNotNull(newAddressGroups, "newAddressGroups");
1✔
348
    checkListHasNoNulls(newAddressGroups, "newAddressGroups contains null entry");
1✔
349
    Preconditions.checkArgument(!newAddressGroups.isEmpty(), "newAddressGroups is empty");
1✔
350
    final List<EquivalentAddressGroup> newImmutableAddressGroups =
1✔
351
        Collections.unmodifiableList(new ArrayList<>(newAddressGroups));
1✔
352

353
    syncContext.execute(new Runnable() {
1✔
354
      @Override
355
      public void run() {
356
        ManagedClientTransport savedTransport = null;
1✔
357
        SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
358
        addressIndex.updateGroups(newImmutableAddressGroups);
1✔
359
        addressGroups = newImmutableAddressGroups;
1✔
360
        if (state.getState() == READY || state.getState() == CONNECTING) {
1✔
361
          if (!addressIndex.seekTo(previousAddress)) {
1✔
362
            // Forced to drop the connection
363
            if (state.getState() == READY) {
1✔
364
              savedTransport = activeTransport;
1✔
365
              activeTransport = null;
1✔
366
              addressIndex.reset();
1✔
367
              gotoNonErrorState(IDLE);
1✔
368
            } else {
369
              pendingTransport.shutdown(
1✔
370
                  Status.UNAVAILABLE.withDescription(
1✔
371
                    "InternalSubchannel closed pending transport due to address change"));
372
              pendingTransport = null;
1✔
373
              addressIndex.reset();
1✔
374
              startNewTransport();
1✔
375
            }
376
          }
377
        }
378
        if (savedTransport != null) {
1✔
379
          if (shutdownDueToUpdateTask != null) {
1✔
380
            // Keeping track of multiple shutdown tasks adds complexity, and shouldn't generally be
381
            // necessary. This transport has probably already had plenty of time.
382
            shutdownDueToUpdateTransport.shutdown(
1✔
383
                Status.UNAVAILABLE.withDescription(
1✔
384
                    "InternalSubchannel closed transport early due to address change"));
385
            shutdownDueToUpdateTask.cancel();
1✔
386
            shutdownDueToUpdateTask = null;
1✔
387
            shutdownDueToUpdateTransport = null;
1✔
388
          }
389
          // Avoid needless RPC failures by delaying the shutdown. See
390
          // https://github.com/grpc/grpc-java/issues/2562
391
          shutdownDueToUpdateTransport = savedTransport;
1✔
392
          shutdownDueToUpdateTask = syncContext.schedule(
1✔
393
              new Runnable() {
1✔
394
                @Override public void run() {
395
                  ManagedClientTransport transport = shutdownDueToUpdateTransport;
1✔
396
                  shutdownDueToUpdateTask = null;
1✔
397
                  shutdownDueToUpdateTransport = null;
1✔
398
                  transport.shutdown(
1✔
399
                      Status.UNAVAILABLE.withDescription(
1✔
400
                          "InternalSubchannel closed transport due to address change"));
401
                }
1✔
402
              },
403
              ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS,
404
              TimeUnit.SECONDS,
405
              scheduledExecutor);
1✔
406
        }
407
      }
1✔
408
    });
409
  }
1✔
410

411
  public void shutdown(final Status reason) {
412
    syncContext.execute(new Runnable() {
1✔
413
      @Override
414
      public void run() {
415
        ManagedClientTransport savedActiveTransport;
416
        ConnectionClientTransport savedPendingTransport;
417
        if (state.getState() == SHUTDOWN) {
1✔
418
          return;
1✔
419
        }
420
        shutdownReason = reason;
1✔
421
        savedActiveTransport = activeTransport;
1✔
422
        savedPendingTransport = pendingTransport;
1✔
423
        activeTransport = null;
1✔
424
        pendingTransport = null;
1✔
425
        gotoNonErrorState(SHUTDOWN);
1✔
426
        addressIndex.reset();
1✔
427
        if (transports.isEmpty()) {
1✔
428
          handleTermination();
1✔
429
        }  // else: the callback will be run once all transports have been terminated
430
        cancelReconnectTask();
1✔
431
        if (shutdownDueToUpdateTask != null) {
1✔
432
          shutdownDueToUpdateTask.cancel();
1✔
433
          shutdownDueToUpdateTransport.shutdown(reason);
1✔
434
          shutdownDueToUpdateTask = null;
1✔
435
          shutdownDueToUpdateTransport = null;
1✔
436
        }
437
        if (savedActiveTransport != null) {
1✔
438
          savedActiveTransport.shutdown(reason);
1✔
439
        }
440
        if (savedPendingTransport != null) {
1✔
441
          savedPendingTransport.shutdown(reason);
1✔
442
        }
443
      }
1✔
444
    });
445
  }
1✔
446

447
  @Override
448
  public String toString() {
449
    // addressGroupsCopy being a little stale is fine, just avoid calling toString with the lock
450
    // since there may be many addresses.
451
    return MoreObjects.toStringHelper(this)
×
452
        .add("logId", logId.getId())
×
453
        .add("addressGroups", addressGroups)
×
454
        .toString();
×
455
  }
456

457
  private void handleTermination() {
458
    syncContext.execute(new Runnable() {
1✔
459
      @Override
460
      public void run() {
461
        channelLogger.log(ChannelLogLevel.INFO, "Terminated");
1✔
462
        callback.onTerminated(InternalSubchannel.this);
1✔
463
      }
1✔
464
    });
465
  }
1✔
466

467
  private void handleTransportInUseState(
468
      final ConnectionClientTransport transport, final boolean inUse) {
469
    syncContext.execute(new Runnable() {
1✔
470
      @Override
471
      public void run() {
472
        inUseStateAggregator.updateObjectInUse(transport, inUse);
1✔
473
      }
1✔
474
    });
475
  }
1✔
476

477
  void shutdownNow(final Status reason) {
478
    shutdown(reason);
1✔
479
    syncContext.execute(new Runnable() {
1✔
480
      @Override
481
      public void run() {
482
        Collection<ManagedClientTransport> transportsCopy =
1✔
483
            new ArrayList<ManagedClientTransport>(transports);
1✔
484

485
        for (ManagedClientTransport transport : transportsCopy) {
1✔
486
          transport.shutdownNow(reason);
1✔
487
        }
1✔
488
      }
1✔
489
    });
490
  }
1✔
491

492
  List<EquivalentAddressGroup> getAddressGroups() {
493
    return addressGroups;
1✔
494
  }
495

496
  private void cancelReconnectTask() {
497
    syncContext.throwIfNotInThisSynchronizationContext();
1✔
498

499
    if (reconnectTask != null) {
1✔
500
      reconnectTask.cancel();
1✔
501
      reconnectTask = null;
1✔
502
      reconnectPolicy = null;
1✔
503
    }
504
  }
1✔
505

506
  @Override
507
  public InternalLogId getLogId() {
508
    return logId;
1✔
509
  }
510

511
  @Override
512
  public ListenableFuture<ChannelStats> getStats() {
513
    final SettableFuture<ChannelStats> channelStatsFuture = SettableFuture.create();
1✔
514
    syncContext.execute(new Runnable() {
1✔
515
      @Override
516
      public void run() {
517
        ChannelStats.Builder builder = new ChannelStats.Builder();
1✔
518
        List<EquivalentAddressGroup> addressGroupsSnapshot = addressIndex.getGroups();
1✔
519
        List<InternalWithLogId> transportsSnapshot = new ArrayList<InternalWithLogId>(transports);
1✔
520
        builder.setTarget(addressGroupsSnapshot.toString()).setState(getState());
1✔
521
        builder.setSockets(transportsSnapshot);
1✔
522
        callsTracer.updateBuilder(builder);
1✔
523
        channelTracer.updateBuilder(builder);
1✔
524
        channelStatsFuture.set(builder.build());
1✔
525
      }
1✔
526
    });
527
    return channelStatsFuture;
1✔
528
  }
529

530
  /**
531
   * Return attributes for server address connected by sub channel.
532
   */
533
  public Attributes getConnectedAddressAttributes() {
534
    return connectedAddressAttributes;
1✔
535
  }
536

537
  ConnectivityState getState() {
538
    return state.getState();
1✔
539
  }
540

541
  private static void checkListHasNoNulls(List<?> list, String msg) {
542
    for (Object item : list) {
1✔
543
      Preconditions.checkNotNull(item, msg);
1✔
544
    }
1✔
545
  }
1✔
546

547
  /** Listener for real transports. */
548
  private class TransportListener implements ManagedClientTransport.Listener {
549
    final ConnectionClientTransport transport;
550
    boolean shutdownInitiated = false;
1✔
551

552
    TransportListener(ConnectionClientTransport transport) {
1✔
553
      this.transport = transport;
1✔
554
    }
1✔
555

556
    @Override
557
    public Attributes filterTransport(Attributes attributes) {
558
      for (ClientTransportFilter filter : transportFilters) {
1✔
559
        attributes = Preconditions.checkNotNull(filter.transportReady(attributes),
1✔
560
            "Filter %s returned null", filter);
561
      }
1✔
562
      return attributes;
1✔
563
    }
564

565
    @Override
566
    public void transportReady() {
567
      channelLogger.log(ChannelLogLevel.INFO, "READY");
1✔
568
      syncContext.execute(new Runnable() {
1✔
569
        @Override
570
        public void run() {
571
          reconnectPolicy = null;
1✔
572
          if (shutdownReason != null) {
1✔
573
            // activeTransport should have already been set to null by shutdown(). We keep it null.
574
            Preconditions.checkState(activeTransport == null,
1✔
575
                "Unexpected non-null activeTransport");
576
            transport.shutdown(shutdownReason);
1✔
577
          } else if (pendingTransport == transport) {
1✔
578
            activeTransport = transport;
1✔
579
            pendingTransport = null;
1✔
580
            connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
1✔
581
            gotoNonErrorState(READY);
1✔
582
          }
583
        }
1✔
584
      });
585
    }
1✔
586

587
    @Override
588
    public void transportInUse(boolean inUse) {
589
      handleTransportInUseState(transport, inUse);
1✔
590
    }
1✔
591

592
    @Override
593
    public void transportShutdown(final Status s) {
594
      channelLogger.log(
1✔
595
          ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
1✔
596
      shutdownInitiated = true;
1✔
597
      syncContext.execute(new Runnable() {
1✔
598
        @Override
599
        public void run() {
600
          if (state.getState() == SHUTDOWN) {
1✔
601
            return;
1✔
602
          }
603
          if (activeTransport == transport) {
1✔
604
            activeTransport = null;
1✔
605
            addressIndex.reset();
1✔
606
            gotoNonErrorState(IDLE);
1✔
607
          } else if (pendingTransport == transport) {
1✔
608
            Preconditions.checkState(state.getState() == CONNECTING,
1✔
609
                "Expected state is CONNECTING, actual state is %s", state.getState());
1✔
610
            addressIndex.increment();
1✔
611
            // Continue reconnect if there are still addresses to try.
612
            if (!addressIndex.isValid()) {
1✔
613
              pendingTransport = null;
1✔
614
              addressIndex.reset();
1✔
615
              // Initiate backoff
616
              // Transition to TRANSIENT_FAILURE
617
              scheduleBackoff(s);
1✔
618
            } else {
619
              startNewTransport();
1✔
620
            }
621
          }
622
        }
1✔
623
      });
624
    }
1✔
625

626
    @Override
627
    public void transportTerminated() {
628
      Preconditions.checkState(
1✔
629
          shutdownInitiated, "transportShutdown() must be called before transportTerminated().");
630

631
      channelLogger.log(ChannelLogLevel.INFO, "{0} Terminated", transport.getLogId());
1✔
632
      channelz.removeClientSocket(transport);
1✔
633
      handleTransportInUseState(transport, false);
1✔
634
      for (ClientTransportFilter filter : transportFilters) {
1✔
635
        filter.transportTerminated(transport.getAttributes());
1✔
636
      }
1✔
637
      syncContext.execute(new Runnable() {
1✔
638
        @Override
639
        public void run() {
640
          transports.remove(transport);
1✔
641
          if (state.getState() == SHUTDOWN && transports.isEmpty()) {
1✔
642
            handleTermination();
1✔
643
          }
644
        }
1✔
645
      });
646
    }
1✔
647
  }
648

649
  // All methods are called in syncContext
650
  abstract static class Callback {
1✔
651
    /**
652
     * Called when the subchannel is terminated, which means it's shut down and all transports
653
     * have been terminated.
654
     */
655
    @ForOverride
656
    void onTerminated(InternalSubchannel is) { }
×
657

658
    /**
659
     * Called when the subchannel's connectivity state has changed.
660
     */
661
    @ForOverride
662
    void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { }
×
663

664
    /**
665
     * Called when the subchannel's in-use state has changed to true, which means at least one
666
     * transport is in use.
667
     */
668
    @ForOverride
669
    void onInUse(InternalSubchannel is) { }
1✔
670

671
    /**
672
     * Called when the subchannel's in-use state has changed to false, which means no transport is
673
     * in use.
674
     */
675
    @ForOverride
676
    void onNotInUse(InternalSubchannel is) { }
×
677
  }
678

679
  @VisibleForTesting
680
  static final class CallTracingTransport extends ForwardingConnectionClientTransport {
681
    private final ConnectionClientTransport delegate;
682
    private final CallTracer callTracer;
683

684
    private CallTracingTransport(ConnectionClientTransport delegate, CallTracer callTracer) {
1✔
685
      this.delegate = delegate;
1✔
686
      this.callTracer = callTracer;
1✔
687
    }
1✔
688

689
    @Override
690
    protected ConnectionClientTransport delegate() {
691
      return delegate;
1✔
692
    }
693

694
    @Override
695
    public ClientStream newStream(
696
        MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
697
        ClientStreamTracer[] tracers) {
698
      final ClientStream streamDelegate = super.newStream(method, headers, callOptions, tracers);
1✔
699
      return new ForwardingClientStream() {
1✔
700
        @Override
701
        protected ClientStream delegate() {
702
          return streamDelegate;
1✔
703
        }
704

705
        @Override
706
        public void start(final ClientStreamListener listener) {
707
          callTracer.reportCallStarted();
1✔
708
          super.start(new ForwardingClientStreamListener() {
1✔
709
            @Override
710
            protected ClientStreamListener delegate() {
711
              return listener;
1✔
712
            }
713

714
            @Override
715
            public void closed(
716
                Status status, RpcProgress rpcProgress, Metadata trailers) {
717
              callTracer.reportCallEnded(status.isOk());
1✔
718
              super.closed(status, rpcProgress, trailers);
1✔
719
            }
1✔
720
          });
721
        }
1✔
722
      };
723
    }
724
  }
725

726
  /** Index as in 'i', the pointer to an entry. Not a "search index." */
727
  @VisibleForTesting
728
  static final class Index {
729
    private List<EquivalentAddressGroup> addressGroups;
730
    private int groupIndex;
731
    private int addressIndex;
732

733
    public Index(List<EquivalentAddressGroup> groups) {
1✔
734
      this.addressGroups = groups;
1✔
735
    }
1✔
736

737
    public boolean isValid() {
738
      // addressIndex will never be invalid
739
      return groupIndex < addressGroups.size();
1✔
740
    }
741

742
    public boolean isAtBeginning() {
743
      return groupIndex == 0 && addressIndex == 0;
1✔
744
    }
745

746
    public void increment() {
747
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
748
      addressIndex++;
1✔
749
      if (addressIndex >= group.getAddresses().size()) {
1✔
750
        groupIndex++;
1✔
751
        addressIndex = 0;
1✔
752
      }
753
    }
1✔
754

755
    public void reset() {
756
      groupIndex = 0;
1✔
757
      addressIndex = 0;
1✔
758
    }
1✔
759

760
    public SocketAddress getCurrentAddress() {
761
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
762
    }
763

764
    public Attributes getCurrentEagAttributes() {
765
      return addressGroups.get(groupIndex).getAttributes();
1✔
766
    }
767

768
    public List<EquivalentAddressGroup> getGroups() {
769
      return addressGroups;
1✔
770
    }
771

772
    /** Update to new groups, resetting the current index. */
773
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
774
      addressGroups = newGroups;
1✔
775
      reset();
1✔
776
    }
1✔
777

778
    /** Returns false if the needle was not found and the current index was left unchanged. */
779
    public boolean seekTo(SocketAddress needle) {
780
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
781
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
782
        int j = group.getAddresses().indexOf(needle);
1✔
783
        if (j == -1) {
1✔
784
          continue;
1✔
785
        }
786
        this.groupIndex = i;
1✔
787
        this.addressIndex = j;
1✔
788
        return true;
1✔
789
      }
790
      return false;
1✔
791
    }
792
  }
793

794
  private String printShortStatus(Status status) {
795
    StringBuilder buffer = new StringBuilder();
1✔
796
    buffer.append(status.getCode());
1✔
797
    if (status.getDescription() != null) {
1✔
798
      buffer.append("(").append(status.getDescription()).append(")");
1✔
799
    }
800
    if (status.getCause() != null) {
1✔
801
      buffer.append("[").append(status.getCause()).append("]");
1✔
802
    }
803
    return buffer.toString();
1✔
804
  }
805

806
  @VisibleForTesting
807
  static final class TransportLogger extends ChannelLogger {
1✔
808
    // Changed just after construction to break a cyclic dependency.
809
    InternalLogId logId;
810

811
    @Override
812
    public void log(ChannelLogLevel level, String message) {
813
      ChannelLoggerImpl.logOnly(logId, level, message);
1✔
814
    }
1✔
815

816
    @Override
817
    public void log(ChannelLogLevel level, String messageFormat, Object... args) {
818
      ChannelLoggerImpl.logOnly(logId, level, messageFormat, args);
1✔
819
    }
1✔
820
  }
821
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc