• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19707

21 Feb 2025 12:35AM UTC coverage: 88.595% (-0.006%) from 88.601%
#19707

push

github

ejona86
xds: Use acceptResolvedAddresses() for PriorityLb children

PriorityLb should propagate config problems up to the name resolver so
it can refresh.

34296 of 38711 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.62
/../xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java
1
/*
2
 * Copyright 2020 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ConnectivityState.CONNECTING;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
24

25
import io.grpc.ConnectivityState;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer;
28
import io.grpc.Status;
29
import io.grpc.SynchronizationContext;
30
import io.grpc.SynchronizationContext.ScheduledHandle;
31
import io.grpc.util.ForwardingLoadBalancerHelper;
32
import io.grpc.util.GracefulSwitchLoadBalancer;
33
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
34
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig;
35
import io.grpc.xds.client.XdsLogger;
36
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
37
import java.util.ArrayList;
38
import java.util.Collection;
39
import java.util.HashMap;
40
import java.util.HashSet;
41
import java.util.List;
42
import java.util.Map;
43
import java.util.Objects;
44
import java.util.Set;
45
import java.util.concurrent.ScheduledExecutorService;
46
import java.util.concurrent.TimeUnit;
47
import javax.annotation.Nullable;
48

49
/**
50
 * Load balancer for priority policy. A <em>priority</em> represents a logical entity within a
51
 * cluster for load balancing purposes.
52
 */
53
final class PriorityLoadBalancer extends LoadBalancer {
54
  private final Helper helper;
55
  private final SynchronizationContext syncContext;
56
  private final ScheduledExecutorService executor;
57
  private final XdsLogger logger;
58

59
  // Includes all active and deactivated children. Mutable. New entries are only added from priority
60
  // 0 up to the selected priority. An entry is only deleted 15 minutes after its deactivation.
61
  // Note that calling into a child can cause the child to call back into the LB policy and modify
62
  // the map.  Therefore copy values before looping over them.
63
  private final Map<String, ChildLbState> children = new HashMap<>();
1✔
64

65
  // Following fields are only null initially.
66
  private ResolvedAddresses resolvedAddresses;
67
  // List of priority names in order.
68
  private List<String> priorityNames;
69
  // Config for each priority.
70
  private Map<String, PriorityChildConfig> priorityConfigs;
71
  @Nullable private String currentPriority;
72
  private ConnectivityState currentConnectivityState;
73
  private SubchannelPicker currentPicker;
74
  // Set to true if currently in the process of handling resolved addresses.
75
  private boolean handlingResolvedAddresses;
76

77
  PriorityLoadBalancer(Helper helper) {
1✔
78
    this.helper = checkNotNull(helper, "helper");
1✔
79
    syncContext = helper.getSynchronizationContext();
1✔
80
    executor = helper.getScheduledExecutorService();
1✔
81
    InternalLogId logId = InternalLogId.allocate("priority-lb", helper.getAuthority());
1✔
82
    logger = XdsLogger.withLogId(logId);
1✔
83
    logger.log(XdsLogLevel.INFO, "Created");
1✔
84
  }
1✔
85

86
  @Override
87
  public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
88
    logger.log(XdsLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
1✔
89
    this.resolvedAddresses = resolvedAddresses;
1✔
90
    PriorityLbConfig config = (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
91
    checkNotNull(config, "missing priority lb config");
1✔
92
    priorityNames = config.priorities;
1✔
93
    priorityConfigs = config.childConfigs;
1✔
94
    Status status = Status.OK;
1✔
95
    Set<String> prioritySet = new HashSet<>(config.priorities);
1✔
96
    ArrayList<String> childKeys = new ArrayList<>(children.keySet());
1✔
97
    for (String priority : childKeys) {
1✔
98
      if (!prioritySet.contains(priority)) {
1✔
99
        ChildLbState childLbState = children.get(priority);
1✔
100
        if (childLbState != null) {
1✔
101
          childLbState.deactivate();
1✔
102
        }
103
      }
104
    }
1✔
105
    handlingResolvedAddresses = true;
1✔
106
    for (String priority : priorityNames) {
1✔
107
      ChildLbState childLbState = children.get(priority);
1✔
108
      if (childLbState != null) {
1✔
109
        Status newStatus = childLbState.updateResolvedAddresses();
1✔
110
        if (!newStatus.isOk()) {
1✔
111
          status = newStatus;
1✔
112
        }
113
      }
114
    }
1✔
115
    handlingResolvedAddresses = false;
1✔
116
    Status newStatus = tryNextPriority();
1✔
117
    if (!newStatus.isOk()) {
1✔
118
      status = newStatus;
1✔
119
    }
120
    return status;
1✔
121
  }
122

123
  @Override
124
  public void handleNameResolutionError(Status error) {
125
    logger.log(XdsLogLevel.WARNING, "Received name resolution error: {0}", error);
1✔
126
    boolean gotoTransientFailure = true;
1✔
127
    Collection<ChildLbState> childValues = new ArrayList<>(children.values());
1✔
128
    for (ChildLbState child : childValues) {
1✔
129
      if (priorityNames.contains(child.priority)) {
1✔
130
        child.lb.handleNameResolutionError(error);
1✔
131
        gotoTransientFailure = false;
1✔
132
      }
133
    }
1✔
134
    if (gotoTransientFailure) {
1✔
135
      updateOverallState(
×
136
          null, TRANSIENT_FAILURE, new FixedResultPicker(PickResult.withError(error)));
×
137
    }
138
  }
1✔
139

140
  @Override
141
  public void shutdown() {
142
    logger.log(XdsLogLevel.INFO, "Shutdown");
1✔
143
    Collection<ChildLbState> childValues = new ArrayList<>(children.values());
1✔
144
    for (ChildLbState child : childValues) {
1✔
145
      child.tearDown();
1✔
146
    }
1✔
147
    children.clear();
1✔
148
  }
1✔
149

150
  private Status tryNextPriority() {
151
    for (int i = 0; i < priorityNames.size(); i++) {
1✔
152
      String priority = priorityNames.get(i);
1✔
153
      if (!children.containsKey(priority)) {
1✔
154
        ChildLbState child =
1✔
155
            new ChildLbState(priority, priorityConfigs.get(priority).ignoreReresolution);
1✔
156
        children.put(priority, child);
1✔
157
        updateOverallState(priority, CONNECTING, new FixedResultPicker(PickResult.withNoResult()));
1✔
158
        // Calling the child's updateResolvedAddresses() can result in tryNextPriority() being
159
        // called recursively. We need to be sure to be done with processing here before it is
160
        // called.
161
        return child.updateResolvedAddresses(); // Give priority i time to connect.
1✔
162
      }
163
      ChildLbState child = children.get(priority);
1✔
164
      child.reactivate();
1✔
165
      if (child.connectivityState.equals(READY) || child.connectivityState.equals(IDLE)) {
1✔
166
        logger.log(XdsLogLevel.DEBUG, "Shifted to priority {0}", priority);
1✔
167
        updateOverallState(priority, child.connectivityState, child.picker);
1✔
168
        for (int j = i + 1; j < priorityNames.size(); j++) {
1✔
169
          String p = priorityNames.get(j);
1✔
170
          if (children.containsKey(p)) {
1✔
171
            children.get(p).deactivate();
1✔
172
          }
173
        }
174
        return Status.OK;
1✔
175
      }
176
      if (child.failOverTimer != null && child.failOverTimer.isPending()) {
1✔
177
        updateOverallState(priority, child.connectivityState, child.picker);
1✔
178
        return Status.OK; // Give priority i time to connect.
1✔
179
      }
180
      if (priority.equals(currentPriority) && child.connectivityState != TRANSIENT_FAILURE) {
1✔
181
        // If the current priority is not changed into TRANSIENT_FAILURE, keep using it.
182
        updateOverallState(priority, child.connectivityState, child.picker);
×
183
        return Status.OK;
×
184
      }
185
    }
186
    // TODO(zdapeng): Include error details of each priority.
187
    logger.log(XdsLogLevel.DEBUG, "All priority failed");
1✔
188
    String lastPriority = priorityNames.get(priorityNames.size() - 1);
1✔
189
    SubchannelPicker errorPicker = children.get(lastPriority).picker;
1✔
190
    updateOverallState(lastPriority, TRANSIENT_FAILURE, errorPicker);
1✔
191
    return Status.OK;
1✔
192
  }
193

194
  private void updateOverallState(
195
      @Nullable String priority, ConnectivityState state, SubchannelPicker picker) {
196
    if (!Objects.equals(priority, currentPriority) || !state.equals(currentConnectivityState)
1✔
197
        || !picker.equals(currentPicker)) {
1✔
198
      currentPriority = priority;
1✔
199
      currentConnectivityState = state;
1✔
200
      currentPicker = picker;
1✔
201
      helper.updateBalancingState(state, picker);
1✔
202
    }
203
  }
1✔
204

205
  private final class ChildLbState {
206
    final String priority;
207
    final ChildHelper childHelper;
208
    final GracefulSwitchLoadBalancer lb;
209
    // Timer to fail over to the next priority if not connected in 10 sec. Scheduled only once at
210
    // child initialization.
211
    ScheduledHandle failOverTimer;
212
    boolean seenReadyOrIdleSinceTransientFailure = false;
1✔
213
    // Timer to delay shutdown and deletion of the priority. Scheduled whenever the child is
214
    // deactivated.
215
    @Nullable ScheduledHandle deletionTimer;
216
    ConnectivityState connectivityState = CONNECTING;
1✔
217
    SubchannelPicker picker = new FixedResultPicker(PickResult.withNoResult());
1✔
218

219
    ChildLbState(final String priority, boolean ignoreReresolution) {
1✔
220
      this.priority = priority;
1✔
221
      childHelper = new ChildHelper(ignoreReresolution);
1✔
222
      lb = new GracefulSwitchLoadBalancer(childHelper);
1✔
223
      failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS, executor);
1✔
224
      logger.log(XdsLogLevel.DEBUG, "Priority created: {0}", priority);
1✔
225
    }
1✔
226

227
    final class FailOverTask implements Runnable {
1✔
228
      @Override
229
      public void run() {
230
        if (deletionTimer != null && deletionTimer.isPending()) {
1✔
231
          // The child is deactivated.
232
          return;
1✔
233
        }
234
        picker = new FixedResultPicker(PickResult.withError(
1✔
235
            Status.UNAVAILABLE.withDescription("Connection timeout for priority " + priority)));
1✔
236
        logger.log(XdsLogLevel.DEBUG, "Priority {0} failed over to next", priority);
1✔
237
        currentPriority = null; // reset currentPriority to guarantee failover happen
1✔
238
        Status status = tryNextPriority();
1✔
239
        if (!status.isOk()) {
1✔
240
          // A child had a problem with the addresses/config. Request it to be refreshed
241
          helper.refreshNameResolution();
1✔
242
        }
243
      }
1✔
244
    }
245

246
    /**
247
     * Called when the child becomes a priority that is or appears before the first READY one in the
248
     * {@code priorities} list, due to either config update or balancing state update.
249
     */
250
    void reactivate() {
251
      if (deletionTimer != null && deletionTimer.isPending()) {
1✔
252
        deletionTimer.cancel();
1✔
253
        logger.log(XdsLogLevel.DEBUG, "Priority reactivated: {0}", priority);
1✔
254
      }
255
    }
1✔
256

257
    /**
258
     * Called when either the child is removed by config update, or a higher priority becomes READY.
259
     */
260
    void deactivate() {
261
      if (deletionTimer != null && deletionTimer.isPending()) {
1✔
262
        return;
1✔
263
      }
264

265
      class DeletionTask implements Runnable {
1✔
266
        @Override
267
        public void run() {
268
          tearDown();
1✔
269
          children.remove(priority);
1✔
270
        }
1✔
271
      }
272

273
      deletionTimer = syncContext.schedule(new DeletionTask(), 15, TimeUnit.MINUTES, executor);
1✔
274
      logger.log(XdsLogLevel.DEBUG, "Priority deactivated: {0}", priority);
1✔
275
    }
1✔
276

277
    void tearDown() {
278
      if (failOverTimer.isPending()) {
1✔
279
        failOverTimer.cancel();
1✔
280
      }
281
      if (deletionTimer != null && deletionTimer.isPending()) {
1✔
282
        deletionTimer.cancel();
1✔
283
      }
284
      lb.shutdown();
1✔
285
      logger.log(XdsLogLevel.DEBUG, "Priority deleted: {0}", priority);
1✔
286
    }
1✔
287

288
    /**
289
     * Called either when the child is just created and in this case updated with the cached {@code
290
     * resolvedAddresses}, or when priority lb receives a new resolved addresses while the child
291
     * already exists.
292
     */
293
    Status updateResolvedAddresses() {
294
      PriorityLbConfig config =
1✔
295
          (PriorityLbConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
1✔
296
      return lb.acceptResolvedAddresses(
1✔
297
          resolvedAddresses.toBuilder()
1✔
298
              .setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), priority))
1✔
299
              .setLoadBalancingPolicyConfig(config.childConfigs.get(priority).childConfig)
1✔
300
              .build());
1✔
301
    }
302

303
    final class ChildHelper extends ForwardingLoadBalancerHelper {
304
      private final boolean ignoreReresolution;
305

306
      ChildHelper(boolean ignoreReresolution) {
1✔
307
        this.ignoreReresolution = ignoreReresolution;
1✔
308
      }
1✔
309

310
      @Override
311
      public void refreshNameResolution() {
312
        if (!ignoreReresolution) {
1✔
313
          delegate().refreshNameResolution();
1✔
314
        }
315
      }
1✔
316

317
      @Override
318
      public void updateBalancingState(final ConnectivityState newState,
319
          final SubchannelPicker newPicker) {
320
        if (!children.containsKey(priority)) {
1✔
321
          return;
1✔
322
        }
323
        connectivityState = newState;
1✔
324
        picker = newPicker;
1✔
325

326
        if (deletionTimer != null && deletionTimer.isPending()) {
1✔
327
          return;
1✔
328
        }
329
        if (newState.equals(CONNECTING)) {
1✔
330
          if (!failOverTimer.isPending() && seenReadyOrIdleSinceTransientFailure) {
1✔
331
            failOverTimer = syncContext.schedule(new FailOverTask(), 10, TimeUnit.SECONDS,
1✔
332
                executor);
1✔
333
          }
334
        } else if (newState.equals(READY) || newState.equals(IDLE)) {
1✔
335
          seenReadyOrIdleSinceTransientFailure = true;
1✔
336
          failOverTimer.cancel();
1✔
337
        } else if (newState.equals(TRANSIENT_FAILURE)) {
1✔
338
          seenReadyOrIdleSinceTransientFailure = false;
1✔
339
          failOverTimer.cancel();
1✔
340
        }
341

342
        // If we are currently handling newly resolved addresses, let's not try to reconfigure as
343
        // the address handling process will take care of that to provide an atomic config update.
344
        if (!handlingResolvedAddresses) {
1✔
345
          Status status = tryNextPriority();
1✔
346
          if (!status.isOk()) {
1✔
347
            // A child had a problem with the addresses/config. Request it to be refreshed
348
            helper.refreshNameResolution();
1✔
349
          }
350
        }
351
      }
1✔
352

353
      @Override
354
      protected Helper delegate() {
355
        return helper;
1✔
356
      }
357
    }
358
  }
359
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc