• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19441

29 Aug 2024 08:13PM UTC coverage: 84.505% (+0.01%) from 84.494%
#19441

push

github

ejona86
Focus MultiChildLB updates around ResolvedAddresses of children

This makes ClusterManagerLB more straight-forward, focusing on just the
things that are relevant to it, and it avoids specialized map key
handling in updateChildrenWithResolvedAddresses().

33387 of 39509 relevant lines covered (84.5%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.15
/../xds/src/main/java/io/grpc/xds/ClusterManagerLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
21

22
import com.google.common.annotations.VisibleForTesting;
23
import com.google.common.base.MoreObjects;
24
import io.grpc.ConnectivityState;
25
import io.grpc.InternalLogId;
26
import io.grpc.LoadBalancer;
27
import io.grpc.Status;
28
import io.grpc.SynchronizationContext;
29
import io.grpc.SynchronizationContext.ScheduledHandle;
30
import io.grpc.util.GracefulSwitchLoadBalancer;
31
import io.grpc.util.MultiChildLoadBalancer;
32
import io.grpc.xds.ClusterManagerLoadBalancerProvider.ClusterManagerConfig;
33
import io.grpc.xds.client.XdsLogger;
34
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
35
import java.util.HashMap;
36
import java.util.Map;
37
import java.util.Map.Entry;
38
import java.util.concurrent.ScheduledExecutorService;
39
import java.util.concurrent.TimeUnit;
40
import javax.annotation.Nullable;
41

42
/**
43
 * The top-level load balancing policy for use in XDS.
44
 * This policy does not immediately delete its children.  Instead, it marks them deactivated
45
 * and starts a timer for deletion.  If a subsequent address update restores the child, then it is
46
 * simply reactivated instead of built from scratch.  This is necessary because XDS can frequently
47
 * remove and then add back a server as machines are rebooted or repurposed for load management.
48
 *
49
 * <p>Note that this LB does not automatically reconnect children who go into IDLE status
50
 */
51
class ClusterManagerLoadBalancer extends MultiChildLoadBalancer {
52

53
  // 15 minutes is long enough for a reboot and the services to restart while not so long that
54
  // many children are waiting for cleanup.
55
  @VisibleForTesting
56
  public static final int DELAYED_CHILD_DELETION_TIME_MINUTES = 15;
57
  protected final SynchronizationContext syncContext;
58
  private final ScheduledExecutorService timeService;
59
  private final XdsLogger logger;
60
  private ResolvedAddresses lastResolvedAddresses;
61

62
  ClusterManagerLoadBalancer(Helper helper) {
63
    super(helper);
1✔
64
    this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
1✔
65
    this.timeService = checkNotNull(helper.getScheduledExecutorService(), "timeService");
1✔
66
    logger = XdsLogger.withLogId(
1✔
67
        InternalLogId.allocate("cluster_manager-lb", helper.getAuthority()));
1✔
68

69
    logger.log(XdsLogLevel.INFO, "Created");
1✔
70
  }
1✔
71

72
  @Override
73
  protected ChildLbState createChildLbState(Object key) {
74
    return new ClusterManagerLbState(key, GracefulSwitchLoadBalancerFactory.INSTANCE);
1✔
75
  }
76

77
  @Override
78
  protected Map<Object, ResolvedAddresses> createChildAddressesMap(
79
      ResolvedAddresses resolvedAddresses) {
80
    ClusterManagerConfig config = (ClusterManagerConfig)
1✔
81
        resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
82
    Map<Object, ResolvedAddresses> childAddresses = new HashMap<>();
1✔
83
    if (config != null) {
1✔
84
      for (Map.Entry<String, Object> childPolicy : config.childPolicies.entrySet()) {
1✔
85
        ResolvedAddresses addresses = resolvedAddresses.toBuilder()
1✔
86
            .setLoadBalancingPolicyConfig(childPolicy.getValue())
1✔
87
            .build();
1✔
88
        childAddresses.put(childPolicy.getKey(), addresses);
1✔
89
      }
1✔
90
    }
91
    logger.log(
1✔
92
        XdsLogLevel.INFO,
93
        "Received cluster_manager lb config: child names={0}", childAddresses.keySet());
1✔
94
    return childAddresses;
1✔
95
  }
96

97
  /**
98
   * This is like the parent except that it doesn't shutdown the removed children since we want that
99
   * to be done by the timer.
100
   */
101
  @Override
102
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
103
    if (lastResolvedAddresses != null) {
1✔
104
      // Handle deactivated children
105
      ClusterManagerConfig config = (ClusterManagerConfig)
1✔
106
          resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
107
      ClusterManagerConfig lastConfig = (ClusterManagerConfig)
1✔
108
          lastResolvedAddresses.getLoadBalancingPolicyConfig();
1✔
109
      Map<String, Object> adjChildPolicies = new HashMap<>(config.childPolicies);
1✔
110
      for (Entry<String, Object> entry : lastConfig.childPolicies.entrySet()) {
1✔
111
        ClusterManagerLbState state = (ClusterManagerLbState) getChildLbState(entry.getKey());
1✔
112
        if (adjChildPolicies.containsKey(entry.getKey())) {
1✔
113
          if (state.deletionTimer != null) {
1✔
114
            state.reactivateChild();
1✔
115
          }
116
        } else if (state != null) {
1✔
117
          adjChildPolicies.put(entry.getKey(), entry.getValue());
1✔
118
          if (state.deletionTimer == null) {
1✔
119
            state.deactivateChild();
1✔
120
          }
121
        }
122
      }
1✔
123
      config = new ClusterManagerConfig(adjChildPolicies);
1✔
124
      resolvedAddresses =
1✔
125
          resolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build();
1✔
126
    }
127
    lastResolvedAddresses = resolvedAddresses;
1✔
128
    return super.acceptResolvedAddresses(resolvedAddresses);
1✔
129
  }
130

131
  /**
132
   * Using the state of all children will calculate the current connectivity state,
133
   * update currentConnectivityState, generate a picker and then call
134
   * {@link Helper#updateBalancingState(ConnectivityState, SubchannelPicker)}.
135
   */
136
  @Override
137
  protected void updateOverallBalancingState() {
138
    ConnectivityState overallState = null;
1✔
139
    final Map<Object, SubchannelPicker> childPickers = new HashMap<>();
1✔
140
    for (ChildLbState childLbState : getChildLbStates()) {
1✔
141
      if (((ClusterManagerLbState) childLbState).deletionTimer != null) {
1✔
142
        continue;
1✔
143
      }
144
      childPickers.put(childLbState.getKey(), childLbState.getCurrentPicker());
1✔
145
      overallState = aggregateState(overallState, childLbState.getCurrentState());
1✔
146
    }
1✔
147

148
    if (overallState != null) {
1✔
149
      getHelper().updateBalancingState(overallState, getSubchannelPicker(childPickers));
1✔
150
      currentConnectivityState = overallState;
1✔
151
    }
152
  }
1✔
153

154
  protected SubchannelPicker getSubchannelPicker(Map<Object, SubchannelPicker> childPickers) {
155
    return new SubchannelPicker() {
1✔
156
      @Override
157
      public PickResult pickSubchannel(PickSubchannelArgs args) {
158
        String clusterName =
1✔
159
            args.getCallOptions().getOption(XdsNameResolver.CLUSTER_SELECTION_KEY);
1✔
160
        SubchannelPicker childPicker = childPickers.get(clusterName);
1✔
161
        if (childPicker == null) {
1✔
162
          return
1✔
163
              PickResult.withError(
1✔
164
                  Status.UNAVAILABLE.withDescription("CDS encountered error: unable to find "
1✔
165
                      + "available subchannel for cluster " + clusterName));
166
        }
167
        return childPicker.pickSubchannel(args);
1✔
168
      }
169

170
      @Override
171
      public String toString() {
172
        return MoreObjects.toStringHelper(this).add("pickers", childPickers).toString();
×
173
      }
174
    };
175
  }
176

177
  @Override
178
  public void handleNameResolutionError(Status error) {
179
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
180
    boolean gotoTransientFailure = true;
1✔
181
    for (ChildLbState state : getChildLbStates()) {
1✔
182
      if (((ClusterManagerLbState) state).deletionTimer == null) {
1✔
183
        gotoTransientFailure = false;
1✔
184
        state.getLb().handleNameResolutionError(error);
1✔
185
      }
186
    }
1✔
187
    if (gotoTransientFailure) {
1✔
188
      getHelper().updateBalancingState(
1✔
189
          TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
1✔
190
    }
191
  }
1✔
192

193
  /**
194
   * This differs from the base class in the use of the deletion timer.  When it is deactivated,
195
   * rather than immediately calling shutdown it starts a timer.  If shutdown or reactivate
196
   * are called before the timer fires, the timer is canceled.  Otherwise, time timer calls shutdown
197
   * and removes the child from the petiole policy when it is triggered.
198
   */
199
  private class ClusterManagerLbState extends ChildLbState {
1✔
200
    @Nullable
201
    ScheduledHandle deletionTimer;
202

203
    public ClusterManagerLbState(Object key, LoadBalancer.Factory policyFactory) {
1✔
204
      super(key, policyFactory);
1✔
205
    }
1✔
206

207
    @Override
208
    protected ChildLbStateHelper createChildHelper() {
209
      return new ClusterManagerChildHelper();
1✔
210
    }
211

212
    @Override
213
    protected void shutdown() {
214
      if (deletionTimer != null) {
1✔
215
        deletionTimer.cancel();
1✔
216
        deletionTimer = null;
1✔
217
      }
218
      super.shutdown();
1✔
219
    }
1✔
220

221
    void reactivateChild() {
222
      assert deletionTimer != null;
1✔
223
      deletionTimer.cancel();
1✔
224
      deletionTimer = null;
1✔
225
      logger.log(XdsLogLevel.DEBUG, "Child balancer {0} reactivated", getKey());
1✔
226
    }
1✔
227

228
    void deactivateChild() {
229
      assert deletionTimer == null;
1✔
230

231
      class DeletionTask implements Runnable {
1✔
232

233
        @Override
234
        public void run() {
235
          ClusterManagerConfig config = (ClusterManagerConfig)
1✔
236
              lastResolvedAddresses.getLoadBalancingPolicyConfig();
1✔
237
          Map<String, Object> childPolicies = new HashMap<>(config.childPolicies);
1✔
238
          Object removed = childPolicies.remove(getKey());
1✔
239
          assert removed != null;
1✔
240
          config = new ClusterManagerConfig(childPolicies);
1✔
241
          lastResolvedAddresses =
1✔
242
              lastResolvedAddresses.toBuilder().setLoadBalancingPolicyConfig(config).build();
1✔
243
          acceptResolvedAddresses(lastResolvedAddresses);
1✔
244
        }
1✔
245
      }
246

247
      deletionTimer =
1✔
248
          syncContext.schedule(
1✔
249
              new DeletionTask(),
250
              DELAYED_CHILD_DELETION_TIME_MINUTES,
251
              TimeUnit.MINUTES,
252
              timeService);
1✔
253
      logger.log(XdsLogLevel.DEBUG, "Child balancer {0} deactivated", getKey());
1✔
254
    }
1✔
255

256
    private class ClusterManagerChildHelper extends ChildLbStateHelper {
1✔
257
      @Override
258
      public void updateBalancingState(final ConnectivityState newState,
259
                                       final SubchannelPicker newPicker) {
260
        if (getCurrentState() == ConnectivityState.SHUTDOWN) {
1✔
261
          return;
1✔
262
        }
263

264
        // Subchannel picker and state are saved, but will only be propagated to the channel
265
        // when the child instance exits deactivated state.
266
        setCurrentState(newState);
1✔
267
        setCurrentPicker(newPicker);
1✔
268
        // If we are already in the process of resolving addresses, the overall balancing state
269
        // will be updated at the end of it, and we don't need to trigger that update here.
270
        if (deletionTimer == null && !resolvingAddresses) {
1✔
271
          updateOverallBalancingState();
1✔
272
        }
273
      }
1✔
274
    }
275
  }
276

277
  static final class GracefulSwitchLoadBalancerFactory extends LoadBalancer.Factory {
1✔
278
    static final LoadBalancer.Factory INSTANCE = new GracefulSwitchLoadBalancerFactory();
1✔
279

280
    @Override
281
    public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
282
      return new GracefulSwitchLoadBalancer(helper);
1✔
283
    }
284
  }
285
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc