• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20136

06 Jan 2026 05:27AM UTC coverage: 88.693% (+0.01%) from 88.681%
#20136

push

github

web-flow
core: Implement oobChannel with resolvingOobChannel

The most important part of this change is to ensure that CallCredentials
are not propagated to the OOB channel. Because the authority of the OOB
channel doesn't match the parent channel, we must ensure that any bearer
tokens are not sent to the different server. However, this was not a
problem because resolvingOobChannel has the same constraint. (RLS has a
different constraint, but we were able to let RLS manage that itself.)

This commit does change the behavior of channelz, shutdown, and metrics
for the OOB channel. Previously the OOB channel was registered with
channelz, but it is only a TODO for resolving channel. Channel shutdown
no longer shuts down the OOB channel and it no longer waits for the OOB
channel to terminate before becoming terminated itself. That is also a
pre-existing TODO. Since ManagedChannelImplBuilder is now being used,
global configurators and census are enabled. The proper behavior here is
still being determined, but we would want it to be the same for
resolving OOB channel and OOB channel.

The OOB channel used to refresh the name resolution when the subchannel
went IDLE or TF. That is an older behavior from back when regular
subchannels would also cause the name resolver to refresh. Now-a-days
that goes though the LB tree. gRPC-LB already refreshes name resolution
when its RPC closes, so no longer doing it automatically should be fine.

balancerRpcExecutorPool no longer has its lifetime managed by the child.
It'd be easiest to not use it at all from OOB channel, which wouldn't
actually change the regular behavior, as channels already use the same
executor by default. However, the tests are making use of the executor
being injected, so some propagation needs to be preserved.

Lots of OOB channel tests were deleted, but these were either testing
OobChannel, which is now gone, or things like channelz, which are known
to no longer work like before.

35361 of 39869 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.96
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.collect.ImmutableList;
28
import com.google.common.collect.Lists;
29
import io.grpc.Attributes;
30
import io.grpc.ConnectivityState;
31
import io.grpc.ConnectivityStateInfo;
32
import io.grpc.EquivalentAddressGroup;
33
import io.grpc.LoadBalancer;
34
import io.grpc.Status;
35
import io.grpc.SynchronizationContext.ScheduledHandle;
36
import java.net.Inet4Address;
37
import java.net.InetSocketAddress;
38
import java.net.SocketAddress;
39
import java.util.ArrayList;
40
import java.util.Collections;
41
import java.util.HashMap;
42
import java.util.HashSet;
43
import java.util.List;
44
import java.util.Map;
45
import java.util.Random;
46
import java.util.Set;
47
import java.util.concurrent.TimeUnit;
48
import java.util.concurrent.atomic.AtomicBoolean;
49
import java.util.logging.Level;
50
import java.util.logging.Logger;
51
import javax.annotation.Nullable;
52

53
/**
54
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
55
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
56
 * list and sticking to the first that works.
57
 */
58
final class PickFirstLeafLoadBalancer extends LoadBalancer {
59
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
60
  @VisibleForTesting
61
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
62
  private final boolean enableHappyEyeballs = !isSerializingRetries()
1✔
63
      && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
1✔
64
  private final Helper helper;
65
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
66
  private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
1✔
67
  private int numTf = 0;
1✔
68
  private boolean firstPass = true;
1✔
69
  @Nullable
1✔
70
  private ScheduledHandle scheduleConnectionTask = null;
71
  private ConnectivityState rawConnectivityState = IDLE;
1✔
72
  private ConnectivityState concludedState = IDLE;
1✔
73
  private boolean notAPetiolePolicy = true; // means not under a petiole policy
1✔
74
  private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
1✔
75
  private BackoffPolicy reconnectPolicy;
76
  @Nullable
1✔
77
  private ScheduledHandle reconnectTask = null;
78
  private final boolean serializingRetries = isSerializingRetries();
1✔
79

80
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
81
    this.helper = checkNotNull(helper, "helper");
1✔
82
  }
1✔
83

84
  static boolean isSerializingRetries() {
85
    return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
1✔
86
  }
87

88
  @Override
89
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
90
    if (rawConnectivityState == SHUTDOWN) {
1✔
91
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
92
    }
93

94
    // Check whether this is a petiole policy, which is based off of an address attribute
95
    Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
1✔
96
    this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
1✔
97

98
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
99

100
    // Validate the address list
101
    if (servers.isEmpty()) {
1✔
102
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
103
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
104
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
105
      handleNameResolutionError(unavailableStatus);
1✔
106
      return unavailableStatus;
1✔
107
    }
108
    for (EquivalentAddressGroup eag : servers) {
1✔
109
      if (eag == null) {
1✔
110
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
111
            "NameResolver returned address list with null endpoint. addrs="
112
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
113
                + resolvedAddresses.getAttributes());
1✔
114
        handleNameResolutionError(unavailableStatus);
1✔
115
        return unavailableStatus;
1✔
116
      }
117
    }
1✔
118

119
    // Since we have a new set of addresses, we are again at first pass
120
    firstPass = true;
1✔
121

122
    List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
1✔
123

124
    // We can optionally be configured to shuffle the address list. This can help better distribute
125
    // the load.
126
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
127
        instanceof PickFirstLeafLoadBalancerConfig) {
128
      PickFirstLeafLoadBalancerConfig config
1✔
129
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
130
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
131
        Collections.shuffle(cleanServers,
1✔
132
            config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
133
      }
134
    }
135

136
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
137
        ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
1✔
138

139
    if (rawConnectivityState == READY
1✔
140
        || (rawConnectivityState == CONNECTING
141
          && (!enableHappyEyeballs || addressIndex.isValid()))) {
1✔
142
      // If the previous ready (or connecting) subchannel exists in new address list,
143
      // keep this connection and don't create new subchannels. Happy Eyeballs is excluded when
144
      // connecting, because it allows multiple attempts simultaneously, thus is fine to start at
145
      // the beginning.
146
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
147
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
148
      if (addressIndex.seekTo(previousAddress)) {
1✔
149
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
150
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
151
        shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
152
        return Status.OK;
1✔
153
      }
154
      // Previous ready subchannel not in the new list of addresses
155
    } else {
1✔
156
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
157
    }
158

159
    // No old addresses means first time through, so we will do an explicit move to CONNECTING
160
    // which is what we implicitly started with
161
    boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
162

163
    if (noOldAddrs) {
1✔
164
      // Make tests happy; they don't properly assume starting in CONNECTING
165
      rawConnectivityState = CONNECTING;
1✔
166
      updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
167
    }
168

169
    if (rawConnectivityState == READY) {
1✔
170
      // connect from beginning when prompted
171
      rawConnectivityState = IDLE;
1✔
172
      updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
173

174
    } else if (rawConnectivityState == CONNECTING || rawConnectivityState == TRANSIENT_FAILURE) {
1✔
175
      // start connection attempt at first address
176
      cancelScheduleTask();
1✔
177
      requestConnection();
1✔
178
    }
179

180
    return Status.OK;
1✔
181
  }
182

183
  /**
184
   * Compute the difference between the flattened new addresses and the old addresses that had been
185
   * made into subchannels and then shutdown the matching subchannels.
186
   * @return true if there were no old addresses
187
   */
188
  private boolean shutdownRemovedAddresses(
189
      ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {
190

191
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
192

193
    // Flatten the new EAGs addresses
194
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
195
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
196
      newAddrs.addAll(endpoint.getAddresses());
1✔
197
    }
1✔
198

199
    // Shut them down and remove them
200
    for (SocketAddress oldAddr : oldAddrs) {
1✔
201
      if (!newAddrs.contains(oldAddr)) {
1✔
202
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
203
      }
204
    }
1✔
205
    return oldAddrs.isEmpty();
1✔
206
  }
207

208
  private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
209
    Set<SocketAddress> seenAddresses = new HashSet<>();
1✔
210
    List<EquivalentAddressGroup> newGroups = new ArrayList<>();
1✔
211

212
    for (EquivalentAddressGroup group : groups) {
1✔
213
      List<SocketAddress> addrs = new ArrayList<>();
1✔
214
      for (SocketAddress addr : group.getAddresses()) {
1✔
215
        if (seenAddresses.add(addr)) {
1✔
216
          addrs.add(addr);
1✔
217
        }
218
      }
1✔
219
      if (!addrs.isEmpty()) {
1✔
220
        newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
1✔
221
      }
222
    }
1✔
223

224
    return newGroups;
1✔
225
  }
226

227
  @Override
228
  public void handleNameResolutionError(Status error) {
229
    if (rawConnectivityState == SHUTDOWN) {
1✔
230
      return;
×
231
    }
232

233
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
234
      subchannelData.getSubchannel().shutdown();
1✔
235
    }
1✔
236
    subchannels.clear();
1✔
237
    addressIndex.updateGroups(ImmutableList.of());
1✔
238
    rawConnectivityState = TRANSIENT_FAILURE;
1✔
239
    updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
240
  }
1✔
241

242
  void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
243
    ConnectivityState newState = stateInfo.getState();
1✔
244

245
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
246
    // To prevent pickers from returning these obsolete subchannels, this logic
247
    // is included to check if the current list of active subchannels includes this subchannel.
248
    if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
1✔
249
      return;
1✔
250
    }
251

252
    if (newState == SHUTDOWN) {
1✔
253
      return;
1✔
254
    }
255

256
    if (newState == IDLE && subchannelData.state == READY) {
1✔
257
      helper.refreshNameResolution();
1✔
258
    }
259

260
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
261
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
262
    // transient failure". Only a subchannel state change to READY will get the LB out of
263
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
264
    // keep retrying for a connection.
265

266
    // With the new pick first implementation, individual subchannels will have their own backoff
267
    // on a per-address basis. Thus, iterative requests for connections will not be requested
268
    // once the first pass through is complete.
269
    // However, every time there is an address update, we will perform a pass through for the new
270
    // addresses in the updated list.
271
    subchannelData.updateState(newState);
1✔
272
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
273
      if (newState == CONNECTING) {
1✔
274
        // each subchannel is responsible for its own backoff
275
        return;
1✔
276
      } else if (newState == IDLE) {
1✔
277
        requestConnection();
1✔
278
        return;
1✔
279
      }
280
    }
281

282
    switch (newState) {
1✔
283
      case IDLE:
284
        // Shutdown when ready: connect from beginning when prompted
285
        addressIndex.reset();
1✔
286
        rawConnectivityState = IDLE;
1✔
287
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
288
        break;
1✔
289

290
      case CONNECTING:
291
        rawConnectivityState = CONNECTING;
1✔
292
        updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
293
        break;
1✔
294

295
      case READY:
296
        shutdownRemaining(subchannelData);
1✔
297
        addressIndex.seekTo(getAddress(subchannelData.subchannel));
1✔
298
        rawConnectivityState = READY;
1✔
299
        updateHealthCheckedState(subchannelData);
1✔
300
        break;
1✔
301

302
      case TRANSIENT_FAILURE:
303
        // If we are looking at current channel, request a connection if possible
304
        if (addressIndex.isValid()
1✔
305
            && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
1✔
306
          if (addressIndex.increment()) {
1✔
307
            cancelScheduleTask();
1✔
308
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
309
          } else {
310
            if (subchannels.size() >= addressIndex.size()) {
1✔
311
              scheduleBackoff();
1✔
312
            } else {
313
              // We must have done a seek to the middle of the list lets start over from the
314
              // beginning
315
              addressIndex.reset();
1✔
316
              requestConnection();
1✔
317
            }
318
          }
319
        }
320

321
        if (isPassComplete()) {
1✔
322
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
323
          updateBalancingState(TRANSIENT_FAILURE,
1✔
324
              new FixedResultPicker(PickResult.withError(stateInfo.getStatus())));
1✔
325

326
          // Refresh Name Resolution, but only when all 3 conditions are met
327
          // * We are at the end of addressIndex
328
          // * have had status reported for all subchannels.
329
          // * And one of the following conditions:
330
          //    * Have had enough TF reported since we completed first pass
331
          //    * Just completed the first pass
332
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
333
            firstPass = false;
1✔
334
            numTf = 0;
1✔
335
            helper.refreshNameResolution();
1✔
336
          }
337
        }
338
        break;
339

340
      default:
341
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
342
    }
343
  }
1✔
344

345
  /**
346
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
347
   */
348
  private void scheduleBackoff() {
349
    if (!serializingRetries) {
1✔
350
      return;
1✔
351
    }
352

353
    class EndOfCurrentBackoff implements Runnable {
1✔
354
      @Override
355
      public void run() {
356
        reconnectTask = null;
1✔
357
        addressIndex.reset();
1✔
358
        requestConnection();
1✔
359
      }
1✔
360
    }
361

362
    // Just allow the previous one to trigger when ready if we're already in backoff
363
    if (reconnectTask != null) {
1✔
364
      return;
×
365
    }
366

367
    if (reconnectPolicy == null) {
1✔
368
      reconnectPolicy = bkoffPolProvider.get();
1✔
369
    }
370
    long delayNanos = reconnectPolicy.nextBackoffNanos();
1✔
371
    reconnectTask = helper.getSynchronizationContext().schedule(
1✔
372
        new EndOfCurrentBackoff(),
373
        delayNanos,
374
        TimeUnit.NANOSECONDS,
375
        helper.getScheduledExecutorService());
1✔
376
  }
1✔
377

378
  private void updateHealthCheckedState(SubchannelData subchannelData) {
379
    if (subchannelData.state != READY) {
1✔
380
      return;
1✔
381
    }
382

383
    if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
1✔
384
      updateBalancingState(READY,
1✔
385
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
386
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
387
      updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(
1✔
388
          subchannelData.healthStateInfo.getStatus())));
1✔
389
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
390
      updateBalancingState(subchannelData.getHealthState(),
1✔
391
          new FixedResultPicker(PickResult.withNoResult()));
1✔
392
    }
393
  }
1✔
394

395
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
396
    // an optimization: de-dup IDLE or CONNECTING notification.
397
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
398
      return;
1✔
399
    }
400
    concludedState = state;
1✔
401
    helper.updateBalancingState(state, picker);
1✔
402
  }
1✔
403

404
  @Override
405
  public void shutdown() {
406
    log.log(Level.FINE,
1✔
407
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
408
    rawConnectivityState = SHUTDOWN;
1✔
409
    concludedState = SHUTDOWN;
1✔
410
    cancelScheduleTask();
1✔
411
    if (reconnectTask != null) {
1✔
412
      reconnectTask.cancel();
1✔
413
      reconnectTask = null;
1✔
414
    }
415
    reconnectPolicy = null;
1✔
416

417
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
418
      subchannelData.getSubchannel().shutdown();
1✔
419
    }
1✔
420

421
    subchannels.clear();
1✔
422
  }
1✔
423

424
  /**
425
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
426
  * that all other subchannels must be shutdown.
427
  */
428
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
429
    if (reconnectTask != null) {
1✔
430
      reconnectTask.cancel();
1✔
431
      reconnectTask = null;
1✔
432
    }
433
    reconnectPolicy = null;
1✔
434

435
    cancelScheduleTask();
1✔
436
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
437
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
438
        subchannelData.getSubchannel().shutdown();
1✔
439
      }
440
    }
1✔
441
    subchannels.clear();
1✔
442
    activeSubchannelData.updateState(READY);
1✔
443
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
444
  }
1✔
445

446
  /**
447
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
448
   * Schedules a connection to next address in list as well.
449
   * If the current channel has already attempted a connection, we attempt a connection
450
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
451
   * return null.
452
   */
453
  @Override
454
  public void requestConnection() {
455
    if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
1✔
456
      return;
1✔
457
    }
458

459
    SocketAddress currentAddress = addressIndex.getCurrentAddress();
1✔
460
    SubchannelData subchannelData = subchannels.get(currentAddress);
1✔
461
    if (subchannelData == null) {
1✔
462
      subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
463
    }
464

465
    ConnectivityState subchannelState = subchannelData.getState();
1✔
466
    switch (subchannelState) {
1✔
467
      case IDLE:
468
        subchannelData.subchannel.requestConnection();
1✔
469
        subchannelData.updateState(CONNECTING);
1✔
470
        scheduleNextConnection();
1✔
471
        break;
1✔
472
      case CONNECTING:
473
        scheduleNextConnection();
1✔
474
        break;
1✔
475
      case TRANSIENT_FAILURE:
476
        if (!serializingRetries) {
1✔
477
          addressIndex.increment();
1✔
478
          requestConnection();
1✔
479
        } else {
480
          if (!addressIndex.isValid()) {
1✔
481
            scheduleBackoff();
×
482
          } else {
483
            subchannelData.subchannel.requestConnection();
1✔
484
            subchannelData.updateState(CONNECTING);
1✔
485
          }
486
        }
487
        break;
1✔
488
      default:
489
        // Wait for current subchannel to change state
490
    }
491
  }
1✔
492

493

494
  /**
495
  * Happy Eyeballs
496
  * Schedules connection attempt to happen after a delay to the next available address.
497
  */
498
  private void scheduleNextConnection() {
499
    if (!enableHappyEyeballs
1✔
500
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
501
      return;
1✔
502
    }
503

504
    class StartNextConnection implements Runnable {
1✔
505
      @Override
506
      public void run() {
507
        scheduleConnectionTask = null;
1✔
508
        if (addressIndex.increment()) {
1✔
509
          requestConnection();
1✔
510
        }
511
      }
1✔
512
    }
513

514
    scheduleConnectionTask = helper.getSynchronizationContext().schedule(
1✔
515
        new StartNextConnection(),
516
        CONNECTION_DELAY_INTERVAL_MS,
517
        TimeUnit.MILLISECONDS,
518
        helper.getScheduledExecutorService());
1✔
519
  }
1✔
520

521
  private void cancelScheduleTask() {
522
    if (scheduleConnectionTask != null) {
1✔
523
      scheduleConnectionTask.cancel();
1✔
524
      scheduleConnectionTask = null;
1✔
525
    }
526
  }
1✔
527

528
  private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
529
    HealthListener hcListener = new HealthListener();
1✔
530
    final Subchannel subchannel = helper.createSubchannel(
1✔
531
        CreateSubchannelArgs.newBuilder()
1✔
532
            .setAddresses(Lists.newArrayList(
1✔
533
                new EquivalentAddressGroup(addr, attrs)))
534
            .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
535
            .addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
1✔
536
            .build());
1✔
537
    if (subchannel == null) {
1✔
538
      log.warning("Was not able to create subchannel for " + addr);
×
539
      throw new IllegalStateException("Can't create subchannel");
×
540
    }
541
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
1✔
542
    hcListener.subchannelData = subchannelData;
1✔
543
    subchannels.put(addr, subchannelData);
1✔
544
    Attributes scAttrs = subchannel.getAttributes();
1✔
545
    if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
546
      subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
547
    }
548
    subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
1✔
549
    return subchannelData;
1✔
550
  }
551

552
  private boolean isPassComplete() {
553
    if (subchannels.size() < addressIndex.size()) {
1✔
554
      return false;
1✔
555
    }
556
    for (SubchannelData sc : subchannels.values()) {
1✔
557
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
558
        return false;
1✔
559
      }
560
    }
1✔
561
    return true;
1✔
562
  }
563

564
  private final class HealthListener implements SubchannelStateListener {
1✔
565
    private SubchannelData subchannelData;
566

567
    @Override
568
    public void onSubchannelState(ConnectivityStateInfo newState) {
569
      if (notAPetiolePolicy) {
1✔
570
        log.log(Level.WARNING,
1✔
571
            "Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
572
            new Object[]{newState, subchannelData.subchannel});
1✔
573
        return;
1✔
574
      }
575

576
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
577
          new Object[]{newState, subchannelData.subchannel});
1✔
578
      subchannelData.healthStateInfo = newState;
1✔
579
      if (addressIndex.isValid()
1✔
580
          && subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
1✔
581
        updateHealthCheckedState(subchannelData);
1✔
582
      }
583
    }
1✔
584
  }
585

586
  private SocketAddress getAddress(Subchannel subchannel) {
587
    return subchannel.getAddresses().getAddresses().get(0);
1✔
588
  }
589

590
  @VisibleForTesting
591
  ConnectivityState getConcludedConnectivityState() {
592
    return this.concludedState;
1✔
593
  }
594

595
  /**
596
   * Picker that requests connection during the first pick, and returns noResult.
597
   */
598
  private final class RequestConnectionPicker extends SubchannelPicker {
599
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
600
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
601

602
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
603
      this.pickFirstLeafLoadBalancer =
1✔
604
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
605
    }
1✔
606

607
    @Override
608
    public PickResult pickSubchannel(PickSubchannelArgs args) {
609
      if (connectionRequested.compareAndSet(false, true)) {
1✔
610
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
611
      }
612
      return PickResult.withNoResult();
1✔
613
    }
614
  }
615

616
  /**
617
   * This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry.
618
   * All updates should be done in a synchronization context.
619
   */
620
  @VisibleForTesting
621
  static final class Index {
622
    private List<UnwrappedEag> orderedAddresses;
623
    private int activeElement = 0;
1✔
624
    private boolean enableHappyEyeballs;
625

626
    Index(List<EquivalentAddressGroup> groups, boolean enableHappyEyeballs) {
1✔
627
      this.enableHappyEyeballs = enableHappyEyeballs;
1✔
628
      updateGroups(groups);
1✔
629
    }
1✔
630

631
    public boolean isValid() {
632
      return activeElement < orderedAddresses.size();
1✔
633
    }
634

635
    public boolean isAtBeginning() {
636
      return activeElement == 0;
1✔
637
    }
638

639
    /**
640
     * Move to next address in group.  If last address in group move to first address of next group.
641
     * @return false if went off end of the list, otherwise true
642
     */
643
    public boolean increment() {
644
      if (!isValid()) {
1✔
645
        return false;
1✔
646
      }
647

648
      activeElement++;
1✔
649

650
      return isValid();
1✔
651
    }
652

653
    public void reset() {
654
      activeElement = 0;
1✔
655
    }
1✔
656

657
    public SocketAddress getCurrentAddress() {
658
      if (!isValid()) {
1✔
659
        throw new IllegalStateException("Index is past the end of the address group list");
1✔
660
      }
661
      return orderedAddresses.get(activeElement).address;
1✔
662
    }
663

664
    public Attributes getCurrentEagAttributes() {
665
      if (!isValid()) {
1✔
666
        throw new IllegalStateException("Index is off the end of the address group list");
×
667
      }
668
      return orderedAddresses.get(activeElement).attributes;
1✔
669
    }
670

671
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
672
      return Collections.singletonList(getCurrentEag());
1✔
673
    }
674

675
    private EquivalentAddressGroup getCurrentEag() {
676
      if (!isValid()) {
1✔
677
        throw new IllegalStateException("Index is past the end of the address group list");
×
678
      }
679
      return orderedAddresses.get(activeElement).asEag();
1✔
680
    }
681

682
    /**
683
     * Update to new groups, resetting the current index.
684
     */
685
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
686
      checkNotNull(newGroups, "newGroups");
1✔
687
      orderedAddresses = enableHappyEyeballs
1✔
688
                             ? updateGroupsHE(newGroups)
1✔
689
                             : updateGroupsNonHE(newGroups);
1✔
690
      reset();
1✔
691
    }
1✔
692

693
    /**
694
     * Returns false if the needle was not found and the current index was left unchanged.
695
     */
696
    public boolean seekTo(SocketAddress needle) {
697
      checkNotNull(needle, "needle");
1✔
698
      for (int i = 0; i < orderedAddresses.size(); i++) {
1✔
699
        if (orderedAddresses.get(i).address.equals(needle)) {
1✔
700
          this.activeElement = i;
1✔
701
          return true;
1✔
702
        }
703
      }
704
      return false;
1✔
705
    }
706

707
    public int size() {
708
      return orderedAddresses.size();
1✔
709
    }
710

711
    private List<UnwrappedEag> updateGroupsNonHE(List<EquivalentAddressGroup> newGroups) {
712
      List<UnwrappedEag> entries = new ArrayList<>();
1✔
713
      for (int g = 0; g < newGroups.size(); g++) {
1✔
714
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
715
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
716
          SocketAddress addr = eag.getAddresses().get(a);
1✔
717
          entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
718
        }
719
      }
720

721
      return entries;
1✔
722
    }
723

724
    private List<UnwrappedEag> updateGroupsHE(List<EquivalentAddressGroup> newGroups) {
725
      Boolean firstIsV6 = null;
1✔
726
      List<UnwrappedEag> v4Entries = new ArrayList<>();
1✔
727
      List<UnwrappedEag> v6Entries = new ArrayList<>();
1✔
728
      for (int g = 0; g <  newGroups.size(); g++) {
1✔
729
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
730
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
731
          SocketAddress addr = eag.getAddresses().get(a);
1✔
732
          boolean isIpV4 = addr instanceof InetSocketAddress
1✔
733
              && ((InetSocketAddress) addr).getAddress() instanceof Inet4Address;
1✔
734
          if (isIpV4) {
1✔
735
            if (firstIsV6 == null) {
1✔
736
              firstIsV6 = false;
1✔
737
            }
738
            v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
739
          } else {
740
            if (firstIsV6 == null) {
1✔
741
              firstIsV6 = true;
1✔
742
            }
743
            v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
744
          }
745
        }
746
      }
747

748
      return firstIsV6 != null && firstIsV6
1✔
749
          ? interleave(v6Entries, v4Entries)
1✔
750
          : interleave(v4Entries, v6Entries);
1✔
751
    }
752

753
    private List<UnwrappedEag> interleave(List<UnwrappedEag> firstFamily,
754
                                          List<UnwrappedEag> secondFamily) {
755
      if (firstFamily.isEmpty()) {
1✔
756
        return secondFamily;
1✔
757
      }
758
      if (secondFamily.isEmpty()) {
1✔
759
        return firstFamily;
1✔
760
      }
761

762
      List<UnwrappedEag> result = new ArrayList<>(firstFamily.size() + secondFamily.size());
1✔
763
      for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) {
1✔
764
        if (i < firstFamily.size()) {
1✔
765
          result.add(firstFamily.get(i));
1✔
766
        }
767
        if (i < secondFamily.size()) {
1✔
768
          result.add(secondFamily.get(i));
1✔
769
        }
770
      }
771
      return result;
1✔
772
    }
773

774
    private static final class UnwrappedEag {
775
      private final Attributes attributes;
776
      private final SocketAddress address;
777

778
      public UnwrappedEag(Attributes attributes, SocketAddress address) {
1✔
779
        this.attributes = attributes;
1✔
780
        this.address = address;
1✔
781
      }
1✔
782

783
      private EquivalentAddressGroup asEag() {
784
        return new EquivalentAddressGroup(address, attributes);
1✔
785
      }
786
    }
787
  }
788

789
  @VisibleForTesting
790
  int getIndexLocation() {
791
    return addressIndex.activeElement;
1✔
792
  }
793

794
  @VisibleForTesting
795
  boolean isIndexValid() {
796
    return addressIndex.isValid();
1✔
797
  }
798

799
  private static final class SubchannelData {
800
    private final Subchannel subchannel;
801
    private ConnectivityState state;
802
    private boolean completedConnectivityAttempt = false;
1✔
803
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
804

805
    public SubchannelData(Subchannel subchannel, ConnectivityState state) {
1✔
806
      this.subchannel = subchannel;
1✔
807
      this.state = state;
1✔
808
    }
1✔
809

810
    public Subchannel getSubchannel() {
811
      return this.subchannel;
1✔
812
    }
813

814
    public ConnectivityState getState() {
815
      return this.state;
1✔
816
    }
817

818
    public boolean isCompletedConnectivityAttempt() {
819
      return completedConnectivityAttempt;
1✔
820
    }
821

822
    private void updateState(ConnectivityState newState) {
823
      this.state = newState;
1✔
824
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
825
        completedConnectivityAttempt = true;
1✔
826
      } else if (newState == IDLE) {
1✔
827
        completedConnectivityAttempt = false;
1✔
828
      }
829
    }
1✔
830

831
    private ConnectivityState getHealthState() {
832
      return healthStateInfo.getState();
1✔
833
    }
834
  }
835

836
  public static final class PickFirstLeafLoadBalancerConfig {
837

838
    @Nullable
839
    public final Boolean shuffleAddressList;
840

841
    // For testing purposes only, not meant to be parsed from a real config.
842
    @Nullable
843
    final Long randomSeed;
844

845
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
846
      this(shuffleAddressList, null);
1✔
847
    }
1✔
848

849
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
850
        @Nullable Long randomSeed) {
1✔
851
      this.shuffleAddressList = shuffleAddressList;
1✔
852
      this.randomSeed = randomSeed;
1✔
853
    }
1✔
854
  }
855

856
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc