• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20080

13 Nov 2025 03:38PM UTC coverage: 88.52% (-0.02%) from 88.535%
#20080

push

github

ejona86
core: Fix NPE during address update with Happy Eyeballs

Fixes #12168

34643 of 39136 relevant lines covered (88.52%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.Lists;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.ConnectivityStateInfo;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.LoadBalancer;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext.ScheduledHandle;
37
import java.net.Inet4Address;
38
import java.net.InetSocketAddress;
39
import java.net.SocketAddress;
40
import java.util.ArrayList;
41
import java.util.Collections;
42
import java.util.HashMap;
43
import java.util.HashSet;
44
import java.util.List;
45
import java.util.Map;
46
import java.util.Random;
47
import java.util.Set;
48
import java.util.concurrent.TimeUnit;
49
import java.util.concurrent.atomic.AtomicBoolean;
50
import java.util.logging.Level;
51
import java.util.logging.Logger;
52
import javax.annotation.Nullable;
53

54
/**
55
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
56
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
57
 * list and sticking to the first that works.
58
 */
59
final class PickFirstLeafLoadBalancer extends LoadBalancer {
60
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
61
  @VisibleForTesting
62
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
63
  private final boolean enableHappyEyeballs = !isSerializingRetries()
1✔
64
      && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
1✔
65
  private final Helper helper;
66
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
67
  private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
1✔
68
  private int numTf = 0;
1✔
69
  private boolean firstPass = true;
1✔
70
  @Nullable
1✔
71
  private ScheduledHandle scheduleConnectionTask = null;
72
  private ConnectivityState rawConnectivityState = IDLE;
1✔
73
  private ConnectivityState concludedState = IDLE;
1✔
74
  private boolean notAPetiolePolicy = true; // means not under a petiole policy
1✔
75
  private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
1✔
76
  private BackoffPolicy reconnectPolicy;
77
  @Nullable
1✔
78
  private ScheduledHandle reconnectTask = null;
79
  private final boolean serializingRetries = isSerializingRetries();
1✔
80

81
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
82
    this.helper = checkNotNull(helper, "helper");
1✔
83
  }
1✔
84

85
  static boolean isSerializingRetries() {
86
    return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
1✔
87
  }
88

89
  @Override
90
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
91
    if (rawConnectivityState == SHUTDOWN) {
1✔
92
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
93
    }
94

95
    // Check whether this is a petiole policy, which is based off of an address attribute
96
    Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
1✔
97
    this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
1✔
98

99
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
100

101
    // Validate the address list
102
    if (servers.isEmpty()) {
1✔
103
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
104
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
105
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
106
      handleNameResolutionError(unavailableStatus);
1✔
107
      return unavailableStatus;
1✔
108
    }
109
    for (EquivalentAddressGroup eag : servers) {
1✔
110
      if (eag == null) {
1✔
111
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
112
            "NameResolver returned address list with null endpoint. addrs="
113
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
114
                + resolvedAddresses.getAttributes());
1✔
115
        handleNameResolutionError(unavailableStatus);
1✔
116
        return unavailableStatus;
1✔
117
      }
118
    }
1✔
119

120
    // Since we have a new set of addresses, we are again at first pass
121
    firstPass = true;
1✔
122

123
    List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
1✔
124

125
    // We can optionally be configured to shuffle the address list. This can help better distribute
126
    // the load.
127
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
128
        instanceof PickFirstLeafLoadBalancerConfig) {
129
      PickFirstLeafLoadBalancerConfig config
1✔
130
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
131
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
132
        Collections.shuffle(cleanServers,
1✔
133
            config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
134
      }
135
    }
136

137
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
138
        ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
1✔
139

140
    if (rawConnectivityState == READY
1✔
141
        || (rawConnectivityState == CONNECTING
142
          && (!enableHappyEyeballs || addressIndex.isValid()))) {
1✔
143
      // If the previous ready (or connecting) subchannel exists in new address list,
144
      // keep this connection and don't create new subchannels. Happy Eyeballs is excluded when
145
      // connecting, because it allows multiple attempts simultaneously, thus is fine to start at
146
      // the beginning.
147
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
148
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
149
      if (addressIndex.seekTo(previousAddress)) {
1✔
150
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
151
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
152
        shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
153
        return Status.OK;
1✔
154
      }
155
      // Previous ready subchannel not in the new list of addresses
156
    } else {
1✔
157
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
158
    }
159

160
    // No old addresses means first time through, so we will do an explicit move to CONNECTING
161
    // which is what we implicitly started with
162
    boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
163

164
    if (noOldAddrs) {
1✔
165
      // Make tests happy; they don't properly assume starting in CONNECTING
166
      rawConnectivityState = CONNECTING;
1✔
167
      updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
168
    }
169

170
    if (rawConnectivityState == READY) {
1✔
171
      // connect from beginning when prompted
172
      rawConnectivityState = IDLE;
1✔
173
      updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
174

175
    } else if (rawConnectivityState == CONNECTING || rawConnectivityState == TRANSIENT_FAILURE) {
1✔
176
      // start connection attempt at first address
177
      cancelScheduleTask();
1✔
178
      requestConnection();
1✔
179
    }
180

181
    return Status.OK;
1✔
182
  }
183

184
  /**
185
   * Compute the difference between the flattened new addresses and the old addresses that had been
186
   * made into subchannels and then shutdown the matching subchannels.
187
   * @return true if there were no old addresses
188
   */
189
  private boolean shutdownRemovedAddresses(
190
      ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {
191

192
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
193

194
    // Flatten the new EAGs addresses
195
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
196
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
197
      newAddrs.addAll(endpoint.getAddresses());
1✔
198
    }
1✔
199

200
    // Shut them down and remove them
201
    for (SocketAddress oldAddr : oldAddrs) {
1✔
202
      if (!newAddrs.contains(oldAddr)) {
1✔
203
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
204
      }
205
    }
1✔
206
    return oldAddrs.isEmpty();
1✔
207
  }
208

209
  private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
210
    Set<SocketAddress> seenAddresses = new HashSet<>();
1✔
211
    List<EquivalentAddressGroup> newGroups = new ArrayList<>();
1✔
212

213
    for (EquivalentAddressGroup group : groups) {
1✔
214
      List<SocketAddress> addrs = new ArrayList<>();
1✔
215
      for (SocketAddress addr : group.getAddresses()) {
1✔
216
        if (seenAddresses.add(addr)) {
1✔
217
          addrs.add(addr);
1✔
218
        }
219
      }
1✔
220
      if (!addrs.isEmpty()) {
1✔
221
        newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
1✔
222
      }
223
    }
1✔
224

225
    return newGroups;
1✔
226
  }
227

228
  @Override
229
  public void handleNameResolutionError(Status error) {
230
    if (rawConnectivityState == SHUTDOWN) {
1✔
231
      return;
×
232
    }
233

234
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
235
      subchannelData.getSubchannel().shutdown();
1✔
236
    }
1✔
237
    subchannels.clear();
1✔
238
    addressIndex.updateGroups(ImmutableList.of());
1✔
239
    rawConnectivityState = TRANSIENT_FAILURE;
1✔
240
    updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
1✔
241
  }
1✔
242

243
  void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
244
    ConnectivityState newState = stateInfo.getState();
1✔
245

246
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
247
    // To prevent pickers from returning these obsolete subchannels, this logic
248
    // is included to check if the current list of active subchannels includes this subchannel.
249
    if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
1✔
250
      return;
1✔
251
    }
252

253
    if (newState == SHUTDOWN) {
1✔
254
      return;
1✔
255
    }
256

257
    if (newState == IDLE && subchannelData.state == READY) {
1✔
258
      helper.refreshNameResolution();
1✔
259
    }
260

261
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
262
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
263
    // transient failure". Only a subchannel state change to READY will get the LB out of
264
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
265
    // keep retrying for a connection.
266

267
    // With the new pick first implementation, individual subchannels will have their own backoff
268
    // on a per-address basis. Thus, iterative requests for connections will not be requested
269
    // once the first pass through is complete.
270
    // However, every time there is an address update, we will perform a pass through for the new
271
    // addresses in the updated list.
272
    subchannelData.updateState(newState);
1✔
273
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
274
      if (newState == CONNECTING) {
1✔
275
        // each subchannel is responsible for its own backoff
276
        return;
1✔
277
      } else if (newState == IDLE) {
1✔
278
        requestConnection();
1✔
279
        return;
1✔
280
      }
281
    }
282

283
    switch (newState) {
1✔
284
      case IDLE:
285
        // Shutdown when ready: connect from beginning when prompted
286
        addressIndex.reset();
1✔
287
        rawConnectivityState = IDLE;
1✔
288
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
289
        break;
1✔
290

291
      case CONNECTING:
292
        rawConnectivityState = CONNECTING;
1✔
293
        updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
294
        break;
1✔
295

296
      case READY:
297
        shutdownRemaining(subchannelData);
1✔
298
        addressIndex.seekTo(getAddress(subchannelData.subchannel));
1✔
299
        rawConnectivityState = READY;
1✔
300
        updateHealthCheckedState(subchannelData);
1✔
301
        break;
1✔
302

303
      case TRANSIENT_FAILURE:
304
        // If we are looking at current channel, request a connection if possible
305
        if (addressIndex.isValid()
1✔
306
            && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
1✔
307
          if (addressIndex.increment()) {
1✔
308
            cancelScheduleTask();
1✔
309
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
310
          } else {
311
            if (subchannels.size() >= addressIndex.size()) {
1✔
312
              scheduleBackoff();
1✔
313
            } else {
314
              // We must have done a seek to the middle of the list lets start over from the
315
              // beginning
316
              addressIndex.reset();
1✔
317
              requestConnection();
1✔
318
            }
319
          }
320
        }
321

322
        if (isPassComplete()) {
1✔
323
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
324
          updateBalancingState(TRANSIENT_FAILURE,
1✔
325
              new Picker(PickResult.withError(stateInfo.getStatus())));
1✔
326

327
          // Refresh Name Resolution, but only when all 3 conditions are met
328
          // * We are at the end of addressIndex
329
          // * have had status reported for all subchannels.
330
          // * And one of the following conditions:
331
          //    * Have had enough TF reported since we completed first pass
332
          //    * Just completed the first pass
333
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
334
            firstPass = false;
1✔
335
            numTf = 0;
1✔
336
            helper.refreshNameResolution();
1✔
337
          }
338
        }
339
        break;
340

341
      default:
342
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
343
    }
344
  }
1✔
345

346
  /**
347
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
348
   */
349
  private void scheduleBackoff() {
350
    if (!serializingRetries) {
1✔
351
      return;
1✔
352
    }
353

354
    class EndOfCurrentBackoff implements Runnable {
1✔
355
      @Override
356
      public void run() {
357
        reconnectTask = null;
1✔
358
        addressIndex.reset();
1✔
359
        requestConnection();
1✔
360
      }
1✔
361
    }
362

363
    // Just allow the previous one to trigger when ready if we're already in backoff
364
    if (reconnectTask != null) {
1✔
365
      return;
×
366
    }
367

368
    if (reconnectPolicy == null) {
1✔
369
      reconnectPolicy = bkoffPolProvider.get();
1✔
370
    }
371
    long delayNanos = reconnectPolicy.nextBackoffNanos();
1✔
372
    reconnectTask = helper.getSynchronizationContext().schedule(
1✔
373
        new EndOfCurrentBackoff(),
374
        delayNanos,
375
        TimeUnit.NANOSECONDS,
376
        helper.getScheduledExecutorService());
1✔
377
  }
1✔
378

379
  private void updateHealthCheckedState(SubchannelData subchannelData) {
380
    if (subchannelData.state != READY) {
1✔
381
      return;
1✔
382
    }
383

384
    if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
1✔
385
      updateBalancingState(READY,
1✔
386
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
387
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
388
      updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
1✔
389
          subchannelData.healthStateInfo.getStatus())));
1✔
390
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
391
      updateBalancingState(subchannelData.getHealthState(),
1✔
392
          new Picker(PickResult.withNoResult()));
1✔
393
    }
394
  }
1✔
395

396
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
397
    // an optimization: de-dup IDLE or CONNECTING notification.
398
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
399
      return;
1✔
400
    }
401
    concludedState = state;
1✔
402
    helper.updateBalancingState(state, picker);
1✔
403
  }
1✔
404

405
  @Override
406
  public void shutdown() {
407
    log.log(Level.FINE,
1✔
408
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
409
    rawConnectivityState = SHUTDOWN;
1✔
410
    concludedState = SHUTDOWN;
1✔
411
    cancelScheduleTask();
1✔
412
    if (reconnectTask != null) {
1✔
413
      reconnectTask.cancel();
1✔
414
      reconnectTask = null;
1✔
415
    }
416
    reconnectPolicy = null;
1✔
417

418
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
419
      subchannelData.getSubchannel().shutdown();
1✔
420
    }
1✔
421

422
    subchannels.clear();
1✔
423
  }
1✔
424

425
  /**
426
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
427
  * that all other subchannels must be shutdown.
428
  */
429
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
430
    if (reconnectTask != null) {
1✔
431
      reconnectTask.cancel();
1✔
432
      reconnectTask = null;
1✔
433
    }
434
    reconnectPolicy = null;
1✔
435

436
    cancelScheduleTask();
1✔
437
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
438
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
439
        subchannelData.getSubchannel().shutdown();
1✔
440
      }
441
    }
1✔
442
    subchannels.clear();
1✔
443
    activeSubchannelData.updateState(READY);
1✔
444
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
445
  }
1✔
446

447
  /**
448
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
449
   * Schedules a connection to next address in list as well.
450
   * If the current channel has already attempted a connection, we attempt a connection
451
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
452
   * return null.
453
   */
454
  @Override
455
  public void requestConnection() {
456
    if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
1✔
457
      return;
1✔
458
    }
459

460
    SocketAddress currentAddress = addressIndex.getCurrentAddress();
1✔
461
    SubchannelData subchannelData = subchannels.get(currentAddress);
1✔
462
    if (subchannelData == null) {
1✔
463
      subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
464
    }
465

466
    ConnectivityState subchannelState = subchannelData.getState();
1✔
467
    switch (subchannelState) {
1✔
468
      case IDLE:
469
        subchannelData.subchannel.requestConnection();
1✔
470
        subchannelData.updateState(CONNECTING);
1✔
471
        scheduleNextConnection();
1✔
472
        break;
1✔
473
      case CONNECTING:
474
        scheduleNextConnection();
1✔
475
        break;
1✔
476
      case TRANSIENT_FAILURE:
477
        if (!serializingRetries) {
1✔
478
          addressIndex.increment();
1✔
479
          requestConnection();
1✔
480
        } else {
481
          if (!addressIndex.isValid()) {
1✔
482
            scheduleBackoff();
×
483
          } else {
484
            subchannelData.subchannel.requestConnection();
1✔
485
            subchannelData.updateState(CONNECTING);
1✔
486
          }
487
        }
488
        break;
1✔
489
      default:
490
        // Wait for current subchannel to change state
491
    }
492
  }
1✔
493

494

495
  /**
496
  * Happy Eyeballs
497
  * Schedules connection attempt to happen after a delay to the next available address.
498
  */
499
  private void scheduleNextConnection() {
500
    if (!enableHappyEyeballs
1✔
501
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
502
      return;
1✔
503
    }
504

505
    class StartNextConnection implements Runnable {
1✔
506
      @Override
507
      public void run() {
508
        scheduleConnectionTask = null;
1✔
509
        if (addressIndex.increment()) {
1✔
510
          requestConnection();
1✔
511
        }
512
      }
1✔
513
    }
514

515
    scheduleConnectionTask = helper.getSynchronizationContext().schedule(
1✔
516
        new StartNextConnection(),
517
        CONNECTION_DELAY_INTERVAL_MS,
518
        TimeUnit.MILLISECONDS,
519
        helper.getScheduledExecutorService());
1✔
520
  }
1✔
521

522
  private void cancelScheduleTask() {
523
    if (scheduleConnectionTask != null) {
1✔
524
      scheduleConnectionTask.cancel();
1✔
525
      scheduleConnectionTask = null;
1✔
526
    }
527
  }
1✔
528

529
  private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
530
    HealthListener hcListener = new HealthListener();
1✔
531
    final Subchannel subchannel = helper.createSubchannel(
1✔
532
        CreateSubchannelArgs.newBuilder()
1✔
533
            .setAddresses(Lists.newArrayList(
1✔
534
                new EquivalentAddressGroup(addr, attrs)))
535
            .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
536
            .addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
1✔
537
            .build());
1✔
538
    if (subchannel == null) {
1✔
539
      log.warning("Was not able to create subchannel for " + addr);
×
540
      throw new IllegalStateException("Can't create subchannel");
×
541
    }
542
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
1✔
543
    hcListener.subchannelData = subchannelData;
1✔
544
    subchannels.put(addr, subchannelData);
1✔
545
    Attributes scAttrs = subchannel.getAttributes();
1✔
546
    if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
547
      subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
548
    }
549
    subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
1✔
550
    return subchannelData;
1✔
551
  }
552

553
  private boolean isPassComplete() {
554
    if (subchannels.size() < addressIndex.size()) {
1✔
555
      return false;
1✔
556
    }
557
    for (SubchannelData sc : subchannels.values()) {
1✔
558
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
559
        return false;
1✔
560
      }
561
    }
1✔
562
    return true;
1✔
563
  }
564

565
  private final class HealthListener implements SubchannelStateListener {
1✔
566
    private SubchannelData subchannelData;
567

568
    @Override
569
    public void onSubchannelState(ConnectivityStateInfo newState) {
570
      if (notAPetiolePolicy) {
1✔
571
        log.log(Level.WARNING,
1✔
572
            "Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
573
            new Object[]{newState, subchannelData.subchannel});
1✔
574
        return;
1✔
575
      }
576

577
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
578
          new Object[]{newState, subchannelData.subchannel});
1✔
579
      subchannelData.healthStateInfo = newState;
1✔
580
      if (addressIndex.isValid()
1✔
581
          && subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
1✔
582
        updateHealthCheckedState(subchannelData);
1✔
583
      }
584
    }
1✔
585
  }
586

587
  private SocketAddress getAddress(Subchannel subchannel) {
588
    return subchannel.getAddresses().getAddresses().get(0);
1✔
589
  }
590

591
  @VisibleForTesting
592
  ConnectivityState getConcludedConnectivityState() {
593
    return this.concludedState;
1✔
594
  }
595

596
  /**
597
   * No-op picker which doesn't add any custom picking logic. It just passes already known result
598
   * received in constructor.
599
   */
600
  private static final class Picker extends SubchannelPicker {
601
    private final PickResult result;
602

603
    Picker(PickResult result) {
1✔
604
      this.result = checkNotNull(result, "result");
1✔
605
    }
1✔
606

607
    @Override
608
    public PickResult pickSubchannel(PickSubchannelArgs args) {
609
      return result;
1✔
610
    }
611

612
    @Override
613
    public String toString() {
614
      return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
×
615
    }
616
  }
617

618
  /**
619
   * Picker that requests connection during the first pick, and returns noResult.
620
   */
621
  private final class RequestConnectionPicker extends SubchannelPicker {
622
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
623
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
624

625
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
626
      this.pickFirstLeafLoadBalancer =
1✔
627
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
628
    }
1✔
629

630
    @Override
631
    public PickResult pickSubchannel(PickSubchannelArgs args) {
632
      if (connectionRequested.compareAndSet(false, true)) {
1✔
633
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
634
      }
635
      return PickResult.withNoResult();
1✔
636
    }
637
  }
638

639
  /**
640
   * This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry.
641
   * All updates should be done in a synchronization context.
642
   */
643
  @VisibleForTesting
644
  static final class Index {
645
    private List<UnwrappedEag> orderedAddresses;
646
    private int activeElement = 0;
1✔
647
    private boolean enableHappyEyeballs;
648

649
    Index(List<EquivalentAddressGroup> groups, boolean enableHappyEyeballs) {
1✔
650
      this.enableHappyEyeballs = enableHappyEyeballs;
1✔
651
      updateGroups(groups);
1✔
652
    }
1✔
653

654
    public boolean isValid() {
655
      return activeElement < orderedAddresses.size();
1✔
656
    }
657

658
    public boolean isAtBeginning() {
659
      return activeElement == 0;
1✔
660
    }
661

662
    /**
663
     * Move to next address in group.  If last address in group move to first address of next group.
664
     * @return false if went off end of the list, otherwise true
665
     */
666
    public boolean increment() {
667
      if (!isValid()) {
1✔
668
        return false;
1✔
669
      }
670

671
      activeElement++;
1✔
672

673
      return isValid();
1✔
674
    }
675

676
    public void reset() {
677
      activeElement = 0;
1✔
678
    }
1✔
679

680
    public SocketAddress getCurrentAddress() {
681
      if (!isValid()) {
1✔
682
        throw new IllegalStateException("Index is past the end of the address group list");
1✔
683
      }
684
      return orderedAddresses.get(activeElement).address;
1✔
685
    }
686

687
    public Attributes getCurrentEagAttributes() {
688
      if (!isValid()) {
1✔
689
        throw new IllegalStateException("Index is off the end of the address group list");
×
690
      }
691
      return orderedAddresses.get(activeElement).attributes;
1✔
692
    }
693

694
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
695
      return Collections.singletonList(getCurrentEag());
1✔
696
    }
697

698
    private EquivalentAddressGroup getCurrentEag() {
699
      if (!isValid()) {
1✔
700
        throw new IllegalStateException("Index is past the end of the address group list");
×
701
      }
702
      return orderedAddresses.get(activeElement).asEag();
1✔
703
    }
704

705
    /**
706
     * Update to new groups, resetting the current index.
707
     */
708
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
709
      checkNotNull(newGroups, "newGroups");
1✔
710
      orderedAddresses = enableHappyEyeballs
1✔
711
                             ? updateGroupsHE(newGroups)
1✔
712
                             : updateGroupsNonHE(newGroups);
1✔
713
      reset();
1✔
714
    }
1✔
715

716
    /**
717
     * Returns false if the needle was not found and the current index was left unchanged.
718
     */
719
    public boolean seekTo(SocketAddress needle) {
720
      checkNotNull(needle, "needle");
1✔
721
      for (int i = 0; i < orderedAddresses.size(); i++) {
1✔
722
        if (orderedAddresses.get(i).address.equals(needle)) {
1✔
723
          this.activeElement = i;
1✔
724
          return true;
1✔
725
        }
726
      }
727
      return false;
1✔
728
    }
729

730
    public int size() {
731
      return orderedAddresses.size();
1✔
732
    }
733

734
    private List<UnwrappedEag> updateGroupsNonHE(List<EquivalentAddressGroup> newGroups) {
735
      List<UnwrappedEag> entries = new ArrayList<>();
1✔
736
      for (int g = 0; g < newGroups.size(); g++) {
1✔
737
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
738
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
739
          SocketAddress addr = eag.getAddresses().get(a);
1✔
740
          entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
741
        }
742
      }
743

744
      return entries;
1✔
745
    }
746

747
    private List<UnwrappedEag> updateGroupsHE(List<EquivalentAddressGroup> newGroups) {
748
      Boolean firstIsV6 = null;
1✔
749
      List<UnwrappedEag> v4Entries = new ArrayList<>();
1✔
750
      List<UnwrappedEag> v6Entries = new ArrayList<>();
1✔
751
      for (int g = 0; g <  newGroups.size(); g++) {
1✔
752
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
753
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
754
          SocketAddress addr = eag.getAddresses().get(a);
1✔
755
          boolean isIpV4 = addr instanceof InetSocketAddress
1✔
756
              && ((InetSocketAddress) addr).getAddress() instanceof Inet4Address;
1✔
757
          if (isIpV4) {
1✔
758
            if (firstIsV6 == null) {
1✔
759
              firstIsV6 = false;
1✔
760
            }
761
            v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
762
          } else {
763
            if (firstIsV6 == null) {
1✔
764
              firstIsV6 = true;
1✔
765
            }
766
            v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
767
          }
768
        }
769
      }
770

771
      return firstIsV6 != null && firstIsV6
1✔
772
          ? interleave(v6Entries, v4Entries)
1✔
773
          : interleave(v4Entries, v6Entries);
1✔
774
    }
775

776
    private List<UnwrappedEag> interleave(List<UnwrappedEag> firstFamily,
777
                                          List<UnwrappedEag> secondFamily) {
778
      if (firstFamily.isEmpty()) {
1✔
779
        return secondFamily;
1✔
780
      }
781
      if (secondFamily.isEmpty()) {
1✔
782
        return firstFamily;
1✔
783
      }
784

785
      List<UnwrappedEag> result = new ArrayList<>(firstFamily.size() + secondFamily.size());
1✔
786
      for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) {
1✔
787
        if (i < firstFamily.size()) {
1✔
788
          result.add(firstFamily.get(i));
1✔
789
        }
790
        if (i < secondFamily.size()) {
1✔
791
          result.add(secondFamily.get(i));
1✔
792
        }
793
      }
794
      return result;
1✔
795
    }
796

797
    private static final class UnwrappedEag {
798
      private final Attributes attributes;
799
      private final SocketAddress address;
800

801
      public UnwrappedEag(Attributes attributes, SocketAddress address) {
1✔
802
        this.attributes = attributes;
1✔
803
        this.address = address;
1✔
804
      }
1✔
805

806
      private EquivalentAddressGroup asEag() {
807
        return new EquivalentAddressGroup(address, attributes);
1✔
808
      }
809
    }
810
  }
811

812
  @VisibleForTesting
813
  int getIndexLocation() {
814
    return addressIndex.activeElement;
1✔
815
  }
816

817
  @VisibleForTesting
818
  boolean isIndexValid() {
819
    return addressIndex.isValid();
1✔
820
  }
821

822
  private static final class SubchannelData {
823
    private final Subchannel subchannel;
824
    private ConnectivityState state;
825
    private boolean completedConnectivityAttempt = false;
1✔
826
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
827

828
    public SubchannelData(Subchannel subchannel, ConnectivityState state) {
1✔
829
      this.subchannel = subchannel;
1✔
830
      this.state = state;
1✔
831
    }
1✔
832

833
    public Subchannel getSubchannel() {
834
      return this.subchannel;
1✔
835
    }
836

837
    public ConnectivityState getState() {
838
      return this.state;
1✔
839
    }
840

841
    public boolean isCompletedConnectivityAttempt() {
842
      return completedConnectivityAttempt;
1✔
843
    }
844

845
    private void updateState(ConnectivityState newState) {
846
      this.state = newState;
1✔
847
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
848
        completedConnectivityAttempt = true;
1✔
849
      } else if (newState == IDLE) {
1✔
850
        completedConnectivityAttempt = false;
1✔
851
      }
852
    }
1✔
853

854
    private ConnectivityState getHealthState() {
855
      return healthStateInfo.getState();
1✔
856
    }
857
  }
858

859
  public static final class PickFirstLeafLoadBalancerConfig {
860

861
    @Nullable
862
    public final Boolean shuffleAddressList;
863

864
    // For testing purposes only, not meant to be parsed from a real config.
865
    @Nullable
866
    final Long randomSeed;
867

868
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
869
      this(shuffleAddressList, null);
1✔
870
    }
1✔
871

872
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
873
        @Nullable Long randomSeed) {
1✔
874
      this.shuffleAddressList = shuffleAddressList;
1✔
875
      this.randomSeed = randomSeed;
1✔
876
    }
1✔
877
  }
878

879
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc