• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20160

28 Jan 2026 12:21AM UTC coverage: 88.71% (+0.04%) from 88.666%
#20160

push

github

ejona86
xds: Normalize weights before combining endpoint and locality weights

Previously, the number of endpoints in a locality would skew how much
traffic was sent to that locality. Also, if endpoints in localities had
wildly different weights, that would impact cross-locality weighting.

For example, consider:
  LocalityA weight=1 endpointWeights=[100, 100, 100, 100]
  LocalityB weight=1 endpointWeights=[1]

The endpoint in LocalityB should have an endpoint weight that is half
the total sum of endpoint weights, in order to receive half the traffic.
But the multiple endpoints in LocalityA would cause it to get 4x the
traffic and the endpoint weights in LocalityA causes them to get 100x
the traffic.

See gRFC A113

35415 of 39922 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.07
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.collect.ImmutableList;
28
import com.google.common.collect.Lists;
29
import com.google.errorprone.annotations.CheckReturnValue;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.ConnectivityStateInfo;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.InternalEquivalentAddressGroup;
35
import io.grpc.LoadBalancer;
36
import io.grpc.Status;
37
import io.grpc.SynchronizationContext.ScheduledHandle;
38
import java.net.Inet4Address;
39
import java.net.InetSocketAddress;
40
import java.net.SocketAddress;
41
import java.util.ArrayList;
42
import java.util.Collections;
43
import java.util.HashMap;
44
import java.util.HashSet;
45
import java.util.List;
46
import java.util.Map;
47
import java.util.Random;
48
import java.util.Set;
49
import java.util.concurrent.TimeUnit;
50
import java.util.concurrent.atomic.AtomicBoolean;
51
import java.util.logging.Level;
52
import java.util.logging.Logger;
53
import javax.annotation.Nullable;
54

55
/**
56
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
57
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
58
 * list and sticking to the first that works.
59
 */
60
final class PickFirstLeafLoadBalancer extends LoadBalancer {
61
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
62
  @VisibleForTesting
63
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
64
  private final boolean enableHappyEyeballs = !isSerializingRetries()
1✔
65
      && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
1✔
66
  static boolean weightedShuffling =
1✔
67
      GrpcUtil.getFlag("GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING", true);
1✔
68
  private final Helper helper;
69
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
70
  private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
1✔
71
  private int numTf = 0;
1✔
72
  private boolean firstPass = true;
1✔
73
  @Nullable
1✔
74
  private ScheduledHandle scheduleConnectionTask = null;
75
  private ConnectivityState rawConnectivityState = IDLE;
1✔
76
  private ConnectivityState concludedState = IDLE;
1✔
77
  private boolean notAPetiolePolicy = true; // means not under a petiole policy
1✔
78
  private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
1✔
79
  private BackoffPolicy reconnectPolicy;
80
  @Nullable
1✔
81
  private ScheduledHandle reconnectTask = null;
82
  private final boolean serializingRetries = isSerializingRetries();
1✔
83

84
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
85
    this.helper = checkNotNull(helper, "helper");
1✔
86
  }
1✔
87

88
  static boolean isSerializingRetries() {
89
    return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
1✔
90
  }
91

92
  @Override
93
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
94
    if (rawConnectivityState == SHUTDOWN) {
1✔
95
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
96
    }
97

98
    // Check whether this is a petiole policy, which is based off of an address attribute
99
    Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
1✔
100
    this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
1✔
101

102
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
103

104
    // Validate the address list
105
    if (servers.isEmpty()) {
1✔
106
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
107
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
108
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
109
      handleNameResolutionError(unavailableStatus);
1✔
110
      return unavailableStatus;
1✔
111
    }
112
    for (EquivalentAddressGroup eag : servers) {
1✔
113
      if (eag == null) {
1✔
114
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
115
            "NameResolver returned address list with null endpoint. addrs="
116
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
117
                + resolvedAddresses.getAttributes());
1✔
118
        handleNameResolutionError(unavailableStatus);
1✔
119
        return unavailableStatus;
1✔
120
      }
121
    }
1✔
122

123
    // Since we have a new set of addresses, we are again at first pass
124
    firstPass = true;
1✔
125

126
    List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
1✔
127

128
    // We can optionally be configured to shuffle the address list. This can help better distribute
129
    // the load.
130
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
131
        instanceof PickFirstLeafLoadBalancerConfig) {
132
      PickFirstLeafLoadBalancerConfig config
1✔
133
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
134
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
135
        cleanServers = shuffle(
1✔
136
            cleanServers, config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
137
      }
138
    }
139

140
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
1✔
141
        ImmutableList.copyOf(cleanServers);
1✔
142

143
    if (rawConnectivityState == READY
1✔
144
        || (rawConnectivityState == CONNECTING
145
          && (!enableHappyEyeballs || addressIndex.isValid()))) {
1✔
146
      // If the previous ready (or connecting) subchannel exists in new address list,
147
      // keep this connection and don't create new subchannels. Happy Eyeballs is excluded when
148
      // connecting, because it allows multiple attempts simultaneously, thus is fine to start at
149
      // the beginning.
150
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
151
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
152
      if (addressIndex.seekTo(previousAddress)) {
1✔
153
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
154
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
155
        shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
156
        return Status.OK;
1✔
157
      }
158
      // Previous ready subchannel not in the new list of addresses
159
    } else {
1✔
160
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
161
    }
162

163
    // No old addresses means first time through, so we will do an explicit move to CONNECTING
164
    // which is what we implicitly started with
165
    boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
166

167
    if (noOldAddrs) {
1✔
168
      // Make tests happy; they don't properly assume starting in CONNECTING
169
      rawConnectivityState = CONNECTING;
1✔
170
      updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
171
    }
172

173
    if (rawConnectivityState == READY) {
1✔
174
      // connect from beginning when prompted
175
      rawConnectivityState = IDLE;
1✔
176
      updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
177

178
    } else if (rawConnectivityState == CONNECTING || rawConnectivityState == TRANSIENT_FAILURE) {
1✔
179
      // start connection attempt at first address
180
      cancelScheduleTask();
1✔
181
      requestConnection();
1✔
182
    }
183

184
    return Status.OK;
1✔
185
  }
186

187
  /**
188
   * Compute the difference between the flattened new addresses and the old addresses that had been
189
   * made into subchannels and then shutdown the matching subchannels.
190
   * @return true if there were no old addresses
191
   */
192
  private boolean shutdownRemovedAddresses(
193
      ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {
194

195
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
196

197
    // Flatten the new EAGs addresses
198
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
199
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
200
      newAddrs.addAll(endpoint.getAddresses());
1✔
201
    }
1✔
202

203
    // Shut them down and remove them
204
    for (SocketAddress oldAddr : oldAddrs) {
1✔
205
      if (!newAddrs.contains(oldAddr)) {
1✔
206
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
207
      }
208
    }
1✔
209
    return oldAddrs.isEmpty();
1✔
210
  }
211

212
  private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
213
    Set<SocketAddress> seenAddresses = new HashSet<>();
1✔
214
    List<EquivalentAddressGroup> newGroups = new ArrayList<>();
1✔
215

216
    for (EquivalentAddressGroup group : groups) {
1✔
217
      List<SocketAddress> addrs = new ArrayList<>();
1✔
218
      for (SocketAddress addr : group.getAddresses()) {
1✔
219
        if (seenAddresses.add(addr)) {
1✔
220
          addrs.add(addr);
1✔
221
        }
222
      }
1✔
223
      if (!addrs.isEmpty()) {
1✔
224
        newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
1✔
225
      }
226
    }
1✔
227

228
    return newGroups;
1✔
229
  }
230

231
  // Also used by PickFirstLoadBalancer
232
  @CheckReturnValue
233
  static List<EquivalentAddressGroup> shuffle(List<EquivalentAddressGroup> eags, Random random) {
234
    if (weightedShuffling) {
1✔
235
      List<WeightEntry> weightedEntries = new ArrayList<>(eags.size());
1✔
236
      for (EquivalentAddressGroup eag : eags) {
1✔
237
        weightedEntries.add(new WeightEntry(eag, eagToWeight(eag, random)));
1✔
238
      }
1✔
239
      Collections.sort(weightedEntries, Collections.reverseOrder() /* descending */);
1✔
240
      return Lists.transform(weightedEntries, entry -> entry.eag);
1✔
241
    } else {
242
      List<EquivalentAddressGroup> eagsCopy = new ArrayList<>(eags);
1✔
243
      Collections.shuffle(eagsCopy, random);
1✔
244
      return eagsCopy;
1✔
245
    }
246
  }
247

248
  private static double eagToWeight(EquivalentAddressGroup eag, Random random) {
249
    Long weight = eag.getAttributes().get(InternalEquivalentAddressGroup.ATTR_WEIGHT);
1✔
250
    if (weight == null) {
1✔
251
      weight = 1L;
1✔
252
    }
253
    return Math.pow(random.nextDouble(), 1.0 / weight);
1✔
254
  }
255

256
  private static final class WeightEntry implements Comparable<WeightEntry> {
257
    final EquivalentAddressGroup eag;
258
    final double weight;
259

260
    public WeightEntry(EquivalentAddressGroup eag, double weight) {
1✔
261
      this.eag = eag;
1✔
262
      this.weight = weight;
1✔
263
    }
1✔
264

265
    @Override
266
    public int compareTo(WeightEntry entry) {
267
      return Double.compare(this.weight, entry.weight);
1✔
268
    }
269
  }
270

271
  @Override
272
  public void handleNameResolutionError(Status error) {
273
    if (rawConnectivityState == SHUTDOWN) {
1✔
274
      return;
×
275
    }
276

277
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
278
      subchannelData.getSubchannel().shutdown();
1✔
279
    }
1✔
280
    subchannels.clear();
1✔
281
    addressIndex.updateGroups(ImmutableList.of());
1✔
282
    rawConnectivityState = TRANSIENT_FAILURE;
1✔
283
    updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
284
  }
1✔
285

286
  void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
287
    ConnectivityState newState = stateInfo.getState();
1✔
288

289
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
290
    // To prevent pickers from returning these obsolete subchannels, this logic
291
    // is included to check if the current list of active subchannels includes this subchannel.
292
    if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
1✔
293
      return;
1✔
294
    }
295

296
    if (newState == SHUTDOWN) {
1✔
297
      return;
1✔
298
    }
299

300
    if (newState == IDLE && subchannelData.state == READY) {
1✔
301
      helper.refreshNameResolution();
1✔
302
    }
303

304
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
305
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
306
    // transient failure". Only a subchannel state change to READY will get the LB out of
307
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
308
    // keep retrying for a connection.
309

310
    // With the new pick first implementation, individual subchannels will have their own backoff
311
    // on a per-address basis. Thus, iterative requests for connections will not be requested
312
    // once the first pass through is complete.
313
    // However, every time there is an address update, we will perform a pass through for the new
314
    // addresses in the updated list.
315
    subchannelData.updateState(newState);
1✔
316
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
317
      if (newState == CONNECTING) {
1✔
318
        // each subchannel is responsible for its own backoff
319
        return;
1✔
320
      } else if (newState == IDLE) {
1✔
321
        requestConnection();
1✔
322
        return;
1✔
323
      }
324
    }
325

326
    switch (newState) {
1✔
327
      case IDLE:
328
        // Shutdown when ready: connect from beginning when prompted
329
        addressIndex.reset();
1✔
330
        rawConnectivityState = IDLE;
1✔
331
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
332
        break;
1✔
333

334
      case CONNECTING:
335
        rawConnectivityState = CONNECTING;
1✔
336
        updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
337
        break;
1✔
338

339
      case READY:
340
        shutdownRemaining(subchannelData);
1✔
341
        addressIndex.seekTo(getAddress(subchannelData.subchannel));
1✔
342
        rawConnectivityState = READY;
1✔
343
        updateHealthCheckedState(subchannelData);
1✔
344
        break;
1✔
345

346
      case TRANSIENT_FAILURE:
347
        // If we are looking at current channel, request a connection if possible
348
        if (addressIndex.isValid()
1✔
349
            && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
1✔
350
          if (addressIndex.increment()) {
1✔
351
            cancelScheduleTask();
1✔
352
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
353
          } else {
354
            if (subchannels.size() >= addressIndex.size()) {
1✔
355
              scheduleBackoff();
1✔
356
            } else {
357
              // We must have done a seek to the middle of the list lets start over from the
358
              // beginning
359
              addressIndex.reset();
1✔
360
              requestConnection();
1✔
361
            }
362
          }
363
        }
364

365
        if (isPassComplete()) {
1✔
366
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
367
          updateBalancingState(TRANSIENT_FAILURE,
1✔
368
              new FixedResultPicker(PickResult.withError(stateInfo.getStatus())));
1✔
369

370
          // Refresh Name Resolution, but only when all 3 conditions are met
371
          // * We are at the end of addressIndex
372
          // * have had status reported for all subchannels.
373
          // * And one of the following conditions:
374
          //    * Have had enough TF reported since we completed first pass
375
          //    * Just completed the first pass
376
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
377
            firstPass = false;
1✔
378
            numTf = 0;
1✔
379
            helper.refreshNameResolution();
1✔
380
          }
381
        }
382
        break;
383

384
      default:
385
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
386
    }
387
  }
1✔
388

389
  /**
390
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
391
   */
392
  private void scheduleBackoff() {
393
    if (!serializingRetries) {
1✔
394
      return;
1✔
395
    }
396

397
    class EndOfCurrentBackoff implements Runnable {
1✔
398
      @Override
399
      public void run() {
400
        reconnectTask = null;
1✔
401
        addressIndex.reset();
1✔
402
        requestConnection();
1✔
403
      }
1✔
404
    }
405

406
    // Just allow the previous one to trigger when ready if we're already in backoff
407
    if (reconnectTask != null) {
1✔
408
      return;
×
409
    }
410

411
    if (reconnectPolicy == null) {
1✔
412
      reconnectPolicy = bkoffPolProvider.get();
1✔
413
    }
414
    long delayNanos = reconnectPolicy.nextBackoffNanos();
1✔
415
    reconnectTask = helper.getSynchronizationContext().schedule(
1✔
416
        new EndOfCurrentBackoff(),
417
        delayNanos,
418
        TimeUnit.NANOSECONDS,
419
        helper.getScheduledExecutorService());
1✔
420
  }
1✔
421

422
  private void updateHealthCheckedState(SubchannelData subchannelData) {
423
    if (subchannelData.state != READY) {
1✔
424
      return;
1✔
425
    }
426

427
    if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
1✔
428
      updateBalancingState(READY,
1✔
429
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
430
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
431
      updateBalancingState(TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(
1✔
432
          subchannelData.healthStateInfo.getStatus())));
1✔
433
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
434
      updateBalancingState(subchannelData.getHealthState(),
1✔
435
          new FixedResultPicker(PickResult.withNoResult()));
1✔
436
    }
437
  }
1✔
438

439
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
440
    // an optimization: de-dup IDLE or CONNECTING notification.
441
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
442
      return;
1✔
443
    }
444
    concludedState = state;
1✔
445
    helper.updateBalancingState(state, picker);
1✔
446
  }
1✔
447

448
  @Override
449
  public void shutdown() {
450
    log.log(Level.FINE,
1✔
451
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
452
    rawConnectivityState = SHUTDOWN;
1✔
453
    concludedState = SHUTDOWN;
1✔
454
    cancelScheduleTask();
1✔
455
    if (reconnectTask != null) {
1✔
456
      reconnectTask.cancel();
1✔
457
      reconnectTask = null;
1✔
458
    }
459
    reconnectPolicy = null;
1✔
460

461
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
462
      subchannelData.getSubchannel().shutdown();
1✔
463
    }
1✔
464

465
    subchannels.clear();
1✔
466
  }
1✔
467

468
  /**
469
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
470
  * that all other subchannels must be shutdown.
471
  */
472
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
473
    if (reconnectTask != null) {
1✔
474
      reconnectTask.cancel();
1✔
475
      reconnectTask = null;
1✔
476
    }
477
    reconnectPolicy = null;
1✔
478

479
    cancelScheduleTask();
1✔
480
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
481
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
482
        subchannelData.getSubchannel().shutdown();
1✔
483
      }
484
    }
1✔
485
    subchannels.clear();
1✔
486
    activeSubchannelData.updateState(READY);
1✔
487
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
488
  }
1✔
489

490
  /**
491
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
492
   * Schedules a connection to next address in list as well.
493
   * If the current channel has already attempted a connection, we attempt a connection
494
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
495
   * return null.
496
   */
497
  @Override
498
  public void requestConnection() {
499
    if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
1✔
500
      return;
1✔
501
    }
502

503
    SocketAddress currentAddress = addressIndex.getCurrentAddress();
1✔
504
    SubchannelData subchannelData = subchannels.get(currentAddress);
1✔
505
    if (subchannelData == null) {
1✔
506
      subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
507
    }
508

509
    ConnectivityState subchannelState = subchannelData.getState();
1✔
510
    switch (subchannelState) {
1✔
511
      case IDLE:
512
        subchannelData.subchannel.requestConnection();
1✔
513
        subchannelData.updateState(CONNECTING);
1✔
514
        scheduleNextConnection();
1✔
515
        break;
1✔
516
      case CONNECTING:
517
        scheduleNextConnection();
1✔
518
        break;
1✔
519
      case TRANSIENT_FAILURE:
520
        if (!serializingRetries) {
1✔
521
          addressIndex.increment();
1✔
522
          requestConnection();
1✔
523
        } else {
524
          if (!addressIndex.isValid()) {
1✔
525
            scheduleBackoff();
×
526
          } else {
527
            subchannelData.subchannel.requestConnection();
1✔
528
            subchannelData.updateState(CONNECTING);
1✔
529
          }
530
        }
531
        break;
1✔
532
      default:
533
        // Wait for current subchannel to change state
534
    }
535
  }
1✔
536

537

538
  /**
539
  * Happy Eyeballs
540
  * Schedules connection attempt to happen after a delay to the next available address.
541
  */
542
  private void scheduleNextConnection() {
543
    if (!enableHappyEyeballs
1✔
544
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
545
      return;
1✔
546
    }
547

548
    class StartNextConnection implements Runnable {
1✔
549
      @Override
550
      public void run() {
551
        scheduleConnectionTask = null;
1✔
552
        if (addressIndex.increment()) {
1✔
553
          requestConnection();
1✔
554
        }
555
      }
1✔
556
    }
557

558
    scheduleConnectionTask = helper.getSynchronizationContext().schedule(
1✔
559
        new StartNextConnection(),
560
        CONNECTION_DELAY_INTERVAL_MS,
561
        TimeUnit.MILLISECONDS,
562
        helper.getScheduledExecutorService());
1✔
563
  }
1✔
564

565
  private void cancelScheduleTask() {
566
    if (scheduleConnectionTask != null) {
1✔
567
      scheduleConnectionTask.cancel();
1✔
568
      scheduleConnectionTask = null;
1✔
569
    }
570
  }
1✔
571

572
  private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
573
    HealthListener hcListener = new HealthListener();
1✔
574
    final Subchannel subchannel = helper.createSubchannel(
1✔
575
        CreateSubchannelArgs.newBuilder()
1✔
576
            .setAddresses(Lists.newArrayList(
1✔
577
                new EquivalentAddressGroup(addr, attrs)))
578
            .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
579
            .addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
1✔
580
            .build());
1✔
581
    if (subchannel == null) {
1✔
582
      log.warning("Was not able to create subchannel for " + addr);
×
583
      throw new IllegalStateException("Can't create subchannel");
×
584
    }
585
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
1✔
586
    hcListener.subchannelData = subchannelData;
1✔
587
    subchannels.put(addr, subchannelData);
1✔
588
    Attributes scAttrs = subchannel.getAttributes();
1✔
589
    if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
590
      subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
591
    }
592
    subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
1✔
593
    return subchannelData;
1✔
594
  }
595

596
  private boolean isPassComplete() {
597
    if (subchannels.size() < addressIndex.size()) {
1✔
598
      return false;
1✔
599
    }
600
    for (SubchannelData sc : subchannels.values()) {
1✔
601
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
602
        return false;
1✔
603
      }
604
    }
1✔
605
    return true;
1✔
606
  }
607

608
  private final class HealthListener implements SubchannelStateListener {
1✔
609
    private SubchannelData subchannelData;
610

611
    @Override
612
    public void onSubchannelState(ConnectivityStateInfo newState) {
613
      if (notAPetiolePolicy) {
1✔
614
        log.log(Level.WARNING,
1✔
615
            "Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
616
            new Object[]{newState, subchannelData.subchannel});
1✔
617
        return;
1✔
618
      }
619

620
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
621
          new Object[]{newState, subchannelData.subchannel});
1✔
622
      subchannelData.healthStateInfo = newState;
1✔
623
      if (addressIndex.isValid()
1✔
624
          && subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
1✔
625
        updateHealthCheckedState(subchannelData);
1✔
626
      }
627
    }
1✔
628
  }
629

630
  private SocketAddress getAddress(Subchannel subchannel) {
631
    return subchannel.getAddresses().getAddresses().get(0);
1✔
632
  }
633

634
  @VisibleForTesting
635
  ConnectivityState getConcludedConnectivityState() {
636
    return this.concludedState;
1✔
637
  }
638

639
  /**
640
   * Picker that requests connection during the first pick, and returns noResult.
641
   */
642
  private final class RequestConnectionPicker extends SubchannelPicker {
643
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
644
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
645

646
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
647
      this.pickFirstLeafLoadBalancer =
1✔
648
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
649
    }
1✔
650

651
    @Override
652
    public PickResult pickSubchannel(PickSubchannelArgs args) {
653
      if (connectionRequested.compareAndSet(false, true)) {
1✔
654
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
655
      }
656
      return PickResult.withNoResult();
1✔
657
    }
658
  }
659

660
  /**
661
   * This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry.
662
   * All updates should be done in a synchronization context.
663
   */
664
  @VisibleForTesting
665
  static final class Index {
666
    private List<UnwrappedEag> orderedAddresses;
667
    private int activeElement = 0;
1✔
668
    private boolean enableHappyEyeballs;
669

670
    Index(List<EquivalentAddressGroup> groups, boolean enableHappyEyeballs) {
1✔
671
      this.enableHappyEyeballs = enableHappyEyeballs;
1✔
672
      updateGroups(groups);
1✔
673
    }
1✔
674

675
    public boolean isValid() {
676
      return activeElement < orderedAddresses.size();
1✔
677
    }
678

679
    public boolean isAtBeginning() {
680
      return activeElement == 0;
1✔
681
    }
682

683
    /**
684
     * Move to next address in group.  If last address in group move to first address of next group.
685
     * @return false if went off end of the list, otherwise true
686
     */
687
    public boolean increment() {
688
      if (!isValid()) {
1✔
689
        return false;
1✔
690
      }
691

692
      activeElement++;
1✔
693

694
      return isValid();
1✔
695
    }
696

697
    public void reset() {
698
      activeElement = 0;
1✔
699
    }
1✔
700

701
    public SocketAddress getCurrentAddress() {
702
      if (!isValid()) {
1✔
703
        throw new IllegalStateException("Index is past the end of the address group list");
1✔
704
      }
705
      return orderedAddresses.get(activeElement).address;
1✔
706
    }
707

708
    public Attributes getCurrentEagAttributes() {
709
      if (!isValid()) {
1✔
710
        throw new IllegalStateException("Index is off the end of the address group list");
×
711
      }
712
      return orderedAddresses.get(activeElement).attributes;
1✔
713
    }
714

715
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
716
      return Collections.singletonList(getCurrentEag());
1✔
717
    }
718

719
    private EquivalentAddressGroup getCurrentEag() {
720
      if (!isValid()) {
1✔
721
        throw new IllegalStateException("Index is past the end of the address group list");
×
722
      }
723
      return orderedAddresses.get(activeElement).asEag();
1✔
724
    }
725

726
    /**
727
     * Update to new groups, resetting the current index.
728
     */
729
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
730
      checkNotNull(newGroups, "newGroups");
1✔
731
      orderedAddresses = enableHappyEyeballs
1✔
732
                             ? updateGroupsHE(newGroups)
1✔
733
                             : updateGroupsNonHE(newGroups);
1✔
734
      reset();
1✔
735
    }
1✔
736

737
    /**
738
     * Returns false if the needle was not found and the current index was left unchanged.
739
     */
740
    public boolean seekTo(SocketAddress needle) {
741
      checkNotNull(needle, "needle");
1✔
742
      for (int i = 0; i < orderedAddresses.size(); i++) {
1✔
743
        if (orderedAddresses.get(i).address.equals(needle)) {
1✔
744
          this.activeElement = i;
1✔
745
          return true;
1✔
746
        }
747
      }
748
      return false;
1✔
749
    }
750

751
    public int size() {
752
      return orderedAddresses.size();
1✔
753
    }
754

755
    private List<UnwrappedEag> updateGroupsNonHE(List<EquivalentAddressGroup> newGroups) {
756
      List<UnwrappedEag> entries = new ArrayList<>();
1✔
757
      for (int g = 0; g < newGroups.size(); g++) {
1✔
758
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
759
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
760
          SocketAddress addr = eag.getAddresses().get(a);
1✔
761
          entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
762
        }
763
      }
764

765
      return entries;
1✔
766
    }
767

768
    private List<UnwrappedEag> updateGroupsHE(List<EquivalentAddressGroup> newGroups) {
769
      Boolean firstIsV6 = null;
1✔
770
      List<UnwrappedEag> v4Entries = new ArrayList<>();
1✔
771
      List<UnwrappedEag> v6Entries = new ArrayList<>();
1✔
772
      for (int g = 0; g <  newGroups.size(); g++) {
1✔
773
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
774
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
775
          SocketAddress addr = eag.getAddresses().get(a);
1✔
776
          boolean isIpV4 = addr instanceof InetSocketAddress
1✔
777
              && ((InetSocketAddress) addr).getAddress() instanceof Inet4Address;
1✔
778
          if (isIpV4) {
1✔
779
            if (firstIsV6 == null) {
1✔
780
              firstIsV6 = false;
1✔
781
            }
782
            v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
783
          } else {
784
            if (firstIsV6 == null) {
1✔
785
              firstIsV6 = true;
1✔
786
            }
787
            v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
788
          }
789
        }
790
      }
791

792
      return firstIsV6 != null && firstIsV6
1✔
793
          ? interleave(v6Entries, v4Entries)
1✔
794
          : interleave(v4Entries, v6Entries);
1✔
795
    }
796

797
    private List<UnwrappedEag> interleave(List<UnwrappedEag> firstFamily,
798
                                          List<UnwrappedEag> secondFamily) {
799
      if (firstFamily.isEmpty()) {
1✔
800
        return secondFamily;
1✔
801
      }
802
      if (secondFamily.isEmpty()) {
1✔
803
        return firstFamily;
1✔
804
      }
805

806
      List<UnwrappedEag> result = new ArrayList<>(firstFamily.size() + secondFamily.size());
1✔
807
      for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) {
1✔
808
        if (i < firstFamily.size()) {
1✔
809
          result.add(firstFamily.get(i));
1✔
810
        }
811
        if (i < secondFamily.size()) {
1✔
812
          result.add(secondFamily.get(i));
1✔
813
        }
814
      }
815
      return result;
1✔
816
    }
817

818
    private static final class UnwrappedEag {
819
      private final Attributes attributes;
820
      private final SocketAddress address;
821

822
      public UnwrappedEag(Attributes attributes, SocketAddress address) {
1✔
823
        this.attributes = attributes;
1✔
824
        this.address = address;
1✔
825
      }
1✔
826

827
      private EquivalentAddressGroup asEag() {
828
        return new EquivalentAddressGroup(address, attributes);
1✔
829
      }
830
    }
831
  }
832

833
  @VisibleForTesting
834
  int getIndexLocation() {
835
    return addressIndex.activeElement;
1✔
836
  }
837

838
  @VisibleForTesting
839
  boolean isIndexValid() {
840
    return addressIndex.isValid();
1✔
841
  }
842

843
  private static final class SubchannelData {
844
    private final Subchannel subchannel;
845
    private ConnectivityState state;
846
    private boolean completedConnectivityAttempt = false;
1✔
847
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
848

849
    public SubchannelData(Subchannel subchannel, ConnectivityState state) {
1✔
850
      this.subchannel = subchannel;
1✔
851
      this.state = state;
1✔
852
    }
1✔
853

854
    public Subchannel getSubchannel() {
855
      return this.subchannel;
1✔
856
    }
857

858
    public ConnectivityState getState() {
859
      return this.state;
1✔
860
    }
861

862
    public boolean isCompletedConnectivityAttempt() {
863
      return completedConnectivityAttempt;
1✔
864
    }
865

866
    private void updateState(ConnectivityState newState) {
867
      this.state = newState;
1✔
868
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
869
        completedConnectivityAttempt = true;
1✔
870
      } else if (newState == IDLE) {
1✔
871
        completedConnectivityAttempt = false;
1✔
872
      }
873
    }
1✔
874

875
    private ConnectivityState getHealthState() {
876
      return healthStateInfo.getState();
1✔
877
    }
878
  }
879

880
  public static final class PickFirstLeafLoadBalancerConfig {
881

882
    @Nullable
883
    public final Boolean shuffleAddressList;
884

885
    // For testing purposes only, not meant to be parsed from a real config.
886
    @Nullable
887
    final Long randomSeed;
888

889
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
890
      this(shuffleAddressList, null);
1✔
891
    }
1✔
892

893
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
894
        @Nullable Long randomSeed) {
1✔
895
      this.shuffleAddressList = shuffleAddressList;
1✔
896
      this.randomSeed = randomSeed;
1✔
897
    }
1✔
898
  }
899

900
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc