• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19653

18 Jan 2025 01:16AM CUT coverage: 88.576% (+0.006%) from 88.57%
#19653

push

github

ejona86
xds: Rename grpc.xds.cluster to grpc.lb.backend_service

The name is being changed to allow the value to be used in more metrics
where xds-specifics are awkward.

33719 of 38068 relevant lines covered (88.58%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.25
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import com.google.errorprone.annotations.concurrent.GuardedBy;
23
import io.grpc.CallOptions;
24
import io.grpc.ClientStreamTracer;
25
import io.grpc.Context;
26
import io.grpc.InternalChannelz.SocketStats;
27
import io.grpc.InternalLogId;
28
import io.grpc.LoadBalancer.PickResult;
29
import io.grpc.LoadBalancer.PickSubchannelArgs;
30
import io.grpc.LoadBalancer.SubchannelPicker;
31
import io.grpc.Metadata;
32
import io.grpc.MethodDescriptor;
33
import io.grpc.Status;
34
import io.grpc.SynchronizationContext;
35
import io.grpc.internal.ClientStreamListener.RpcProgress;
36
import java.util.ArrayList;
37
import java.util.Collection;
38
import java.util.Collections;
39
import java.util.LinkedHashSet;
40
import java.util.concurrent.Executor;
41
import javax.annotation.Nonnull;
42
import javax.annotation.Nullable;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /** Immutable state needed for picking. 'lock' must be held for writing. */
73
  private volatile PickerState pickerState = new PickerState(null, null);
1✔
74

75
  /**
76
   * Creates a new delayed transport.
77
   *
78
   * @param defaultAppExecutor pending streams will create real streams and run buffered operations
79
   *        in an application executor, which will be this executor, unless there is on provided in
80
   *        {@link CallOptions}.
81
   * @param syncContext all listener callbacks of the delayed transport will be run from this
82
   *        SynchronizationContext.
83
   */
84
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
85
    this.defaultAppExecutor = defaultAppExecutor;
1✔
86
    this.syncContext = syncContext;
1✔
87
  }
1✔
88

89
  @Override
90
  public final Runnable start(final Listener listener) {
91
    this.listener = listener;
1✔
92
    reportTransportInUse = new Runnable() {
1✔
93
        @Override
94
        public void run() {
95
          listener.transportInUse(true);
1✔
96
        }
1✔
97
      };
98
    reportTransportNotInUse = new Runnable() {
1✔
99
        @Override
100
        public void run() {
101
          listener.transportInUse(false);
1✔
102
        }
1✔
103
      };
104
    reportTransportTerminated = new Runnable() {
1✔
105
        @Override
106
        public void run() {
107
          listener.transportTerminated();
1✔
108
        }
1✔
109
      };
110
    return null;
1✔
111
  }
112

113
  /**
114
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
115
   * picker will be consulted.
116
   *
117
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
118
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
119
   */
120
  @Override
121
  public final ClientStream newStream(
122
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
123
      ClientStreamTracer[] tracers) {
124
    try {
125
      PickSubchannelArgs args = new PickSubchannelArgsImpl(
1✔
126
          method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
127
      PickerState state = pickerState;
1✔
128
      while (true) {
129
        if (state.shutdownStatus != null) {
1✔
130
          return new FailingClientStream(state.shutdownStatus, tracers);
1✔
131
        }
132
        if (state.lastPicker != null) {
1✔
133
          PickResult pickResult = state.lastPicker.pickSubchannel(args);
1✔
134
          callOptions = args.getCallOptions();
1✔
135
          // User code provided authority takes precedence over the LB provided one.
136
          if (callOptions.getAuthority() == null
1✔
137
              && pickResult.getAuthorityOverride() != null) {
1✔
138
            callOptions = callOptions.withAuthority(pickResult.getAuthorityOverride());
1✔
139
          }
140
          ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
141
              callOptions.isWaitForReady());
1✔
142
          if (transport != null) {
1✔
143
            return transport.newStream(
1✔
144
                args.getMethodDescriptor(), args.getHeaders(), callOptions,
1✔
145
                tracers);
146
          }
147
        }
148
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
149
        // race with reprocess()), we will buffer the RPC.  Otherwise, will try with the new picker.
150
        synchronized (lock) {
1✔
151
          PickerState newerState = pickerState;
1✔
152
          if (state == newerState) {
1✔
153
            return createPendingStream(args, tracers);
1✔
154
          }
155
          state = newerState;
1✔
156
        }
1✔
157
      }
158
    } finally {
159
      syncContext.drain();
1✔
160
    }
161
  }
162

163
  /**
164
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
165
   * schedule tasks on syncContext.
166
   */
167
  @GuardedBy("lock")
168
  private PendingStream createPendingStream(
169
      PickSubchannelArgs args, ClientStreamTracer[] tracers) {
170
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
171
    pendingStreams.add(pendingStream);
1✔
172
    if (getPendingStreamsCount() == 1) {
1✔
173
      syncContext.executeLater(reportTransportInUse);
1✔
174
    }
175
    for (ClientStreamTracer streamTracer : tracers) {
1✔
176
      streamTracer.createPendingStream();
1✔
177
    }
178
    return pendingStream;
1✔
179
  }
180

181
  @Override
182
  public final void ping(final PingCallback callback, Executor executor) {
183
    throw new UnsupportedOperationException("This method is not expected to be called");
×
184
  }
185

186
  @Override
187
  public ListenableFuture<SocketStats> getStats() {
188
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
189
    ret.set(null);
×
190
    return ret;
×
191
  }
192

193
  /**
194
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
195
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
196
   * more buffered streams.
197
   */
198
  @Override
199
  public final void shutdown(final Status status) {
200
    synchronized (lock) {
1✔
201
      if (pickerState.shutdownStatus != null) {
1✔
202
        return;
1✔
203
      }
204
      pickerState = pickerState.withShutdownStatus(status);
1✔
205
      syncContext.executeLater(new Runnable() {
1✔
206
          @Override
207
          public void run() {
208
            listener.transportShutdown(status);
1✔
209
          }
1✔
210
        });
211
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
212
        syncContext.executeLater(reportTransportTerminated);
1✔
213
        reportTransportTerminated = null;
1✔
214
      }
215
    }
1✔
216
    syncContext.drain();
1✔
217
  }
1✔
218

219
  /**
220
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
221
   * this transport.
222
   */
223
  @Override
224
  public final void shutdownNow(Status status) {
225
    shutdown(status);
1✔
226
    Collection<PendingStream> savedPendingStreams;
227
    Runnable savedReportTransportTerminated;
228
    synchronized (lock) {
1✔
229
      savedPendingStreams = pendingStreams;
1✔
230
      savedReportTransportTerminated = reportTransportTerminated;
1✔
231
      reportTransportTerminated = null;
1✔
232
      if (!pendingStreams.isEmpty()) {
1✔
233
        pendingStreams = Collections.emptyList();
1✔
234
      }
235
    }
1✔
236
    if (savedReportTransportTerminated != null) {
1✔
237
      for (PendingStream stream : savedPendingStreams) {
1✔
238
        Runnable runnable = stream.setStream(
1✔
239
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
240
        if (runnable != null) {
1✔
241
          // Drain in-line instead of using an executor as failing stream just throws everything
242
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
243
          // before stream.start().
244
          runnable.run();
1✔
245
        }
246
      }
1✔
247
      syncContext.execute(savedReportTransportTerminated);
1✔
248
    }
249
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
250
    // shutdown().
251
  }
1✔
252

253
  public final boolean hasPendingStreams() {
254
    synchronized (lock) {
1✔
255
      return !pendingStreams.isEmpty();
1✔
256
    }
257
  }
258

259
  @VisibleForTesting
260
  final int getPendingStreamsCount() {
261
    synchronized (lock) {
1✔
262
      return pendingStreams.size();
1✔
263
    }
264
  }
265

266
  /**
267
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
268
   * pick is successful, otherwise keep it pending.
269
   *
270
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
271
   * streams will be served by the latest picker (if a same picker is given more than once, they are
272
   * considered different pickers) as soon as possible.
273
   *
274
   * <p>This method <strong>must not</strong> be called concurrently with itself.
275
   */
276
  final void reprocess(@Nullable SubchannelPicker picker) {
277
    ArrayList<PendingStream> toProcess;
278
    synchronized (lock) {
1✔
279
      pickerState = pickerState.withPicker(picker);
1✔
280
      if (picker == null || !hasPendingStreams()) {
1✔
281
        return;
1✔
282
      }
283
      toProcess = new ArrayList<>(pendingStreams);
1✔
284
    }
1✔
285
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
286

287
    for (final PendingStream stream : toProcess) {
1✔
288
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
289
      CallOptions callOptions = stream.args.getCallOptions();
1✔
290
      // User code provided authority takes precedence over the LB provided one.
291
      if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
1✔
292
        stream.setAuthority(pickResult.getAuthorityOverride());
1✔
293
      }
294
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
295
          callOptions.isWaitForReady());
1✔
296
      if (transport != null) {
1✔
297
        Executor executor = defaultAppExecutor;
1✔
298
        // createRealStream may be expensive. It will start real streams on the transport. If
299
        // there are pending requests, they will be serialized too, which may be expensive. Since
300
        // we are now on transport thread, we need to offload the work to an executor.
301
        if (callOptions.getExecutor() != null) {
1✔
302
          executor = callOptions.getExecutor();
1✔
303
        }
304
        Runnable runnable = stream.createRealStream(transport);
1✔
305
        if (runnable != null) {
1✔
306
          executor.execute(runnable);
1✔
307
        }
308
        toRemove.add(stream);
1✔
309
      }  // else: stay pending
310
    }
1✔
311

312
    synchronized (lock) {
1✔
313
      // Between this synchronized and the previous one:
314
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
315
      //   - shutdown() may be called, which may turn pendingStreams into null.
316
      if (!hasPendingStreams()) {
1✔
317
        return;
×
318
      }
319
      pendingStreams.removeAll(toRemove);
1✔
320
      // Because delayed transport is long-lived, we take this opportunity to down-size the
321
      // hashmap.
322
      if (pendingStreams.isEmpty()) {
1✔
323
        pendingStreams = new LinkedHashSet<>();
1✔
324
      }
325
      if (!hasPendingStreams()) {
1✔
326
        // There may be a brief gap between delayed transport clearing in-use state, and first real
327
        // transport starting streams and setting in-use state.  During the gap the whole channel's
328
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
329
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
330
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
331
        syncContext.executeLater(reportTransportNotInUse);
1✔
332
        if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
1✔
333
          syncContext.executeLater(reportTransportTerminated);
1✔
334
          reportTransportTerminated = null;
1✔
335
        }
336
      }
337
    }
1✔
338
    syncContext.drain();
1✔
339
  }
1✔
340

341
  @Override
342
  public InternalLogId getLogId() {
343
    return logId;
×
344
  }
345

346
  private class PendingStream extends DelayedStream {
347
    private final PickSubchannelArgs args;
348
    private final Context context = Context.current();
1✔
349
    private final ClientStreamTracer[] tracers;
350

351
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
352
      this.args = args;
1✔
353
      this.tracers = tracers;
1✔
354
    }
1✔
355

356
    /** Runnable may be null. */
357
    private Runnable createRealStream(ClientTransport transport) {
358
      ClientStream realStream;
359
      Context origContext = context.attach();
1✔
360
      try {
361
        realStream = transport.newStream(
1✔
362
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
363
            tracers);
364
      } finally {
365
        context.detach(origContext);
1✔
366
      }
367
      return setStream(realStream);
1✔
368
    }
369

370
    @Override
371
    public void cancel(Status reason) {
372
      super.cancel(reason);
1✔
373
      synchronized (lock) {
1✔
374
        if (reportTransportTerminated != null) {
1✔
375
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
376
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
377
            syncContext.executeLater(reportTransportNotInUse);
1✔
378
            if (pickerState.shutdownStatus != null) {
1✔
379
              syncContext.executeLater(reportTransportTerminated);
1✔
380
              reportTransportTerminated = null;
1✔
381
            }
382
          }
383
        }
384
      }
1✔
385
      syncContext.drain();
1✔
386
    }
1✔
387

388
    @Override
389
    protected void onEarlyCancellation(Status reason) {
390
      for (ClientStreamTracer tracer : tracers) {
1✔
391
        tracer.streamClosed(reason);
1✔
392
      }
393
    }
1✔
394

395
    @Override
396
    public void appendTimeoutInsight(InsightBuilder insight) {
397
      if (args.getCallOptions().isWaitForReady()) {
1✔
398
        insight.append("wait_for_ready");
1✔
399
      }
400
      super.appendTimeoutInsight(insight);
1✔
401
    }
1✔
402
  }
403

404
  static final class PickerState {
405
    /**
406
     * The last picker that {@link #reprocess} has used. May be set to null when the channel has
407
     * moved to idle.
408
     */
409
    @Nullable
410
    final SubchannelPicker lastPicker;
411
    /**
412
     * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
413
     * terminated.
414
     */
415
    @Nullable
416
    final Status shutdownStatus;
417

418
    private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
1✔
419
      this.lastPicker = lastPicker;
1✔
420
      this.shutdownStatus = shutdownStatus;
1✔
421
    }
1✔
422

423
    public PickerState withPicker(SubchannelPicker newPicker) {
424
      return new PickerState(newPicker, this.shutdownStatus);
1✔
425
    }
426

427
    public PickerState withShutdownStatus(Status newShutdownStatus) {
428
      return new PickerState(this.lastPicker, newShutdownStatus);
1✔
429
    }
430
  }
431
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc