• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20175

20 Feb 2026 07:37AM UTC coverage: 88.707% (+0.001%) from 88.706%
#20175

push

github

web-flow
unwrap ForwardingSubchannel during Picks (#12658)

This PR ensures that Load Balancing (LB) policies unwrap
`ForwardingSubchannel` instances before returning them in a
`PickResult`.

**Rationale:** Currently, the identity of a subchannel is "awkward"
because decorators break object identity. This forces the core channel
to use internal workarounds like `getInternalSubchannel()` to find the
underlying implementation. Removing these wrappers during the pick
process is a critical prerequisite for deleting Subchannel Attributes.

By enforcing unwrapping, `ManagedChannelImpl` can rely on the fact that
a returned subchannel is the same instance it originally created. This
allows the channel to use strongly-typed fields for state management
(via "blind casting") rather than abusing attributes to re-discover
information that should already be known. This also paves the way for
the eventual removal of the `getInternalSubchannel()` internal API.

**New APIs:** To ensure we don't "drop data on the floor" during the
unwrapping process, this PR adds two new non-static APIs to PickResult:
- copyWithSubchannel()
- copyWithStreamTracerFactory()

Unlike static factory methods, these instance methods follow a
"copy-and-update" pattern that preserves all existing pick-level
metadata (such as authority overrides or drop status) while only
swapping the specific field required.

35450 of 39963 relevant lines covered (88.71%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/../xds/src/main/java/io/grpc/xds/orca/OrcaOobUtil.java
1
/*
2
 * Copyright 2019 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.xds.orca;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static com.google.common.base.Preconditions.checkState;
21
import static io.grpc.ConnectivityState.IDLE;
22
import static io.grpc.ConnectivityState.READY;
23

24
import com.github.xds.data.orca.v3.OrcaLoadReport;
25
import com.github.xds.service.orca.v3.OpenRcaServiceGrpc;
26
import com.github.xds.service.orca.v3.OrcaLoadReportRequest;
27
import com.google.common.annotations.VisibleForTesting;
28
import com.google.common.base.MoreObjects;
29
import com.google.common.base.Objects;
30
import com.google.common.base.Stopwatch;
31
import com.google.common.base.Supplier;
32
import com.google.protobuf.util.Durations;
33
import io.grpc.Attributes;
34
import io.grpc.CallOptions;
35
import io.grpc.Channel;
36
import io.grpc.ChannelLogger;
37
import io.grpc.ChannelLogger.ChannelLogLevel;
38
import io.grpc.ClientCall;
39
import io.grpc.ConnectivityState;
40
import io.grpc.ConnectivityStateInfo;
41
import io.grpc.ExperimentalApi;
42
import io.grpc.LoadBalancer;
43
import io.grpc.LoadBalancer.CreateSubchannelArgs;
44
import io.grpc.LoadBalancer.Helper;
45
import io.grpc.LoadBalancer.PickResult;
46
import io.grpc.LoadBalancer.PickSubchannelArgs;
47
import io.grpc.LoadBalancer.Subchannel;
48
import io.grpc.LoadBalancer.SubchannelPicker;
49
import io.grpc.LoadBalancer.SubchannelStateListener;
50
import io.grpc.Metadata;
51
import io.grpc.Status;
52
import io.grpc.Status.Code;
53
import io.grpc.SynchronizationContext;
54
import io.grpc.SynchronizationContext.ScheduledHandle;
55
import io.grpc.internal.BackoffPolicy;
56
import io.grpc.internal.ExponentialBackoffPolicy;
57
import io.grpc.internal.GrpcUtil;
58
import io.grpc.services.MetricReport;
59
import io.grpc.util.ForwardingLoadBalancerHelper;
60
import io.grpc.util.ForwardingSubchannel;
61
import java.util.HashMap;
62
import java.util.Map;
63
import java.util.concurrent.ScheduledExecutorService;
64
import java.util.concurrent.TimeUnit;
65
import javax.annotation.Nullable;
66

67
/**
68
 * Utility class that provides method for {@link LoadBalancer} to install listeners to receive
69
 * out-of-band backend metrics in the format of Open Request Cost Aggregation (ORCA).
70
 */
71
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9129")
72
public final class OrcaOobUtil {
73

74
  private OrcaOobUtil() {}
75

76
  /**
77
   * Creates a new {@link io.grpc.LoadBalancer.Helper} with provided
78
   * {@link OrcaOobReportListener} installed
79
   * to receive callback when an out-of-band ORCA report is received.
80
   *
81
   * <p>Example usages:
82
   *
83
   * <ul>
84
   *   <li> Leaf policy (e.g., WRR policy)
85
   *     <pre>
86
   *       {@code
87
   *       class WrrLoadbalancer extends LoadBalancer {
88
   *         private final Helper originHelper;  // the original Helper
89
   *
90
   *         public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
91
   *           // listener implements the logic for WRR's usage of backend metrics.
92
   *           OrcaReportingHelper orcaHelper =
93
   *               OrcaOobUtil.newOrcaReportingHelper(originHelper);
94
   *           Subchannel subchannel =
95
   *               orcaHelper.createSubchannel(CreateSubchannelArgs.newBuilder()...);
96
   *           OrcaOobUtil.setListener(
97
   *              subchannel,
98
   *              listener,
99
   *              OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build());
100
   *           ...
101
   *         }
102
   *       }
103
   *       }
104
   *     </pre>
105
   *   </li>
106
   *   <li> Delegating policy doing per-child-policy aggregation
107
   *     <pre>
108
   *       {@code
109
   *       class XdsLoadBalancer extends LoadBalancer {
110
   *         private final Helper orcaHelper;  // the original Helper
111
   *
112
   *         public XdsLoadBalancer(LoadBalancer.Helper helper) {
113
   *           this.orcaHelper = OrcaUtil.newOrcaReportingHelper(helper);
114
   *         }
115
   *         private void createChildPolicy(
116
   *             Locality locality, LoadBalancerProvider childPolicyProvider) {
117
   *           // Each Locality has a child policy, and the parent does per-locality aggregation by
118
   *           // summing everything up.
119
   *
120
   *           // Create an OrcaReportingHelperWrapper for each Locality.
121
   *           // listener implements the logic for locality-level backend metric aggregation.
122
   *           LoadBalancer childLb = childPolicyProvider.newLoadBalancer(
123
   *             new ForwardingLoadBalancerHelper() {
124
   *               public Subchannel createSubchannel(CreateSubchannelArgs args) {
125
   *                 Subchannel subchannel = super.createSubchannel(args);
126
   *                 OrcaOobUtil.setListener(subchannel, listener,
127
   *                 OrcaReportingConfig.newBuilder().setReportInterval(30, SECOND).build());
128
   *                 return subchannel;
129
   *               }
130
   *               public LoadBalancer.Helper delegate() {
131
   *                 return orcaHelper;
132
   *               }
133
   *             });
134
   *         }
135
   *       }
136
   *       }
137
   *     </pre>
138
   *   </li>
139
   * </ul>
140
   *
141
   * @param delegate the delegate helper that provides essentials for establishing subchannels to
142
   *     backends.
143
   */
144
  public static LoadBalancer.Helper newOrcaReportingHelper(LoadBalancer.Helper delegate) {
145
    return newOrcaReportingHelper(
1✔
146
        delegate,
147
        new ExponentialBackoffPolicy.Provider(),
148
        GrpcUtil.STOPWATCH_SUPPLIER);
149
  }
150

151
  @VisibleForTesting
152
  static LoadBalancer.Helper newOrcaReportingHelper(
153
      LoadBalancer.Helper delegate,
154
      BackoffPolicy.Provider backoffPolicyProvider,
155
      Supplier<Stopwatch> stopwatchSupplier) {
156
    return new OrcaReportingHelper(delegate, backoffPolicyProvider, stopwatchSupplier);
1✔
157
  }
158

159
  /**
160
   * The listener interface for receiving out-of-band ORCA reports from backends. The class that is
161
   * interested in processing backend cost metrics implements this interface, and the object created
162
   * with that class is registered with a component, using methods in {@link OrcaPerRequestUtil}.
163
   * When an ORCA report is received, that object's {@code onLoadReport} method is invoked.
164
   */
165
  public interface OrcaOobReportListener {
166

167
    /**
168
     * Invoked when an out-of-band ORCA report is received.
169
     *
170
     * <p>Note this callback will be invoked from the {@link SynchronizationContext} of the
171
     * delegated helper, implementations should not block.
172
     *
173
     * @param report load report in the format of grpc {@link MetricReport}.
174
     */
175
    void onLoadReport(MetricReport report);
176
  }
177

178
  static final Attributes.Key<SubchannelImpl> ORCA_REPORTING_STATE_KEY =
1✔
179
      Attributes.Key.create("internal-orca-reporting-state");
1✔
180

181
  /**
182
   *  Update {@link OrcaOobReportListener} to receive Out-of-Band metrics report for the
183
   *  particular subchannel connection, and set the configuration of receiving ORCA reports,
184
   *  such as the interval of receiving reports. Set listener to null to remove listener, and the
185
   *  config will have no effect.
186
   *
187
   * <p>This method needs to be called from the SynchronizationContext returned by the wrapped
188
   * helper's {@link Helper#getSynchronizationContext()}.
189
   *
190
   * <p>Each load balancing policy must call this method to configure the backend load reporting.
191
   * Otherwise, it will not receive ORCA reports.
192
   *
193
   * <p>If multiple load balancing policies configure reporting with different intervals, reports
194
   * come with the minimum of those intervals.
195
   *
196
   * @param subchannel the server connected by this subchannel to receive the metrics.
197
   *
198
   * @param listener the callback upon receiving backend metrics from the Out-Of-Band stream.
199
   *                 Setting to null to removes the listener from the subchannel.
200
   *
201
   * @param config the configuration to be set. It has no effect when listener is null.
202
   *
203
   */
204
  public static void setListener(Subchannel subchannel, OrcaOobReportListener listener,
205
                                 OrcaReportingConfig config) {
206
    Attributes attributes = subchannel.getAttributes();
1✔
207
    SubchannelImpl orcaSubchannel =
208
        (attributes == null) ? null : attributes.get(ORCA_REPORTING_STATE_KEY);
1✔
209
    if (orcaSubchannel == null) {
1✔
210
      throw new IllegalArgumentException("Subchannel does not have orca Out-Of-Band stream enabled."
1✔
211
          + " Try to use a subchannel created by OrcaOobUtil.OrcaHelper.");
212
    }
213
    orcaSubchannel.orcaState.setListener(orcaSubchannel, listener, config);
1✔
214
  }
1✔
215

216
  /**
217
   * An {@link OrcaReportingHelper} wraps a delegated {@link LoadBalancer.Helper} with additional
218
   * functionality to manage RPCs for out-of-band ORCA reporting for each backend it establishes
219
   * connection to. Subchannels created through it will retrieve ORCA load reports if the server
220
   * supports it.
221
   */
222
  static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper {
223
    private final LoadBalancer.Helper delegate;
224
    private final SynchronizationContext syncContext;
225
    private final BackoffPolicy.Provider backoffPolicyProvider;
226
    private final Supplier<Stopwatch> stopwatchSupplier;
227

228
    OrcaReportingHelper(
229
        LoadBalancer.Helper delegate,
230
        BackoffPolicy.Provider backoffPolicyProvider,
231
        Supplier<Stopwatch> stopwatchSupplier) {
1✔
232
      this.delegate = checkNotNull(delegate, "delegate");
1✔
233
      this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
1✔
234
      this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
235
      syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext");
1✔
236
    }
1✔
237

238
    @Override
239
    protected Helper delegate() {
240
      return delegate;
1✔
241
    }
242

243
    @Override
244
    public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
245
      delegate.updateBalancingState(newState, new OrcaOobPicker(newPicker));
1✔
246
    }
1✔
247

248
    @VisibleForTesting
249
    static final class OrcaOobPicker extends SubchannelPicker {
250
      final SubchannelPicker delegate;
251

252
      OrcaOobPicker(SubchannelPicker delegate) {
1✔
253
        this.delegate = delegate;
1✔
254
      }
1✔
255

256
      @Override
257
      public PickResult pickSubchannel(PickSubchannelArgs args) {
258
        PickResult result = delegate.pickSubchannel(args);
1✔
259
        Subchannel subchannel = result.getSubchannel();
1✔
260
        if (subchannel instanceof SubchannelImpl) {
1✔
261
          return result.copyWithSubchannel(((SubchannelImpl) subchannel).delegate());
1✔
262
        }
263
        return result;
1✔
264
      }
265
    }
266

267
    @Override
268
    public Subchannel createSubchannel(CreateSubchannelArgs args) {
269
      syncContext.throwIfNotInThisSynchronizationContext();
1✔
270
      Subchannel subchannel = super.createSubchannel(args);
1✔
271
      Attributes attributes = subchannel.getAttributes();
1✔
272
      SubchannelImpl orcaSubchannel =
273
          (attributes == null) ? null : attributes.get(ORCA_REPORTING_STATE_KEY);
1✔
274
      OrcaReportingState orcaState;
275
      if (orcaSubchannel == null) {
1✔
276
        // Only the first load balancing policy requesting ORCA reports instantiates an
277
        // OrcaReportingState.
278
        orcaState = new OrcaReportingState(syncContext, delegate().getScheduledExecutorService());
1✔
279
      } else {
280
        orcaState = orcaSubchannel.orcaState;
1✔
281
      }
282
      return new SubchannelImpl(subchannel, orcaState);
1✔
283
    }
284

285
    /**
286
     * An {@link OrcaReportingState} is a client of ORCA service running on a single backend.
287
     *
288
     * <p>All methods are run from {@code syncContext}.
289
     */
290
    private final class OrcaReportingState implements SubchannelStateListener {
291

292
      private final SynchronizationContext syncContext;
293
      private final ScheduledExecutorService timeService;
294
      private final Map<OrcaOobReportListener, OrcaReportingConfig> configs = new HashMap<>();
1✔
295
      @Nullable private Subchannel subchannel;
296
      @Nullable private ChannelLogger subchannelLogger;
297
      @Nullable
298
      private SubchannelStateListener stateListener;
299
      @Nullable private BackoffPolicy backoffPolicy;
300
      @Nullable private OrcaReportingStream orcaRpc;
301
      @Nullable private ScheduledHandle retryTimer;
302
      @Nullable private OrcaReportingConfig overallConfig;
303
      private final Runnable retryTask =
1✔
304
          new Runnable() {
1✔
305
            @Override
306
            public void run() {
307
              startRpc();
1✔
308
            }
1✔
309
          };
310
      private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
1✔
311
      // True if server returned UNIMPLEMENTED.
312
      private boolean disabled;
313
      private boolean started;
314

315
      OrcaReportingState(
316
          SynchronizationContext syncContext,
317
          ScheduledExecutorService timeService) {
1✔
318
        this.syncContext = checkNotNull(syncContext, "syncContext");
1✔
319
        this.timeService = checkNotNull(timeService, "timeService");
1✔
320
      }
1✔
321

322
      void init(Subchannel subchannel, SubchannelStateListener stateListener) {
323
        checkState(this.subchannel == null, "init() already called");
1✔
324
        this.subchannel = checkNotNull(subchannel, "subchannel");
1✔
325
        this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
1✔
326
        this.stateListener = checkNotNull(stateListener, "stateListener");
1✔
327
        started = true;
1✔
328
      }
1✔
329

330
      void setListener(SubchannelImpl orcaSubchannel, OrcaOobReportListener listener,
331
                       OrcaReportingConfig config) {
332
        syncContext.execute(new Runnable() {
1✔
333
          @Override
334
          public void run() {
335
            OrcaOobReportListener oldListener = orcaSubchannel.reportListener;
1✔
336
            if (oldListener != null) {
1✔
337
              configs.remove(oldListener);
1✔
338
            }
339
            if (listener != null) {
1✔
340
              configs.put(listener, config);
1✔
341
            }
342
            orcaSubchannel.reportListener = listener;
1✔
343
            setReportingConfig(config);
1✔
344
          }
1✔
345
        });
346
      }
1✔
347

348
      private void setReportingConfig(OrcaReportingConfig config) {
349
        boolean reconfigured = false;
1✔
350
        // Real reporting interval is the minimum of intervals requested by all participating
351
        // helpers.
352
        if (configs.isEmpty()) {
1✔
353
          overallConfig = null;
1✔
354
          reconfigured = true;
1✔
355
        } else if (overallConfig == null) {
1✔
356
          overallConfig = config.toBuilder().build();
1✔
357
          reconfigured = true;
1✔
358
        } else {
359
          long minInterval = Long.MAX_VALUE;
1✔
360
          for (OrcaReportingConfig c : configs.values()) {
1✔
361
            if (c.getReportIntervalNanos() < minInterval) {
1✔
362
              minInterval = c.getReportIntervalNanos();
1✔
363
            }
364
          }
1✔
365
          if (overallConfig.getReportIntervalNanos() != minInterval) {
1✔
366
            overallConfig = overallConfig.toBuilder()
1✔
367
                .setReportInterval(minInterval, TimeUnit.NANOSECONDS).build();
1✔
368
            reconfigured = true;
1✔
369
          }
370
        }
371
        if (reconfigured) {
1✔
372
          stopRpc("ORCA reporting reconfigured");
1✔
373
          adjustOrcaReporting();
1✔
374
        }
375
      }
1✔
376

377
      @Override
378
      public void onSubchannelState(ConnectivityStateInfo newState) {
379
        if (Objects.equal(state.getState(), READY) && !Objects.equal(newState.getState(), READY)) {
1✔
380
          // A connection was lost.  We will reset disabled flag because ORCA service
381
          // may be available on the new connection.
382
          disabled = false;
1✔
383
        }
384
        state = newState;
1✔
385
        adjustOrcaReporting();
1✔
386
        // Propagate subchannel state update to downstream listeners.
387
        stateListener.onSubchannelState(newState);
1✔
388
      }
1✔
389

390
      void adjustOrcaReporting() {
391
        if (!disabled && overallConfig != null && Objects.equal(state.getState(), READY)) {
1✔
392
          if (orcaRpc == null && !isRetryTimerPending()) {
1✔
393
            startRpc();
1✔
394
          }
395
        } else {
396
          stopRpc("Client stops ORCA reporting");
1✔
397
          backoffPolicy = null;
1✔
398
        }
399
      }
1✔
400

401
      void startRpc() {
402
        checkState(orcaRpc == null, "previous orca reporting RPC has not been cleaned up");
1✔
403
        checkState(subchannel != null, "init() not called");
1✔
404
        subchannelLogger.log(
1✔
405
            ChannelLogLevel.DEBUG, "Starting ORCA reporting for {0}", subchannel.getAllAddresses());
1✔
406
        orcaRpc = new OrcaReportingStream(subchannel.asChannel(), stopwatchSupplier.get());
1✔
407
        orcaRpc.start();
1✔
408
      }
1✔
409

410
      void stopRpc(String msg) {
411
        if (orcaRpc != null) {
1✔
412
          orcaRpc.cancel(msg);
1✔
413
          orcaRpc = null;
1✔
414
        }
415
        if (retryTimer != null) {
1✔
416
          retryTimer.cancel();
1✔
417
          retryTimer = null;
1✔
418
        }
419
      }
1✔
420

421
      boolean isRetryTimerPending() {
422
        return retryTimer != null && retryTimer.isPending();
1✔
423
      }
424

425
      @Override
426
      public String toString() {
427
        return MoreObjects.toStringHelper(this)
×
428
            .add("disabled", disabled)
×
429
            .add("orcaRpc", orcaRpc)
×
430
            .add("reportingConfig", overallConfig)
×
431
            .add("connectivityState", state)
×
432
            .toString();
×
433
      }
434

435
      private class OrcaReportingStream extends ClientCall.Listener<OrcaLoadReport> {
436

437
        private final ClientCall<OrcaLoadReportRequest, OrcaLoadReport> call;
438
        private final Stopwatch stopwatch;
439
        private boolean callHasResponded;
440

441
        OrcaReportingStream(Channel channel, Stopwatch stopwatch) {
1✔
442
          call =
1✔
443
              checkNotNull(channel, "channel")
1✔
444
                  .newCall(OpenRcaServiceGrpc.getStreamCoreMetricsMethod(), CallOptions.DEFAULT);
1✔
445
          this.stopwatch = checkNotNull(stopwatch, "stopwatch");
1✔
446
        }
1✔
447

448
        void start() {
449
          stopwatch.reset().start();
1✔
450
          call.start(this, new Metadata());
1✔
451
          call.sendMessage(
1✔
452
              OrcaLoadReportRequest.newBuilder()
1✔
453
                  .setReportInterval(Durations.fromNanos(overallConfig.getReportIntervalNanos()))
1✔
454
                  .build());
1✔
455
          call.halfClose();
1✔
456
          call.request(1);
1✔
457
        }
1✔
458

459
        @Override
460
        public void onMessage(final OrcaLoadReport response) {
461
          syncContext.execute(
1✔
462
              new Runnable() {
1✔
463
                @Override
464
                public void run() {
465
                  if (orcaRpc == OrcaReportingStream.this) {
1✔
466
                    handleResponse(response);
1✔
467
                  }
468
                }
1✔
469
              });
470
        }
1✔
471

472
        @Override
473
        public void onClose(final Status status, Metadata trailers) {
474
          syncContext.execute(
1✔
475
              new Runnable() {
1✔
476
                @Override
477
                public void run() {
478
                  if (orcaRpc == OrcaReportingStream.this) {
1✔
479
                    orcaRpc = null;
1✔
480
                    handleStreamClosed(status);
1✔
481
                  }
482
                }
1✔
483
              });
484
        }
1✔
485

486
        void handleResponse(OrcaLoadReport response) {
487
          callHasResponded = true;
1✔
488
          backoffPolicy = null;
1✔
489
          subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response);
1✔
490
          MetricReport metricReport = OrcaPerRequestUtil.fromOrcaLoadReport(response);
1✔
491
          for (OrcaOobReportListener listener : configs.keySet()) {
1✔
492
            listener.onLoadReport(metricReport);
1✔
493
          }
1✔
494
          call.request(1);
1✔
495
        }
1✔
496

497
        void handleStreamClosed(Status status) {
498
          if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) {
1✔
499
            disabled = true;
1✔
500
            subchannelLogger.log(
1✔
501
                ChannelLogLevel.ERROR,
502
                "Backend {0} OpenRcaService is disabled. Server returned: {1}",
503
                new Object[] {subchannel.getAllAddresses(), status});
1✔
504
            subchannelLogger.log(ChannelLogLevel.ERROR, "OpenRcaService disabled: {0}", status);
1✔
505
            return;
1✔
506
          }
507
          long delayNanos = 0;
1✔
508
          // Backoff only when no response has been received.
509
          if (!callHasResponded) {
1✔
510
            if (backoffPolicy == null) {
1✔
511
              backoffPolicy = backoffPolicyProvider.get();
1✔
512
            }
513
            delayNanos = backoffPolicy.nextBackoffNanos() - stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
514
          }
515
          subchannelLogger.log(
1✔
516
              ChannelLogLevel.DEBUG,
517
              "ORCA reporting stream closed with {0}, backoff in {1} ns",
518
              status,
519
              delayNanos <= 0 ? 0 : delayNanos);
1✔
520
          if (delayNanos <= 0) {
1✔
521
            startRpc();
1✔
522
          } else {
523
            checkState(!isRetryTimerPending(), "Retry double scheduled");
1✔
524
            retryTimer =
1✔
525
                syncContext.schedule(retryTask, delayNanos, TimeUnit.NANOSECONDS, timeService);
1✔
526
          }
527
        }
1✔
528

529
        void cancel(String msg) {
530
          call.cancel(msg, null);
1✔
531
        }
1✔
532

533
        @Override
534
        public String toString() {
535
          return MoreObjects.toStringHelper(this)
×
536
              .add("callStarted", call != null)
×
537
              .add("callHasResponded", callHasResponded)
×
538
              .toString();
×
539
        }
540
      }
541
    }
542
  }
543

544
  @VisibleForTesting
545
  static final class SubchannelImpl extends ForwardingSubchannel {
546
    private final Subchannel delegate;
547
    private final OrcaReportingHelper.OrcaReportingState orcaState;
548
    @Nullable private OrcaOobReportListener reportListener;
549

550
    SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState) {
1✔
551
      this.delegate = checkNotNull(delegate, "delegate");
1✔
552
      this.orcaState = checkNotNull(orcaState, "orcaState");
1✔
553
    }
1✔
554

555
    @Override
556
    protected Subchannel delegate() {
557
      return delegate;
1✔
558
    }
559

560
    @Override
561
    public void start(SubchannelStateListener listener) {
562
      if (!orcaState.started) {
1✔
563
        orcaState.init(this, listener);
1✔
564
        super.start(orcaState);
1✔
565
      } else {
566
        super.start(listener);
1✔
567
      }
568
    }
1✔
569

570
    @Override
571
    public Attributes getAttributes() {
572
      return super.getAttributes().toBuilder().set(ORCA_REPORTING_STATE_KEY, this).build();
1✔
573
    }
574
  }
575

576
  /** Configuration for out-of-band ORCA reporting service RPC. */
577
  public static final class OrcaReportingConfig {
578

579
    private final long reportIntervalNanos;
580

581
    private OrcaReportingConfig(long reportIntervalNanos) {
1✔
582
      this.reportIntervalNanos = reportIntervalNanos;
1✔
583
    }
1✔
584

585
    /** Creates a new builder. */
586
    public static Builder newBuilder() {
587
      return new Builder();
1✔
588
    }
589

590
    /** Returns the configured maximum interval of receiving out-of-band ORCA reports. */
591
    public long getReportIntervalNanos() {
592
      return reportIntervalNanos;
1✔
593
    }
594

595
    /** Returns a builder with the same initial values as this object. */
596
    public Builder toBuilder() {
597
      return newBuilder().setReportInterval(reportIntervalNanos, TimeUnit.NANOSECONDS);
1✔
598
    }
599

600
    @Override
601
    public String toString() {
602
      return MoreObjects.toStringHelper(this)
1✔
603
          .add("reportIntervalNanos", reportIntervalNanos)
1✔
604
          .toString();
1✔
605
    }
606

607
    public static final class Builder {
608

609
      private long reportIntervalNanos;
610

611
      Builder() {}
1✔
612

613
      /**
614
       * Sets the maximum expected interval of receiving out-of-band ORCA report. The actual
615
       * reporting interval might be smaller if there are other load balancing policies requesting
616
       * for more frequent cost metric report.
617
       *
618
       * @param reportInterval the maximum expected interval of receiving periodical ORCA reports.
619
       * @param unit time unit of {@code reportInterval} value.
620
       */
621
      public Builder setReportInterval(long reportInterval, TimeUnit unit) {
622
        reportIntervalNanos = unit.toNanos(reportInterval);
1✔
623
        return this;
1✔
624
      }
625

626
      /** Creates a new {@link OrcaReportingConfig} object. */
627
      public OrcaReportingConfig build() {
628
        return new OrcaReportingConfig(reportIntervalNanos);
1✔
629
      }
630
    }
631
  }
632
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc