• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #18874

26 Oct 2023 06:34PM CUT coverage: 88.249% (-0.009%) from 88.258%
#18874

push

github-actions

web-flow
xds: Log ORCA UNIMPLEMENTED error to subchannel logger (#10625) (#10631)

Logging to the static instance would result in application logs filling
up if the Orca service is not available.
We'd like to have the logging on the subchannelLogger, so we make it
visible on demand.

Also succeed Orca logging test if log message present. Using
contains over containsExactly seems more reasonable.

Co-authored-by: Yannick Epstein <yannick.epstein@gmail.com>

30294 of 34328 relevant lines covered (88.25%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.03
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import io.grpc.CallOptions;
23
import io.grpc.ClientStreamTracer;
24
import io.grpc.Context;
25
import io.grpc.InternalChannelz.SocketStats;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer.PickResult;
28
import io.grpc.LoadBalancer.PickSubchannelArgs;
29
import io.grpc.LoadBalancer.SubchannelPicker;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import java.util.ArrayList;
36
import java.util.Collection;
37
import java.util.Collections;
38
import java.util.LinkedHashSet;
39
import java.util.concurrent.Executor;
40
import javax.annotation.Nonnull;
41
import javax.annotation.Nullable;
42
import javax.annotation.concurrent.GuardedBy;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /**
73
   * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
74
   * terminated.
75
   */
76
  @GuardedBy("lock")
77
  private Status shutdownStatus;
78

79
  /**
80
   * The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
81
   * to idle.
82
   */
83
  @GuardedBy("lock")
84
  @Nullable
85
  private SubchannelPicker lastPicker;
86

87
  @GuardedBy("lock")
88
  private long lastPickerVersion;
89

90
  /**
91
   * Creates a new delayed transport.
92
   *
93
   * @param defaultAppExecutor pending streams will create real streams and run bufferred operations
94
   *        in an application executor, which will be this executor, unless there is on provided in
95
   *        {@link CallOptions}.
96
   * @param syncContext all listener callbacks of the delayed transport will be run from this
97
   *        SynchronizationContext.
98
   */
99
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
100
    this.defaultAppExecutor = defaultAppExecutor;
1✔
101
    this.syncContext = syncContext;
1✔
102
  }
1✔
103

104
  @Override
105
  public final Runnable start(final Listener listener) {
106
    this.listener = listener;
1✔
107
    reportTransportInUse = new Runnable() {
1✔
108
        @Override
109
        public void run() {
110
          listener.transportInUse(true);
1✔
111
        }
1✔
112
      };
113
    reportTransportNotInUse = new Runnable() {
1✔
114
        @Override
115
        public void run() {
116
          listener.transportInUse(false);
1✔
117
        }
1✔
118
      };
119
    reportTransportTerminated = new Runnable() {
1✔
120
        @Override
121
        public void run() {
122
          listener.transportTerminated();
1✔
123
        }
1✔
124
      };
125
    return null;
1✔
126
  }
127

128
  /**
129
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
130
   * picker will be consulted.
131
   *
132
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
133
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
134
   */
135
  @Override
136
  public final ClientStream newStream(
137
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
138
      ClientStreamTracer[] tracers) {
139
    try {
140
      PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
1✔
141
      SubchannelPicker picker = null;
1✔
142
      long pickerVersion = -1;
1✔
143
      while (true) {
144
        synchronized (lock) {
1✔
145
          if (shutdownStatus != null) {
1✔
146
            return new FailingClientStream(shutdownStatus, tracers);
1✔
147
          }
148
          if (lastPicker == null) {
1✔
149
            return createPendingStream(args, tracers);
1✔
150
          }
151
          // Check for second time through the loop, and whether anything changed
152
          if (picker != null && pickerVersion == lastPickerVersion) {
1✔
153
            return createPendingStream(args, tracers);
1✔
154
          }
155
          picker = lastPicker;
1✔
156
          pickerVersion = lastPickerVersion;
1✔
157
        }
1✔
158
        PickResult pickResult = picker.pickSubchannel(args);
1✔
159
        ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
160
            callOptions.isWaitForReady());
1✔
161
        if (transport != null) {
1✔
162
          return transport.newStream(
1✔
163
              args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
164
              tracers);
165
        }
166
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
167
        // race with reprocess()), we will buffer it.  Otherwise, will try with the new picker.
168
      }
1✔
169
    } finally {
170
      syncContext.drain();
1✔
171
    }
172
  }
173

174
  /**
175
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
176
   * schedule tasks on syncContext.
177
   */
178
  @GuardedBy("lock")
179
  private PendingStream createPendingStream(
180
      PickSubchannelArgs args, ClientStreamTracer[] tracers) {
181
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
182
    pendingStreams.add(pendingStream);
1✔
183
    if (getPendingStreamsCount() == 1) {
1✔
184
      syncContext.executeLater(reportTransportInUse);
1✔
185
    }
186
    for (ClientStreamTracer streamTracer : tracers) {
1✔
187
      streamTracer.createPendingStream();
1✔
188
    }
189
    return pendingStream;
1✔
190
  }
191

192
  @Override
193
  public final void ping(final PingCallback callback, Executor executor) {
194
    throw new UnsupportedOperationException("This method is not expected to be called");
×
195
  }
196

197
  @Override
198
  public ListenableFuture<SocketStats> getStats() {
199
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
200
    ret.set(null);
×
201
    return ret;
×
202
  }
203

204
  /**
205
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
206
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
207
   * more buffered streams.
208
   */
209
  @Override
210
  public final void shutdown(final Status status) {
211
    synchronized (lock) {
1✔
212
      if (shutdownStatus != null) {
1✔
213
        return;
1✔
214
      }
215
      shutdownStatus = status;
1✔
216
      syncContext.executeLater(new Runnable() {
1✔
217
          @Override
218
          public void run() {
219
            listener.transportShutdown(status);
1✔
220
          }
1✔
221
        });
222
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
223
        syncContext.executeLater(reportTransportTerminated);
1✔
224
        reportTransportTerminated = null;
1✔
225
      }
226
    }
1✔
227
    syncContext.drain();
1✔
228
  }
1✔
229

230
  /**
231
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
232
   * this transport.
233
   */
234
  @Override
235
  public final void shutdownNow(Status status) {
236
    shutdown(status);
1✔
237
    Collection<PendingStream> savedPendingStreams;
238
    Runnable savedReportTransportTerminated;
239
    synchronized (lock) {
1✔
240
      savedPendingStreams = pendingStreams;
1✔
241
      savedReportTransportTerminated = reportTransportTerminated;
1✔
242
      reportTransportTerminated = null;
1✔
243
      if (!pendingStreams.isEmpty()) {
1✔
244
        pendingStreams = Collections.emptyList();
1✔
245
      }
246
    }
1✔
247
    if (savedReportTransportTerminated != null) {
1✔
248
      for (PendingStream stream : savedPendingStreams) {
1✔
249
        Runnable runnable = stream.setStream(
1✔
250
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
251
        if (runnable != null) {
1✔
252
          // Drain in-line instead of using an executor as failing stream just throws everything
253
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
254
          // before stream.start().
255
          runnable.run();
1✔
256
        }
257
      }
1✔
258
      syncContext.execute(savedReportTransportTerminated);
1✔
259
    }
260
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
261
    // shutdown().
262
  }
1✔
263

264
  public final boolean hasPendingStreams() {
265
    synchronized (lock) {
1✔
266
      return !pendingStreams.isEmpty();
1✔
267
    }
268
  }
269

270
  @VisibleForTesting
271
  final int getPendingStreamsCount() {
272
    synchronized (lock) {
1✔
273
      return pendingStreams.size();
1✔
274
    }
275
  }
276

277
  /**
278
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
279
   * pick is successful, otherwise keep it pending.
280
   *
281
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
282
   * streams will be served by the latest picker (if a same picker is given more than once, they are
283
   * considered different pickers) as soon as possible.
284
   *
285
   * <p>This method <strong>must not</strong> be called concurrently with itself.
286
   */
287
  final void reprocess(@Nullable SubchannelPicker picker) {
288
    ArrayList<PendingStream> toProcess;
289
    synchronized (lock) {
1✔
290
      lastPicker = picker;
1✔
291
      lastPickerVersion++;
1✔
292
      if (picker == null || !hasPendingStreams()) {
1✔
293
        return;
1✔
294
      }
295
      toProcess = new ArrayList<>(pendingStreams);
1✔
296
    }
1✔
297
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
298

299
    for (final PendingStream stream : toProcess) {
1✔
300
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
301
      CallOptions callOptions = stream.args.getCallOptions();
1✔
302
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
303
          callOptions.isWaitForReady());
1✔
304
      if (transport != null) {
1✔
305
        Executor executor = defaultAppExecutor;
1✔
306
        // createRealStream may be expensive. It will start real streams on the transport. If
307
        // there are pending requests, they will be serialized too, which may be expensive. Since
308
        // we are now on transport thread, we need to offload the work to an executor.
309
        if (callOptions.getExecutor() != null) {
1✔
310
          executor = callOptions.getExecutor();
1✔
311
        }
312
        Runnable runnable = stream.createRealStream(transport);
1✔
313
        if (runnable != null) {
1✔
314
          executor.execute(runnable);
1✔
315
        }
316
        toRemove.add(stream);
1✔
317
      }  // else: stay pending
318
    }
1✔
319

320
    synchronized (lock) {
1✔
321
      // Between this synchronized and the previous one:
322
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
323
      //   - shutdown() may be called, which may turn pendingStreams into null.
324
      if (!hasPendingStreams()) {
1✔
325
        return;
×
326
      }
327
      pendingStreams.removeAll(toRemove);
1✔
328
      // Because delayed transport is long-lived, we take this opportunity to down-size the
329
      // hashmap.
330
      if (pendingStreams.isEmpty()) {
1✔
331
        pendingStreams = new LinkedHashSet<>();
1✔
332
      }
333
      if (!hasPendingStreams()) {
1✔
334
        // There may be a brief gap between delayed transport clearing in-use state, and first real
335
        // transport starting streams and setting in-use state.  During the gap the whole channel's
336
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
337
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
338
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
339
        syncContext.executeLater(reportTransportNotInUse);
1✔
340
        if (shutdownStatus != null && reportTransportTerminated != null) {
1✔
341
          syncContext.executeLater(reportTransportTerminated);
1✔
342
          reportTransportTerminated = null;
1✔
343
        }
344
      }
345
    }
1✔
346
    syncContext.drain();
1✔
347
  }
1✔
348

349
  @Override
350
  public InternalLogId getLogId() {
351
    return logId;
×
352
  }
353

354
  private class PendingStream extends DelayedStream {
355
    private final PickSubchannelArgs args;
356
    private final Context context = Context.current();
1✔
357
    private final ClientStreamTracer[] tracers;
358

359
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
360
      this.args = args;
1✔
361
      this.tracers = tracers;
1✔
362
    }
1✔
363

364
    /** Runnable may be null. */
365
    private Runnable createRealStream(ClientTransport transport) {
366
      ClientStream realStream;
367
      Context origContext = context.attach();
1✔
368
      try {
369
        realStream = transport.newStream(
1✔
370
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
371
            tracers);
372
      } finally {
373
        context.detach(origContext);
1✔
374
      }
375
      return setStream(realStream);
1✔
376
    }
377

378
    @Override
379
    public void cancel(Status reason) {
380
      super.cancel(reason);
1✔
381
      synchronized (lock) {
1✔
382
        if (reportTransportTerminated != null) {
1✔
383
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
384
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
385
            syncContext.executeLater(reportTransportNotInUse);
1✔
386
            if (shutdownStatus != null) {
1✔
387
              syncContext.executeLater(reportTransportTerminated);
1✔
388
              reportTransportTerminated = null;
1✔
389
            }
390
          }
391
        }
392
      }
1✔
393
      syncContext.drain();
1✔
394
    }
1✔
395

396
    @Override
397
    protected void onEarlyCancellation(Status reason) {
398
      for (ClientStreamTracer tracer : tracers) {
1✔
399
        tracer.streamClosed(reason);
1✔
400
      }
401
    }
1✔
402

403
    @Override
404
    public void appendTimeoutInsight(InsightBuilder insight) {
405
      if (args.getCallOptions().isWaitForReady()) {
1✔
406
        insight.append("wait_for_ready");
1✔
407
      }
408
      super.appendTimeoutInsight(insight);
1✔
409
    }
1✔
410
  }
411
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc