• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19709

21 Feb 2025 05:25AM UTC coverage: 88.586% (-0.009%) from 88.595%
#19709

push

github

ejona86
Use acceptResolvedAddresses() in easy cases

We want to move away from handleResolvedAddresses(). These are "easy" in
that they need no logic. LBs extending ForwardingLoadBalancer had the
method duplicated from handleResolvedAddresses() and swapped away from
`super` because ForwardingLoadBalancer only forwards
handleResolvedAddresses() reliably today. Duplicating small methods was
less bug-prone than dealing with ForwardingLoadBalancer.

34296 of 38715 relevant lines covered (88.59%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.57
/../services/src/main/java/io/grpc/protobuf/services/HealthCheckingLoadBalancerFactory.java
1
/*
2
 * Copyright 2018 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.protobuf.services;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.ConnectivityState.CONNECTING;
22
import static io.grpc.ConnectivityState.IDLE;
23
import static io.grpc.ConnectivityState.READY;
24
import static io.grpc.ConnectivityState.SHUTDOWN;
25
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
26

27
import com.google.common.annotations.VisibleForTesting;
28
import com.google.common.base.MoreObjects;
29
import com.google.common.base.Objects;
30
import com.google.common.base.Stopwatch;
31
import com.google.common.base.Supplier;
32
import io.grpc.CallOptions;
33
import io.grpc.ChannelLogger;
34
import io.grpc.ChannelLogger.ChannelLogLevel;
35
import io.grpc.ClientCall;
36
import io.grpc.ConnectivityStateInfo;
37
import io.grpc.LoadBalancer;
38
import io.grpc.LoadBalancer.CreateSubchannelArgs;
39
import io.grpc.LoadBalancer.Helper;
40
import io.grpc.LoadBalancer.Subchannel;
41
import io.grpc.LoadBalancer.SubchannelStateListener;
42
import io.grpc.Metadata;
43
import io.grpc.Status;
44
import io.grpc.Status.Code;
45
import io.grpc.SynchronizationContext;
46
import io.grpc.SynchronizationContext.ScheduledHandle;
47
import io.grpc.health.v1.HealthCheckRequest;
48
import io.grpc.health.v1.HealthCheckResponse;
49
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
50
import io.grpc.health.v1.HealthGrpc;
51
import io.grpc.internal.BackoffPolicy;
52
import io.grpc.internal.ServiceConfigUtil;
53
import io.grpc.util.ForwardingLoadBalancer;
54
import io.grpc.util.ForwardingLoadBalancerHelper;
55
import io.grpc.util.ForwardingSubchannel;
56
import io.grpc.util.HealthProducerHelper;
57
import java.util.HashSet;
58
import java.util.Map;
59
import java.util.concurrent.ScheduledExecutorService;
60
import java.util.concurrent.TimeUnit;
61
import java.util.logging.Level;
62
import java.util.logging.Logger;
63
import javax.annotation.Nullable;
64

65
/**
66
 * Wraps a {@link LoadBalancer} and implements the client-side health-checking
67
 * (https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md).  The
68
 * Subchannel received by the states wrapped LoadBalancer will be determined by health-checking.
69
 *
70
 * <p>Note the original LoadBalancer must call {@code Helper.createSubchannel()} from the
71
 * SynchronizationContext, or it will throw.
72
 */
73
final class HealthCheckingLoadBalancerFactory extends LoadBalancer.Factory {
74
  private static final Logger logger =
1✔
75
      Logger.getLogger(HealthCheckingLoadBalancerFactory.class.getName());
1✔
76

77
  private final LoadBalancer.Factory delegateFactory;
78
  private final BackoffPolicy.Provider backoffPolicyProvider;
79
  private final Supplier<Stopwatch> stopwatchSupplier;
80

81
  public HealthCheckingLoadBalancerFactory(
82
      LoadBalancer.Factory delegateFactory, BackoffPolicy.Provider backoffPolicyProvider,
83
      Supplier<Stopwatch> stopwatchSupplier) {
1✔
84
    this.delegateFactory = checkNotNull(delegateFactory, "delegateFactory");
1✔
85
    this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
86
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
87
  }
1✔
88

89
  @Override
90
  public LoadBalancer newLoadBalancer(Helper helper) {
91
    HelperImpl wrappedHelper = new HelperImpl(helper);
1✔
92
    LoadBalancer delegateBalancer = delegateFactory.newLoadBalancer(wrappedHelper);
1✔
93
    return new HealthCheckingLoadBalancer(wrappedHelper, delegateBalancer);
1✔
94
  }
95

96
  private final class HelperImpl extends ForwardingLoadBalancerHelper {
97
    private final Helper delegate;
98
    private final SynchronizationContext syncContext;
99

100
    @Nullable String healthCheckedService;
101

102
    final HashSet<HealthCheckState> hcStates = new HashSet<>();
1✔
103

104
    HelperImpl(Helper delegate) {
1✔
105
      this.delegate = new HealthProducerHelper(checkNotNull(delegate, "delegate"));
1✔
106
      this.syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext");
1✔
107
    }
1✔
108

109
    @Override
110
    protected Helper delegate() {
111
      return delegate;
1✔
112
    }
113

114
    @Override
115
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
116
      // HealthCheckState is not thread-safe, we are requiring the original LoadBalancer calls
117
      // createSubchannel() from the SynchronizationContext.
118
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
119
      LoadBalancer.SubchannelStateListener healthConsumerListener =
1✔
120
          args.getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY);
1✔
121
      HealthCheckState hcState = new HealthCheckState(
1✔
122
          this, syncContext, delegate.getScheduledExecutorService(), healthConsumerListener);
1✔
123
      if (healthConsumerListener != null) {
1✔
124
        args = args.toBuilder().addOption(HEALTH_CONSUMER_LISTENER_ARG_KEY, hcState).build();
1✔
125
      }
126
      Subchannel delegate = super.createSubchannel(args);
1✔
127
      hcState.setSubchannel(delegate);
1✔
128
      hcStates.add(hcState);
1✔
129
      Subchannel subchannel = new SubchannelImpl(delegate, hcState);
1✔
130
      if (healthCheckedService != null) {
1✔
131
        hcState.setServiceName(healthCheckedService);
1✔
132
      }
133
      return subchannel;
1✔
134
    }
135

136
    void setHealthCheckedService(@Nullable String service) {
137
      healthCheckedService = service;
1✔
138
      for (HealthCheckState hcState : hcStates) {
1✔
139
        hcState.setServiceName(service);
1✔
140
      }
1✔
141
    }
1✔
142

143
    @Override
144
    public String toString() {
145
      return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
×
146
    }
147
  }
148

149
  @VisibleForTesting
150
  static final class SubchannelImpl extends ForwardingSubchannel {
151
    final Subchannel delegate;
152
    final HealthCheckState hcState;
153

154
    SubchannelImpl(Subchannel delegate, HealthCheckState hcState) {
1✔
155
      this.delegate = checkNotNull(delegate, "delegate");
1✔
156
      this.hcState = checkNotNull(hcState, "hcState");
1✔
157
    }
1✔
158

159
    @Override
160
    protected Subchannel delegate() {
161
      return delegate;
1✔
162
    }
163

164
    @Override
165
    public void start(final SubchannelStateListener listener) {
166
      if (hcState.stateListener == null) {
1✔
167
        hcState.init(listener);
1✔
168
        delegate().start(hcState);
1✔
169
      } else {
170
        delegate().start(listener);
1✔
171
      }
172
    }
1✔
173
  }
174

175
  private static final class HealthCheckingLoadBalancer extends ForwardingLoadBalancer {
176
    final LoadBalancer delegate;
177
    final HelperImpl helper;
178

179
    HealthCheckingLoadBalancer(HelperImpl helper, LoadBalancer delegate) {
1✔
180
      this.helper = checkNotNull(helper, "helper");
1✔
181
      this.delegate = checkNotNull(delegate, "delegate");
1✔
182
    }
1✔
183

184
    @Override
185
    protected LoadBalancer delegate() {
186
      return delegate;
1✔
187
    }
188

189
    @Override
190
    public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
191
      Map<String, ?> healthCheckingConfig =
1✔
192
          resolvedAddresses
193
              .getAttributes()
1✔
194
              .get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG);
1✔
195
      String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig);
1✔
196
      helper.setHealthCheckedService(serviceName);
1✔
197
      delegate.handleResolvedAddresses(resolvedAddresses);
1✔
198
    }
1✔
199

200
    @Override
201
    public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
202
      Map<String, ?> healthCheckingConfig =
1✔
203
          resolvedAddresses
204
              .getAttributes()
1✔
205
              .get(LoadBalancer.ATTR_HEALTH_CHECKING_CONFIG);
1✔
206
      String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(healthCheckingConfig);
1✔
207
      helper.setHealthCheckedService(serviceName);
1✔
208
      return delegate.acceptResolvedAddresses(resolvedAddresses);
1✔
209
    }
210

211
    @Override
212
    public String toString() {
213
      return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
×
214
    }
215
  }
216

217
  
218
  // All methods are run from syncContext
219
  private final class HealthCheckState implements SubchannelStateListener {
220
    private final Runnable retryTask = new Runnable() {
1✔
221
        @Override
222
        public void run() {
223
          startRpc();
1✔
224
        }
1✔
225
      };
226

227
    private final SynchronizationContext syncContext;
228
    private final ScheduledExecutorService timerService;
229
    private final HelperImpl helperImpl;
230
    private Subchannel subchannel;
231
    private ChannelLogger subchannelLogger;
232

233
    // In dual stack new pick first, this becomes health listener from create subchannel args.
234
    private SubchannelStateListener stateListener;
235

236
    // Set when RPC started. Cleared when the RPC has closed or abandoned.
237
    @Nullable
238
    private HcStream activeRpc;
239

240
    // The service name that should be used for health checking
241
    private String serviceName;
242
    private BackoffPolicy backoffPolicy;
243
    // The state from the underlying Subchannel
244
    private ConnectivityStateInfo rawState = ConnectivityStateInfo.forNonError(IDLE);
1✔
245
    // The state concluded from health checking
246
    private ConnectivityStateInfo concludedState = ConnectivityStateInfo.forNonError(IDLE);
1✔
247
    // true if a health check stream should be kept.  When true, either there is an active RPC, or a
248
    // retry is pending.
249
    private boolean running;
250
    // true if server returned UNIMPLEMENTED
251
    private boolean disabled;
252
    private ScheduledHandle retryTimer;
253

254
    HealthCheckState(
255
        HelperImpl helperImpl, SynchronizationContext syncContext,
256
        ScheduledExecutorService timerService,
257
        @Nullable SubchannelStateListener healthListener) {
1✔
258
      this.helperImpl = checkNotNull(helperImpl, "helperImpl");
1✔
259
      this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
260
      this.timerService = checkNotNull(timerService, "timerService");
1✔
261
      this.stateListener = healthListener;
1✔
262
    }
1✔
263

264
    void setSubchannel(Subchannel subchannel) {
265
      this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
266
      this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
1✔
267
    }
1✔
268

269
    // Only called in old pick first. Can be removed after migration.
270
    void init(SubchannelStateListener listener) {
271
      checkState(this.stateListener == null, "init() already called");
1✔
272
      this.stateListener = checkNotNull(listener, "listener");
1✔
273
    }
1✔
274

275
    void setServiceName(@Nullable String newServiceName) {
276
      if (Objects.equal(newServiceName, serviceName)) {
1✔
277
        return;
1✔
278
      }
279
      serviceName = newServiceName;
1✔
280
      // If service name has changed while there is active RPC, cancel it so that
281
      // a new call will be made with the new name.
282
      String cancelMsg =
283
          serviceName == null ? "Health check disabled by service config"
1✔
284
          : "Switching to new service name: " + newServiceName;
1✔
285
      stopRpc(cancelMsg);
1✔
286
      adjustHealthCheck();
1✔
287
    }
1✔
288

289
    @Override
290
    public void onSubchannelState(ConnectivityStateInfo rawState) {
291
      if (Objects.equal(this.rawState.getState(), READY)
1✔
292
          && !Objects.equal(rawState.getState(), READY)) {
1✔
293
        // A connection was lost.  We will reset disabled flag because health check
294
        // may be available on the new connection.
295
        disabled = false;
1✔
296
      }
297
      if (Objects.equal(rawState.getState(), SHUTDOWN)) {
1✔
298
        helperImpl.hcStates.remove(this);
1✔
299
      }
300
      this.rawState = rawState;
1✔
301
      adjustHealthCheck();
1✔
302
    }
1✔
303

304
    private boolean isRetryTimerPending() {
305
      return retryTimer != null && retryTimer.isPending();
1✔
306
    }
307

308
    // Start or stop health check according to the current states.
309
    private void adjustHealthCheck() {
310
      if (!disabled && serviceName != null && Objects.equal(rawState.getState(), READY)) {
1✔
311
        running = true;
1✔
312
        if (activeRpc == null && !isRetryTimerPending()) {
1✔
313
          startRpc();
1✔
314
        }
315
      } else {
316
        running = false;
1✔
317
        // Prerequisites for health checking not met.
318
        // Make sure it's stopped.
319
        stopRpc("Client stops health check");
1✔
320
        backoffPolicy = null;
1✔
321
        gotoState(rawState);
1✔
322
      }
323
    }
1✔
324

325
    private void startRpc() {
326
      checkState(serviceName != null, "serviceName is null");
1✔
327
      checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up");
1✔
328
      checkState(subchannel != null, "init() not called");
1✔
329
      // Optimization suggested by @markroth: if we are already READY and starting the health
330
      // checking RPC, either because health check is just enabled or has switched to a new service
331
      // name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs
332
      // waiting for the health check to respond.
333
      if (!Objects.equal(concludedState.getState(), READY)) {
1✔
334
        subchannelLogger.log(
1✔
335
            ChannelLogLevel.INFO, "CONNECTING: Starting health-check for \"{0}\"", serviceName);
336
        gotoState(ConnectivityStateInfo.forNonError(CONNECTING));
1✔
337
      }
338
      activeRpc = new HcStream();
1✔
339
      activeRpc.start();
1✔
340
    }
1✔
341

342
    private void stopRpc(String msg) {
343
      if (activeRpc != null) {
1✔
344
        activeRpc.cancel(msg);
1✔
345
        // Abandon this RPC.  We are not interested in anything from this RPC any more.
346
        activeRpc = null;
1✔
347
      }
348
      if (retryTimer != null) {
1✔
349
        retryTimer.cancel();
1✔
350
        retryTimer = null;
1✔
351
      }
352
    }
1✔
353

354
    private void gotoState(ConnectivityStateInfo newState) {
355
      checkState(subchannel != null, "init() not called");
1✔
356
      if (!Objects.equal(concludedState, newState)) {
1✔
357
        concludedState = newState;
1✔
358
        stateListener.onSubchannelState(concludedState);
1✔
359
      }
360
    }
1✔
361

362
    @Override
363
    public String toString() {
364
      return MoreObjects.toStringHelper(this)
×
365
          .add("running", running)
×
366
          .add("disabled", disabled)
×
367
          .add("activeRpc", activeRpc)
×
368
          .add("serviceName", serviceName)
×
369
          .add("rawState", rawState)
×
370
          .add("concludedState", concludedState)
×
371
          .toString();
×
372
    }
373

374
    private class HcStream extends ClientCall.Listener<HealthCheckResponse> {
375
      private final ClientCall<HealthCheckRequest, HealthCheckResponse> call;
376
      private final String callServiceName;
377
      private final Stopwatch stopwatch;
378
      private boolean callHasResponded;
379

380
      HcStream() {
1✔
381
        stopwatch = stopwatchSupplier.get().start();
1✔
382
        callServiceName = serviceName;
1✔
383
        call = subchannel.asChannel().newCall(HealthGrpc.getWatchMethod(), CallOptions.DEFAULT);
1✔
384
      }
1✔
385

386
      void start() {
387
        call.start(this, new Metadata());
1✔
388
        call.sendMessage(HealthCheckRequest.newBuilder().setService(serviceName).build());
1✔
389
        call.halfClose();
1✔
390
        call.request(1);
1✔
391
      }
1✔
392

393
      void cancel(String msg) {
394
        call.cancel(msg, null);
1✔
395
      }
1✔
396

397
      @Override
398
      public void onMessage(final HealthCheckResponse response) {
399
        syncContext.execute(new Runnable() {
1✔
400
            @Override
401
            public void run() {
402
              if (activeRpc == HcStream.this) {
1✔
403
                handleResponse(response);
1✔
404
              }
405
            }
1✔
406
          });
407
      }
1✔
408

409
      @Override
410
      public void onClose(final Status status, Metadata trailers) {
411
        syncContext.execute(new Runnable() {
1✔
412
            @Override
413
            public void run() {
414
              if (activeRpc == HcStream.this) {
1✔
415
                activeRpc = null;
1✔
416
                handleStreamClosed(status);
1✔
417
              }
418
            }
1✔
419
          });
420
      }
1✔
421

422
      void handleResponse(HealthCheckResponse response) {
423
        callHasResponded = true;
1✔
424
        backoffPolicy = null;
1✔
425
        ServingStatus status = response.getStatus();
1✔
426
        // running == true means the Subchannel's state (rawState) is READY
427
        if (Objects.equal(status, ServingStatus.SERVING)) {
1✔
428
          subchannelLogger.log(ChannelLogLevel.INFO, "READY: health-check responded SERVING");
1✔
429
          gotoState(ConnectivityStateInfo.forNonError(READY));
1✔
430
        } else {
431
          subchannelLogger.log(
1✔
432
              ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check responded {0}", status);
433
          String errorDescription =  "Health-check service responded "
1✔
434
              + status + " for '" + callServiceName + "'";
435
          gotoState(ConnectivityStateInfo.forTransientFailure(
1✔
436
              Status.UNAVAILABLE.withDescription(errorDescription)));
1✔
437
        }
438
        call.request(1);
1✔
439
      }
1✔
440

441
      void handleStreamClosed(Status status) {
442
        if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
1✔
443
          disabled = true;
1✔
444
          logger.log(
1✔
445
              Level.SEVERE, "Health-check with {0} is disabled. Server returned: {1}",
446
              new Object[] {subchannel.getAllAddresses(), status});
1✔
447
          subchannelLogger.log(ChannelLogLevel.ERROR, "Health-check disabled: {0}", status);
1✔
448
          subchannelLogger.log(ChannelLogLevel.INFO, "{0} (no health-check)", rawState);
1✔
449
          gotoState(rawState);
1✔
450
          return;
1✔
451
        }
452
        long delayNanos = 0;
1✔
453
        subchannelLogger.log(
1✔
454
            ChannelLogLevel.INFO, "TRANSIENT_FAILURE: health-check stream closed with {0}", status);
455
        String errorDescription = "Health-check stream unexpectedly closed with "
1✔
456
            + status + " for '" + callServiceName + "'";
457
        gotoState(ConnectivityStateInfo.forTransientFailure(
1✔
458
            Status.UNAVAILABLE.withDescription(errorDescription)));
1✔
459
        // Use backoff only when server has not responded for the previous call
460
        if (!callHasResponded) {
1✔
461
          if (backoffPolicy == null) {
1✔
462
            backoffPolicy = backoffPolicyProvider.get();
1✔
463
          }
464
          delayNanos = backoffPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
465
        }
466
        if (delayNanos <= 0) {
1✔
467
          startRpc();
1✔
468
        } else {
469
          checkState(!isRetryTimerPending(), "Retry double scheduled");
1✔
470
          subchannelLogger.log(
1✔
471
              ChannelLogLevel.DEBUG, "Will retry health-check after {0} ns", delayNanos);
1✔
472
          retryTimer = syncContext.schedule(
1✔
473
              retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService);
1✔
474
        }
475
      }
1✔
476

477
      @Override
478
      public String toString() {
479
        return MoreObjects.toStringHelper(this)
×
480
            .add("callStarted", call != null)
×
481
            .add("serviceName", callServiceName)
×
482
            .add("hasResponded", callHasResponded)
×
483
            .toString();
×
484
      }
485
    }
486
  }
487
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc