• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19662

25 Jan 2025 12:42AM UTC coverage: 88.575% (-0.009%) from 88.584%
#19662

push

github

web-flow
core:Have acceptResolvedAddresses() do a seek when in CONNECTING state and cleanup removed subchannels when a seek was successful (#11849)

* Have acceptResolvedAddresses() do a seek when in CONNECTING state and cleanup removed subchannels when a seek was successful.
Move cleanup of removed subchannels into a method so it can be called from 2 places in acceptResolvedAddresses.
Since the seek could mean we never looked at the first address, if we go off the end of the index and haven't looked at the all of the addresses then instead of scheduleBackoff() we reset the index and request a connection.

33726 of 38076 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/../core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.SHUTDOWN;
24
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
25

26
import com.google.common.annotations.VisibleForTesting;
27
import com.google.common.base.MoreObjects;
28
import com.google.common.collect.ImmutableList;
29
import com.google.common.collect.Lists;
30
import io.grpc.Attributes;
31
import io.grpc.ConnectivityState;
32
import io.grpc.ConnectivityStateInfo;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.LoadBalancer;
35
import io.grpc.Status;
36
import io.grpc.SynchronizationContext.ScheduledHandle;
37
import java.net.Inet4Address;
38
import java.net.InetSocketAddress;
39
import java.net.SocketAddress;
40
import java.util.ArrayList;
41
import java.util.Collections;
42
import java.util.HashMap;
43
import java.util.HashSet;
44
import java.util.List;
45
import java.util.Map;
46
import java.util.Random;
47
import java.util.Set;
48
import java.util.concurrent.TimeUnit;
49
import java.util.concurrent.atomic.AtomicBoolean;
50
import java.util.logging.Level;
51
import java.util.logging.Logger;
52
import javax.annotation.Nullable;
53

54
/**
55
 * A {@link LoadBalancer} that provides no load-balancing over the addresses from the {@link
56
 * io.grpc.NameResolver}. The channel's default behavior is used, which is walking down the address
57
 * list and sticking to the first that works.
58
 */
59
final class PickFirstLeafLoadBalancer extends LoadBalancer {
60
  private static final Logger log = Logger.getLogger(PickFirstLeafLoadBalancer.class.getName());
1✔
61
  @VisibleForTesting
62
  static final int CONNECTION_DELAY_INTERVAL_MS = 250;
63
  private final boolean enableHappyEyeballs = !isSerializingRetries()
1✔
64
      && PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
1✔
65
  private final Helper helper;
66
  private final Map<SocketAddress, SubchannelData> subchannels = new HashMap<>();
1✔
67
  private final Index addressIndex = new Index(ImmutableList.of(), this.enableHappyEyeballs);
1✔
68
  private int numTf = 0;
1✔
69
  private boolean firstPass = true;
1✔
70
  @Nullable
1✔
71
  private ScheduledHandle scheduleConnectionTask = null;
72
  private ConnectivityState rawConnectivityState = IDLE;
1✔
73
  private ConnectivityState concludedState = IDLE;
1✔
74
  private boolean notAPetiolePolicy = true; // means not under a petiole policy
1✔
75
  private final BackoffPolicy.Provider bkoffPolProvider = new ExponentialBackoffPolicy.Provider();
1✔
76
  private BackoffPolicy reconnectPolicy;
77
  @Nullable
1✔
78
  private ScheduledHandle reconnectTask = null;
79
  private final boolean serializingRetries = isSerializingRetries();
1✔
80

81
  PickFirstLeafLoadBalancer(Helper helper) {
1✔
82
    this.helper = checkNotNull(helper, "helper");
1✔
83
  }
1✔
84

85
  static boolean isSerializingRetries() {
86
    return GrpcUtil.getFlag("GRPC_SERIALIZE_RETRIES", false);
1✔
87
  }
88

89
  @Override
90
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
91
    if (rawConnectivityState == SHUTDOWN) {
1✔
92
      return Status.FAILED_PRECONDITION.withDescription("Already shut down");
1✔
93
    }
94

95
    // Cache whether or not this is a petiole policy, which is based off of an address attribute
96
    Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
1✔
97
    this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;
1✔
98

99
    List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();
1✔
100

101
    // Validate the address list
102
    if (servers.isEmpty()) {
1✔
103
      Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
104
              "NameResolver returned no usable address. addrs=" + resolvedAddresses.getAddresses()
1✔
105
                      + ", attrs=" + resolvedAddresses.getAttributes());
1✔
106
      handleNameResolutionError(unavailableStatus);
1✔
107
      return unavailableStatus;
1✔
108
    }
109
    for (EquivalentAddressGroup eag : servers) {
1✔
110
      if (eag == null) {
1✔
111
        Status unavailableStatus = Status.UNAVAILABLE.withDescription(
1✔
112
            "NameResolver returned address list with null endpoint. addrs="
113
                + resolvedAddresses.getAddresses() + ", attrs="
1✔
114
                + resolvedAddresses.getAttributes());
1✔
115
        handleNameResolutionError(unavailableStatus);
1✔
116
        return unavailableStatus;
1✔
117
      }
118
    }
1✔
119

120
    // Since we have a new set of addresses, we are again at first pass
121
    firstPass = true;
1✔
122

123
    List<EquivalentAddressGroup> cleanServers = deDupAddresses(servers);
1✔
124

125
    // We can optionally be configured to shuffle the address list. This can help better distribute
126
    // the load.
127
    if (resolvedAddresses.getLoadBalancingPolicyConfig()
1✔
128
        instanceof PickFirstLeafLoadBalancerConfig) {
129
      PickFirstLeafLoadBalancerConfig config
1✔
130
          = (PickFirstLeafLoadBalancerConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
131
      if (config.shuffleAddressList != null && config.shuffleAddressList) {
1✔
132
        Collections.shuffle(cleanServers,
1✔
133
            config.randomSeed != null ? new Random(config.randomSeed) : new Random());
1✔
134
      }
135
    }
136

137
    final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
138
        ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
1✔
139

140
    if (rawConnectivityState == READY || rawConnectivityState == CONNECTING) {
1✔
141
      // If the previous ready (or connecting) subchannel exists in new address list,
142
      // keep this connection and don't create new subchannels
143
      SocketAddress previousAddress = addressIndex.getCurrentAddress();
1✔
144
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
145
      if (addressIndex.seekTo(previousAddress)) {
1✔
146
        SubchannelData subchannelData = subchannels.get(previousAddress);
1✔
147
        subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
1✔
148
        shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
149
        return Status.OK;
1✔
150
      }
151
      // Previous ready subchannel not in the new list of addresses
152
    } else {
1✔
153
      addressIndex.updateGroups(newImmutableAddressGroups);
1✔
154
    }
155

156
    // No old addresses means first time through, so we will do an explicit move to CONNECTING
157
    // which is what we implicitly started with
158
    boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);
1✔
159

160
    if (noOldAddrs) {
1✔
161
      // Make tests happy; they don't properly assume starting in CONNECTING
162
      rawConnectivityState = CONNECTING;
1✔
163
      updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
164
    }
165

166
    if (rawConnectivityState == READY) {
1✔
167
      // connect from beginning when prompted
168
      rawConnectivityState = IDLE;
1✔
169
      updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
170

171
    } else if (rawConnectivityState == CONNECTING || rawConnectivityState == TRANSIENT_FAILURE) {
1✔
172
      // start connection attempt at first address
173
      cancelScheduleTask();
1✔
174
      requestConnection();
1✔
175
    }
176

177
    return Status.OK;
1✔
178
  }
179

180
  /**
181
   * Compute the difference between the flattened new addresses and the old addresses that had been
182
   * made into subchannels and then shutdown the matching subchannels.
183
   * @return true if there were no old addresses
184
   */
185
  private boolean shutdownRemovedAddresses(
186
      ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {
187

188
    Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
1✔
189

190
    // Flatten the new EAGs addresses
191
    Set<SocketAddress> newAddrs = new HashSet<>();
1✔
192
    for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
1✔
193
      newAddrs.addAll(endpoint.getAddresses());
1✔
194
    }
1✔
195

196
    // Shut them down and remove them
197
    for (SocketAddress oldAddr : oldAddrs) {
1✔
198
      if (!newAddrs.contains(oldAddr)) {
1✔
199
        subchannels.remove(oldAddr).getSubchannel().shutdown();
1✔
200
      }
201
    }
1✔
202
    return oldAddrs.isEmpty();
1✔
203
  }
204

205
  private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
206
    Set<SocketAddress> seenAddresses = new HashSet<>();
1✔
207
    List<EquivalentAddressGroup> newGroups = new ArrayList<>();
1✔
208

209
    for (EquivalentAddressGroup group : groups) {
1✔
210
      List<SocketAddress> addrs = new ArrayList<>();
1✔
211
      for (SocketAddress addr : group.getAddresses()) {
1✔
212
        if (seenAddresses.add(addr)) {
1✔
213
          addrs.add(addr);
1✔
214
        }
215
      }
1✔
216
      if (!addrs.isEmpty()) {
1✔
217
        newGroups.add(new EquivalentAddressGroup(addrs, group.getAttributes()));
1✔
218
      }
219
    }
1✔
220

221
    return newGroups;
1✔
222
  }
223

224
  @Override
225
  public void handleNameResolutionError(Status error) {
226
    if (rawConnectivityState == SHUTDOWN) {
1✔
227
      return;
×
228
    }
229

230
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
231
      subchannelData.getSubchannel().shutdown();
1✔
232
    }
1✔
233
    subchannels.clear();
1✔
234
    addressIndex.updateGroups(ImmutableList.of());
1✔
235
    rawConnectivityState = TRANSIENT_FAILURE;
1✔
236
    updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(error)));
1✔
237
  }
1✔
238

239
  void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo stateInfo) {
240
    ConnectivityState newState = stateInfo.getState();
1✔
241

242
    // Shutdown channels/previously relevant subchannels can still callback with state updates.
243
    // To prevent pickers from returning these obsolete subchannels, this logic
244
    // is included to check if the current list of active subchannels includes this subchannel.
245
    if (subchannelData != subchannels.get(getAddress(subchannelData.subchannel))) {
1✔
246
      return;
1✔
247
    }
248

249
    if (newState == SHUTDOWN) {
1✔
250
      return;
1✔
251
    }
252

253
    if (newState == IDLE && subchannelData.state == READY) {
1✔
254
      helper.refreshNameResolution();
1✔
255
    }
256

257
    // If we are transitioning from a TRANSIENT_FAILURE to CONNECTING or IDLE we ignore this state
258
    // transition and still keep the LB in TRANSIENT_FAILURE state. This is referred to as "sticky
259
    // transient failure". Only a subchannel state change to READY will get the LB out of
260
    // TRANSIENT_FAILURE. If the state is IDLE we additionally request a new connection so that we
261
    // keep retrying for a connection.
262

263
    // With the new pick first implementation, individual subchannels will have their own backoff
264
    // on a per-address basis. Thus, iterative requests for connections will not be requested
265
    // once the first pass through is complete.
266
    // However, every time there is an address update, we will perform a pass through for the new
267
    // addresses in the updated list.
268
    subchannelData.updateState(newState);
1✔
269
    if (rawConnectivityState == TRANSIENT_FAILURE || concludedState == TRANSIENT_FAILURE)  {
1✔
270
      if (newState == CONNECTING) {
1✔
271
        // each subchannel is responsible for its own backoff
272
        return;
1✔
273
      } else if (newState == IDLE) {
1✔
274
        requestConnection();
1✔
275
        return;
1✔
276
      }
277
    }
278

279
    switch (newState) {
1✔
280
      case IDLE:
281
        // Shutdown when ready: connect from beginning when prompted
282
        addressIndex.reset();
1✔
283
        rawConnectivityState = IDLE;
1✔
284
        updateBalancingState(IDLE, new RequestConnectionPicker(this));
1✔
285
        break;
1✔
286

287
      case CONNECTING:
288
        rawConnectivityState = CONNECTING;
1✔
289
        updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
1✔
290
        break;
1✔
291

292
      case READY:
293
        shutdownRemaining(subchannelData);
1✔
294
        addressIndex.seekTo(getAddress(subchannelData.subchannel));
1✔
295
        rawConnectivityState = READY;
1✔
296
        updateHealthCheckedState(subchannelData);
1✔
297
        break;
1✔
298

299
      case TRANSIENT_FAILURE:
300
        // If we are looking at current channel, request a connection if possible
301
        if (addressIndex.isValid()
1✔
302
            && subchannels.get(addressIndex.getCurrentAddress()) == subchannelData) {
1✔
303
          if (addressIndex.increment()) {
1✔
304
            cancelScheduleTask();
1✔
305
            requestConnection(); // is recursive so might hit the end of the addresses
1✔
306
          } else {
307
            if (subchannels.size() >= addressIndex.size()) {
1✔
308
              scheduleBackoff();
1✔
309
            } else {
310
              // We must have done a seek to the middle of the list lets start over from the
311
              // beginning
312
              addressIndex.reset();
1✔
313
              requestConnection();
1✔
314
            }
315
          }
316
        }
317

318
        if (isPassComplete()) {
1✔
319
          rawConnectivityState = TRANSIENT_FAILURE;
1✔
320
          updateBalancingState(TRANSIENT_FAILURE,
1✔
321
              new Picker(PickResult.withError(stateInfo.getStatus())));
1✔
322

323
          // Refresh Name Resolution, but only when all 3 conditions are met
324
          // * We are at the end of addressIndex
325
          // * have had status reported for all subchannels.
326
          // * And one of the following conditions:
327
          //    * Have had enough TF reported since we completed first pass
328
          //    * Just completed the first pass
329
          if (++numTf >= addressIndex.size() || firstPass) {
1✔
330
            firstPass = false;
1✔
331
            numTf = 0;
1✔
332
            helper.refreshNameResolution();
1✔
333
          }
334
        }
335
        break;
336

337
      default:
338
        throw new IllegalArgumentException("Unsupported state:" + newState);
×
339
    }
340
  }
1✔
341

342
  /**
343
   * Only called after all addresses attempted and failed (TRANSIENT_FAILURE).
344
   */
345
  private void scheduleBackoff() {
346
    if (!serializingRetries) {
1✔
347
      return;
1✔
348
    }
349

350
    class EndOfCurrentBackoff implements Runnable {
1✔
351
      @Override
352
      public void run() {
353
        reconnectTask = null;
1✔
354
        addressIndex.reset();
1✔
355
        requestConnection();
1✔
356
      }
1✔
357
    }
358

359
    // Just allow the previous one to trigger when ready if we're already in backoff
360
    if (reconnectTask != null) {
1✔
361
      return;
×
362
    }
363

364
    if (reconnectPolicy == null) {
1✔
365
      reconnectPolicy = bkoffPolProvider.get();
1✔
366
    }
367
    long delayNanos = reconnectPolicy.nextBackoffNanos();
1✔
368
    reconnectTask = helper.getSynchronizationContext().schedule(
1✔
369
        new EndOfCurrentBackoff(),
370
        delayNanos,
371
        TimeUnit.NANOSECONDS,
372
        helper.getScheduledExecutorService());
1✔
373
  }
1✔
374

375
  private void updateHealthCheckedState(SubchannelData subchannelData) {
376
    if (subchannelData.state != READY) {
1✔
377
      return;
1✔
378
    }
379

380
    if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
1✔
381
      updateBalancingState(READY,
1✔
382
          new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
1✔
383
    } else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
1✔
384
      updateBalancingState(TRANSIENT_FAILURE, new Picker(PickResult.withError(
1✔
385
          subchannelData.healthStateInfo.getStatus())));
1✔
386
    } else if (concludedState != TRANSIENT_FAILURE) {
1✔
387
      updateBalancingState(subchannelData.getHealthState(),
1✔
388
          new Picker(PickResult.withNoResult()));
1✔
389
    }
390
  }
1✔
391

392
  private void updateBalancingState(ConnectivityState state, SubchannelPicker picker) {
393
    // an optimization: de-dup IDLE or CONNECTING notification.
394
    if (state == concludedState && (state == IDLE || state == CONNECTING)) {
1✔
395
      return;
1✔
396
    }
397
    concludedState = state;
1✔
398
    helper.updateBalancingState(state, picker);
1✔
399
  }
1✔
400

401
  @Override
402
  public void shutdown() {
403
    log.log(Level.FINE,
1✔
404
        "Shutting down, currently have {} subchannels created", subchannels.size());
1✔
405
    rawConnectivityState = SHUTDOWN;
1✔
406
    concludedState = SHUTDOWN;
1✔
407
    cancelScheduleTask();
1✔
408
    if (reconnectTask != null) {
1✔
409
      reconnectTask.cancel();
1✔
410
      reconnectTask = null;
1✔
411
    }
412
    reconnectPolicy = null;
1✔
413

414
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
415
      subchannelData.getSubchannel().shutdown();
1✔
416
    }
1✔
417

418
    subchannels.clear();
1✔
419
  }
1✔
420

421
  /**
422
  * Shuts down remaining subchannels. Called when a subchannel becomes ready, which means
423
  * that all other subchannels must be shutdown.
424
  */
425
  private void shutdownRemaining(SubchannelData activeSubchannelData) {
426
    if (reconnectTask != null) {
1✔
427
      reconnectTask.cancel();
1✔
428
      reconnectTask = null;
1✔
429
    }
430
    reconnectPolicy = null;
1✔
431

432
    cancelScheduleTask();
1✔
433
    for (SubchannelData subchannelData : subchannels.values()) {
1✔
434
      if (!subchannelData.getSubchannel().equals(activeSubchannelData.subchannel)) {
1✔
435
        subchannelData.getSubchannel().shutdown();
1✔
436
      }
437
    }
1✔
438
    subchannels.clear();
1✔
439
    activeSubchannelData.updateState(READY);
1✔
440
    subchannels.put(getAddress(activeSubchannelData.subchannel), activeSubchannelData);
1✔
441
  }
1✔
442

443
  /**
444
   * Requests a connection to the next applicable address' subchannel, creating one if necessary.
445
   * Schedules a connection to next address in list as well.
446
   * If the current channel has already attempted a connection, we attempt a connection
447
   * to the next address/subchannel in our list.  We assume that createNewSubchannel will never
448
   * return null.
449
   */
450
  @Override
451
  public void requestConnection() {
452
    if (!addressIndex.isValid() || rawConnectivityState == SHUTDOWN) {
1✔
453
      return;
1✔
454
    }
455

456
    SocketAddress currentAddress = addressIndex.getCurrentAddress();
1✔
457
    SubchannelData subchannelData = subchannels.get(currentAddress);
1✔
458
    if (subchannelData == null) {
1✔
459
      subchannelData = createNewSubchannel(currentAddress, addressIndex.getCurrentEagAttributes());
1✔
460
    }
461

462
    ConnectivityState subchannelState = subchannelData.getState();
1✔
463
    switch (subchannelState) {
1✔
464
      case IDLE:
465
        subchannelData.subchannel.requestConnection();
1✔
466
        subchannelData.updateState(CONNECTING);
1✔
467
        scheduleNextConnection();
1✔
468
        break;
1✔
469
      case CONNECTING:
470
        scheduleNextConnection();
1✔
471
        break;
1✔
472
      case TRANSIENT_FAILURE:
473
        if (!serializingRetries) {
1✔
474
          addressIndex.increment();
1✔
475
          requestConnection();
1✔
476
        } else {
477
          if (!addressIndex.isValid()) {
1✔
478
            scheduleBackoff();
×
479
          } else {
480
            subchannelData.subchannel.requestConnection();
1✔
481
            subchannelData.updateState(CONNECTING);
1✔
482
          }
483
        }
484
        break;
1✔
485
      default:
486
        // Wait for current subchannel to change state
487
    }
488
  }
1✔
489

490

491
  /**
492
  * Happy Eyeballs
493
  * Schedules connection attempt to happen after a delay to the next available address.
494
  */
495
  private void scheduleNextConnection() {
496
    if (!enableHappyEyeballs
1✔
497
        || (scheduleConnectionTask != null && scheduleConnectionTask.isPending())) {
1✔
498
      return;
1✔
499
    }
500

501
    class StartNextConnection implements Runnable {
1✔
502
      @Override
503
      public void run() {
504
        scheduleConnectionTask = null;
1✔
505
        if (addressIndex.increment()) {
1✔
506
          requestConnection();
1✔
507
        }
508
      }
1✔
509
    }
510

511
    scheduleConnectionTask = helper.getSynchronizationContext().schedule(
1✔
512
        new StartNextConnection(),
513
        CONNECTION_DELAY_INTERVAL_MS,
514
        TimeUnit.MILLISECONDS,
515
        helper.getScheduledExecutorService());
1✔
516
  }
1✔
517

518
  private void cancelScheduleTask() {
519
    if (scheduleConnectionTask != null) {
1✔
520
      scheduleConnectionTask.cancel();
1✔
521
      scheduleConnectionTask = null;
1✔
522
    }
523
  }
1✔
524

525
  private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs) {
526
    HealthListener hcListener = new HealthListener();
1✔
527
    final Subchannel subchannel = helper.createSubchannel(
1✔
528
        CreateSubchannelArgs.newBuilder()
1✔
529
            .setAddresses(Lists.newArrayList(
1✔
530
                new EquivalentAddressGroup(addr, attrs)))
531
            .addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcListener)
1✔
532
            .addOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY, serializingRetries)
1✔
533
            .build());
1✔
534
    if (subchannel == null) {
1✔
535
      log.warning("Was not able to create subchannel for " + addr);
×
536
      throw new IllegalStateException("Can't create subchannel");
×
537
    }
538
    SubchannelData subchannelData = new SubchannelData(subchannel, IDLE);
1✔
539
    hcListener.subchannelData = subchannelData;
1✔
540
    subchannels.put(addr, subchannelData);
1✔
541
    Attributes scAttrs = subchannel.getAttributes();
1✔
542
    if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
1✔
543
      subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
1✔
544
    }
545
    subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
1✔
546
    return subchannelData;
1✔
547
  }
548

549
  private boolean isPassComplete() {
550
    if (subchannels.size() < addressIndex.size()) {
1✔
551
      return false;
1✔
552
    }
553
    for (SubchannelData sc : subchannels.values()) {
1✔
554
      if (!sc.isCompletedConnectivityAttempt() ) {
1✔
555
        return false;
1✔
556
      }
557
    }
1✔
558
    return true;
1✔
559
  }
560

561
  private final class HealthListener implements SubchannelStateListener {
1✔
562
    private SubchannelData subchannelData;
563

564
    @Override
565
    public void onSubchannelState(ConnectivityStateInfo newState) {
566
      if (notAPetiolePolicy) {
1✔
567
        log.log(Level.WARNING,
1✔
568
            "Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
569
            new Object[]{newState, subchannelData.subchannel});
1✔
570
        return;
1✔
571
      }
572

573
      log.log(Level.FINE, "Received health status {0} for subchannel {1}",
1✔
574
          new Object[]{newState, subchannelData.subchannel});
1✔
575
      subchannelData.healthStateInfo = newState;
1✔
576
      if (addressIndex.isValid()
1✔
577
          && subchannelData == subchannels.get(addressIndex.getCurrentAddress())) {
1✔
578
        updateHealthCheckedState(subchannelData);
1✔
579
      }
580
    }
1✔
581
  }
582

583
  private SocketAddress getAddress(Subchannel subchannel) {
584
    return subchannel.getAddresses().getAddresses().get(0);
1✔
585
  }
586

587
  @VisibleForTesting
588
  ConnectivityState getConcludedConnectivityState() {
589
    return this.concludedState;
1✔
590
  }
591

592
  /**
593
   * No-op picker which doesn't add any custom picking logic. It just passes already known result
594
   * received in constructor.
595
   */
596
  private static final class Picker extends SubchannelPicker {
597
    private final PickResult result;
598

599
    Picker(PickResult result) {
1✔
600
      this.result = checkNotNull(result, "result");
1✔
601
    }
1✔
602

603
    @Override
604
    public PickResult pickSubchannel(PickSubchannelArgs args) {
605
      return result;
1✔
606
    }
607

608
    @Override
609
    public String toString() {
610
      return MoreObjects.toStringHelper(Picker.class).add("result", result).toString();
×
611
    }
612
  }
613

614
  /**
615
   * Picker that requests connection during the first pick, and returns noResult.
616
   */
617
  private final class RequestConnectionPicker extends SubchannelPicker {
618
    private final PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer;
619
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
620

621
    RequestConnectionPicker(PickFirstLeafLoadBalancer pickFirstLeafLoadBalancer) {
1✔
622
      this.pickFirstLeafLoadBalancer =
1✔
623
          checkNotNull(pickFirstLeafLoadBalancer, "pickFirstLeafLoadBalancer");
1✔
624
    }
1✔
625

626
    @Override
627
    public PickResult pickSubchannel(PickSubchannelArgs args) {
628
      if (connectionRequested.compareAndSet(false, true)) {
1✔
629
        helper.getSynchronizationContext().execute(pickFirstLeafLoadBalancer::requestConnection);
1✔
630
      }
631
      return PickResult.withNoResult();
1✔
632
    }
633
  }
634

635
  /**
636
   * This contains both an ordered list of addresses and a pointer(i.e. index) to the current entry.
637
   * All updates should be done in a synchronization context.
638
   */
639
  @VisibleForTesting
640
  static final class Index {
641
    private List<UnwrappedEag> orderedAddresses;
642
    private int activeElement = 0;
1✔
643
    private boolean enableHappyEyeballs;
644

645
    Index(List<EquivalentAddressGroup> groups, boolean enableHappyEyeballs) {
1✔
646
      this.enableHappyEyeballs = enableHappyEyeballs;
1✔
647
      updateGroups(groups);
1✔
648
    }
1✔
649

650
    public boolean isValid() {
651
      return activeElement < orderedAddresses.size();
1✔
652
    }
653

654
    public boolean isAtBeginning() {
655
      return activeElement == 0;
1✔
656
    }
657

658
    /**
659
     * Move to next address in group.  If last address in group move to first address of next group.
660
     * @return false if went off end of the list, otherwise true
661
     */
662
    public boolean increment() {
663
      if (!isValid()) {
1✔
664
        return false;
1✔
665
      }
666

667
      activeElement++;
1✔
668

669
      return isValid();
1✔
670
    }
671

672
    public void reset() {
673
      activeElement = 0;
1✔
674
    }
1✔
675

676
    public SocketAddress getCurrentAddress() {
677
      if (!isValid()) {
1✔
678
        throw new IllegalStateException("Index is past the end of the address group list");
1✔
679
      }
680
      return orderedAddresses.get(activeElement).address;
1✔
681
    }
682

683
    public Attributes getCurrentEagAttributes() {
684
      if (!isValid()) {
1✔
685
        throw new IllegalStateException("Index is off the end of the address group list");
×
686
      }
687
      return orderedAddresses.get(activeElement).attributes;
1✔
688
    }
689

690
    public List<EquivalentAddressGroup> getCurrentEagAsList() {
691
      return Collections.singletonList(getCurrentEag());
1✔
692
    }
693

694
    private EquivalentAddressGroup getCurrentEag() {
695
      if (!isValid()) {
1✔
696
        throw new IllegalStateException("Index is past the end of the address group list");
×
697
      }
698
      return orderedAddresses.get(activeElement).asEag();
1✔
699
    }
700

701
    /**
702
     * Update to new groups, resetting the current index.
703
     */
704
    public void updateGroups(List<EquivalentAddressGroup> newGroups) {
705
      checkNotNull(newGroups, "newGroups");
1✔
706
      orderedAddresses = enableHappyEyeballs
1✔
707
                             ? updateGroupsHE(newGroups)
1✔
708
                             : updateGroupsNonHE(newGroups);
1✔
709
      reset();
1✔
710
    }
1✔
711

712
    /**
713
     * Returns false if the needle was not found and the current index was left unchanged.
714
     */
715
    public boolean seekTo(SocketAddress needle) {
716
      checkNotNull(needle, "needle");
1✔
717
      for (int i = 0; i < orderedAddresses.size(); i++) {
1✔
718
        if (orderedAddresses.get(i).address.equals(needle)) {
1✔
719
          this.activeElement = i;
1✔
720
          return true;
1✔
721
        }
722
      }
723
      return false;
1✔
724
    }
725

726
    public int size() {
727
      return orderedAddresses.size();
1✔
728
    }
729

730
    private List<UnwrappedEag> updateGroupsNonHE(List<EquivalentAddressGroup> newGroups) {
731
      List<UnwrappedEag> entries = new ArrayList<>();
1✔
732
      for (int g = 0; g < newGroups.size(); g++) {
1✔
733
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
734
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
735
          SocketAddress addr = eag.getAddresses().get(a);
1✔
736
          entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
737
        }
738
      }
739

740
      return entries;
1✔
741
    }
742

743
    private List<UnwrappedEag> updateGroupsHE(List<EquivalentAddressGroup> newGroups) {
744
      Boolean firstIsV6 = null;
1✔
745
      List<UnwrappedEag> v4Entries = new ArrayList<>();
1✔
746
      List<UnwrappedEag> v6Entries = new ArrayList<>();
1✔
747
      for (int g = 0; g <  newGroups.size(); g++) {
1✔
748
        EquivalentAddressGroup eag = newGroups.get(g);
1✔
749
        for (int a = 0; a < eag.getAddresses().size(); a++) {
1✔
750
          SocketAddress addr = eag.getAddresses().get(a);
1✔
751
          boolean isIpV4 = addr instanceof InetSocketAddress
1✔
752
              && ((InetSocketAddress) addr).getAddress() instanceof Inet4Address;
1✔
753
          if (isIpV4) {
1✔
754
            if (firstIsV6 == null) {
1✔
755
              firstIsV6 = false;
1✔
756
            }
757
            v4Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
758
          } else {
759
            if (firstIsV6 == null) {
1✔
760
              firstIsV6 = true;
1✔
761
            }
762
            v6Entries.add(new UnwrappedEag(eag.getAttributes(), addr));
1✔
763
          }
764
        }
765
      }
766

767
      return firstIsV6 != null && firstIsV6
1✔
768
          ? interleave(v6Entries, v4Entries)
1✔
769
          : interleave(v4Entries, v6Entries);
1✔
770
    }
771

772
    private List<UnwrappedEag> interleave(List<UnwrappedEag> firstFamily,
773
                                          List<UnwrappedEag> secondFamily) {
774
      if (firstFamily.isEmpty()) {
1✔
775
        return secondFamily;
1✔
776
      }
777
      if (secondFamily.isEmpty()) {
1✔
778
        return firstFamily;
1✔
779
      }
780

781
      List<UnwrappedEag> result = new ArrayList<>(firstFamily.size() + secondFamily.size());
1✔
782
      for (int i = 0; i < Math.max(firstFamily.size(), secondFamily.size()); i++) {
1✔
783
        if (i < firstFamily.size()) {
1✔
784
          result.add(firstFamily.get(i));
1✔
785
        }
786
        if (i < secondFamily.size()) {
1✔
787
          result.add(secondFamily.get(i));
1✔
788
        }
789
      }
790
      return result;
1✔
791
    }
792

793
    private static final class UnwrappedEag {
794
      private final Attributes attributes;
795
      private final SocketAddress address;
796

797
      public UnwrappedEag(Attributes attributes, SocketAddress address) {
1✔
798
        this.attributes = attributes;
1✔
799
        this.address = address;
1✔
800
      }
1✔
801

802
      private EquivalentAddressGroup asEag() {
803
        return new EquivalentAddressGroup(address, attributes);
1✔
804
      }
805
    }
806
  }
807

808
  @VisibleForTesting
809
  int getIndexLocation() {
810
    return addressIndex.activeElement;
1✔
811
  }
812

813
  @VisibleForTesting
814
  boolean isIndexValid() {
815
    return addressIndex.isValid();
1✔
816
  }
817

818
  private static final class SubchannelData {
819
    private final Subchannel subchannel;
820
    private ConnectivityState state;
821
    private boolean completedConnectivityAttempt = false;
1✔
822
    private ConnectivityStateInfo healthStateInfo = ConnectivityStateInfo.forNonError(IDLE);
1✔
823

824
    public SubchannelData(Subchannel subchannel, ConnectivityState state) {
1✔
825
      this.subchannel = subchannel;
1✔
826
      this.state = state;
1✔
827
    }
1✔
828

829
    public Subchannel getSubchannel() {
830
      return this.subchannel;
1✔
831
    }
832

833
    public ConnectivityState getState() {
834
      return this.state;
1✔
835
    }
836

837
    public boolean isCompletedConnectivityAttempt() {
838
      return completedConnectivityAttempt;
1✔
839
    }
840

841
    private void updateState(ConnectivityState newState) {
842
      this.state = newState;
1✔
843
      if (newState == READY || newState == TRANSIENT_FAILURE) {
1✔
844
        completedConnectivityAttempt = true;
1✔
845
      } else if (newState == IDLE) {
1✔
846
        completedConnectivityAttempt = false;
1✔
847
      }
848
    }
1✔
849

850
    private ConnectivityState getHealthState() {
851
      return healthStateInfo.getState();
1✔
852
    }
853
  }
854

855
  public static final class PickFirstLeafLoadBalancerConfig {
856

857
    @Nullable
858
    public final Boolean shuffleAddressList;
859

860
    // For testing purposes only, not meant to be parsed from a real config.
861
    @Nullable
862
    final Long randomSeed;
863

864
    public PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList) {
865
      this(shuffleAddressList, null);
1✔
866
    }
1✔
867

868
    PickFirstLeafLoadBalancerConfig(@Nullable Boolean shuffleAddressList,
869
        @Nullable Long randomSeed) {
1✔
870
      this.shuffleAddressList = shuffleAddressList;
1✔
871
      this.randomSeed = randomSeed;
1✔
872
    }
1✔
873
  }
874

875
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc