• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19224

09 May 2024 06:31PM UTC coverage: 88.38% (-0.03%) from 88.408%
#19224

push

github

ejona86
api: Hide internal metric APIs

Some APIs were marked experimental but had internal APIs in their
surface. These were all changed to internal. And then the internal APIs
were mostly hidden from generated documentation.

All these APIs will eventually become public and maybe even stable. But
they need some iteration before we're ready for others to start using
them.

31580 of 35732 relevant lines covered (88.38%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.03
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import io.grpc.CallOptions;
23
import io.grpc.ClientStreamTracer;
24
import io.grpc.Context;
25
import io.grpc.InternalChannelz.SocketStats;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer.PickResult;
28
import io.grpc.LoadBalancer.PickSubchannelArgs;
29
import io.grpc.LoadBalancer.SubchannelPicker;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import java.util.ArrayList;
36
import java.util.Collection;
37
import java.util.Collections;
38
import java.util.LinkedHashSet;
39
import java.util.concurrent.Executor;
40
import javax.annotation.Nonnull;
41
import javax.annotation.Nullable;
42
import javax.annotation.concurrent.GuardedBy;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /**
73
   * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
74
   * terminated.
75
   */
76
  @GuardedBy("lock")
77
  private Status shutdownStatus;
78

79
  /**
80
   * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
81
   * to idle.
82
   */
83
  @GuardedBy("lock")
84
  @Nullable
85
  private SubchannelPicker lastPicker;
86

87
  @GuardedBy("lock")
88
  private long lastPickerVersion;
89

90
  /**
91
   * Creates a new delayed transport.
92
   *
93
   * @param defaultAppExecutor pending streams will create real streams and run bufferred operations
94
   *        in an application executor, which will be this executor, unless there is on provided in
95
   *        {@link CallOptions}.
96
   * @param syncContext all listener callbacks of the delayed transport will be run from this
97
   *        SynchronizationContext.
98
   */
99
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
100
    this.defaultAppExecutor = defaultAppExecutor;
1✔
101
    this.syncContext = syncContext;
1✔
102
  }
1✔
103

104
  @Override
105
  public final Runnable start(final Listener listener) {
106
    this.listener = listener;
1✔
107
    reportTransportInUse = new Runnable() {
1✔
108
        @Override
109
        public void run() {
110
          listener.transportInUse(true);
1✔
111
        }
1✔
112
      };
113
    reportTransportNotInUse = new Runnable() {
1✔
114
        @Override
115
        public void run() {
116
          listener.transportInUse(false);
1✔
117
        }
1✔
118
      };
119
    reportTransportTerminated = new Runnable() {
1✔
120
        @Override
121
        public void run() {
122
          listener.transportTerminated();
1✔
123
        }
1✔
124
      };
125
    return null;
1✔
126
  }
127

128
  /**
129
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
130
   * picker will be consulted.
131
   *
132
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
133
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
134
   */
135
  @Override
136
  public final ClientStream newStream(
137
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
138
      ClientStreamTracer[] tracers) {
139
    try {
140
      PickSubchannelArgs args = new PickSubchannelArgsImpl(
1✔
141
          method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
142
      SubchannelPicker picker = null;
1✔
143
      long pickerVersion = -1;
1✔
144
      while (true) {
145
        synchronized (lock) {
1✔
146
          if (shutdownStatus != null) {
1✔
147
            return new FailingClientStream(shutdownStatus, tracers);
1✔
148
          }
149
          if (lastPicker == null) {
1✔
150
            return createPendingStream(args, tracers);
1✔
151
          }
152
          // Check for second time through the loop, and whether anything changed
153
          if (picker != null && pickerVersion == lastPickerVersion) {
1✔
154
            return createPendingStream(args, tracers);
1✔
155
          }
156
          picker = lastPicker;
1✔
157
          pickerVersion = lastPickerVersion;
1✔
158
        }
1✔
159
        PickResult pickResult = picker.pickSubchannel(args);
1✔
160
        ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
161
            callOptions.isWaitForReady());
1✔
162
        if (transport != null) {
1✔
163
          return transport.newStream(
1✔
164
              args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
165
              tracers);
166
        }
167
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
168
        // race with reprocess()), we will buffer it.  Otherwise, will try with the new picker.
169
      }
1✔
170
    } finally {
171
      syncContext.drain();
1✔
172
    }
173
  }
174

175
  /**
176
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
177
   * schedule tasks on syncContext.
178
   */
179
  @GuardedBy("lock")
180
  private PendingStream createPendingStream(
181
      PickSubchannelArgs args, ClientStreamTracer[] tracers) {
182
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
183
    pendingStreams.add(pendingStream);
1✔
184
    if (getPendingStreamsCount() == 1) {
1✔
185
      syncContext.executeLater(reportTransportInUse);
1✔
186
    }
187
    for (ClientStreamTracer streamTracer : tracers) {
1✔
188
      streamTracer.createPendingStream();
1✔
189
    }
190
    return pendingStream;
1✔
191
  }
192

193
  @Override
194
  public final void ping(final PingCallback callback, Executor executor) {
195
    throw new UnsupportedOperationException("This method is not expected to be called");
×
196
  }
197

198
  @Override
199
  public ListenableFuture<SocketStats> getStats() {
200
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
201
    ret.set(null);
×
202
    return ret;
×
203
  }
204

205
  /**
206
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
207
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
208
   * more buffered streams.
209
   */
210
  @Override
211
  public final void shutdown(final Status status) {
212
    synchronized (lock) {
1✔
213
      if (shutdownStatus != null) {
1✔
214
        return;
1✔
215
      }
216
      shutdownStatus = status;
1✔
217
      syncContext.executeLater(new Runnable() {
1✔
218
          @Override
219
          public void run() {
220
            listener.transportShutdown(status);
1✔
221
          }
1✔
222
        });
223
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
224
        syncContext.executeLater(reportTransportTerminated);
1✔
225
        reportTransportTerminated = null;
1✔
226
      }
227
    }
1✔
228
    syncContext.drain();
1✔
229
  }
1✔
230

231
  /**
232
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
233
   * this transport.
234
   */
235
  @Override
236
  public final void shutdownNow(Status status) {
237
    shutdown(status);
1✔
238
    Collection<PendingStream> savedPendingStreams;
239
    Runnable savedReportTransportTerminated;
240
    synchronized (lock) {
1✔
241
      savedPendingStreams = pendingStreams;
1✔
242
      savedReportTransportTerminated = reportTransportTerminated;
1✔
243
      reportTransportTerminated = null;
1✔
244
      if (!pendingStreams.isEmpty()) {
1✔
245
        pendingStreams = Collections.emptyList();
1✔
246
      }
247
    }
1✔
248
    if (savedReportTransportTerminated != null) {
1✔
249
      for (PendingStream stream : savedPendingStreams) {
1✔
250
        Runnable runnable = stream.setStream(
1✔
251
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
252
        if (runnable != null) {
1✔
253
          // Drain in-line instead of using an executor as failing stream just throws everything
254
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
255
          // before stream.start().
256
          runnable.run();
1✔
257
        }
258
      }
1✔
259
      syncContext.execute(savedReportTransportTerminated);
1✔
260
    }
261
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
262
    // shutdown().
263
  }
1✔
264

265
  public final boolean hasPendingStreams() {
266
    synchronized (lock) {
1✔
267
      return !pendingStreams.isEmpty();
1✔
268
    }
269
  }
270

271
  @VisibleForTesting
272
  final int getPendingStreamsCount() {
273
    synchronized (lock) {
1✔
274
      return pendingStreams.size();
1✔
275
    }
276
  }
277

278
  /**
279
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
280
   * pick is successful, otherwise keep it pending.
281
   *
282
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
283
   * streams will be served by the latest picker (if a same picker is given more than once, they are
284
   * considered different pickers) as soon as possible.
285
   *
286
   * <p>This method <strong>must not</strong> be called concurrently with itself.
287
   */
288
  final void reprocess(@Nullable SubchannelPicker picker) {
289
    ArrayList<PendingStream> toProcess;
290
    synchronized (lock) {
1✔
291
      lastPicker = picker;
1✔
292
      lastPickerVersion++;
1✔
293
      if (picker == null || !hasPendingStreams()) {
1✔
294
        return;
1✔
295
      }
296
      toProcess = new ArrayList<>(pendingStreams);
1✔
297
    }
1✔
298
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
299

300
    for (final PendingStream stream : toProcess) {
1✔
301
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
302
      CallOptions callOptions = stream.args.getCallOptions();
1✔
303
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
304
          callOptions.isWaitForReady());
1✔
305
      if (transport != null) {
1✔
306
        Executor executor = defaultAppExecutor;
1✔
307
        // createRealStream may be expensive. It will start real streams on the transport. If
308
        // there are pending requests, they will be serialized too, which may be expensive. Since
309
        // we are now on transport thread, we need to offload the work to an executor.
310
        if (callOptions.getExecutor() != null) {
1✔
311
          executor = callOptions.getExecutor();
1✔
312
        }
313
        Runnable runnable = stream.createRealStream(transport);
1✔
314
        if (runnable != null) {
1✔
315
          executor.execute(runnable);
1✔
316
        }
317
        toRemove.add(stream);
1✔
318
      }  // else: stay pending
319
    }
1✔
320

321
    synchronized (lock) {
1✔
322
      // Between this synchronized and the previous one:
323
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
324
      //   - shutdown() may be called, which may turn pendingStreams into null.
325
      if (!hasPendingStreams()) {
1✔
326
        return;
×
327
      }
328
      pendingStreams.removeAll(toRemove);
1✔
329
      // Because delayed transport is long-lived, we take this opportunity to down-size the
330
      // hashmap.
331
      if (pendingStreams.isEmpty()) {
1✔
332
        pendingStreams = new LinkedHashSet<>();
1✔
333
      }
334
      if (!hasPendingStreams()) {
1✔
335
        // There may be a brief gap between delayed transport clearing in-use state, and first real
336
        // transport starting streams and setting in-use state.  During the gap the whole channel's
337
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
338
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
339
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
340
        syncContext.executeLater(reportTransportNotInUse);
1✔
341
        if (shutdownStatus != null && reportTransportTerminated != null) {
1✔
342
          syncContext.executeLater(reportTransportTerminated);
1✔
343
          reportTransportTerminated = null;
1✔
344
        }
345
      }
346
    }
1✔
347
    syncContext.drain();
1✔
348
  }
1✔
349

350
  @Override
351
  public InternalLogId getLogId() {
352
    return logId;
×
353
  }
354

355
  private class PendingStream extends DelayedStream {
356
    private final PickSubchannelArgs args;
357
    private final Context context = Context.current();
1✔
358
    private final ClientStreamTracer[] tracers;
359

360
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
361
      this.args = args;
1✔
362
      this.tracers = tracers;
1✔
363
    }
1✔
364

365
    /** Runnable may be null. */
366
    private Runnable createRealStream(ClientTransport transport) {
367
      ClientStream realStream;
368
      Context origContext = context.attach();
1✔
369
      try {
370
        realStream = transport.newStream(
1✔
371
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
372
            tracers);
373
      } finally {
374
        context.detach(origContext);
1✔
375
      }
376
      return setStream(realStream);
1✔
377
    }
378

379
    @Override
380
    public void cancel(Status reason) {
381
      super.cancel(reason);
1✔
382
      synchronized (lock) {
1✔
383
        if (reportTransportTerminated != null) {
1✔
384
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
385
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
386
            syncContext.executeLater(reportTransportNotInUse);
1✔
387
            if (shutdownStatus != null) {
1✔
388
              syncContext.executeLater(reportTransportTerminated);
1✔
389
              reportTransportTerminated = null;
1✔
390
            }
391
          }
392
        }
393
      }
1✔
394
      syncContext.drain();
1✔
395
    }
1✔
396

397
    @Override
398
    protected void onEarlyCancellation(Status reason) {
399
      for (ClientStreamTracer tracer : tracers) {
1✔
400
        tracer.streamClosed(reason);
1✔
401
      }
402
    }
1✔
403

404
    @Override
405
    public void appendTimeoutInsight(InsightBuilder insight) {
406
      if (args.getCallOptions().isWaitForReady()) {
1✔
407
        insight.append("wait_for_ready");
1✔
408
      }
409
      super.appendTimeoutInsight(insight);
1✔
410
    }
1✔
411
  }
412
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc