• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20125

23 Dec 2025 04:32AM UTC coverage: 88.706% (-0.01%) from 88.72%
#20125

push

github

web-flow
grpclb: pick_first delegation (#12568)

**Summary of Changes**
This pull request refactors the grpclb load balancer's PICK_FIRST mode
to delegate its logic to a standard pick_first load balancing policy.

The key changes are as follows:
1. **`grpclb/build.gradle`**

Added dependency on `grpc-util` module to access
`ForwardingLoadBalancerHelper`

2. **`grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java`**
- New imports:
LoadBalancer, LoadBalancerProvider, LoadBalancerRegistry,
ResolvedAddresses, FixedResultPicker, ForwardingLoadBalancerHelper
- New fields for PICK_FIRST delegation:
    - pickFirstLbProvider - Provider for creating child pick_first LB
    - pickFirstLb - The child LoadBalancer instance
- pickFirstLbState / pickFirstLbPicker - Track child LB's state and
picker
    - currentPickFirstLoadRecorder - Load recorder for token attachment
- Key behavioral changes:
- updateServerList() PICK_FIRST case: Instead of creating a single
subchannel, it now:
- Creates the child pick_first LB once and then updates it with new
addresses on subsequent updates.
        - Passes addresses to child LB via acceptResolvedAddresses()
- maybeUpdatePicker() PICK_FIRST case: Uses child LB's state and picker
wrapped with ChildLbPickerEntry
- RoundRobinEntry.picked() signature change: Changed from
picked(Metadata) to picked(PickSubchannelArgs) to allow child picker
delegation
- New ChildLbPickerEntry class: Wraps child LB's picker and attaches
TokenAttachingTracerFactory for token propagation
- New PickFirstLbHelper class: Forwarding helper that intercepts
updateBalancingState() to store child state and trigger grpclb picker
updates
- Updated shutdown(), requestConnection(), maybeUseFallbackBackends():
Handle the new child LB delegation model

3. **`grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java`**

- Updated tests to reflect the new delegation behavior:
- Initial state is now CONNECTING (not IDLE) since standard pick_first
eagerly connects
- Tests ... (continued)

35463 of 39978 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.79
/../grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.grpclb;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ConnectivityState.CONNECTING;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.READY;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Objects;
31
import com.google.common.base.Stopwatch;
32
import com.google.protobuf.util.Durations;
33
import io.grpc.Attributes;
34
import io.grpc.ChannelLogger;
35
import io.grpc.ChannelLogger.ChannelLogLevel;
36
import io.grpc.ConnectivityState;
37
import io.grpc.ConnectivityStateInfo;
38
import io.grpc.Context;
39
import io.grpc.EquivalentAddressGroup;
40
import io.grpc.LoadBalancer;
41
import io.grpc.LoadBalancer.FixedResultPicker;
42
import io.grpc.LoadBalancer.Helper;
43
import io.grpc.LoadBalancer.PickResult;
44
import io.grpc.LoadBalancer.PickSubchannelArgs;
45
import io.grpc.LoadBalancer.ResolvedAddresses;
46
import io.grpc.LoadBalancer.Subchannel;
47
import io.grpc.LoadBalancer.SubchannelPicker;
48
import io.grpc.LoadBalancerProvider;
49
import io.grpc.LoadBalancerRegistry;
50
import io.grpc.ManagedChannel;
51
import io.grpc.Metadata;
52
import io.grpc.Status;
53
import io.grpc.SynchronizationContext;
54
import io.grpc.SynchronizationContext.ScheduledHandle;
55
import io.grpc.grpclb.SubchannelPool.PooledSubchannelStateListener;
56
import io.grpc.internal.BackoffPolicy;
57
import io.grpc.internal.TimeProvider;
58
import io.grpc.lb.v1.ClientStats;
59
import io.grpc.lb.v1.InitialLoadBalanceRequest;
60
import io.grpc.lb.v1.InitialLoadBalanceResponse;
61
import io.grpc.lb.v1.LoadBalanceRequest;
62
import io.grpc.lb.v1.LoadBalanceResponse;
63
import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase;
64
import io.grpc.lb.v1.LoadBalancerGrpc;
65
import io.grpc.lb.v1.Server;
66
import io.grpc.lb.v1.ServerList;
67
import io.grpc.stub.StreamObserver;
68
import io.grpc.util.ForwardingLoadBalancerHelper;
69
import java.net.InetAddress;
70
import java.net.InetSocketAddress;
71
import java.net.UnknownHostException;
72
import java.util.ArrayList;
73
import java.util.Arrays;
74
import java.util.Collections;
75
import java.util.HashMap;
76
import java.util.List;
77
import java.util.Map;
78
import java.util.concurrent.ScheduledExecutorService;
79
import java.util.concurrent.TimeUnit;
80
import java.util.concurrent.atomic.AtomicBoolean;
81
import java.util.concurrent.atomic.AtomicReference;
82
import javax.annotation.Nullable;
83
import javax.annotation.concurrent.NotThreadSafe;
84

85
/**
86
 * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}.  Created when
87
 * GrpclbLoadBalancer switches to GRPCLB mode.  Closed and discarded when GrpclbLoadBalancer
88
 * switches away from GRPCLB mode.
89
 */
90
@NotThreadSafe
91
final class GrpclbState {
92
  static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
1✔
93
  private static final Attributes LB_PROVIDED_BACKEND_ATTRS =
94
      Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build();
1✔
95

96
  // Temporary workaround to reduce log spam for a grpclb server that incessantly sends updates
97
  // Tracked by b/198440401
98
  static final boolean SHOULD_LOG_SERVER_LISTS =
1✔
99
      Boolean.parseBoolean(System.getProperty("io.grpc.grpclb.LogServerLists", "true"));
1✔
100

101
  @VisibleForTesting
102
  static final PickResult DROP_PICK_RESULT =
1✔
103
      PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
1✔
104
  @VisibleForTesting
105
  static final Status NO_AVAILABLE_BACKENDS_STATUS =
1✔
106
      Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends");
1✔
107
  @VisibleForTesting
108
  static final Status BALANCER_TIMEOUT_STATUS =
1✔
109
      Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer");
1✔
110
  @VisibleForTesting
111
  static final Status BALANCER_REQUESTED_FALLBACK_STATUS =
1✔
112
      Status.UNAVAILABLE.withDescription("Fallback requested by balancer");
1✔
113
  @VisibleForTesting
114
  static final Status NO_FALLBACK_BACKENDS_STATUS =
1✔
115
      Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found");
1✔
116
  // This error status should never be propagated to RPC failures, as "no backend or balancer
117
  // addresses found" should be directly handled as a name resolution error. So in cases of no
118
  // balancer address, fallback should never fail.
119
  private static final Status NO_LB_ADDRESS_PROVIDED_STATUS =
1✔
120
      Status.UNAVAILABLE.withDescription("No balancer address found");
1✔
121

122

123
  @VisibleForTesting
124
  static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
1✔
125
      @Override
126
      public PickResult picked(PickSubchannelArgs args) {
127
        return PickResult.withNoResult();
×
128
      }
129

130
      @Override
131
      public String toString() {
132
        return "BUFFER_ENTRY";
×
133
      }
134
    };
135
  @VisibleForTesting
136
  static final String NO_USE_AUTHORITY_SUFFIX = "-notIntendedToBeUsed";
137

138
  enum Mode {
1✔
139
    ROUND_ROBIN,
1✔
140
    PICK_FIRST,
1✔
141
  }
142

143
  private final String serviceName;
144
  private final long fallbackTimeoutMs;
145
  private final Helper helper;
146
  private final Context context;
147
  private final SynchronizationContext syncContext;
148
  @Nullable
149
  private final SubchannelPool subchannelPool;
150
  private final TimeProvider time;
151
  private final Stopwatch stopwatch;
152
  private final ScheduledExecutorService timerService;
153

154
  private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
1✔
155
      Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
1✔
156
  private final BackoffPolicy.Provider backoffPolicyProvider;
157
  private final ChannelLogger logger;
158

159
  // Scheduled only once.  Never reset.
160
  @Nullable
161
  private ScheduledHandle fallbackTimer;
162
  private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
1✔
163
  private boolean usingFallbackBackends;
164
  // Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no
165
  // fallback addresses found).
166
  @Nullable
167
  private Status fallbackReason;
168
  // True if the current balancer has returned a serverlist.  Will be reset to false when lost
169
  // connection to a balancer.
170
  private boolean balancerWorking;
171
  @Nullable
172
  private BackoffPolicy lbRpcRetryPolicy;
173
  @Nullable
174
  private ScheduledHandle lbRpcRetryTimer;
175

176
  @Nullable
177
  private ManagedChannel lbCommChannel;
178

179
  @Nullable
180
  private LbStream lbStream;
181
  private Map<List<EquivalentAddressGroup>, Subchannel> subchannels = Collections.emptyMap();
1✔
182
  private final GrpclbConfig config;
183

184
  // Has the same size as the round-robin list from the balancer.
185
  // A drop entry from the round-robin list becomes a DropEntry here.
186
  // A backend entry from the robin-robin list becomes a null here.
187
  private List<DropEntry> dropList = Collections.emptyList();
1✔
188
  // Contains only non-drop, i.e., backends from the round-robin list from the balancer.
189
  private List<BackendEntry> backendList = Collections.emptyList();
1✔
190
  private RoundRobinPicker currentPicker =
1✔
191
      new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
1✔
192
  private boolean requestConnectionPending;
193

194
  // Child LoadBalancer and state for PICK_FIRST mode delegation.
195
  private final LoadBalancerProvider pickFirstLbProvider;
196
  @Nullable
197
  private LoadBalancer pickFirstLb;
198
  private ConnectivityState pickFirstLbState = CONNECTING;
1✔
199
  private SubchannelPicker pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
200
  @Nullable
201
  private GrpclbClientLoadRecorder currentPickFirstLoadRecorder;
202

203
  GrpclbState(
204
      GrpclbConfig config,
205
      Helper helper,
206
      Context context,
207
      SubchannelPool subchannelPool,
208
      TimeProvider time,
209
      Stopwatch stopwatch,
210
      BackoffPolicy.Provider backoffPolicyProvider) {
1✔
211
    this.config = checkNotNull(config, "config");
1✔
212
    this.helper = checkNotNull(helper, "helper");
1✔
213
    this.context = checkNotNull(context, "context");
1✔
214
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
215
    if (config.getMode() == Mode.ROUND_ROBIN) {
1✔
216
      this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
1✔
217
      subchannelPool.registerListener(
1✔
218
          new PooledSubchannelStateListener() {
1✔
219
            @Override
220
            public void onSubchannelState(
221
                Subchannel subchannel, ConnectivityStateInfo newState) {
222
              handleSubchannelState(subchannel, newState);
1✔
223
            }
1✔
224
          });
225
    } else {
226
      this.subchannelPool = null;
1✔
227
    }
228
    this.pickFirstLbProvider = checkNotNull(
1✔
229
        LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first"),
1✔
230
        "pick_first balancer not available");
231
    this.time = checkNotNull(time, "time provider");
1✔
232
    this.stopwatch = checkNotNull(stopwatch, "stopwatch");
1✔
233
    this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
1✔
234
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
235
    if (config.getServiceName() != null) {
1✔
236
      this.serviceName = config.getServiceName();
1✔
237
    } else {
238
      this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
1✔
239
    }
240
    this.fallbackTimeoutMs = config.getFallbackTimeoutMs();
1✔
241
    this.logger = checkNotNull(helper.getChannelLogger(), "logger");
1✔
242
    logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Created", serviceName);
1✔
243
  }
1✔
244

245
  void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
246
    if (newState.getState() == SHUTDOWN || !subchannels.containsValue(subchannel)) {
1✔
247
      return;
1✔
248
    }
249
    if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
1✔
250
      subchannel.requestConnection();
1✔
251
    }
252
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
253
      helper.refreshNameResolution();
1✔
254
    }
255

256
    AtomicReference<ConnectivityStateInfo> stateInfoRef =
1✔
257
        subchannel.getAttributes().get(STATE_INFO);
1✔
258
    // If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at
259
    // every moment which causes RR to stay in CONNECTING. It's better to keep the TRANSIENT_FAILURE
260
    // state in that case so that fail-fast RPCs can fail fast.
261
    boolean keepState =
1✔
262
        config.getMode() == Mode.ROUND_ROBIN
1✔
263
        && stateInfoRef.get().getState() == TRANSIENT_FAILURE
1✔
264
        && (newState.getState() == CONNECTING || newState.getState() == IDLE);
1✔
265
    if (!keepState) {
1✔
266
      stateInfoRef.set(newState);
1✔
267
      maybeUseFallbackBackends();
1✔
268
      maybeUpdatePicker();
1✔
269
    }
270
  }
1✔
271

272
  /**
273
   * Handle new addresses of the balancer and backends from the resolver, and create connection if
274
   * not yet connected.
275
   */
276
  void handleAddresses(
277
      List<EquivalentAddressGroup> newLbAddressGroups,
278
      List<EquivalentAddressGroup> newBackendServers) {
279
    logger.log(
1✔
280
        ChannelLogLevel.DEBUG,
281
        "[grpclb-<{0}>] Resolved addresses: lb addresses {1}, backends: {2}",
282
        serviceName,
283
        newLbAddressGroups,
284
        newBackendServers);
285
    fallbackBackendList = newBackendServers;
1✔
286
    if (newLbAddressGroups.isEmpty()) {
1✔
287
      // No balancer address: close existing balancer connection and prepare to enter fallback
288
      // mode. If there is no successful backend connection, it enters fallback mode immediately.
289
      // Otherwise, fallback does not happen until backend connections are lost. This behavior
290
      // might be different from other languages (e.g., existing balancer connection is not
291
      // closed in C-core), but we aren't changing it at this time.
292
      shutdownLbComm();
1✔
293
      if (!usingFallbackBackends) {
1✔
294
        fallbackReason = NO_LB_ADDRESS_PROVIDED_STATUS;
1✔
295
        cancelFallbackTimer();
1✔
296
        maybeUseFallbackBackends();
1✔
297
      }
298
    } else {
299
      startLbComm(newLbAddressGroups);
1✔
300
      // Avoid creating a new RPC just because the addresses were updated, as it can cause a
301
      // stampeding herd. The current RPC may be on a connection to an address not present in
302
      // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an
303
      // outdated backend, we could choose to re-create the RPC.
304
      if (lbStream == null) {
1✔
305
        cancelLbRpcRetryTimer();
1✔
306
        startLbRpc();
1✔
307
      }
308
      // Start the fallback timer if it's never started and we are not already using fallback
309
      // backends.
310
      if (fallbackTimer == null && !usingFallbackBackends) {
1✔
311
        fallbackTimer =
1✔
312
            syncContext.schedule(
1✔
313
                new FallbackModeTask(BALANCER_TIMEOUT_STATUS),
314
                fallbackTimeoutMs,
315
                TimeUnit.MILLISECONDS,
316
                timerService);
317
      }
318
    }
319
    if (usingFallbackBackends) {
1✔
320
      // Populate the new fallback backends to round-robin list.
321
      useFallbackBackends();
1✔
322
    }
323
    maybeUpdatePicker();
1✔
324
  }
1✔
325

326
  void requestConnection() {
327
    requestConnectionPending = true;
1✔
328
    // For PICK_FIRST mode with delegation, forward to the child LB.
329
    if (config.getMode() == Mode.PICK_FIRST && pickFirstLb != null) {
1✔
330
      pickFirstLb.requestConnection();
1✔
331
      requestConnectionPending = false;
1✔
332
      return;
1✔
333
    }
334
    for (RoundRobinEntry entry : currentPicker.pickList) {
×
335
      if (entry instanceof IdleSubchannelEntry) {
×
336
        ((IdleSubchannelEntry) entry).subchannel.requestConnection();
×
337
        requestConnectionPending = false;
×
338
      }
339
    }
×
340
  }
×
341

342
  private void maybeUseFallbackBackends() {
343
    if (balancerWorking || usingFallbackBackends) {
1✔
344
      return;
1✔
345
    }
346
    // Balancer RPC should have either been broken or timed out.
347
    checkState(fallbackReason != null, "no reason to fallback");
1✔
348
    // For PICK_FIRST mode with delegation, check the child LB's state.
349
    if (config.getMode() == Mode.PICK_FIRST) {
1✔
350
      if (pickFirstLb != null && pickFirstLbState == READY) {
1✔
351
        return;
×
352
      }
353
      // For PICK_FIRST, we don't have individual subchannel states to use as fallback reason.
354
    } else {
355
      for (Subchannel subchannel : subchannels.values()) {
1✔
356
        ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
1✔
357
        if (stateInfo.getState() == READY) {
1✔
358
          return;
1✔
359
        }
360
        // If we do have balancer-provided backends, use one of its error in the error message if
361
        // fail to fallback.
362
        if (stateInfo.getState() == TRANSIENT_FAILURE) {
1✔
363
          fallbackReason = stateInfo.getStatus();
1✔
364
        }
365
      }
1✔
366
    }
367
    // Fallback conditions met
368
    useFallbackBackends();
1✔
369
  }
1✔
370

371
  /**
372
   * Populate backend servers to be used from the fallback backends.
373
   */
374
  private void useFallbackBackends() {
375
    usingFallbackBackends = true;
1✔
376
    logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Using fallback backends", serviceName);
1✔
377

378
    List<DropEntry> newDropList = new ArrayList<>();
1✔
379
    List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
1✔
380
    for (EquivalentAddressGroup eag : fallbackBackendList) {
1✔
381
      newDropList.add(null);
1✔
382
      newBackendAddrList.add(new BackendAddressGroup(eag, null));
1✔
383
    }
1✔
384
    updateServerList(newDropList, newBackendAddrList, null);
1✔
385
  }
1✔
386

387
  private void shutdownLbComm() {
388
    if (lbCommChannel != null) {
1✔
389
      lbCommChannel.shutdown();
1✔
390
      lbCommChannel = null;
1✔
391
    }
392
    shutdownLbRpc();
1✔
393
  }
1✔
394

395
  private void shutdownLbRpc() {
396
    if (lbStream != null) {
1✔
397
      lbStream.close(Status.CANCELLED.withDescription("balancer shutdown").asException());
1✔
398
      // lbStream will be set to null in LbStream.cleanup()
399
    }
400
  }
1✔
401

402
  private void startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags) {
403
    checkNotNull(overrideAuthorityEags, "overrideAuthorityEags");
1✔
404
    assert !overrideAuthorityEags.isEmpty();
1✔
405
    String doNotUseAuthority = overrideAuthorityEags.get(0).getAttributes()
1✔
406
        .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE) + NO_USE_AUTHORITY_SUFFIX;
1✔
407
    if (lbCommChannel == null) {
1✔
408
      lbCommChannel = helper.createOobChannel(overrideAuthorityEags, doNotUseAuthority);
1✔
409
      logger.log(
1✔
410
          ChannelLogLevel.DEBUG,
411
          "[grpclb-<{0}>] Created grpclb channel: EAG={1}",
412
          serviceName,
413
          overrideAuthorityEags);
414
    } else {
415
      helper.updateOobChannelAddresses(lbCommChannel, overrideAuthorityEags);
1✔
416
    }
417
  }
1✔
418

419
  private void startLbRpc() {
420
    checkState(lbStream == null, "previous lbStream has not been cleared yet");
1✔
421
    LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
1✔
422
    lbStream = new LbStream(stub);
1✔
423
    Context prevContext = context.attach();
1✔
424
    try {
425
      lbStream.start();
1✔
426
    } finally {
427
      context.detach(prevContext);
1✔
428
    }
429
    stopwatch.reset().start();
1✔
430

431
    LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
1✔
432
        .setInitialRequest(InitialLoadBalanceRequest.newBuilder()
1✔
433
            .setName(serviceName).build())
1✔
434
        .build();
1✔
435
    logger.log(
1✔
436
        ChannelLogLevel.DEBUG,
437
        "[grpclb-<{0}>] Sent initial grpclb request {1}", serviceName, initRequest);
438
    try {
439
      lbStream.lbRequestWriter.onNext(initRequest);
1✔
440
    } catch (Exception e) {
×
441
      lbStream.close(e);
×
442
    }
1✔
443
  }
1✔
444

445
  private void cancelFallbackTimer() {
446
    if (fallbackTimer != null) {
1✔
447
      fallbackTimer.cancel();
1✔
448
    }
449
  }
1✔
450

451
  private void cancelLbRpcRetryTimer() {
452
    if (lbRpcRetryTimer != null) {
1✔
453
      lbRpcRetryTimer.cancel();
1✔
454
      lbRpcRetryTimer = null;
1✔
455
    }
456
  }
1✔
457

458
  void shutdown() {
459
    logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Shutdown", serviceName);
1✔
460
    shutdownLbComm();
1✔
461
    switch (config.getMode()) {
1✔
462
      case ROUND_ROBIN:
463
        // We close the subchannels through subchannelPool instead of helper just for convenience of
464
        // testing.
465
        for (Subchannel subchannel : subchannels.values()) {
1✔
466
          returnSubchannelToPool(subchannel);
1✔
467
        }
1✔
468
        subchannelPool.clear();
1✔
469
        break;
1✔
470
      case PICK_FIRST:
471
        // Shutdown the child pick_first LB which manages its own subchannels.
472
        if (pickFirstLb != null) {
1✔
473
          pickFirstLb.shutdown();
1✔
474
          pickFirstLb = null;
1✔
475
        }
476
        break;
477
      default:
478
        throw new AssertionError("Missing case for " + config.getMode());
×
479
    }
480
    subchannels = Collections.emptyMap();
1✔
481
    cancelFallbackTimer();
1✔
482
    cancelLbRpcRetryTimer();
1✔
483
  }
1✔
484

485
  void propagateError(Status status) {
486
    logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Error: {1}", serviceName, status);
1✔
487
    if (backendList.isEmpty()) {
1✔
488
      Status error =
1✔
489
          Status.UNAVAILABLE.withCause(status.getCause()).withDescription(status.getDescription());
1✔
490
      maybeUpdatePicker(
1✔
491
          TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(error))));
1✔
492
    }
493
  }
1✔
494

495
  private void returnSubchannelToPool(Subchannel subchannel) {
496
    subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get());
1✔
497
  }
1✔
498

499
  @VisibleForTesting
500
  @Nullable
501
  GrpclbClientLoadRecorder getLoadRecorder() {
502
    if (lbStream == null) {
1✔
503
      return null;
×
504
    }
505
    return lbStream.loadRecorder;
1✔
506
  }
507

508
  /**
509
   * Populate backend servers to be used based on the given list of addresses.
510
   */
511
  private void updateServerList(
512
      List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
513
      @Nullable GrpclbClientLoadRecorder loadRecorder) {
514
    HashMap<List<EquivalentAddressGroup>, Subchannel> newSubchannelMap =
1✔
515
        new HashMap<>();
516
    List<BackendEntry> newBackendList = new ArrayList<>();
1✔
517

518
    switch (config.getMode()) {
1✔
519
      case ROUND_ROBIN:
520
        for (BackendAddressGroup backendAddr : newBackendAddrList) {
1✔
521
          EquivalentAddressGroup eag = backendAddr.getAddresses();
1✔
522
          List<EquivalentAddressGroup> eagAsList = Collections.singletonList(eag);
1✔
523
          Subchannel subchannel = newSubchannelMap.get(eagAsList);
1✔
524
          if (subchannel == null) {
1✔
525
            subchannel = subchannels.get(eagAsList);
1✔
526
            if (subchannel == null) {
1✔
527
              subchannel = subchannelPool.takeOrCreateSubchannel(eag, createSubchannelAttrs());
1✔
528
              subchannel.requestConnection();
1✔
529
            }
530
            newSubchannelMap.put(eagAsList, subchannel);
1✔
531
          }
532
          BackendEntry entry;
533
          // Only picks with tokens are reported to LoadRecorder
534
          if (backendAddr.getToken() == null) {
1✔
535
            entry = new BackendEntry(subchannel);
1✔
536
          } else {
537
            entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken());
1✔
538
          }
539
          newBackendList.add(entry);
1✔
540
        }
1✔
541
        // Close Subchannels whose addresses have been delisted
542
        for (Map.Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) {
1✔
543
          List<EquivalentAddressGroup> eagList = entry.getKey();
1✔
544
          if (!newSubchannelMap.containsKey(eagList)) {
1✔
545
            returnSubchannelToPool(entry.getValue());
1✔
546
          }
547
        }
1✔
548
        subchannels = Collections.unmodifiableMap(newSubchannelMap);
1✔
549
        break;
1✔
550
      case PICK_FIRST:
551
        // Delegate to child pick_first LB for address management.
552
        // Shutdown existing child LB if addresses become empty.
553
        if (newBackendAddrList.isEmpty()) {
1✔
554
          if (pickFirstLb != null) {
1✔
555
            pickFirstLb.shutdown();
1✔
556
            pickFirstLb = null;
1✔
557
          }
558
          break;
559
        }
560
        List<EquivalentAddressGroup> eagList = new ArrayList<>();
1✔
561
        // Attach tokens to EAG attributes for TokenAttachingTracerFactory to retrieve.
562
        for (BackendAddressGroup bag : newBackendAddrList) {
1✔
563
          EquivalentAddressGroup origEag = bag.getAddresses();
1✔
564
          Attributes eagAttrs = origEag.getAttributes();
1✔
565
          if (bag.getToken() != null) {
1✔
566
            eagAttrs = eagAttrs.toBuilder()
1✔
567
                .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, bag.getToken()).build();
1✔
568
          }
569
          eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
1✔
570
        }
1✔
571

572
        if (pickFirstLb == null) {
1✔
573
          pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper());
1✔
574
        }
575

576
        // Pass addresses to child LB.
577
        pickFirstLb.acceptResolvedAddresses(
1✔
578
            ResolvedAddresses.newBuilder()
1✔
579
                .setAddresses(eagList)
1✔
580
                .build());
1✔
581
        if (requestConnectionPending) {
1✔
582
          pickFirstLb.requestConnection();
×
583
          requestConnectionPending = false;
×
584
        }
585
        // Store the load recorder for token attachment.
586
        currentPickFirstLoadRecorder = loadRecorder;
1✔
587
        break;
1✔
588
      default:
589
        throw new AssertionError("Missing case for " + config.getMode());
×
590
    }
591

592
    dropList = Collections.unmodifiableList(newDropList);
1✔
593
    backendList = Collections.unmodifiableList(newBackendList);
1✔
594
  }
1✔
595

596
  @VisibleForTesting
597
  class FallbackModeTask implements Runnable {
598
    private final Status reason;
599

600
    private FallbackModeTask(Status reason) {
1✔
601
      this.reason = reason;
1✔
602
    }
1✔
603

604
    @Override
605
    public void run() {
606
      // Timer should have been cancelled if entered fallback early.
607
      checkState(!usingFallbackBackends, "already in fallback");
1✔
608
      fallbackReason = reason;
1✔
609
      maybeUseFallbackBackends();
1✔
610
      maybeUpdatePicker();
1✔
611
    }
1✔
612
  }
613

614
  @VisibleForTesting
615
  class LbRpcRetryTask implements Runnable {
1✔
616
    @Override
617
    public void run() {
618
      startLbRpc();
1✔
619
    }
1✔
620
  }
621

622
  @VisibleForTesting
623
  static class LoadReportingTask implements Runnable {
624
    private final LbStream stream;
625

626
    LoadReportingTask(LbStream stream) {
1✔
627
      this.stream = stream;
1✔
628
    }
1✔
629

630
    @Override
631
    public void run() {
632
      stream.loadReportTimer = null;
1✔
633
      stream.sendLoadReport();
1✔
634
    }
1✔
635
  }
636

637
  private class LbStream implements StreamObserver<LoadBalanceResponse> {
638
    final GrpclbClientLoadRecorder loadRecorder;
639
    final LoadBalancerGrpc.LoadBalancerStub stub;
640
    StreamObserver<LoadBalanceRequest> lbRequestWriter;
641

642
    // These fields are only accessed from helper.runSerialized()
643
    boolean initialResponseReceived;
644
    boolean closed;
645
    long loadReportIntervalMillis = -1;
1✔
646
    ScheduledHandle loadReportTimer;
647

648
    LbStream(LoadBalancerGrpc.LoadBalancerStub stub) {
1✔
649
      this.stub = checkNotNull(stub, "stub");
1✔
650
      // Stats data only valid for current LbStream.  We do not carry over data from previous
651
      // stream.
652
      loadRecorder = new GrpclbClientLoadRecorder(time);
1✔
653
    }
1✔
654

655
    void start() {
656
      lbRequestWriter = stub.withWaitForReady().balanceLoad(this);
1✔
657
    }
1✔
658

659
    @Override public void onNext(final LoadBalanceResponse response) {
660
      syncContext.execute(new Runnable() {
1✔
661
          @Override
662
          public void run() {
663
            handleResponse(response);
1✔
664
          }
1✔
665
        });
666
    }
1✔
667

668
    @Override public void onError(final Throwable error) {
669
      syncContext.execute(new Runnable() {
1✔
670
          @Override
671
          public void run() {
672
            handleStreamClosed(Status.fromThrowable(error)
1✔
673
                .augmentDescription("Stream to GRPCLB LoadBalancer had an error"));
1✔
674
          }
1✔
675
        });
676
    }
1✔
677

678
    @Override public void onCompleted() {
679
      syncContext.execute(new Runnable() {
1✔
680
          @Override
681
          public void run() {
682
            handleStreamClosed(
1✔
683
                Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed"));
1✔
684
          }
1✔
685
        });
686
    }
1✔
687

688
    // Following methods must be run in helper.runSerialized()
689

690
    private void sendLoadReport() {
691
      if (closed) {
1✔
692
        return;
×
693
      }
694
      ClientStats stats = loadRecorder.generateLoadReport();
1✔
695
      // TODO(zhangkun83): flow control?
696
      try {
697
        lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build());
1✔
698
        scheduleNextLoadReport();
1✔
699
      } catch (Exception e) {
×
700
        close(e);
×
701
      }
1✔
702
    }
1✔
703

704
    private void scheduleNextLoadReport() {
705
      if (loadReportIntervalMillis > 0) {
1✔
706
        loadReportTimer = syncContext.schedule(
1✔
707
            new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS,
708
            timerService);
1✔
709
      }
710
    }
1✔
711

712
    private void handleResponse(LoadBalanceResponse response) {
713
      if (closed) {
1✔
714
        return;
×
715
      }
716

717
      LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase();
1✔
718
      if (!initialResponseReceived) {
1✔
719
        logger.log(
1✔
720
            ChannelLogLevel.INFO,
721
            "[grpclb-<{0}>] Got an LB initial response: {1}", serviceName, response);
1✔
722
        if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) {
1✔
723
          logger.log(
×
724
              ChannelLogLevel.WARNING,
725
              "[grpclb-<{0}>] Received a response without initial response",
726
              serviceName);
×
727
          return;
×
728
        }
729
        initialResponseReceived = true;
1✔
730
        InitialLoadBalanceResponse initialResponse = response.getInitialResponse();
1✔
731
        loadReportIntervalMillis =
1✔
732
            Durations.toMillis(initialResponse.getClientStatsReportInterval());
1✔
733
        scheduleNextLoadReport();
1✔
734
        return;
1✔
735
      }
736
      if (SHOULD_LOG_SERVER_LISTS) {
1✔
737
        logger.log(
1✔
738
            ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response: {1}", serviceName, response);
1✔
739
      } else {
740
        logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response", serviceName);
×
741
      }
742

743
      if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) {
1✔
744
        // Force entering fallback requested by balancer.
745
        cancelFallbackTimer();
1✔
746
        fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS;
1✔
747
        useFallbackBackends();
1✔
748
        maybeUpdatePicker();
1✔
749
        return;
1✔
750
      } else if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) {
1✔
751
        logger.log(
1✔
752
            ChannelLogLevel.WARNING,
753
            "[grpclb-<{0}>] Ignoring unexpected response type: {1}",
754
            serviceName,
1✔
755
            typeCase);
756
        return;
1✔
757
      }
758

759
      balancerWorking = true;
1✔
760
      // TODO(zhangkun83): handle delegate from initialResponse
761
      ServerList serverList = response.getServerList();
1✔
762
      List<DropEntry> newDropList = new ArrayList<>();
1✔
763
      List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
1✔
764
      // Construct the new collections. Create new Subchannels when necessary.
765
      for (Server server : serverList.getServersList()) {
1✔
766
        String token = server.getLoadBalanceToken();
1✔
767
        if (server.getDrop()) {
1✔
768
          newDropList.add(new DropEntry(loadRecorder, token));
1✔
769
        } else {
770
          newDropList.add(null);
1✔
771
          InetSocketAddress address;
772
          try {
773
            address = new InetSocketAddress(
1✔
774
                InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
1✔
775
          } catch (UnknownHostException e) {
×
776
            propagateError(
×
777
                Status.UNAVAILABLE
778
                    .withDescription("Invalid backend address: " + server)
×
779
                    .withCause(e));
×
780
            continue;
×
781
          }
1✔
782
          // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of
783
          // TLS, with Netty.
784
          EquivalentAddressGroup eag =
1✔
785
              new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS);
1✔
786
          newBackendAddrList.add(new BackendAddressGroup(eag, token));
1✔
787
        }
788
      }
1✔
789
      // Exit fallback as soon as a new server list is received from the balancer.
790
      usingFallbackBackends = false;
1✔
791
      fallbackReason = null;
1✔
792
      cancelFallbackTimer();
1✔
793
      updateServerList(newDropList, newBackendAddrList, loadRecorder);
1✔
794
      maybeUpdatePicker();
1✔
795
    }
1✔
796

797
    private void handleStreamClosed(Status error) {
798
      checkArgument(!error.isOk(), "unexpected OK status");
1✔
799
      if (closed) {
1✔
800
        return;
1✔
801
      }
802
      closed = true;
1✔
803
      cleanUp();
1✔
804
      propagateError(error);
1✔
805
      balancerWorking = false;
1✔
806
      fallbackReason = error;
1✔
807
      cancelFallbackTimer();
1✔
808
      maybeUseFallbackBackends();
1✔
809
      maybeUpdatePicker();
1✔
810

811
      long delayNanos = 0;
1✔
812
      if (initialResponseReceived || lbRpcRetryPolicy == null) {
1✔
813
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
814
        // has never been initialized.
815
        lbRpcRetryPolicy = backoffPolicyProvider.get();
1✔
816
      }
817
      // Backoff only when balancer wasn't working previously.
818
      if (!initialResponseReceived) {
1✔
819
        // The back-off policy determines the interval between consecutive RPC upstarts, thus the
820
        // actual delay may be smaller than the value from the back-off policy, or even negative,
821
        // depending how much time was spent in the previous RPC.
822
        delayNanos =
1✔
823
            lbRpcRetryPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
824
      }
825
      if (delayNanos <= 0) {
1✔
826
        startLbRpc();
1✔
827
      } else {
828
        lbRpcRetryTimer =
1✔
829
            syncContext.schedule(new LbRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS,
1✔
830
                timerService);
1✔
831
      }
832

833
      helper.refreshNameResolution();
1✔
834
    }
1✔
835

836
    void close(Exception error) {
837
      if (closed) {
1✔
838
        return;
×
839
      }
840
      closed = true;
1✔
841
      cleanUp();
1✔
842
      lbRequestWriter.onError(error);
1✔
843
    }
1✔
844

845
    private void cleanUp() {
846
      if (loadReportTimer != null) {
1✔
847
        loadReportTimer.cancel();
1✔
848
        loadReportTimer = null;
1✔
849
      }
850
      if (lbStream == this) {
1✔
851
        lbStream = null;
1✔
852
      }
853
    }
1✔
854
  }
855

856
  /**
857
   * Make and use a picker out of the current lists and the states of subchannels if they have
858
   * changed since the last picker created.
859
   */
860
  private void maybeUpdatePicker() {
861
    List<RoundRobinEntry> pickList;
862
    ConnectivityState state;
863
    // For PICK_FIRST mode with delegation, check if child LB exists instead of backendList.
864
    boolean hasBackends = config.getMode() == Mode.PICK_FIRST
1✔
865
        ? pickFirstLb != null
1✔
866
        : !backendList.isEmpty();
1✔
867
    if (!hasBackends) {
1✔
868
      // Note balancer (is working) may enforce using fallback backends, and that fallback may
869
      // fail. So we should check if currently in fallback first.
870
      if (usingFallbackBackends) {
1✔
871
        Status error =
1✔
872
            NO_FALLBACK_BACKENDS_STATUS
873
                .withCause(fallbackReason.getCause())
1✔
874
                .augmentDescription(fallbackReason.getDescription());
1✔
875
        pickList = Collections.<RoundRobinEntry>singletonList(new ErrorEntry(error));
1✔
876
        state = TRANSIENT_FAILURE;
1✔
877
      } else if (balancerWorking)  {
1✔
878
        pickList =
1✔
879
            Collections.<RoundRobinEntry>singletonList(
1✔
880
                new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS));
881
        state = TRANSIENT_FAILURE;
1✔
882
      } else {  // still waiting for balancer
883
        pickList = Collections.singletonList(BUFFER_ENTRY);
1✔
884
        state = CONNECTING;
1✔
885
      }
886
      maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
1✔
887
      return;
1✔
888
    }
889
    switch (config.getMode()) {
1✔
890
      case ROUND_ROBIN:
891
        pickList = new ArrayList<>(backendList.size());
1✔
892
        Status error = null;
1✔
893
        boolean hasPending = false;
1✔
894
        for (BackendEntry entry : backendList) {
1✔
895
          Subchannel subchannel = entry.subchannel;
1✔
896
          Attributes attrs = subchannel.getAttributes();
1✔
897
          ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
1✔
898
          if (stateInfo.getState() == READY) {
1✔
899
            pickList.add(entry);
1✔
900
          } else if (stateInfo.getState() == TRANSIENT_FAILURE) {
1✔
901
            error = stateInfo.getStatus();
1✔
902
          } else {
903
            hasPending = true;
1✔
904
          }
905
        }
1✔
906
        if (pickList.isEmpty()) {
1✔
907
          if (hasPending) {
1✔
908
            pickList.add(BUFFER_ENTRY);
1✔
909
            state = CONNECTING;
1✔
910
          } else {
911
            pickList.add(new ErrorEntry(error));
1✔
912
            state = TRANSIENT_FAILURE;
1✔
913
          }
914
        } else {
915
          state = READY;
1✔
916
        }
917
        break;
1✔
918
      case PICK_FIRST: {
919
        // Use child LB's state and picker. Wrap the picker for token attachment.
920
        state = pickFirstLbState;
1✔
921
        TokenAttachingTracerFactory tracerFactory =
1✔
922
            new TokenAttachingTracerFactory(currentPickFirstLoadRecorder);
923
        pickList = Collections.<RoundRobinEntry>singletonList(
1✔
924
            new ChildLbPickerEntry(pickFirstLbPicker, tracerFactory));
925
        break;
1✔
926
      }
927
      default:
928
        throw new AssertionError("Missing case for " + config.getMode());
×
929
    }
930
    maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
1✔
931
  }
1✔
932

933
  /**
934
   * Update the given picker to the helper if it's different from the current one.
935
   */
936
  private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) {
937
    // Discard the new picker if we are sure it won't make any difference, in order to save
938
    // re-processing pending streams, and avoid unnecessary resetting of the pointer in
939
    // RoundRobinPicker.
940
    if (picker.dropList.equals(currentPicker.dropList)
1✔
941
        && picker.pickList.equals(currentPicker.pickList)) {
1✔
942
      return;
1✔
943
    }
944
    currentPicker = picker;
1✔
945
    helper.updateBalancingState(state, picker);
1✔
946
  }
1✔
947

948
  private static Attributes createSubchannelAttrs() {
949
    return Attributes.newBuilder()
1✔
950
        .set(STATE_INFO,
1✔
951
            new AtomicReference<>(
952
                ConnectivityStateInfo.forNonError(IDLE)))
1✔
953
        .build();
1✔
954
  }
955

956
  @VisibleForTesting
957
  static final class DropEntry {
958
    private final GrpclbClientLoadRecorder loadRecorder;
959
    private final String token;
960

961
    DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) {
1✔
962
      this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
1✔
963
      this.token = checkNotNull(token, "token");
1✔
964
    }
1✔
965

966
    PickResult picked() {
967
      loadRecorder.recordDroppedRequest(token);
1✔
968
      return DROP_PICK_RESULT;
1✔
969
    }
970

971
    @Override
972
    public String toString() {
973
      // This is printed in logs.  Only include useful information.
974
      return "drop(" + token + ")";
×
975
    }
976

977
    @Override
978
    public int hashCode() {
979
      return Objects.hashCode(loadRecorder, token);
×
980
    }
981

982
    @Override
983
    public boolean equals(Object other) {
984
      if (!(other instanceof DropEntry)) {
1✔
985
        return false;
1✔
986
      }
987
      DropEntry that = (DropEntry) other;
1✔
988
      return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token);
1✔
989
    }
990
  }
991

992
  @VisibleForTesting
993
  interface RoundRobinEntry {
994
    PickResult picked(PickSubchannelArgs args);
995
  }
996

997
  @VisibleForTesting
998
  static final class BackendEntry implements RoundRobinEntry {
999
    final Subchannel subchannel;
1000
    @VisibleForTesting
1001
    final PickResult result;
1002
    @Nullable
1003
    private final String token;
1004

1005
    /**
1006
     * For ROUND_ROBIN: creates a BackendEntry whose usage will be reported to load recorder.
1007
     */
1008
    BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
1✔
1009
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
1010
      this.result =
1✔
1011
          PickResult.withSubchannel(subchannel, checkNotNull(loadRecorder, "loadRecorder"));
1✔
1012
      this.token = checkNotNull(token, "token");
1✔
1013
    }
1✔
1014

1015
    /**
1016
     * For ROUND_ROBIN/PICK_FIRST: creates a BackendEntry whose usage will not be reported.
1017
     */
1018
    BackendEntry(Subchannel subchannel) {
1✔
1019
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
1020
      this.result = PickResult.withSubchannel(subchannel);
1✔
1021
      this.token = null;
1✔
1022
    }
1✔
1023

1024
    /**
1025
     * For PICK_FIRST: creates a BackendEntry that includes all addresses.
1026
     */
1027
    BackendEntry(Subchannel subchannel, TokenAttachingTracerFactory tracerFactory) {
×
1028
      this.subchannel = checkNotNull(subchannel, "subchannel");
×
1029
      this.result =
×
1030
          PickResult.withSubchannel(subchannel, checkNotNull(tracerFactory, "tracerFactory"));
×
1031
      this.token = null;
×
1032
    }
×
1033

1034
    @Override
1035
    public PickResult picked(PickSubchannelArgs args) {
1036
      Metadata headers = args.getHeaders();
1✔
1037
      headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
1✔
1038
      if (token != null) {
1✔
1039
        headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
1✔
1040
      }
1041
      return result;
1✔
1042
    }
1043

1044
    @Override
1045
    public String toString() {
1046
      // This is printed in logs.  Only give out useful information.
1047
      return "[" + subchannel.getAllAddresses().toString() + "(" + token + ")]";
×
1048
    }
1049

1050
    @Override
1051
    public int hashCode() {
1052
      return Objects.hashCode(result, token);
×
1053
    }
1054

1055
    @Override
1056
    public boolean equals(Object other) {
1057
      if (!(other instanceof BackendEntry)) {
1✔
1058
        return false;
1✔
1059
      }
1060
      BackendEntry that = (BackendEntry) other;
1✔
1061
      return Objects.equal(result, that.result) && Objects.equal(token, that.token);
1✔
1062
    }
1063
  }
1064

1065
  @VisibleForTesting
1066
  static final class IdleSubchannelEntry implements RoundRobinEntry {
1067
    private final SynchronizationContext syncContext;
1068
    private final Subchannel subchannel;
1069
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
1070

1071
    IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) {
1✔
1072
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
1073
      this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
1074
    }
1✔
1075

1076
    @Override
1077
    public PickResult picked(PickSubchannelArgs args) {
1078
      if (connectionRequested.compareAndSet(false, true)) {
1✔
1079
        syncContext.execute(new Runnable() {
1✔
1080
            @Override
1081
            public void run() {
1082
              subchannel.requestConnection();
1✔
1083
            }
1✔
1084
          });
1085
      }
1086
      return PickResult.withNoResult();
1✔
1087
    }
1088

1089
    @Override
1090
    public String toString() {
1091
      // This is printed in logs.  Only give out useful information.
1092
      return "(idle)[" + subchannel.getAllAddresses().toString() + "]";
×
1093
    }
1094

1095
    @Override
1096
    public int hashCode() {
1097
      return Objects.hashCode(subchannel, syncContext);
×
1098
    }
1099

1100
    @Override
1101
    public boolean equals(Object other) {
1102
      if (!(other instanceof IdleSubchannelEntry)) {
×
1103
        return false;
×
1104
      }
1105
      IdleSubchannelEntry that = (IdleSubchannelEntry) other;
×
1106
      return Objects.equal(subchannel, that.subchannel)
×
1107
          && Objects.equal(syncContext, that.syncContext);
×
1108
    }
1109
  }
1110

1111
  @VisibleForTesting
1112
  static final class ErrorEntry implements RoundRobinEntry {
1113
    final PickResult result;
1114

1115
    ErrorEntry(Status status) {
1✔
1116
      result = PickResult.withError(status);
1✔
1117
    }
1✔
1118

1119
    @Override
1120
    public PickResult picked(PickSubchannelArgs args) {
1121
      return result;
1✔
1122
    }
1123

1124
    @Override
1125
    public int hashCode() {
1126
      return Objects.hashCode(result);
×
1127
    }
1128

1129
    @Override
1130
    public boolean equals(Object other) {
1131
      if (!(other instanceof ErrorEntry)) {
1✔
1132
        return false;
1✔
1133
      }
1134
      return Objects.equal(result, ((ErrorEntry) other).result);
1✔
1135
    }
1136

1137
    @Override
1138
    public String toString() {
1139
      // This is printed in logs.  Only include useful information.
1140
      return result.getStatus().toString();
×
1141
    }
1142
  }
1143

1144
  /**
1145
   * Entry that wraps a child LB's picker for PICK_FIRST mode delegation.
1146
   * Attaches TokenAttachingTracerFactory to the pick result for token propagation.
1147
   */
1148
  @VisibleForTesting
1149
  static final class ChildLbPickerEntry implements RoundRobinEntry {
1150
    private final SubchannelPicker childPicker;
1151
    private final TokenAttachingTracerFactory tracerFactory;
1152

1153
    ChildLbPickerEntry(SubchannelPicker childPicker, TokenAttachingTracerFactory tracerFactory) {
1✔
1154
      this.childPicker = checkNotNull(childPicker, "childPicker");
1✔
1155
      this.tracerFactory = checkNotNull(tracerFactory, "tracerFactory");
1✔
1156
    }
1✔
1157

1158
    @Override
1159
    public PickResult picked(PickSubchannelArgs args) {
1160
      PickResult childResult = childPicker.pickSubchannel(args);
1✔
1161
      if (childResult.getSubchannel() == null) {
1✔
1162
        // No subchannel (e.g., buffer, error), return as-is.
1163
        return childResult;
1✔
1164
      }
1165
      // Wrap the pick result to attach tokens via the tracer factory.
1166
      return PickResult.withSubchannel(
1✔
1167
          childResult.getSubchannel(), tracerFactory, childResult.getAuthorityOverride());
1✔
1168
    }
1169

1170
    @Override
1171
    public int hashCode() {
1172
      return Objects.hashCode(childPicker, tracerFactory);
×
1173
    }
1174

1175
    @Override
1176
    public boolean equals(Object other) {
1177
      if (!(other instanceof ChildLbPickerEntry)) {
1✔
1178
        return false;
1✔
1179
      }
1180
      ChildLbPickerEntry that = (ChildLbPickerEntry) other;
1✔
1181
      return Objects.equal(childPicker, that.childPicker)
1✔
1182
          && Objects.equal(tracerFactory, that.tracerFactory);
1✔
1183
    }
1184

1185
    @Override
1186
    public String toString() {
1187
      return "ChildLbPickerEntry(" + childPicker + ")";
×
1188
    }
1189

1190
    @VisibleForTesting
1191
    SubchannelPicker getChildPicker() {
1192
      return childPicker;
1✔
1193
    }
1194
  }
1195

1196
  @VisibleForTesting
1197
  static final class RoundRobinPicker extends SubchannelPicker {
1198
    @VisibleForTesting
1199
    final List<DropEntry> dropList;
1200
    private int dropIndex;
1201

1202
    @VisibleForTesting
1203
    final List<? extends RoundRobinEntry> pickList;
1204
    private int pickIndex;
1205

1206
    // dropList can be empty, which means no drop.
1207
    // pickList must not be empty.
1208
    RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) {
1✔
1209
      this.dropList = checkNotNull(dropList, "dropList");
1✔
1210
      this.pickList = checkNotNull(pickList, "pickList");
1✔
1211
      checkArgument(!pickList.isEmpty(), "pickList is empty");
1✔
1212
    }
1✔
1213

1214
    @Override
1215
    public PickResult pickSubchannel(PickSubchannelArgs args) {
1216
      synchronized (pickList) {
1✔
1217
        // Two-level round-robin.
1218
        // First round-robin on dropList. If a drop entry is selected, request will be dropped.  If
1219
        // a non-drop entry is selected, then round-robin on pickList.  This makes sure requests are
1220
        // dropped at the same proportion as the drop entries appear on the round-robin list from
1221
        // the balancer, while only backends from pickList are selected for the non-drop cases.
1222
        if (!dropList.isEmpty()) {
1✔
1223
          DropEntry drop = dropList.get(dropIndex);
1✔
1224
          dropIndex++;
1✔
1225
          if (dropIndex == dropList.size()) {
1✔
1226
            dropIndex = 0;
1✔
1227
          }
1228
          if (drop != null) {
1✔
1229
            return drop.picked();
1✔
1230
          }
1231
        }
1232

1233
        RoundRobinEntry pick = pickList.get(pickIndex);
1✔
1234
        pickIndex++;
1✔
1235
        if (pickIndex == pickList.size()) {
1✔
1236
          pickIndex = 0;
1✔
1237
        }
1238
        return pick.picked(args);
1✔
1239
      }
1240
    }
1241

1242
    @Override
1243
    public String toString() {
1244
      if (SHOULD_LOG_SERVER_LISTS) {
×
1245
        return MoreObjects.toStringHelper(RoundRobinPicker.class)
×
1246
            .add("dropList", dropList)
×
1247
            .add("pickList", pickList)
×
1248
            .toString();
×
1249
      }
1250
      return MoreObjects.toStringHelper(RoundRobinPicker.class).toString();
×
1251
    }
1252
  }
1253

1254
  /**
1255
   * Helper for the child pick_first LB in PICK_FIRST mode. Intercepts updateBalancingState()
1256
   * to store state and trigger the grpclb picker update with drops and token attachment.
1257
   */
1258
  private final class PickFirstLbHelper extends ForwardingLoadBalancerHelper {
1✔
1259

1260
    @Override
1261
    protected Helper delegate() {
1262
      return helper;
1✔
1263
    }
1264

1265
    @Override
1266
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
1267
      pickFirstLbState = newState;
1✔
1268
      pickFirstLbPicker = newPicker;
1✔
1269
      // Trigger name resolution refresh on TRANSIENT_FAILURE or IDLE, similar to ROUND_ROBIN.
1270
      if (newState == TRANSIENT_FAILURE || newState == IDLE) {
1✔
1271
        helper.refreshNameResolution();
1✔
1272
      }
1273
      maybeUseFallbackBackends();
1✔
1274
      maybeUpdatePicker();
1✔
1275
    }
1✔
1276
  }
1277
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc