• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19109

21 Mar 2024 11:59PM UTC coverage: 88.287% (+0.01%) from 88.277%
#19109

push

github

web-flow
Enable Happy Eyeballs by default (#11022)

* Flip the flag

* Fix test flakiness where IPv6 was not considered loopback

31198 of 35337 relevant lines covered (88.29%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.76
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.Lists;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.ConnectivityStateInfo;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.ExperimentalApi;
35
import io.grpc.LoadBalancer;
36
import io.grpc.Status;
37
import io.grpc.SynchronizationContext;
38
import io.grpc.SynchronizationContext.ScheduledHandle;
39
import java.net.SocketAddress;
40
import java.util.ArrayList;
41
import java.util.Collections;
42
import java.util.HashMap;
43
import java.util.HashSet;
44
import java.util.List;
45
import java.util.Map;
46
import java.util.Random;
47
import java.util.Set;
48
import java.util.concurrent.TimeUnit;
49
import java.util.concurrent.atomic.AtomicBoolean;
50
import java.util.logging.Level;
51
import java.util.logging.Logger;
52
import javax.annotation.Nullable;
53

54
/**
55
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
56
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
57
 * list and sticking to the first that works.
58
 */
59
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10383")
60
final class PickFirstLeafLoadBalancer extends LoadBalancer {
61
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
62
  @VisibleForTesting
63
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
64
  public static final String GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS =
65
      "GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS";
66
  private final Helper helper;
67
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
68
  private Index addressIndex;
69
  private int numTf = 0;
1✔
70
  private boolean firstPass = true;
1✔
71
  @Nullable
72
  private ScheduledHandle scheduleConnectionTask;
73
  private ConnectivityState rawConnectivityState = IDLE;
1✔
74
  private ConnectivityState concludedState = IDLE;
1✔
75
  private final boolean enableHappyEyeballs =
1✔
76
      GrpcUtil.getFlag(GRPC_EXPERIMENTAL_XDS_DUALSTACK_ENDPOINTS, true);
1✔
77

78
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
79
    this.helper = checkNotNull(helper, "helper");
1✔
80
  }
1✔
81

82
  @Override
83
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
84
    if (rawConnectivityState == SHUTDOWN) {
1✔
85
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
86
    }
87

88
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
89

90
    // Validate the address list
91
    if (servers.isEmpty()) {
1✔
92
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
93
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
94
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
95
      handleNameResolutionError(unavailableStatus);
1✔
96
      return unavailableStatus;
1✔
97
    }
98
    for (EquivalentAddressGroup eag : servers) {
1✔
99
      if (eag == null) {
1✔
100
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
101
            "NameResolver returned address list with null endpoint. addrs="
102
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
103
                + resolvedAddresses.getAttributes());
1✔
104
        handleNameResolutionError(unavailableStatus);
1✔
105
        return unavailableStatus;
1✔
106
      }
107
    }
1✔
108

109
    // Since we have a new set of addresses, we are again at first pass
110
    firstPass = true;
1✔
111

112
    // We can optionally be configured to shuffle the address list. This can help better distribute
113
    // the load.
114
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
115
        instanceof PickFirstLeafLoadBalancerConfig) {
116
      PickFirstLeafLoadBalancerConfig config
1✔
117
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
118
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
119
        servers = new ArrayList<>(servers);
1✔
120
        Collections.shuffle(servers,
1✔
121
            config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
122
      }
123
    }
124

125
    // Make sure we're storing our own list rather than what was passed in
126
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
127
        ImmutableList.<EquivalentAddressGroup>builder().addAll(servers).build();
1✔
128

129
    if (addressIndex == null) {
1✔
130
      addressIndex = new Index(newImmutableAddressGroups);
1✔
131
    } else if (rawConnectivityState == READY) {
1✔
132
      // If the previous ready subchannel exists in new address list,
133
      // keep this connection and don't create new subchannels
134
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
135
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
136
      if (addressIndex.seekTo(previousAddress)) {
1✔
137
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
138
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
139
        return Status.OK;
1✔
140
      } else {
141
        addressIndex.reset(); // Previous ready subchannel not in the new list of addresses
1✔
142
      }
143
    } else {
1✔
144
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
145
    }
146

147
    // remove old subchannels that were not in new address list
148
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
149

150
    // Flatten the new EAGs addresses
151
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
152
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
153
      newAddrs.addAll(endpoint.getAddresses());
1✔
154
    }
1✔
155

156
    // Shut them down and remove them
157
    for (SocketAddress oldAddr : oldAddrs) {
1✔
158
      if (!newAddrs.contains(oldAddr)) {
1✔
159
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
160
      }
161
    }
1✔
162

163
    if (oldAddrs.size() == 0 || rawConnectivityState == CONNECTING
1✔
164
        || rawConnectivityState == READY) {
165
      // start connection attempt at first address
166
      rawConnectivityState = CONNECTING;
1✔
167
      updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
168
      cancelScheduleTask();
1✔
169
      requestConnection();
1✔
170

171
    } else if (rawConnectivityState == IDLE) {
1✔
172
      // start connection attempt at first address when requested
173
      SubchannelPicker picker = new RequestConnectionPicker(this);
1✔
174
      updateBalancingState(IDLE, picker);
1✔
175

176
    } else if (rawConnectivityState == TRANSIENT_FAILURE) {
1✔
177
      // start connection attempt at first address
178
      cancelScheduleTask();
1✔
179
      requestConnection();
1✔
180
    }
181

182
    return Status.OK;
1✔
183
  }
184

185
  @Override
186
  public void handleNameResolutionError(Status error) {
187
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
188
      subchannelData.getSubchannel().shutdown();
1✔
189
    }
1✔
190
    subchannels.clear();
1✔
191
    updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
1✔
192
  }
1✔
193

194
  void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo) {
195
    ConnectivityState newState = stateInfo.getState();
1✔
196
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
197
    // To prevent pickers from returning these obsolete subchannels, this logic
198
    // is included to check if the current list of active subchannels includes this subchannel.
199
    SubchannelData subchannelData = subchannels.get(getAddress(subchannel));
1✔
200
    if (subchannelData == null || subchannelData.getSubchannel() != subchannel) {
1✔
201
      return;
1✔
202
    }
203
    if (newState == SHUTDOWN) {
1✔
204
      return;
×
205
    }
206

207
    if (newState == IDLE) {
1✔
208
      helper.refreshNameResolution();
1✔
209
    }
210
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
211
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
212
    // transient failure". Only a subchannel state change to READY will get the LB out of
213
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
214
    // keep retrying for a connection.
215

216
    // With the new pick first implementation, individual subchannels will have their own backoff
217
    // on a per-address basis. Thus, iterative requests for connections will not be requested
218
    // once the first pass through is complete.
219
    // However, every time there is an address update, we will perform a pass through for the new
220
    // addresses in the updated list.
221
    subchannelData.updateState(newState);
1✔
222
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
223
      if (newState == CONNECTING) {
1✔
224
        // each subchannel is responsible for its own backoff
225
        return;
1✔
226
      } else if (newState == IDLE) {
1✔
227
        requestConnection();
1✔
228
        return;
1✔
229
      }
230
    }
231

232
    switch (newState) {
1✔
233
      case IDLE:
234
        // Shutdown when ready: connect from beginning when prompted
235
        addressIndex.reset();
1✔
236
        rawConnectivityState = IDLE;
1✔
237
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
238
        break;
1✔
239

240
      case CONNECTING:
241
        rawConnectivityState = CONNECTING;
1✔
242
        updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
243
        break;
1✔
244

245
      case READY:
246
        shutdownRemaining(subchannelData);
1✔
247
        addressIndex.seekTo(getAddress(subchannel));
1✔
248
        rawConnectivityState = READY;
1✔
249
        updateHealthCheckedState(subchannelData);
1✔
250
        break;
1✔
251

252
      case TRANSIENT_FAILURE:
253
        // If we are looking at current channel, request a connection if possible
254
        if (addressIndex.isValid()
1✔
255
            && subchannels.get(addressIndex.getCurrentAddress()).getSubchannel() == subchannel) {
1✔
256
          if (addressIndex.increment()) {
1✔
257
            cancelScheduleTask();
1✔
258
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
259
          }
260
        }
261

262
        if (isPassComplete()) {
1✔
263
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
264
          updateBalancingState(TRANSIENT_FAILURE,
1✔
265
              new Picker(PickResult.withError(stateInfo.getStatus())));
1✔
266

267
          // Refresh Name Resolution, but only when all 3 conditions are met
268
          // * We are at the end of addressIndex
269
          // * have had status reported for all subchannels.
270
          // * And one of the following conditions:
271
          //    * Have had enough TF reported since we completed first pass
272
          //    * Just completed the first pass
273
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
274
            firstPass = false;
1✔
275
            numTf = 0;
1✔
276
            helper.refreshNameResolution();
1✔
277
          }
278
        }
279
        break;
280

281
      default:
282
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
283
    }
284
  }
1✔
285

286
  private void updateHealthCheckedState(SubchannelData subchannelData) {
287
    if (subchannelData.state != READY) {
1✔
288
      return;
1✔
289
    }
290
    if (subchannelData.getHealthState() == READY) {
1✔
291
      updateBalancingState(READY,
1✔
292
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
293
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
294
      updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
1✔
295
          subchannelData.healthListener.healthStateInfo.getStatus())));
1✔
296
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
297
      updateBalancingState(subchannelData.getHealthState(),
1✔
298
          new Picker(PickResult.withNoResult()));
1✔
299
    }
300
  }
1✔
301

302
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
303
    // an optimization: de-dup IDLE or CONNECTING notification.
304
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
305
      return;
1✔
306
    }
307
    concludedState = state;
1✔
308
    helper.updateBalancingState(state, picker);
1✔
309
  }
1✔
310

311
  @Override
312
  public void shutdown() {
313
    log.log(Level.FINE,
1✔
314
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
315
    rawConnectivityState = SHUTDOWN;
1✔
316
    concludedState = SHUTDOWN;
1✔
317
    cancelScheduleTask();
1✔
318

319
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
320
      subchannelData.getSubchannel().shutdown();
1✔
321
    }
1✔
322

323
    subchannels.clear();
1✔
324
  }
1✔
325

326
  /**
327
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
328
  * that all other subchannels must be shutdown.
329
  */
330
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
331
    cancelScheduleTask();
1✔
332
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
333
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
334
        subchannelData.getSubchannel().shutdown();
1✔
335
      }
336
    }
1✔
337
    subchannels.clear();
1✔
338
    activeSubchannelData.updateState(READY);
1✔
339
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
340
  }
1✔
341

342
  /**
343
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
344
   * Schedules a connection to next address in list as well.
345
   * If the current channel has already attempted a connection, we attempt a connection
346
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
347
   * return null.
348
   */
349
  @Override
350
  public void requestConnection() {
351
    if (addressIndex == null || !addressIndex.isValid() || rawConnectivityState == SHUTDOWN ) {
1✔
352
      return;
1✔
353
    }
354

355
    Subchannel subchannel;
356
    SocketAddress currentAddress;
357
    currentAddress = addressIndex.getCurrentAddress();
1✔
358
    subchannel = subchannels.containsKey(currentAddress)
1✔
359
        ? subchannels.get(currentAddress).getSubchannel()
1✔
360
        : createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
361

362
    ConnectivityState subchannelState = subchannels.get(currentAddress).getState();
1✔
363
    switch (subchannelState) {
1✔
364
      case IDLE:
365
        subchannel.requestConnection();
1✔
366
        subchannels.get(currentAddress).updateState(CONNECTING);
1✔
367
        scheduleNextConnection();
1✔
368
        break;
1✔
369
      case CONNECTING:
370
        if (enableHappyEyeballs) {
1✔
371
          scheduleNextConnection();
1✔
372
        } else {
373
          subchannel.requestConnection();
1✔
374
        }
375
        break;
1✔
376
      case TRANSIENT_FAILURE:
377
        addressIndex.increment();
1✔
378
        requestConnection();
1✔
379
        break;
1✔
380
      case READY: // Shouldn't ever happen
381
        log.warning("Requesting a connection even though we have a READY subchannel");
1✔
382
        break;
1✔
383
      case SHUTDOWN:
384
      default:
385
        // Makes checkstyle happy
386
    }
387
  }
1✔
388

389

390
  /**
391
  * Happy Eyeballs
392
  * Schedules connection attempt to happen after a delay to the next available address.
393
  */
394
  private void scheduleNextConnection() {
395
    if (!enableHappyEyeballs
1✔
396
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
397
      return;
1✔
398
    }
399

400
    class StartNextConnection implements Runnable {
1✔
401
      @Override
402
      public void run() {
403
        scheduleConnectionTask = null;
1✔
404
        if (addressIndex.increment()) {
1✔
405
          requestConnection();
1✔
406
        }
407
      }
1✔
408
    }
409

410
    SynchronizationContext synchronizationContext = null;
1✔
411
    try {
412
      synchronizationContext = helper.getSynchronizationContext();
1✔
413
    } catch (NullPointerException e) {
×
414
      // All helpers should have a sync context, but if one doesn't (ex. user had a custom test)
415
      // we don't want to break previously working functionality.
416
      return;
×
417
    }
1✔
418

419
    scheduleConnectionTask = synchronizationContext.schedule(
1✔
420
        new StartNextConnection(),
421
        CONNECTION_DELAY_INTERVAL_MS,
422
        TimeUnit.MILLISECONDS,
423
        helper.getScheduledExecutorService());
1✔
424
  }
1✔
425

426
  private void cancelScheduleTask() {
427
    if (scheduleConnectionTask != null) {
1✔
428
      scheduleConnectionTask.cancel();
1✔
429
      scheduleConnectionTask = null;
1✔
430
    }
431
  }
1✔
432

433
  private Subchannel createNewSubchannel(SocketAddress addr, Attributes attrs) {
434
    HealthListener hcListener = new HealthListener();
1✔
435
    final Subchannel subchannel = helper.createSubchannel(
1✔
436
        CreateSubchannelArgs.newBuilder()
1✔
437
        .setAddresses(Lists.newArrayList(
1✔
438
            new EquivalentAddressGroup(addr, attrs)))
439
        .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
440
            .build());
1✔
441
    if (subchannel == null) {
1✔
442
      log.warning("Was not able to create subchannel for " + addr);
×
443
      throw new IllegalStateException("Can't create subchannel");
×
444
    }
445
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE, hcListener);
1✔
446
    hcListener.subchannelData = subchannelData;
1✔
447
    subchannels.put(addr, subchannelData);
1✔
448
    Attributes scAttrs = subchannel.getAttributes();
1✔
449
    if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
450
      hcListener.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
451
    }
452
    subchannel.start(stateInfo -> processSubchannelState(subchannel, stateInfo));
1✔
453
    return subchannel;
1✔
454
  }
455

456
  private boolean isPassComplete() {
457
    if (addressIndex == null || addressIndex.isValid()
1✔
458
        || subchannels.size() < addressIndex.size()) {
1✔
459
      return false;
1✔
460
    }
461
    for (SubchannelData sc : subchannels.values()) {
1✔
462
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
463
        return false;
1✔
464
      }
465
    }
1✔
466
    return true;
1✔
467
  }
468

469
  private final class HealthListener implements SubchannelStateListener {
1✔
470
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
471
    private SubchannelData subchannelData;
472

473
    @Override
474
    public void onSubchannelState(ConnectivityStateInfo newState) {
475
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
476
          new Object[]{newState, subchannelData.subchannel});
1✔
477
      healthStateInfo = newState;
1✔
478
      try {
479
        SubchannelData curSubChanData = subchannels.get(addressIndex.getCurrentAddress());
1✔
480
        if (curSubChanData != null && curSubChanData.healthListener == this) {
1✔
481
          updateHealthCheckedState(subchannelData);
1✔
482
        }
483
      } catch (IllegalStateException e) {
×
484
        log.fine("Health listener received state change after subchannel was removed");
×
485
      }
1✔
486
    }
1✔
487
  }
488

489
  private SocketAddress getAddress(Subchannel subchannel) {
490
    return subchannel.getAddresses().getAddresses().get(0);
1✔
491
  }
492

493
  @VisibleForTesting
494
  ConnectivityState getConcludedConnectivityState() {
495
    return this.concludedState;
1✔
496
  }
497

498
  /**
499
   * No-op picker which doesn't add any custom picking logic. It just passes already known result
500
   * received in constructor.
501
   */
502
  private static final class Picker extends SubchannelPicker {
503
    private final PickResult result;
504

505
    Picker(PickResult result) {
1✔
506
      this.result = checkNotNull(result, "result");
1✔
507
    }
1✔
508

509
    @Override
510
    public PickResult pickSubchannel(PickSubchannelArgs args) {
511
      return result;
1✔
512
    }
513

514
    @Override
515
    public String toString() {
516
      return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
1✔
517
    }
518
  }
519

520
  /**
521
   * Picker that requests connection during the first pick, and returns noResult.
522
   */
523
  private final class RequestConnectionPicker extends SubchannelPicker {
524
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
525
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
526

527
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
528
      this.pickFirstLeafLoadBalancer =
1✔
529
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
530
    }
1✔
531

532
    @Override
533
    public PickResult pickSubchannel(PickSubchannelArgs args) {
534
      if (connectionRequested.compareAndSet(false, true)) {
1✔
535
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
536
      }
537
      return PickResult.withNoResult();
1✔
538
    }
539
  }
540

541
  /**
542
   * Index as in 'i', the pointer to an entry. Not a "search index."
543
   * All updates should be done in a synchronization context.
544
   */
545
  @VisibleForTesting
546
  static final class Index {
547
    private List<EquivalentAddressGroup> addressGroups;
548
    private int groupIndex;
549
    private int addressIndex;
550

551
    public Index(List<EquivalentAddressGroup> groups) {
1✔
552
      this.addressGroups = groups != null ? groups : Collections.emptyList();
1✔
553
    }
1✔
554

555
    public boolean isValid() {
556
      // Is invalid if empty or has incremented off the end
557
      return groupIndex < addressGroups.size();
1✔
558
    }
559

560
    public boolean isAtBeginning() {
561
      return groupIndex == 0 && addressIndex == 0;
1✔
562
    }
563

564
    /**
565
     * Move to next address in group.  If last address in group move to first address of next group.
566
     * @return false if went off end of the list, otherwise true
567
     */
568
    public boolean increment() {
569
      if (!isValid()) {
1✔
570
        return false;
1✔
571
      }
572

573
      EquivalentAddressGroup group = addressGroups.get(groupIndex);
1✔
574
      addressIndex++;
1✔
575
      if (addressIndex >= group.getAddresses().size()) {
1✔
576
        groupIndex++;
1✔
577
        addressIndex = 0;
1✔
578
        return groupIndex < addressGroups.size();
1✔
579
      }
580

581
      return true;
1✔
582
    }
583

584
    public void reset() {
585
      groupIndex = 0;
1✔
586
      addressIndex = 0;
1✔
587
    }
1✔
588

589
    public SocketAddress getCurrentAddress() {
590
      if (!isValid()) {
1✔
591
        throw new IllegalStateException("Index is past the end of the address group list");
×
592
      }
593
      return addressGroups.get(groupIndex).getAddresses().get(addressIndex);
1✔
594
    }
595

596
    public Attributes getCurrentEagAttributes() {
597
      if (!isValid()) {
1✔
598
        throw new IllegalStateException("Index is off the end of the address group list");
×
599
      }
600
      return addressGroups.get(groupIndex).getAttributes();
1✔
601
    }
602

603
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
604
      return Collections.singletonList(
1✔
605
          new EquivalentAddressGroup(getCurrentAddress(), getCurrentEagAttributes()));
1✔
606
    }
607

608
    /**
609
     * Update to new groups, resetting the current index.
610
     */
611
    public void updateGroups(ImmutableList<EquivalentAddressGroup> newGroups) {
612
      addressGroups = newGroups != null ? newGroups : Collections.emptyList();
1✔
613
      reset();
1✔
614
    }
1✔
615

616
    /**
617
     * Returns false if the needle was not found and the current index was left unchanged.
618
     */
619
    public boolean seekTo(SocketAddress needle) {
620
      for (int i = 0; i < addressGroups.size(); i++) {
1✔
621
        EquivalentAddressGroup group = addressGroups.get(i);
1✔
622
        int j = group.getAddresses().indexOf(needle);
1✔
623
        if (j == -1) {
1✔
624
          continue;
1✔
625
        }
626
        this.groupIndex = i;
1✔
627
        this.addressIndex = j;
1✔
628
        return true;
1✔
629
      }
630
      return false;
1✔
631
    }
632

633
    public int size() {
634
      return (addressGroups != null) ? addressGroups.size() : 0;
1✔
635
    }
636
  }
637

638
  private static final class SubchannelData {
639
    private final Subchannel subchannel;
640
    private ConnectivityState state;
641
    private final HealthListener healthListener;
642
    private boolean completedConnectivityAttempt = false;
1✔
643

644
    public SubchannelData(Subchannel subchannel, ConnectivityState state,
645
                          HealthListener subchannelHealthListener) {
1✔
646
      this.subchannel = subchannel;
1✔
647
      this.state = state;
1✔
648
      this.healthListener = subchannelHealthListener;
1✔
649
    }
1✔
650

651
    public Subchannel getSubchannel() {
652
      return this.subchannel;
1✔
653
    }
654

655
    public ConnectivityState getState() {
656
      return this.state;
1✔
657
    }
658

659
    public boolean isCompletedConnectivityAttempt() {
660
      return completedConnectivityAttempt;
1✔
661
    }
662

663
    private void updateState(ConnectivityState newState) {
664
      this.state = newState;
1✔
665
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
666
        completedConnectivityAttempt = true;
1✔
667
      } else if (newState == IDLE) {
1✔
668
        completedConnectivityAttempt = false;
1✔
669
      }
670
    }
1✔
671

672
    private ConnectivityState getHealthState() {
673
      return healthListener.healthStateInfo.getState();
1✔
674
    }
675
  }
676

677
  public static final class PickFirstLeafLoadBalancerConfig {
678

679
    @Nullable
680
    public final Boolean shuffleAddressList;
681

682
    // For testing purposes only, not meant to be parsed from a real config.
683
    @Nullable
684
    final Long randomSeed;
685

686
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
687
      this(shuffleAddressList, null);
1✔
688
    }
1✔
689

690
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
691
        @Nullable Long randomSeed) {
1✔
692
      this.shuffleAddressList = shuffleAddressList;
1✔
693
      this.randomSeed = randomSeed;
1✔
694
    }
1✔
695
  }
696
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc