• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19996

24 Sep 2025 12:08AM UTC coverage: 88.575% (+0.03%) from 88.543%
#19996

push

github

web-flow
Implement otel retry metrics (#12064)

implements [A96](https://github.com/grpc/proposal/pull/488/files)

34731 of 39211 relevant lines covered (88.57%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.77
/../core/src/main/java/io/grpc/internal/OobChannel.java
1
/*
2
 * Copyright 2016 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20

21
import com.google.common.annotations.VisibleForTesting;
22
import com.google.common.base.MoreObjects;
23
import com.google.common.base.Preconditions;
24
import com.google.common.util.concurrent.ListenableFuture;
25
import com.google.common.util.concurrent.SettableFuture;
26
import io.grpc.Attributes;
27
import io.grpc.CallOptions;
28
import io.grpc.ClientCall;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.ConnectivityState;
31
import io.grpc.ConnectivityStateInfo;
32
import io.grpc.Context;
33
import io.grpc.EquivalentAddressGroup;
34
import io.grpc.InternalChannelz;
35
import io.grpc.InternalChannelz.ChannelStats;
36
import io.grpc.InternalChannelz.ChannelTrace;
37
import io.grpc.InternalInstrumented;
38
import io.grpc.InternalLogId;
39
import io.grpc.InternalWithLogId;
40
import io.grpc.LoadBalancer;
41
import io.grpc.LoadBalancer.PickResult;
42
import io.grpc.LoadBalancer.PickSubchannelArgs;
43
import io.grpc.LoadBalancer.Subchannel;
44
import io.grpc.LoadBalancer.SubchannelPicker;
45
import io.grpc.ManagedChannel;
46
import io.grpc.Metadata;
47
import io.grpc.MethodDescriptor;
48
import io.grpc.Status;
49
import io.grpc.SynchronizationContext;
50
import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
51
import java.util.Collections;
52
import java.util.List;
53
import java.util.concurrent.CountDownLatch;
54
import java.util.concurrent.Executor;
55
import java.util.concurrent.ScheduledExecutorService;
56
import java.util.concurrent.TimeUnit;
57
import java.util.logging.Level;
58
import java.util.logging.Logger;
59
import javax.annotation.concurrent.ThreadSafe;
60

61
/**
62
 * A ManagedChannel backed by a single {@link InternalSubchannel} and used for {@link LoadBalancer}
63
 * to its own RPC needs.
64
 */
65
@ThreadSafe
66
final class OobChannel extends ManagedChannel implements InternalInstrumented<ChannelStats> {
67
  private static final Logger log = Logger.getLogger(OobChannel.class.getName());
1✔
68

69
  private InternalSubchannel subchannel;
70
  private AbstractSubchannel subchannelImpl;
71
  private SubchannelPicker subchannelPicker;
72

73
  private final InternalLogId logId;
74
  private final String authority;
75
  private final DelayedClientTransport delayedTransport;
76
  private final InternalChannelz channelz;
77
  private final ObjectPool<? extends Executor> executorPool;
78
  private final Executor executor;
79
  private final ScheduledExecutorService deadlineCancellationExecutor;
80
  private final CountDownLatch terminatedLatch = new CountDownLatch(1);
1✔
81
  private volatile boolean shutdown;
82
  private final CallTracer channelCallsTracer;
83
  private final ChannelTracer channelTracer;
84
  private final TimeProvider timeProvider;
85

86
  private final ClientStreamProvider transportProvider = new ClientStreamProvider() {
1✔
87
    @Override
88
    public ClientStream newStream(MethodDescriptor<?, ?> method,
89
        CallOptions callOptions, Metadata headers, Context context) {
90
      ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
1✔
91
          callOptions, headers, 0, /* isTransparentRetry= */ false,
92
          /* isHedging= */ false);
93
      Context origContext = context.attach();
1✔
94
      // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
95
      // matter here because OOB communication should be sparse, and it's not on application RPC's
96
      // critical path.
97
      try {
98
        return delayedTransport.newStream(method, headers, callOptions, tracers);
1✔
99
      } finally {
100
        context.detach(origContext);
1✔
101
      }
102
    }
103
  };
104

105
  OobChannel(
106
      String authority, ObjectPool<? extends Executor> executorPool,
107
      ScheduledExecutorService deadlineCancellationExecutor, SynchronizationContext syncContext,
108
      CallTracer callsTracer, ChannelTracer channelTracer, InternalChannelz channelz,
109
      TimeProvider timeProvider) {
1✔
110
    this.authority = checkNotNull(authority, "authority");
1✔
111
    this.logId = InternalLogId.allocate(getClass(), authority);
1✔
112
    this.executorPool = checkNotNull(executorPool, "executorPool");
1✔
113
    this.executor = checkNotNull(executorPool.getObject(), "executor");
1✔
114
    this.deadlineCancellationExecutor = checkNotNull(
1✔
115
        deadlineCancellationExecutor, "deadlineCancellationExecutor");
116
    this.delayedTransport = new DelayedClientTransport(executor, syncContext);
1✔
117
    this.channelz = Preconditions.checkNotNull(channelz);
1✔
118
    this.delayedTransport.start(new ManagedClientTransport.Listener() {
1✔
119
        @Override
120
        public void transportShutdown(Status s) {
121
          // Don't care
122
        }
1✔
123

124
        @Override
125
        public void transportTerminated() {
126
          subchannelImpl.shutdown();
1✔
127
        }
1✔
128

129
        @Override
130
        public void transportReady() {
131
          // Don't care
132
        }
×
133

134
        @Override
135
        public Attributes filterTransport(Attributes attributes) {
136
          return attributes;
×
137
        }
138

139
        @Override
140
        public void transportInUse(boolean inUse) {
141
          // Don't care
142
        }
1✔
143
      });
144
    this.channelCallsTracer = callsTracer;
1✔
145
    this.channelTracer = checkNotNull(channelTracer, "channelTracer");
1✔
146
    this.timeProvider = checkNotNull(timeProvider, "timeProvider");
1✔
147
  }
1✔
148

149
  // Must be called only once, right after the OobChannel is created.
150
  void setSubchannel(final InternalSubchannel subchannel) {
151
    log.log(Level.FINE, "[{0}] Created with [{1}]", new Object[] {this, subchannel});
1✔
152
    this.subchannel = subchannel;
1✔
153
    subchannelImpl = new AbstractSubchannel() {
1✔
154
        @Override
155
        public void shutdown() {
156
          subchannel.shutdown(Status.UNAVAILABLE.withDescription("OobChannel is shutdown"));
1✔
157
        }
1✔
158

159
        @Override
160
        InternalInstrumented<ChannelStats> getInstrumentedInternalSubchannel() {
161
          return subchannel;
1✔
162
        }
163

164
        @Override
165
        public void requestConnection() {
166
          subchannel.obtainActiveTransport();
1✔
167
        }
1✔
168

169
        @Override
170
        public List<EquivalentAddressGroup> getAllAddresses() {
171
          return subchannel.getAddressGroups();
×
172
        }
173

174
        @Override
175
        public Attributes getAttributes() {
176
          return Attributes.EMPTY;
×
177
        }
178

179
        @Override
180
        public Object getInternalSubchannel() {
181
          return subchannel;
1✔
182
        }
183
    };
184

185
    final class OobSubchannelPicker extends SubchannelPicker {
1✔
186
      final PickResult result = PickResult.withSubchannel(subchannelImpl);
1✔
187

188
      @Override
189
      public PickResult pickSubchannel(PickSubchannelArgs args) {
190
        return result;
1✔
191
      }
192

193
      @Override
194
      public String toString() {
195
        return MoreObjects.toStringHelper(OobSubchannelPicker.class)
×
196
            .add("result", result)
×
197
            .toString();
×
198
      }
199
    }
200

201
    subchannelPicker = new OobSubchannelPicker();
1✔
202
    delayedTransport.reprocess(subchannelPicker);
1✔
203
  }
1✔
204

205
  void updateAddresses(List<EquivalentAddressGroup> eag) {
206
    subchannel.updateAddresses(eag);
1✔
207
  }
1✔
208

209
  @Override
210
  public <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
211
      MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
212
    return new ClientCallImpl<>(methodDescriptor,
1✔
213
        callOptions.getExecutor() == null ? executor : callOptions.getExecutor(),
1✔
214
        callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer, null);
215
  }
216

217
  @Override
218
  public String authority() {
219
    return authority;
1✔
220
  }
221

222
  @Override
223
  public boolean isTerminated() {
224
    return terminatedLatch.getCount() == 0;
1✔
225
  }
226

227
  @Override
228
  public boolean awaitTermination(long time, TimeUnit unit) throws InterruptedException {
229
    return terminatedLatch.await(time, unit);
×
230
  }
231

232
  @Override
233
  public ConnectivityState getState(boolean requestConnectionIgnored) {
234
    if (subchannel == null) {
×
235
      return ConnectivityState.IDLE;
×
236
    }
237
    return subchannel.getState();
×
238
  }
239

240
  @Override
241
  public ManagedChannel shutdown() {
242
    shutdown = true;
1✔
243
    delayedTransport.shutdown(Status.UNAVAILABLE.withDescription("OobChannel.shutdown() called"));
1✔
244
    return this;
1✔
245
  }
246

247
  @Override
248
  public boolean isShutdown() {
249
    return shutdown;
1✔
250
  }
251

252
  @Override
253
  public ManagedChannel shutdownNow() {
254
    shutdown = true;
1✔
255
    delayedTransport.shutdownNow(
1✔
256
        Status.UNAVAILABLE.withDescription("OobChannel.shutdownNow() called"));
1✔
257
    return this;
1✔
258
  }
259

260
  void handleSubchannelStateChange(final ConnectivityStateInfo newState) {
261
    channelTracer.reportEvent(
1✔
262
        new ChannelTrace.Event.Builder()
263
            .setDescription("Entering " + newState.getState() + " state")
1✔
264
            .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1✔
265
            .setTimestampNanos(timeProvider.currentTimeNanos())
1✔
266
            .build());
1✔
267
    switch (newState.getState()) {
1✔
268
      case READY:
269
      case IDLE:
270
        delayedTransport.reprocess(subchannelPicker);
1✔
271
        break;
1✔
272
      case TRANSIENT_FAILURE:
273
        final class OobErrorPicker extends SubchannelPicker {
1✔
274
          final PickResult errorResult = PickResult.withError(newState.getStatus());
1✔
275

276
          @Override
277
          public PickResult pickSubchannel(PickSubchannelArgs args) {
278
            return errorResult;
1✔
279
          }
280

281
          @Override
282
          public String toString() {
283
            return MoreObjects.toStringHelper(OobErrorPicker.class)
×
284
                .add("errorResult", errorResult)
×
285
                .toString();
×
286
          }
287
        }
288

289
        delayedTransport.reprocess(new OobErrorPicker());
1✔
290
        break;
1✔
291
      default:
292
        // Do nothing
293
    }
294
  }
1✔
295

296
  // must be run from channel executor
297
  void handleSubchannelTerminated() {
298
    channelz.removeSubchannel(this);
1✔
299
    // When delayedTransport is terminated, it shuts down subchannel.  Therefore, at this point
300
    // both delayedTransport and subchannel have terminated.
301
    executorPool.returnObject(executor);
1✔
302
    terminatedLatch.countDown();
1✔
303
  }
1✔
304

305
  @VisibleForTesting
306
  Subchannel getSubchannel() {
307
    return subchannelImpl;
1✔
308
  }
309

310
  InternalSubchannel getInternalSubchannel() {
311
    return subchannel;
1✔
312
  }
313

314
  @Override
315
  public ListenableFuture<ChannelStats> getStats() {
316
    final SettableFuture<ChannelStats> ret = SettableFuture.create();
1✔
317
    final ChannelStats.Builder builder = new ChannelStats.Builder();
1✔
318
    channelCallsTracer.updateBuilder(builder);
1✔
319
    channelTracer.updateBuilder(builder);
1✔
320
    builder
1✔
321
        .setTarget(authority)
1✔
322
        .setState(subchannel.getState())
1✔
323
        .setSubchannels(Collections.<InternalWithLogId>singletonList(subchannel));
1✔
324
    ret.set(builder.build());
1✔
325
    return ret;
1✔
326
  }
327

328
  @Override
329
  public InternalLogId getLogId() {
330
    return logId;
1✔
331
  }
332

333
  @Override
334
  public String toString() {
335
    return MoreObjects.toStringHelper(this)
×
336
        .add("logId", logId.getId())
×
337
        .add("authority", authority)
×
338
        .toString();
×
339
  }
340

341
  @Override
342
  public void resetConnectBackoff() {
343
    subchannel.resetConnectBackoff();
×
344
  }
×
345
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc