• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20136

06 Jan 2026 05:27AM UTC coverage: 88.693% (+0.01%) from 88.681%
#20136

push

github

web-flow
core: Implement oobChannel with resolvingOobChannel

The most important part of this change is to ensure that CallCredentials
are not propagated to the OOB channel. Because the authority of the OOB
channel doesn't match the parent channel, we must ensure that any bearer
tokens are not sent to the different server. However, this was not a
problem because resolvingOobChannel has the same constraint. (RLS has a
different constraint, but we were able to let RLS manage that itself.)

This commit does change the behavior of channelz, shutdown, and metrics
for the OOB channel. Previously the OOB channel was registered with
channelz, but it is only a TODO for resolving channel. Channel shutdown
no longer shuts down the OOB channel and it no longer waits for the OOB
channel to terminate before becoming terminated itself. That is also a
pre-existing TODO. Since ManagedChannelImplBuilder is now being used,
global configurators and census are enabled. The proper behavior here is
still being determined, but we would want it to be the same for
resolving OOB channel and OOB channel.

The OOB channel used to refresh the name resolution when the subchannel
went IDLE or TF. That is an older behavior from back when regular
subchannels would also cause the name resolver to refresh. Now-a-days
that goes though the LB tree. gRPC-LB already refreshes name resolution
when its RPC closes, so no longer doing it automatically should be fine.

balancerRpcExecutorPool no longer has its lifetime managed by the child.
It'd be easiest to not use it at all from OOB channel, which wouldn't
actually change the regular behavior, as channels already use the same
executor by default. However, the tests are making use of the executor
being injected, so some propagation needs to be preserved.

Lots of OOB channel tests were deleted, but these were either testing
OobChannel, which is now gone, or things like channelz, which are known
to no longer work like before.

35361 of 39869 relevant lines covered (88.69%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.49
/../grpclb/src/main/java/io/grpc/grpclb/GrpclbState.java
1
/*
2
 * Copyright 2017 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.grpclb;
18

19
import static com.google.common.base.Preconditions.checkArgument;
20
import static com.google.common.base.Preconditions.checkNotNull;
21
import static com.google.common.base.Preconditions.checkState;
22
import static io.grpc.ConnectivityState.CONNECTING;
23
import static io.grpc.ConnectivityState.IDLE;
24
import static io.grpc.ConnectivityState.READY;
25
import static io.grpc.ConnectivityState.SHUTDOWN;
26
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.MoreObjects;
30
import com.google.common.base.Objects;
31
import com.google.common.base.Stopwatch;
32
import com.google.protobuf.util.Durations;
33
import io.grpc.Attributes;
34
import io.grpc.ChannelLogger;
35
import io.grpc.ChannelLogger.ChannelLogLevel;
36
import io.grpc.ConnectivityState;
37
import io.grpc.ConnectivityStateInfo;
38
import io.grpc.Context;
39
import io.grpc.EquivalentAddressGroup;
40
import io.grpc.LoadBalancer;
41
import io.grpc.LoadBalancer.FixedResultPicker;
42
import io.grpc.LoadBalancer.Helper;
43
import io.grpc.LoadBalancer.PickResult;
44
import io.grpc.LoadBalancer.PickSubchannelArgs;
45
import io.grpc.LoadBalancer.ResolvedAddresses;
46
import io.grpc.LoadBalancer.Subchannel;
47
import io.grpc.LoadBalancer.SubchannelPicker;
48
import io.grpc.LoadBalancerProvider;
49
import io.grpc.LoadBalancerRegistry;
50
import io.grpc.ManagedChannel;
51
import io.grpc.Metadata;
52
import io.grpc.Status;
53
import io.grpc.SynchronizationContext;
54
import io.grpc.SynchronizationContext.ScheduledHandle;
55
import io.grpc.grpclb.SubchannelPool.PooledSubchannelStateListener;
56
import io.grpc.internal.BackoffPolicy;
57
import io.grpc.internal.TimeProvider;
58
import io.grpc.lb.v1.ClientStats;
59
import io.grpc.lb.v1.InitialLoadBalanceRequest;
60
import io.grpc.lb.v1.InitialLoadBalanceResponse;
61
import io.grpc.lb.v1.LoadBalanceRequest;
62
import io.grpc.lb.v1.LoadBalanceResponse;
63
import io.grpc.lb.v1.LoadBalanceResponse.LoadBalanceResponseTypeCase;
64
import io.grpc.lb.v1.LoadBalancerGrpc;
65
import io.grpc.lb.v1.Server;
66
import io.grpc.lb.v1.ServerList;
67
import io.grpc.stub.StreamObserver;
68
import io.grpc.util.ForwardingLoadBalancerHelper;
69
import java.net.InetAddress;
70
import java.net.InetSocketAddress;
71
import java.net.UnknownHostException;
72
import java.util.ArrayList;
73
import java.util.Arrays;
74
import java.util.Collections;
75
import java.util.HashMap;
76
import java.util.List;
77
import java.util.Map;
78
import java.util.concurrent.ScheduledExecutorService;
79
import java.util.concurrent.TimeUnit;
80
import java.util.concurrent.atomic.AtomicBoolean;
81
import java.util.concurrent.atomic.AtomicReference;
82
import javax.annotation.Nullable;
83
import javax.annotation.concurrent.NotThreadSafe;
84

85
/**
86
 * The states of a GRPCLB working session of {@link GrpclbLoadBalancer}.  Created when
87
 * GrpclbLoadBalancer switches to GRPCLB mode.  Closed and discarded when GrpclbLoadBalancer
88
 * switches away from GRPCLB mode.
89
 */
90
@NotThreadSafe
91
final class GrpclbState {
92
  static final long FALLBACK_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
1✔
93
  private static final Attributes LB_PROVIDED_BACKEND_ATTRS =
94
      Attributes.newBuilder().set(GrpclbConstants.ATTR_LB_PROVIDED_BACKEND, true).build();
1✔
95

96
  // Temporary workaround to reduce log spam for a grpclb server that incessantly sends updates
97
  // Tracked by b/198440401
98
  static final boolean SHOULD_LOG_SERVER_LISTS =
1✔
99
      Boolean.parseBoolean(System.getProperty("io.grpc.grpclb.LogServerLists", "true"));
1✔
100

101
  @VisibleForTesting
102
  static final PickResult DROP_PICK_RESULT =
1✔
103
      PickResult.withDrop(Status.UNAVAILABLE.withDescription("Dropped as requested by balancer"));
1✔
104
  @VisibleForTesting
105
  static final Status NO_AVAILABLE_BACKENDS_STATUS =
1✔
106
      Status.UNAVAILABLE.withDescription("LoadBalancer responded without any backends");
1✔
107
  @VisibleForTesting
108
  static final Status BALANCER_TIMEOUT_STATUS =
1✔
109
      Status.UNAVAILABLE.withDescription("Timeout waiting for remote balancer");
1✔
110
  @VisibleForTesting
111
  static final Status BALANCER_REQUESTED_FALLBACK_STATUS =
1✔
112
      Status.UNAVAILABLE.withDescription("Fallback requested by balancer");
1✔
113
  @VisibleForTesting
114
  static final Status NO_FALLBACK_BACKENDS_STATUS =
1✔
115
      Status.UNAVAILABLE.withDescription("Unable to fallback, no fallback addresses found");
1✔
116
  // This error status should never be propagated to RPC failures, as "no backend or balancer
117
  // addresses found" should be directly handled as a name resolution error. So in cases of no
118
  // balancer address, fallback should never fail.
119
  private static final Status NO_LB_ADDRESS_PROVIDED_STATUS =
1✔
120
      Status.UNAVAILABLE.withDescription("No balancer address found");
1✔
121

122

123
  @VisibleForTesting
124
  static final RoundRobinEntry BUFFER_ENTRY = new RoundRobinEntry() {
1✔
125
      @Override
126
      public PickResult picked(PickSubchannelArgs args) {
127
        return PickResult.withNoResult();
×
128
      }
129

130
      @Override
131
      public String toString() {
132
        return "BUFFER_ENTRY";
×
133
      }
134
    };
135
  @VisibleForTesting
136
  static final String NO_USE_AUTHORITY_SUFFIX = "-notIntendedToBeUsed";
137

138
  enum Mode {
1✔
139
    ROUND_ROBIN,
1✔
140
    PICK_FIRST,
1✔
141
  }
142

143
  private final String serviceName;
144
  private final long fallbackTimeoutMs;
145
  private final Helper helper;
146
  private final Context context;
147
  private final SynchronizationContext syncContext;
148
  @Nullable
149
  private final SubchannelPool subchannelPool;
150
  private final TimeProvider time;
151
  private final Stopwatch stopwatch;
152
  private final ScheduledExecutorService timerService;
153

154
  private static final Attributes.Key<AtomicReference<ConnectivityStateInfo>> STATE_INFO =
1✔
155
      Attributes.Key.create("io.grpc.grpclb.GrpclbLoadBalancer.stateInfo");
1✔
156
  private final BackoffPolicy.Provider backoffPolicyProvider;
157
  private final ChannelLogger logger;
158

159
  // Scheduled only once.  Never reset.
160
  @Nullable
161
  private ScheduledHandle fallbackTimer;
162
  private List<EquivalentAddressGroup> fallbackBackendList = Collections.emptyList();
1✔
163
  private boolean usingFallbackBackends;
164
  // Reason to fallback, will be used as RPC's error message if fail to fallback (e.g., no
165
  // fallback addresses found).
166
  @Nullable
167
  private Status fallbackReason;
168
  // True if the current balancer has returned a serverlist.  Will be reset to false when lost
169
  // connection to a balancer.
170
  private boolean balancerWorking;
171
  @Nullable
172
  private BackoffPolicy lbRpcRetryPolicy;
173
  @Nullable
174
  private ScheduledHandle lbRpcRetryTimer;
175

176
  @Nullable
177
  private ManagedChannel lbCommChannel;
178

179
  @Nullable
180
  private LbStream lbStream;
181
  private Map<List<EquivalentAddressGroup>, Subchannel> subchannels = Collections.emptyMap();
1✔
182
  private final GrpclbConfig config;
183

184
  // Has the same size as the round-robin list from the balancer.
185
  // A drop entry from the round-robin list becomes a DropEntry here.
186
  // A backend entry from the robin-robin list becomes a null here.
187
  private List<DropEntry> dropList = Collections.emptyList();
1✔
188
  // Contains only non-drop, i.e., backends from the round-robin list from the balancer.
189
  private List<BackendEntry> backendList = Collections.emptyList();
1✔
190
  private ConnectivityState currentState = ConnectivityState.CONNECTING;
1✔
191
  private RoundRobinPicker currentPicker =
1✔
192
      new RoundRobinPicker(Collections.<DropEntry>emptyList(), Arrays.asList(BUFFER_ENTRY));
1✔
193
  private boolean requestConnectionPending;
194

195
  // Child LoadBalancer and state for PICK_FIRST mode delegation.
196
  private final LoadBalancerProvider pickFirstLbProvider;
197
  @Nullable
198
  private LoadBalancer pickFirstLb;
199
  private ConnectivityState pickFirstLbState = CONNECTING;
1✔
200
  private SubchannelPicker pickFirstLbPicker = new FixedResultPicker(PickResult.withNoResult());
1✔
201
  @Nullable
202
  private GrpclbClientLoadRecorder currentPickFirstLoadRecorder;
203

204
  GrpclbState(
205
      GrpclbConfig config,
206
      Helper helper,
207
      Context context,
208
      SubchannelPool subchannelPool,
209
      TimeProvider time,
210
      Stopwatch stopwatch,
211
      BackoffPolicy.Provider backoffPolicyProvider) {
1✔
212
    this.config = checkNotNull(config, "config");
1✔
213
    this.helper = checkNotNull(helper, "helper");
1✔
214
    this.context = checkNotNull(context, "context");
1✔
215
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
216
    if (config.getMode() == Mode.ROUND_ROBIN) {
1✔
217
      this.subchannelPool = checkNotNull(subchannelPool, "subchannelPool");
1✔
218
      subchannelPool.registerListener(
1✔
219
          new PooledSubchannelStateListener() {
1✔
220
            @Override
221
            public void onSubchannelState(
222
                Subchannel subchannel, ConnectivityStateInfo newState) {
223
              handleSubchannelState(subchannel, newState);
1✔
224
            }
1✔
225
          });
226
    } else {
227
      this.subchannelPool = null;
1✔
228
    }
229
    this.pickFirstLbProvider = checkNotNull(
1✔
230
        LoadBalancerRegistry.getDefaultRegistry().getProvider("pick_first"),
1✔
231
        "pick_first balancer not available");
232
    this.time = checkNotNull(time, "time provider");
1✔
233
    this.stopwatch = checkNotNull(stopwatch, "stopwatch");
1✔
234
    this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService");
1✔
235
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
236
    if (config.getServiceName() != null) {
1✔
237
      this.serviceName = config.getServiceName();
1✔
238
    } else {
239
      this.serviceName = checkNotNull(helper.getAuthority(), "helper returns null authority");
1✔
240
    }
241
    this.fallbackTimeoutMs = config.getFallbackTimeoutMs();
1✔
242
    this.logger = checkNotNull(helper.getChannelLogger(), "logger");
1✔
243
    logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Created", serviceName);
1✔
244
  }
1✔
245

246
  void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo newState) {
247
    if (newState.getState() == SHUTDOWN || !subchannels.containsValue(subchannel)) {
1✔
248
      return;
1✔
249
    }
250
    if (config.getMode() == Mode.ROUND_ROBIN && newState.getState() == IDLE) {
1✔
251
      subchannel.requestConnection();
1✔
252
    }
253
    if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1✔
254
      helper.refreshNameResolution();
1✔
255
    }
256

257
    AtomicReference<ConnectivityStateInfo> stateInfoRef =
1✔
258
        subchannel.getAttributes().get(STATE_INFO);
1✔
259
    // If all RR servers are unhealthy, it's possible that at least one connection is CONNECTING at
260
    // every moment which causes RR to stay in CONNECTING. It's better to keep the TRANSIENT_FAILURE
261
    // state in that case so that fail-fast RPCs can fail fast.
262
    boolean keepState =
1✔
263
        config.getMode() == Mode.ROUND_ROBIN
1✔
264
        && stateInfoRef.get().getState() == TRANSIENT_FAILURE
1✔
265
        && (newState.getState() == CONNECTING || newState.getState() == IDLE);
1✔
266
    if (!keepState) {
1✔
267
      stateInfoRef.set(newState);
1✔
268
      maybeUseFallbackBackends();
1✔
269
      maybeUpdatePicker();
1✔
270
    }
271
  }
1✔
272

273
  /**
274
   * Handle new addresses of the balancer and backends from the resolver, and create connection if
275
   * not yet connected.
276
   */
277
  void handleAddresses(
278
      List<EquivalentAddressGroup> newLbAddressGroups,
279
      List<EquivalentAddressGroup> newBackendServers) {
280
    logger.log(
1✔
281
        ChannelLogLevel.DEBUG,
282
        "[grpclb-<{0}>] Resolved addresses: lb addresses {1}, backends: {2}",
283
        serviceName,
284
        newLbAddressGroups,
285
        newBackendServers);
286
    fallbackBackendList = newBackendServers;
1✔
287
    if (newLbAddressGroups.isEmpty()) {
1✔
288
      // No balancer address: close existing balancer connection and prepare to enter fallback
289
      // mode. If there is no successful backend connection, it enters fallback mode immediately.
290
      // Otherwise, fallback does not happen until backend connections are lost. This behavior
291
      // might be different from other languages (e.g., existing balancer connection is not
292
      // closed in C-core), but we aren't changing it at this time.
293
      shutdownLbComm();
1✔
294
      if (!usingFallbackBackends) {
1✔
295
        fallbackReason = NO_LB_ADDRESS_PROVIDED_STATUS;
1✔
296
        cancelFallbackTimer();
1✔
297
        maybeUseFallbackBackends();
1✔
298
      }
299
    } else {
300
      startLbComm(newLbAddressGroups);
1✔
301
      // Avoid creating a new RPC just because the addresses were updated, as it can cause a
302
      // stampeding herd. The current RPC may be on a connection to an address not present in
303
      // newLbAddressGroups, but we're considering that "okay". If we detected the RPC is to an
304
      // outdated backend, we could choose to re-create the RPC.
305
      if (lbStream == null) {
1✔
306
        cancelLbRpcRetryTimer();
1✔
307
        startLbRpc();
1✔
308
      }
309
      // Start the fallback timer if it's never started and we are not already using fallback
310
      // backends.
311
      if (fallbackTimer == null && !usingFallbackBackends) {
1✔
312
        fallbackTimer =
1✔
313
            syncContext.schedule(
1✔
314
                new FallbackModeTask(BALANCER_TIMEOUT_STATUS),
315
                fallbackTimeoutMs,
316
                TimeUnit.MILLISECONDS,
317
                timerService);
318
      }
319
    }
320
    if (usingFallbackBackends) {
1✔
321
      // Populate the new fallback backends to round-robin list.
322
      useFallbackBackends();
1✔
323
    }
324
    maybeUpdatePicker();
1✔
325
  }
1✔
326

327
  void requestConnection() {
328
    requestConnectionPending = true;
1✔
329
    // For PICK_FIRST mode with delegation, forward to the child LB.
330
    if (config.getMode() == Mode.PICK_FIRST && pickFirstLb != null) {
1✔
331
      pickFirstLb.requestConnection();
1✔
332
      requestConnectionPending = false;
1✔
333
      return;
1✔
334
    }
335
    for (RoundRobinEntry entry : currentPicker.pickList) {
×
336
      if (entry instanceof IdleSubchannelEntry) {
×
337
        ((IdleSubchannelEntry) entry).subchannel.requestConnection();
×
338
        requestConnectionPending = false;
×
339
      }
340
    }
×
341
  }
×
342

343
  private void maybeUseFallbackBackends() {
344
    if (balancerWorking || usingFallbackBackends) {
1✔
345
      return;
1✔
346
    }
347
    // Balancer RPC should have either been broken or timed out.
348
    checkState(fallbackReason != null, "no reason to fallback");
1✔
349
    // For PICK_FIRST mode with delegation, check the child LB's state.
350
    if (config.getMode() == Mode.PICK_FIRST) {
1✔
351
      if (pickFirstLb != null && pickFirstLbState == READY) {
1✔
352
        return;
×
353
      }
354
      // For PICK_FIRST, we don't have individual subchannel states to use as fallback reason.
355
    } else {
356
      for (Subchannel subchannel : subchannels.values()) {
1✔
357
        ConnectivityStateInfo stateInfo = subchannel.getAttributes().get(STATE_INFO).get();
1✔
358
        if (stateInfo.getState() == READY) {
1✔
359
          return;
1✔
360
        }
361
        // If we do have balancer-provided backends, use one of its error in the error message if
362
        // fail to fallback.
363
        if (stateInfo.getState() == TRANSIENT_FAILURE) {
1✔
364
          fallbackReason = stateInfo.getStatus();
1✔
365
        }
366
      }
1✔
367
    }
368
    // Fallback conditions met
369
    useFallbackBackends();
1✔
370
  }
1✔
371

372
  /**
373
   * Populate backend servers to be used from the fallback backends.
374
   */
375
  private void useFallbackBackends() {
376
    usingFallbackBackends = true;
1✔
377
    logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Using fallback backends", serviceName);
1✔
378

379
    List<DropEntry> newDropList = new ArrayList<>();
1✔
380
    List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
1✔
381
    for (EquivalentAddressGroup eag : fallbackBackendList) {
1✔
382
      newDropList.add(null);
1✔
383
      newBackendAddrList.add(new BackendAddressGroup(eag, null));
1✔
384
    }
1✔
385
    updateServerList(newDropList, newBackendAddrList, null);
1✔
386
  }
1✔
387

388
  private void shutdownLbComm() {
389
    shutdownLbRpc();
1✔
390
    if (lbCommChannel != null) {
1✔
391
      // The channel should have no RPCs at this point
392
      lbCommChannel.shutdownNow();
1✔
393
      lbCommChannel = null;
1✔
394
    }
395
  }
1✔
396

397
  private void shutdownLbRpc() {
398
    if (lbStream != null) {
1✔
399
      lbStream.close(Status.CANCELLED.withDescription("balancer shutdown").asException());
1✔
400
      // lbStream will be set to null in LbStream.cleanup()
401
    }
402
  }
1✔
403

404
  private void startLbComm(List<EquivalentAddressGroup> overrideAuthorityEags) {
405
    checkNotNull(overrideAuthorityEags, "overrideAuthorityEags");
1✔
406
    assert !overrideAuthorityEags.isEmpty();
1✔
407
    String doNotUseAuthority = overrideAuthorityEags.get(0).getAttributes()
1✔
408
        .get(EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE) + NO_USE_AUTHORITY_SUFFIX;
1✔
409
    if (lbCommChannel == null) {
1✔
410
      lbCommChannel = helper.createOobChannel(overrideAuthorityEags, doNotUseAuthority);
1✔
411
      logger.log(
1✔
412
          ChannelLogLevel.DEBUG,
413
          "[grpclb-<{0}>] Created grpclb channel: EAG={1}",
414
          serviceName,
415
          overrideAuthorityEags);
416
    } else {
417
      helper.updateOobChannelAddresses(lbCommChannel, overrideAuthorityEags);
1✔
418
    }
419
  }
1✔
420

421
  private void startLbRpc() {
422
    checkState(lbStream == null, "previous lbStream has not been cleared yet");
1✔
423
    LoadBalancerGrpc.LoadBalancerStub stub = LoadBalancerGrpc.newStub(lbCommChannel);
1✔
424
    lbStream = new LbStream(stub);
1✔
425
    Context prevContext = context.attach();
1✔
426
    try {
427
      lbStream.start();
1✔
428
    } finally {
429
      context.detach(prevContext);
1✔
430
    }
431
    stopwatch.reset().start();
1✔
432

433
    LoadBalanceRequest initRequest = LoadBalanceRequest.newBuilder()
1✔
434
        .setInitialRequest(InitialLoadBalanceRequest.newBuilder()
1✔
435
            .setName(serviceName).build())
1✔
436
        .build();
1✔
437
    logger.log(
1✔
438
        ChannelLogLevel.DEBUG,
439
        "[grpclb-<{0}>] Sent initial grpclb request {1}", serviceName, initRequest);
440
    try {
441
      lbStream.lbRequestWriter.onNext(initRequest);
1✔
442
    } catch (Exception e) {
×
443
      lbStream.close(e);
×
444
    }
1✔
445
  }
1✔
446

447
  private void cancelFallbackTimer() {
448
    if (fallbackTimer != null) {
1✔
449
      fallbackTimer.cancel();
1✔
450
    }
451
  }
1✔
452

453
  private void cancelLbRpcRetryTimer() {
454
    if (lbRpcRetryTimer != null) {
1✔
455
      lbRpcRetryTimer.cancel();
1✔
456
      lbRpcRetryTimer = null;
1✔
457
    }
458
  }
1✔
459

460
  void shutdown() {
461
    logger.log(ChannelLogLevel.INFO, "[grpclb-<{0}>] Shutdown", serviceName);
1✔
462
    shutdownLbComm();
1✔
463
    switch (config.getMode()) {
1✔
464
      case ROUND_ROBIN:
465
        // We close the subchannels through subchannelPool instead of helper just for convenience of
466
        // testing.
467
        for (Subchannel subchannel : subchannels.values()) {
1✔
468
          returnSubchannelToPool(subchannel);
1✔
469
        }
1✔
470
        subchannelPool.clear();
1✔
471
        break;
1✔
472
      case PICK_FIRST:
473
        // Shutdown the child pick_first LB which manages its own subchannels.
474
        if (pickFirstLb != null) {
1✔
475
          pickFirstLb.shutdown();
1✔
476
          pickFirstLb = null;
1✔
477
        }
478
        break;
479
      default:
480
        throw new AssertionError("Missing case for " + config.getMode());
×
481
    }
482
    subchannels = Collections.emptyMap();
1✔
483
    cancelFallbackTimer();
1✔
484
    cancelLbRpcRetryTimer();
1✔
485
  }
1✔
486

487
  void propagateError(Status status) {
488
    logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Error: {1}", serviceName, status);
1✔
489
    if (backendList.isEmpty()) {
1✔
490
      Status error =
1✔
491
          Status.UNAVAILABLE.withCause(status.getCause()).withDescription(status.getDescription());
1✔
492
      maybeUpdatePicker(
1✔
493
          TRANSIENT_FAILURE, new RoundRobinPicker(dropList, Arrays.asList(new ErrorEntry(error))));
1✔
494
    }
495
  }
1✔
496

497
  private void returnSubchannelToPool(Subchannel subchannel) {
498
    subchannelPool.returnSubchannel(subchannel, subchannel.getAttributes().get(STATE_INFO).get());
1✔
499
  }
1✔
500

501
  @VisibleForTesting
502
  @Nullable
503
  GrpclbClientLoadRecorder getLoadRecorder() {
504
    if (lbStream == null) {
1✔
505
      return null;
×
506
    }
507
    return lbStream.loadRecorder;
1✔
508
  }
509

510
  /**
511
   * Populate backend servers to be used based on the given list of addresses.
512
   */
513
  private void updateServerList(
514
      List<DropEntry> newDropList, List<BackendAddressGroup> newBackendAddrList,
515
      @Nullable GrpclbClientLoadRecorder loadRecorder) {
516
    HashMap<List<EquivalentAddressGroup>, Subchannel> newSubchannelMap =
1✔
517
        new HashMap<>();
518
    List<BackendEntry> newBackendList = new ArrayList<>();
1✔
519

520
    switch (config.getMode()) {
1✔
521
      case ROUND_ROBIN:
522
        for (BackendAddressGroup backendAddr : newBackendAddrList) {
1✔
523
          EquivalentAddressGroup eag = backendAddr.getAddresses();
1✔
524
          List<EquivalentAddressGroup> eagAsList = Collections.singletonList(eag);
1✔
525
          Subchannel subchannel = newSubchannelMap.get(eagAsList);
1✔
526
          if (subchannel == null) {
1✔
527
            subchannel = subchannels.get(eagAsList);
1✔
528
            if (subchannel == null) {
1✔
529
              subchannel = subchannelPool.takeOrCreateSubchannel(eag, createSubchannelAttrs());
1✔
530
              subchannel.requestConnection();
1✔
531
            }
532
            newSubchannelMap.put(eagAsList, subchannel);
1✔
533
          }
534
          BackendEntry entry;
535
          // Only picks with tokens are reported to LoadRecorder
536
          if (backendAddr.getToken() == null) {
1✔
537
            entry = new BackendEntry(subchannel);
1✔
538
          } else {
539
            entry = new BackendEntry(subchannel, loadRecorder, backendAddr.getToken());
1✔
540
          }
541
          newBackendList.add(entry);
1✔
542
        }
1✔
543
        // Close Subchannels whose addresses have been delisted
544
        for (Map.Entry<List<EquivalentAddressGroup>, Subchannel> entry : subchannels.entrySet()) {
1✔
545
          List<EquivalentAddressGroup> eagList = entry.getKey();
1✔
546
          if (!newSubchannelMap.containsKey(eagList)) {
1✔
547
            returnSubchannelToPool(entry.getValue());
1✔
548
          }
549
        }
1✔
550
        subchannels = Collections.unmodifiableMap(newSubchannelMap);
1✔
551
        break;
1✔
552
      case PICK_FIRST:
553
        // Delegate to child pick_first LB for address management.
554
        // Shutdown existing child LB if addresses become empty.
555
        if (newBackendAddrList.isEmpty()) {
1✔
556
          if (pickFirstLb != null) {
1✔
557
            pickFirstLb.shutdown();
1✔
558
            pickFirstLb = null;
1✔
559
          }
560
          break;
561
        }
562
        List<EquivalentAddressGroup> eagList = new ArrayList<>();
1✔
563
        // Attach tokens to EAG attributes for TokenAttachingTracerFactory to retrieve.
564
        for (BackendAddressGroup bag : newBackendAddrList) {
1✔
565
          EquivalentAddressGroup origEag = bag.getAddresses();
1✔
566
          Attributes eagAttrs = origEag.getAttributes();
1✔
567
          if (bag.getToken() != null) {
1✔
568
            eagAttrs = eagAttrs.toBuilder()
1✔
569
                .set(GrpclbConstants.TOKEN_ATTRIBUTE_KEY, bag.getToken()).build();
1✔
570
          }
571
          eagList.add(new EquivalentAddressGroup(origEag.getAddresses(), eagAttrs));
1✔
572
        }
1✔
573

574
        if (pickFirstLb == null) {
1✔
575
          pickFirstLb = pickFirstLbProvider.newLoadBalancer(new PickFirstLbHelper());
1✔
576
        }
577

578
        // Pass addresses to child LB.
579
        pickFirstLb.acceptResolvedAddresses(
1✔
580
            ResolvedAddresses.newBuilder()
1✔
581
                .setAddresses(eagList)
1✔
582
                .build());
1✔
583
        if (requestConnectionPending) {
1✔
584
          pickFirstLb.requestConnection();
×
585
          requestConnectionPending = false;
×
586
        }
587
        // Store the load recorder for token attachment.
588
        currentPickFirstLoadRecorder = loadRecorder;
1✔
589
        break;
1✔
590
      default:
591
        throw new AssertionError("Missing case for " + config.getMode());
×
592
    }
593

594
    dropList = Collections.unmodifiableList(newDropList);
1✔
595
    backendList = Collections.unmodifiableList(newBackendList);
1✔
596
  }
1✔
597

598
  @VisibleForTesting
599
  class FallbackModeTask implements Runnable {
600
    private final Status reason;
601

602
    private FallbackModeTask(Status reason) {
1✔
603
      this.reason = reason;
1✔
604
    }
1✔
605

606
    @Override
607
    public void run() {
608
      // Timer should have been cancelled if entered fallback early.
609
      checkState(!usingFallbackBackends, "already in fallback");
1✔
610
      fallbackReason = reason;
1✔
611
      maybeUseFallbackBackends();
1✔
612
      maybeUpdatePicker();
1✔
613
    }
1✔
614
  }
615

616
  @VisibleForTesting
617
  class LbRpcRetryTask implements Runnable {
1✔
618
    @Override
619
    public void run() {
620
      startLbRpc();
1✔
621
    }
1✔
622
  }
623

624
  @VisibleForTesting
625
  static class LoadReportingTask implements Runnable {
626
    private final LbStream stream;
627

628
    LoadReportingTask(LbStream stream) {
1✔
629
      this.stream = stream;
1✔
630
    }
1✔
631

632
    @Override
633
    public void run() {
634
      stream.loadReportTimer = null;
1✔
635
      stream.sendLoadReport();
1✔
636
    }
1✔
637
  }
638

639
  private class LbStream implements StreamObserver<LoadBalanceResponse> {
640
    final GrpclbClientLoadRecorder loadRecorder;
641
    final LoadBalancerGrpc.LoadBalancerStub stub;
642
    StreamObserver<LoadBalanceRequest> lbRequestWriter;
643

644
    // These fields are only accessed from helper.runSerialized()
645
    boolean initialResponseReceived;
646
    boolean closed;
647
    long loadReportIntervalMillis = -1;
1✔
648
    ScheduledHandle loadReportTimer;
649

650
    LbStream(LoadBalancerGrpc.LoadBalancerStub stub) {
1✔
651
      this.stub = checkNotNull(stub, "stub");
1✔
652
      // Stats data only valid for current LbStream.  We do not carry over data from previous
653
      // stream.
654
      loadRecorder = new GrpclbClientLoadRecorder(time);
1✔
655
    }
1✔
656

657
    void start() {
658
      lbRequestWriter = stub.withWaitForReady().balanceLoad(this);
1✔
659
    }
1✔
660

661
    @Override public void onNext(final LoadBalanceResponse response) {
662
      syncContext.execute(new Runnable() {
1✔
663
          @Override
664
          public void run() {
665
            handleResponse(response);
1✔
666
          }
1✔
667
        });
668
    }
1✔
669

670
    @Override public void onError(final Throwable error) {
671
      syncContext.execute(new Runnable() {
1✔
672
          @Override
673
          public void run() {
674
            handleStreamClosed(Status.fromThrowable(error)
1✔
675
                .augmentDescription("Stream to GRPCLB LoadBalancer had an error"));
1✔
676
          }
1✔
677
        });
678
    }
1✔
679

680
    @Override public void onCompleted() {
681
      syncContext.execute(new Runnable() {
1✔
682
          @Override
683
          public void run() {
684
            handleStreamClosed(
1✔
685
                Status.UNAVAILABLE.withDescription("Stream to GRPCLB LoadBalancer was closed"));
1✔
686
          }
1✔
687
        });
688
    }
1✔
689

690
    // Following methods must be run in helper.runSerialized()
691

692
    private void sendLoadReport() {
693
      if (closed) {
1✔
694
        return;
×
695
      }
696
      ClientStats stats = loadRecorder.generateLoadReport();
1✔
697
      // TODO(zhangkun83): flow control?
698
      try {
699
        lbRequestWriter.onNext(LoadBalanceRequest.newBuilder().setClientStats(stats).build());
1✔
700
        scheduleNextLoadReport();
1✔
701
      } catch (Exception e) {
×
702
        close(e);
×
703
      }
1✔
704
    }
1✔
705

706
    private void scheduleNextLoadReport() {
707
      if (loadReportIntervalMillis > 0) {
1✔
708
        loadReportTimer = syncContext.schedule(
1✔
709
            new LoadReportingTask(this), loadReportIntervalMillis, TimeUnit.MILLISECONDS,
710
            timerService);
1✔
711
      }
712
    }
1✔
713

714
    private void handleResponse(LoadBalanceResponse response) {
715
      if (closed) {
1✔
716
        return;
×
717
      }
718

719
      LoadBalanceResponseTypeCase typeCase = response.getLoadBalanceResponseTypeCase();
1✔
720
      if (!initialResponseReceived) {
1✔
721
        logger.log(
1✔
722
            ChannelLogLevel.INFO,
723
            "[grpclb-<{0}>] Got an LB initial response: {1}", serviceName, response);
1✔
724
        if (typeCase != LoadBalanceResponseTypeCase.INITIAL_RESPONSE) {
1✔
725
          logger.log(
×
726
              ChannelLogLevel.WARNING,
727
              "[grpclb-<{0}>] Received a response without initial response",
728
              serviceName);
×
729
          return;
×
730
        }
731
        initialResponseReceived = true;
1✔
732
        InitialLoadBalanceResponse initialResponse = response.getInitialResponse();
1✔
733
        loadReportIntervalMillis =
1✔
734
            Durations.toMillis(initialResponse.getClientStatsReportInterval());
1✔
735
        scheduleNextLoadReport();
1✔
736
        return;
1✔
737
      }
738
      if (SHOULD_LOG_SERVER_LISTS) {
1✔
739
        logger.log(
1✔
740
            ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response: {1}", serviceName, response);
1✔
741
      } else {
742
        logger.log(ChannelLogLevel.DEBUG, "[grpclb-<{0}>] Got an LB response", serviceName);
×
743
      }
744

745
      if (typeCase == LoadBalanceResponseTypeCase.FALLBACK_RESPONSE) {
1✔
746
        // Force entering fallback requested by balancer.
747
        cancelFallbackTimer();
1✔
748
        fallbackReason = BALANCER_REQUESTED_FALLBACK_STATUS;
1✔
749
        useFallbackBackends();
1✔
750
        maybeUpdatePicker();
1✔
751
        return;
1✔
752
      } else if (typeCase != LoadBalanceResponseTypeCase.SERVER_LIST) {
1✔
753
        logger.log(
1✔
754
            ChannelLogLevel.WARNING,
755
            "[grpclb-<{0}>] Ignoring unexpected response type: {1}",
756
            serviceName,
1✔
757
            typeCase);
758
        return;
1✔
759
      }
760

761
      balancerWorking = true;
1✔
762
      // TODO(zhangkun83): handle delegate from initialResponse
763
      ServerList serverList = response.getServerList();
1✔
764
      List<DropEntry> newDropList = new ArrayList<>();
1✔
765
      List<BackendAddressGroup> newBackendAddrList = new ArrayList<>();
1✔
766
      // Construct the new collections. Create new Subchannels when necessary.
767
      for (Server server : serverList.getServersList()) {
1✔
768
        String token = server.getLoadBalanceToken();
1✔
769
        if (server.getDrop()) {
1✔
770
          newDropList.add(new DropEntry(loadRecorder, token));
1✔
771
        } else {
772
          newDropList.add(null);
1✔
773
          InetSocketAddress address;
774
          try {
775
            address = new InetSocketAddress(
1✔
776
                InetAddress.getByAddress(server.getIpAddress().toByteArray()), server.getPort());
1✔
777
          } catch (UnknownHostException e) {
×
778
            propagateError(
×
779
                Status.UNAVAILABLE
780
                    .withDescription("Invalid backend address: " + server)
×
781
                    .withCause(e));
×
782
            continue;
×
783
          }
1✔
784
          // ALTS code can use the presence of ATTR_LB_PROVIDED_BACKEND to select ALTS instead of
785
          // TLS, with Netty.
786
          EquivalentAddressGroup eag =
1✔
787
              new EquivalentAddressGroup(address, LB_PROVIDED_BACKEND_ATTRS);
1✔
788
          newBackendAddrList.add(new BackendAddressGroup(eag, token));
1✔
789
        }
790
      }
1✔
791
      // Exit fallback as soon as a new server list is received from the balancer.
792
      usingFallbackBackends = false;
1✔
793
      fallbackReason = null;
1✔
794
      cancelFallbackTimer();
1✔
795
      updateServerList(newDropList, newBackendAddrList, loadRecorder);
1✔
796
      maybeUpdatePicker();
1✔
797
    }
1✔
798

799
    private void handleStreamClosed(Status error) {
800
      checkArgument(!error.isOk(), "unexpected OK status");
1✔
801
      if (closed) {
1✔
802
        return;
1✔
803
      }
804
      closed = true;
1✔
805
      cleanUp();
1✔
806
      propagateError(error);
1✔
807
      balancerWorking = false;
1✔
808
      fallbackReason = error;
1✔
809
      cancelFallbackTimer();
1✔
810
      maybeUseFallbackBackends();
1✔
811
      maybeUpdatePicker();
1✔
812

813
      long delayNanos = 0;
1✔
814
      if (initialResponseReceived || lbRpcRetryPolicy == null) {
1✔
815
        // Reset the backoff sequence if balancer has sent the initial response, or backoff sequence
816
        // has never been initialized.
817
        lbRpcRetryPolicy = backoffPolicyProvider.get();
1✔
818
      }
819
      // Backoff only when balancer wasn't working previously.
820
      if (!initialResponseReceived) {
1✔
821
        // The back-off policy determines the interval between consecutive RPC upstarts, thus the
822
        // actual delay may be smaller than the value from the back-off policy, or even negative,
823
        // depending how much time was spent in the previous RPC.
824
        delayNanos =
1✔
825
            lbRpcRetryPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
826
      }
827
      if (delayNanos <= 0) {
1✔
828
        startLbRpc();
1✔
829
      } else {
830
        lbRpcRetryTimer =
1✔
831
            syncContext.schedule(new LbRpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS,
1✔
832
                timerService);
1✔
833
      }
834

835
      helper.refreshNameResolution();
1✔
836
    }
1✔
837

838
    void close(Exception error) {
839
      if (closed) {
1✔
840
        return;
×
841
      }
842
      closed = true;
1✔
843
      cleanUp();
1✔
844
      lbRequestWriter.onError(error);
1✔
845
    }
1✔
846

847
    private void cleanUp() {
848
      if (loadReportTimer != null) {
1✔
849
        loadReportTimer.cancel();
1✔
850
        loadReportTimer = null;
1✔
851
      }
852
      if (lbStream == this) {
1✔
853
        lbStream = null;
1✔
854
      }
855
    }
1✔
856
  }
857

858
  /**
859
   * Make and use a picker out of the current lists and the states of subchannels if they have
860
   * changed since the last picker created.
861
   */
862
  private void maybeUpdatePicker() {
863
    List<RoundRobinEntry> pickList;
864
    ConnectivityState state;
865
    // For PICK_FIRST mode with delegation, check if child LB exists instead of backendList.
866
    boolean hasBackends = config.getMode() == Mode.PICK_FIRST
1✔
867
        ? pickFirstLb != null
1✔
868
        : !backendList.isEmpty();
1✔
869
    if (!hasBackends) {
1✔
870
      // Note balancer (is working) may enforce using fallback backends, and that fallback may
871
      // fail. So we should check if currently in fallback first.
872
      if (usingFallbackBackends) {
1✔
873
        Status error =
1✔
874
            NO_FALLBACK_BACKENDS_STATUS
875
                .withCause(fallbackReason.getCause())
1✔
876
                .augmentDescription(fallbackReason.getDescription());
1✔
877
        pickList = Collections.<RoundRobinEntry>singletonList(new ErrorEntry(error));
1✔
878
        state = TRANSIENT_FAILURE;
1✔
879
      } else if (balancerWorking)  {
1✔
880
        pickList =
1✔
881
            Collections.<RoundRobinEntry>singletonList(
1✔
882
                new ErrorEntry(NO_AVAILABLE_BACKENDS_STATUS));
883
        state = TRANSIENT_FAILURE;
1✔
884
      } else {  // still waiting for balancer
885
        pickList = Collections.singletonList(BUFFER_ENTRY);
1✔
886
        state = CONNECTING;
1✔
887
      }
888
      maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
1✔
889
      return;
1✔
890
    }
891
    switch (config.getMode()) {
1✔
892
      case ROUND_ROBIN:
893
        pickList = new ArrayList<>(backendList.size());
1✔
894
        Status error = null;
1✔
895
        boolean hasPending = false;
1✔
896
        for (BackendEntry entry : backendList) {
1✔
897
          Subchannel subchannel = entry.subchannel;
1✔
898
          Attributes attrs = subchannel.getAttributes();
1✔
899
          ConnectivityStateInfo stateInfo = attrs.get(STATE_INFO).get();
1✔
900
          if (stateInfo.getState() == READY) {
1✔
901
            pickList.add(entry);
1✔
902
          } else if (stateInfo.getState() == TRANSIENT_FAILURE) {
1✔
903
            error = stateInfo.getStatus();
1✔
904
          } else {
905
            hasPending = true;
1✔
906
          }
907
        }
1✔
908
        if (pickList.isEmpty()) {
1✔
909
          if (hasPending) {
1✔
910
            pickList.add(BUFFER_ENTRY);
1✔
911
            state = CONNECTING;
1✔
912
          } else {
913
            pickList.add(new ErrorEntry(error));
1✔
914
            state = TRANSIENT_FAILURE;
1✔
915
          }
916
        } else {
917
          state = READY;
1✔
918
        }
919
        break;
1✔
920
      case PICK_FIRST: {
921
        // Use child LB's state and picker. Wrap the picker for token attachment.
922
        state = pickFirstLbState;
1✔
923
        TokenAttachingTracerFactory tracerFactory =
1✔
924
            new TokenAttachingTracerFactory(currentPickFirstLoadRecorder);
925
        pickList = Collections.<RoundRobinEntry>singletonList(
1✔
926
            new ChildLbPickerEntry(pickFirstLbPicker, tracerFactory));
927
        break;
1✔
928
      }
929
      default:
930
        throw new AssertionError("Missing case for " + config.getMode());
×
931
    }
932
    maybeUpdatePicker(state, new RoundRobinPicker(dropList, pickList));
1✔
933
  }
1✔
934

935
  /**
936
   * Update the given picker to the helper if it's different from the current one.
937
   */
938
  private void maybeUpdatePicker(ConnectivityState state, RoundRobinPicker picker) {
939
    // Discard the new picker if we are sure it won't make any difference, in order to save
940
    // re-processing pending streams, and avoid unnecessary resetting of the pointer in
941
    // RoundRobinPicker.
942
    if (state.equals(currentState)
1✔
943
        && picker.dropList.equals(currentPicker.dropList)
1✔
944
        && picker.pickList.equals(currentPicker.pickList)) {
1✔
945
      return;
1✔
946
    }
947
    currentState = state;
1✔
948
    currentPicker = picker;
1✔
949
    helper.updateBalancingState(state, picker);
1✔
950
  }
1✔
951

952
  private static Attributes createSubchannelAttrs() {
953
    return Attributes.newBuilder()
1✔
954
        .set(STATE_INFO,
1✔
955
            new AtomicReference<>(
956
                ConnectivityStateInfo.forNonError(IDLE)))
1✔
957
        .build();
1✔
958
  }
959

960
  @VisibleForTesting
961
  static final class DropEntry {
962
    private final GrpclbClientLoadRecorder loadRecorder;
963
    private final String token;
964

965
    DropEntry(GrpclbClientLoadRecorder loadRecorder, String token) {
1✔
966
      this.loadRecorder = checkNotNull(loadRecorder, "loadRecorder");
1✔
967
      this.token = checkNotNull(token, "token");
1✔
968
    }
1✔
969

970
    PickResult picked() {
971
      loadRecorder.recordDroppedRequest(token);
1✔
972
      return DROP_PICK_RESULT;
1✔
973
    }
974

975
    @Override
976
    public String toString() {
977
      // This is printed in logs.  Only include useful information.
978
      return "drop(" + token + ")";
×
979
    }
980

981
    @Override
982
    public int hashCode() {
983
      return Objects.hashCode(loadRecorder, token);
×
984
    }
985

986
    @Override
987
    public boolean equals(Object other) {
988
      if (!(other instanceof DropEntry)) {
1✔
989
        return false;
1✔
990
      }
991
      DropEntry that = (DropEntry) other;
1✔
992
      return Objects.equal(loadRecorder, that.loadRecorder) && Objects.equal(token, that.token);
1✔
993
    }
994
  }
995

996
  @VisibleForTesting
997
  interface RoundRobinEntry {
998
    PickResult picked(PickSubchannelArgs args);
999
  }
1000

1001
  @VisibleForTesting
1002
  static final class BackendEntry implements RoundRobinEntry {
1003
    final Subchannel subchannel;
1004
    @VisibleForTesting
1005
    final PickResult result;
1006
    @Nullable
1007
    private final String token;
1008

1009
    /**
1010
     * For ROUND_ROBIN: creates a BackendEntry whose usage will be reported to load recorder.
1011
     */
1012
    BackendEntry(Subchannel subchannel, GrpclbClientLoadRecorder loadRecorder, String token) {
1✔
1013
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
1014
      this.result =
1✔
1015
          PickResult.withSubchannel(subchannel, checkNotNull(loadRecorder, "loadRecorder"));
1✔
1016
      this.token = checkNotNull(token, "token");
1✔
1017
    }
1✔
1018

1019
    /**
1020
     * For ROUND_ROBIN/PICK_FIRST: creates a BackendEntry whose usage will not be reported.
1021
     */
1022
    BackendEntry(Subchannel subchannel) {
1✔
1023
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
1024
      this.result = PickResult.withSubchannel(subchannel);
1✔
1025
      this.token = null;
1✔
1026
    }
1✔
1027

1028
    /**
1029
     * For PICK_FIRST: creates a BackendEntry that includes all addresses.
1030
     */
1031
    BackendEntry(Subchannel subchannel, TokenAttachingTracerFactory tracerFactory) {
×
1032
      this.subchannel = checkNotNull(subchannel, "subchannel");
×
1033
      this.result =
×
1034
          PickResult.withSubchannel(subchannel, checkNotNull(tracerFactory, "tracerFactory"));
×
1035
      this.token = null;
×
1036
    }
×
1037

1038
    @Override
1039
    public PickResult picked(PickSubchannelArgs args) {
1040
      Metadata headers = args.getHeaders();
1✔
1041
      headers.discardAll(GrpclbConstants.TOKEN_METADATA_KEY);
1✔
1042
      if (token != null) {
1✔
1043
        headers.put(GrpclbConstants.TOKEN_METADATA_KEY, token);
1✔
1044
      }
1045
      return result;
1✔
1046
    }
1047

1048
    @Override
1049
    public String toString() {
1050
      // This is printed in logs.  Only give out useful information.
1051
      return "[" + subchannel.getAllAddresses().toString() + "(" + token + ")]";
×
1052
    }
1053

1054
    @Override
1055
    public int hashCode() {
1056
      return Objects.hashCode(result, token);
×
1057
    }
1058

1059
    @Override
1060
    public boolean equals(Object other) {
1061
      if (!(other instanceof BackendEntry)) {
1✔
1062
        return false;
×
1063
      }
1064
      BackendEntry that = (BackendEntry) other;
1✔
1065
      return Objects.equal(result, that.result) && Objects.equal(token, that.token);
1✔
1066
    }
1067
  }
1068

1069
  @VisibleForTesting
1070
  static final class IdleSubchannelEntry implements RoundRobinEntry {
1071
    private final SynchronizationContext syncContext;
1072
    private final Subchannel subchannel;
1073
    private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
1✔
1074

1075
    IdleSubchannelEntry(Subchannel subchannel, SynchronizationContext syncContext) {
1✔
1076
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
1077
      this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
1078
    }
1✔
1079

1080
    @Override
1081
    public PickResult picked(PickSubchannelArgs args) {
1082
      if (connectionRequested.compareAndSet(false, true)) {
1✔
1083
        syncContext.execute(new Runnable() {
1✔
1084
            @Override
1085
            public void run() {
1086
              subchannel.requestConnection();
1✔
1087
            }
1✔
1088
          });
1089
      }
1090
      return PickResult.withNoResult();
1✔
1091
    }
1092

1093
    @Override
1094
    public String toString() {
1095
      // This is printed in logs.  Only give out useful information.
1096
      return "(idle)[" + subchannel.getAllAddresses().toString() + "]";
×
1097
    }
1098

1099
    @Override
1100
    public int hashCode() {
1101
      return Objects.hashCode(subchannel, syncContext);
×
1102
    }
1103

1104
    @Override
1105
    public boolean equals(Object other) {
1106
      if (!(other instanceof IdleSubchannelEntry)) {
×
1107
        return false;
×
1108
      }
1109
      IdleSubchannelEntry that = (IdleSubchannelEntry) other;
×
1110
      return Objects.equal(subchannel, that.subchannel)
×
1111
          && Objects.equal(syncContext, that.syncContext);
×
1112
    }
1113
  }
1114

1115
  @VisibleForTesting
1116
  static final class ErrorEntry implements RoundRobinEntry {
1117
    final PickResult result;
1118

1119
    ErrorEntry(Status status) {
1✔
1120
      result = PickResult.withError(status);
1✔
1121
    }
1✔
1122

1123
    @Override
1124
    public PickResult picked(PickSubchannelArgs args) {
1125
      return result;
1✔
1126
    }
1127

1128
    @Override
1129
    public int hashCode() {
1130
      return Objects.hashCode(result);
×
1131
    }
1132

1133
    @Override
1134
    public boolean equals(Object other) {
1135
      if (!(other instanceof ErrorEntry)) {
1✔
1136
        return false;
×
1137
      }
1138
      return Objects.equal(result, ((ErrorEntry) other).result);
1✔
1139
    }
1140

1141
    @Override
1142
    public String toString() {
1143
      // This is printed in logs.  Only include useful information.
1144
      return result.getStatus().toString();
×
1145
    }
1146
  }
1147

1148
  /**
1149
   * Entry that wraps a child LB's picker for PICK_FIRST mode delegation.
1150
   * Attaches TokenAttachingTracerFactory to the pick result for token propagation.
1151
   */
1152
  @VisibleForTesting
1153
  static final class ChildLbPickerEntry implements RoundRobinEntry {
1154
    private final SubchannelPicker childPicker;
1155
    private final TokenAttachingTracerFactory tracerFactory;
1156

1157
    ChildLbPickerEntry(SubchannelPicker childPicker, TokenAttachingTracerFactory tracerFactory) {
1✔
1158
      this.childPicker = checkNotNull(childPicker, "childPicker");
1✔
1159
      this.tracerFactory = checkNotNull(tracerFactory, "tracerFactory");
1✔
1160
    }
1✔
1161

1162
    @Override
1163
    public PickResult picked(PickSubchannelArgs args) {
1164
      PickResult childResult = childPicker.pickSubchannel(args);
1✔
1165
      if (childResult.getSubchannel() == null) {
1✔
1166
        // No subchannel (e.g., buffer, error), return as-is.
1167
        return childResult;
1✔
1168
      }
1169
      // Wrap the pick result to attach tokens via the tracer factory.
1170
      return PickResult.withSubchannel(
1✔
1171
          childResult.getSubchannel(), tracerFactory, childResult.getAuthorityOverride());
1✔
1172
    }
1173

1174
    @Override
1175
    public int hashCode() {
1176
      return Objects.hashCode(childPicker, tracerFactory);
×
1177
    }
1178

1179
    @Override
1180
    public boolean equals(Object other) {
1181
      if (!(other instanceof ChildLbPickerEntry)) {
1✔
1182
        return false;
1✔
1183
      }
1184
      ChildLbPickerEntry that = (ChildLbPickerEntry) other;
1✔
1185
      return Objects.equal(childPicker, that.childPicker)
1✔
1186
          && Objects.equal(tracerFactory, that.tracerFactory);
1✔
1187
    }
1188

1189
    @Override
1190
    public String toString() {
1191
      return "ChildLbPickerEntry(" + childPicker + ")";
×
1192
    }
1193

1194
    @VisibleForTesting
1195
    SubchannelPicker getChildPicker() {
1196
      return childPicker;
1✔
1197
    }
1198
  }
1199

1200
  @VisibleForTesting
1201
  static final class RoundRobinPicker extends SubchannelPicker {
1202
    @VisibleForTesting
1203
    final List<DropEntry> dropList;
1204
    private int dropIndex;
1205

1206
    @VisibleForTesting
1207
    final List<? extends RoundRobinEntry> pickList;
1208
    private int pickIndex;
1209

1210
    // dropList can be empty, which means no drop.
1211
    // pickList must not be empty.
1212
    RoundRobinPicker(List<DropEntry> dropList, List<? extends RoundRobinEntry> pickList) {
1✔
1213
      this.dropList = checkNotNull(dropList, "dropList");
1✔
1214
      this.pickList = checkNotNull(pickList, "pickList");
1✔
1215
      checkArgument(!pickList.isEmpty(), "pickList is empty");
1✔
1216
    }
1✔
1217

1218
    @Override
1219
    public PickResult pickSubchannel(PickSubchannelArgs args) {
1220
      synchronized (pickList) {
1✔
1221
        // Two-level round-robin.
1222
        // First round-robin on dropList. If a drop entry is selected, request will be dropped.  If
1223
        // a non-drop entry is selected, then round-robin on pickList.  This makes sure requests are
1224
        // dropped at the same proportion as the drop entries appear on the round-robin list from
1225
        // the balancer, while only backends from pickList are selected for the non-drop cases.
1226
        if (!dropList.isEmpty()) {
1✔
1227
          DropEntry drop = dropList.get(dropIndex);
1✔
1228
          dropIndex++;
1✔
1229
          if (dropIndex == dropList.size()) {
1✔
1230
            dropIndex = 0;
1✔
1231
          }
1232
          if (drop != null) {
1✔
1233
            return drop.picked();
1✔
1234
          }
1235
        }
1236

1237
        RoundRobinEntry pick = pickList.get(pickIndex);
1✔
1238
        pickIndex++;
1✔
1239
        if (pickIndex == pickList.size()) {
1✔
1240
          pickIndex = 0;
1✔
1241
        }
1242
        return pick.picked(args);
1✔
1243
      }
1244
    }
1245

1246
    @Override
1247
    public String toString() {
1248
      if (SHOULD_LOG_SERVER_LISTS) {
×
1249
        return MoreObjects.toStringHelper(RoundRobinPicker.class)
×
1250
            .add("dropList", dropList)
×
1251
            .add("pickList", pickList)
×
1252
            .toString();
×
1253
      }
1254
      return MoreObjects.toStringHelper(RoundRobinPicker.class).toString();
×
1255
    }
1256
  }
1257

1258
  /**
1259
   * Helper for the child pick_first LB in PICK_FIRST mode. Intercepts updateBalancingState()
1260
   * to store state and trigger the grpclb picker update with drops and token attachment.
1261
   */
1262
  private final class PickFirstLbHelper extends ForwardingLoadBalancerHelper {
1✔
1263

1264
    @Override
1265
    protected Helper delegate() {
1266
      return helper;
1✔
1267
    }
1268

1269
    @Override
1270
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
1271
      pickFirstLbState = newState;
1✔
1272
      pickFirstLbPicker = newPicker;
1✔
1273
      // Trigger name resolution refresh on TRANSIENT_FAILURE or IDLE, similar to ROUND_ROBIN.
1274
      if (newState == TRANSIENT_FAILURE || newState == IDLE) {
1✔
1275
        helper.refreshNameResolution();
1✔
1276
      }
1277
      maybeUseFallbackBackends();
1✔
1278
      maybeUpdatePicker();
1✔
1279
    }
1✔
1280
  }
1281
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc