• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19293

21 Jun 2024 09:35PM UTC coverage: 88.324% (+0.005%) from 88.319%
#19293

push

github

web-flow
examples: Add gRPC OpenTelemetry example (#11299)

Add gRPC OpenTelemetry example. The example uses Prometheus exporter to export metrics and can be verified locally.
It also provides an example using LoggingMetricExporter to export and log the metrics using java.util.logging.

32075 of 36315 relevant lines covered (88.32%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.1
/../core/src/main/java/io/grpc/internal/DelayedClientTransport.java
1
/*
2
 * Copyright 2015 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.internal;
18

19
import com.google.common.annotations.VisibleForTesting;
20
import com.google.common.util.concurrent.ListenableFuture;
21
import com.google.common.util.concurrent.SettableFuture;
22
import io.grpc.CallOptions;
23
import io.grpc.ClientStreamTracer;
24
import io.grpc.Context;
25
import io.grpc.InternalChannelz.SocketStats;
26
import io.grpc.InternalLogId;
27
import io.grpc.LoadBalancer.PickResult;
28
import io.grpc.LoadBalancer.PickSubchannelArgs;
29
import io.grpc.LoadBalancer.SubchannelPicker;
30
import io.grpc.Metadata;
31
import io.grpc.MethodDescriptor;
32
import io.grpc.Status;
33
import io.grpc.SynchronizationContext;
34
import io.grpc.internal.ClientStreamListener.RpcProgress;
35
import java.util.ArrayList;
36
import java.util.Collection;
37
import java.util.Collections;
38
import java.util.LinkedHashSet;
39
import java.util.concurrent.Executor;
40
import javax.annotation.Nonnull;
41
import javax.annotation.Nullable;
42
import javax.annotation.concurrent.GuardedBy;
43

44
/**
45
 * A client transport that queues requests before a real transport is available. When {@link
46
 * #reprocess} is called, this class applies the provided {@link SubchannelPicker} to pick a
47
 * transport for each pending stream.
48
 *
49
 * <p>This transport owns every stream that it has created until a real transport has been picked
50
 * for that stream, at which point the ownership of the stream is transferred to the real transport,
51
 * thus the delayed transport stops owning the stream.
52
 */
53
final class DelayedClientTransport implements ManagedClientTransport {
54
  // lazily allocated, since it is infrequently used.
55
  private final InternalLogId logId =
1✔
56
      InternalLogId.allocate(DelayedClientTransport.class, /*details=*/ null);
1✔
57

58
  private final Object lock = new Object();
1✔
59

60
  private final Executor defaultAppExecutor;
61
  private final SynchronizationContext syncContext;
62

63
  private Runnable reportTransportInUse;
64
  private Runnable reportTransportNotInUse;
65
  private Runnable reportTransportTerminated;
66
  private Listener listener;
67

68
  @Nonnull
1✔
69
  @GuardedBy("lock")
70
  private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
71

72
  /** Immutable state needed for picking. 'lock' must be held for writing. */
73
  private volatile PickerState pickerState = new PickerState(null, null);
1✔
74

75
  /**
76
   * Creates a new delayed transport.
77
   *
78
   * @param defaultAppExecutor pending streams will create real streams and run buffered operations
79
   *        in an application executor, which will be this executor, unless there is on provided in
80
   *        {@link CallOptions}.
81
   * @param syncContext all listener callbacks of the delayed transport will be run from this
82
   *        SynchronizationContext.
83
   */
84
  DelayedClientTransport(Executor defaultAppExecutor, SynchronizationContext syncContext) {
1✔
85
    this.defaultAppExecutor = defaultAppExecutor;
1✔
86
    this.syncContext = syncContext;
1✔
87
  }
1✔
88

89
  @Override
90
  public final Runnable start(final Listener listener) {
91
    this.listener = listener;
1✔
92
    reportTransportInUse = new Runnable() {
1✔
93
        @Override
94
        public void run() {
95
          listener.transportInUse(true);
1✔
96
        }
1✔
97
      };
98
    reportTransportNotInUse = new Runnable() {
1✔
99
        @Override
100
        public void run() {
101
          listener.transportInUse(false);
1✔
102
        }
1✔
103
      };
104
    reportTransportTerminated = new Runnable() {
1✔
105
        @Override
106
        public void run() {
107
          listener.transportTerminated();
1✔
108
        }
1✔
109
      };
110
    return null;
1✔
111
  }
112

113
  /**
114
   * If a {@link SubchannelPicker} is being, or has been provided via {@link #reprocess}, the last
115
   * picker will be consulted.
116
   *
117
   * <p>Otherwise, if the delayed transport is not shutdown, then a {@link PendingStream} is
118
   * returned; if the transport is shutdown, then a {@link FailingClientStream} is returned.
119
   */
120
  @Override
121
  public final ClientStream newStream(
122
      MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
123
      ClientStreamTracer[] tracers) {
124
    try {
125
      PickSubchannelArgs args = new PickSubchannelArgsImpl(
1✔
126
          method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
127
      PickerState state = pickerState;
1✔
128
      while (true) {
129
        if (state.shutdownStatus != null) {
1✔
130
          return new FailingClientStream(state.shutdownStatus, tracers);
1✔
131
        }
132
        if (state.lastPicker != null) {
1✔
133
          PickResult pickResult = state.lastPicker.pickSubchannel(args);
1✔
134
          ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
135
              callOptions.isWaitForReady());
1✔
136
          if (transport != null) {
1✔
137
            return transport.newStream(
1✔
138
                args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
139
                tracers);
140
          }
141
        }
142
        // This picker's conclusion is "buffer".  If there hasn't been a newer picker set (possible
143
        // race with reprocess()), we will buffer the RPC.  Otherwise, will try with the new picker.
144
        synchronized (lock) {
1✔
145
          PickerState newerState = pickerState;
1✔
146
          if (state == newerState) {
1✔
147
            return createPendingStream(args, tracers);
1✔
148
          }
149
          state = newerState;
1✔
150
        }
1✔
151
      }
152
    } finally {
153
      syncContext.drain();
1✔
154
    }
155
  }
156

157
  /**
158
   * Caller must call {@code syncContext.drain()} outside of lock because this method may
159
   * schedule tasks on syncContext.
160
   */
161
  @GuardedBy("lock")
162
  private PendingStream createPendingStream(
163
      PickSubchannelArgs args, ClientStreamTracer[] tracers) {
164
    PendingStream pendingStream = new PendingStream(args, tracers);
1✔
165
    pendingStreams.add(pendingStream);
1✔
166
    if (getPendingStreamsCount() == 1) {
1✔
167
      syncContext.executeLater(reportTransportInUse);
1✔
168
    }
169
    for (ClientStreamTracer streamTracer : tracers) {
1✔
170
      streamTracer.createPendingStream();
1✔
171
    }
172
    return pendingStream;
1✔
173
  }
174

175
  @Override
176
  public final void ping(final PingCallback callback, Executor executor) {
177
    throw new UnsupportedOperationException("This method is not expected to be called");
×
178
  }
179

180
  @Override
181
  public ListenableFuture<SocketStats> getStats() {
182
    SettableFuture<SocketStats> ret = SettableFuture.create();
×
183
    ret.set(null);
×
184
    return ret;
×
185
  }
186

187
  /**
188
   * Prevents creating any new streams.  Buffered streams are not failed and may still proceed
189
   * when {@link #reprocess} is called.  The delayed transport will be terminated when there is no
190
   * more buffered streams.
191
   */
192
  @Override
193
  public final void shutdown(final Status status) {
194
    synchronized (lock) {
1✔
195
      if (pickerState.shutdownStatus != null) {
1✔
196
        return;
1✔
197
      }
198
      pickerState = pickerState.withShutdownStatus(status);
1✔
199
      syncContext.executeLater(new Runnable() {
1✔
200
          @Override
201
          public void run() {
202
            listener.transportShutdown(status);
1✔
203
          }
1✔
204
        });
205
      if (!hasPendingStreams() && reportTransportTerminated != null) {
1✔
206
        syncContext.executeLater(reportTransportTerminated);
1✔
207
        reportTransportTerminated = null;
1✔
208
      }
209
    }
1✔
210
    syncContext.drain();
1✔
211
  }
1✔
212

213
  /**
214
   * Shuts down this transport and cancels all streams that it owns, hence immediately terminates
215
   * this transport.
216
   */
217
  @Override
218
  public final void shutdownNow(Status status) {
219
    shutdown(status);
1✔
220
    Collection<PendingStream> savedPendingStreams;
221
    Runnable savedReportTransportTerminated;
222
    synchronized (lock) {
1✔
223
      savedPendingStreams = pendingStreams;
1✔
224
      savedReportTransportTerminated = reportTransportTerminated;
1✔
225
      reportTransportTerminated = null;
1✔
226
      if (!pendingStreams.isEmpty()) {
1✔
227
        pendingStreams = Collections.emptyList();
1✔
228
      }
229
    }
1✔
230
    if (savedReportTransportTerminated != null) {
1✔
231
      for (PendingStream stream : savedPendingStreams) {
1✔
232
        Runnable runnable = stream.setStream(
1✔
233
            new FailingClientStream(status, RpcProgress.REFUSED, stream.tracers));
1✔
234
        if (runnable != null) {
1✔
235
          // Drain in-line instead of using an executor as failing stream just throws everything
236
          // away. This is essentially the same behavior as DelayedStream.cancel() but can be done
237
          // before stream.start().
238
          runnable.run();
1✔
239
        }
240
      }
1✔
241
      syncContext.execute(savedReportTransportTerminated);
1✔
242
    }
243
    // If savedReportTransportTerminated == null, transportTerminated() has already been called in
244
    // shutdown().
245
  }
1✔
246

247
  public final boolean hasPendingStreams() {
248
    synchronized (lock) {
1✔
249
      return !pendingStreams.isEmpty();
1✔
250
    }
251
  }
252

253
  @VisibleForTesting
254
  final int getPendingStreamsCount() {
255
    synchronized (lock) {
1✔
256
      return pendingStreams.size();
1✔
257
    }
258
  }
259

260
  /**
261
   * Use the picker to try picking a transport for every pending stream, proceed the stream if the
262
   * pick is successful, otherwise keep it pending.
263
   *
264
   * <p>This method may be called concurrently with {@code newStream()}, and it's safe.  All pending
265
   * streams will be served by the latest picker (if a same picker is given more than once, they are
266
   * considered different pickers) as soon as possible.
267
   *
268
   * <p>This method <strong>must not</strong> be called concurrently with itself.
269
   */
270
  final void reprocess(@Nullable SubchannelPicker picker) {
271
    ArrayList<PendingStream> toProcess;
272
    synchronized (lock) {
1✔
273
      pickerState = pickerState.withPicker(picker);
1✔
274
      if (picker == null || !hasPendingStreams()) {
1✔
275
        return;
1✔
276
      }
277
      toProcess = new ArrayList<>(pendingStreams);
1✔
278
    }
1✔
279
    ArrayList<PendingStream> toRemove = new ArrayList<>();
1✔
280

281
    for (final PendingStream stream : toProcess) {
1✔
282
      PickResult pickResult = picker.pickSubchannel(stream.args);
1✔
283
      CallOptions callOptions = stream.args.getCallOptions();
1✔
284
      final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
1✔
285
          callOptions.isWaitForReady());
1✔
286
      if (transport != null) {
1✔
287
        Executor executor = defaultAppExecutor;
1✔
288
        // createRealStream may be expensive. It will start real streams on the transport. If
289
        // there are pending requests, they will be serialized too, which may be expensive. Since
290
        // we are now on transport thread, we need to offload the work to an executor.
291
        if (callOptions.getExecutor() != null) {
1✔
292
          executor = callOptions.getExecutor();
1✔
293
        }
294
        Runnable runnable = stream.createRealStream(transport);
1✔
295
        if (runnable != null) {
1✔
296
          executor.execute(runnable);
1✔
297
        }
298
        toRemove.add(stream);
1✔
299
      }  // else: stay pending
300
    }
1✔
301

302
    synchronized (lock) {
1✔
303
      // Between this synchronized and the previous one:
304
      //   - Streams may have been cancelled, which may turn pendingStreams into emptiness.
305
      //   - shutdown() may be called, which may turn pendingStreams into null.
306
      if (!hasPendingStreams()) {
1✔
307
        return;
×
308
      }
309
      pendingStreams.removeAll(toRemove);
1✔
310
      // Because delayed transport is long-lived, we take this opportunity to down-size the
311
      // hashmap.
312
      if (pendingStreams.isEmpty()) {
1✔
313
        pendingStreams = new LinkedHashSet<>();
1✔
314
      }
315
      if (!hasPendingStreams()) {
1✔
316
        // There may be a brief gap between delayed transport clearing in-use state, and first real
317
        // transport starting streams and setting in-use state.  During the gap the whole channel's
318
        // in-use state may be false. However, it shouldn't cause spurious switching to idleness
319
        // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
320
        // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
321
        syncContext.executeLater(reportTransportNotInUse);
1✔
322
        if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
1✔
323
          syncContext.executeLater(reportTransportTerminated);
1✔
324
          reportTransportTerminated = null;
1✔
325
        }
326
      }
327
    }
1✔
328
    syncContext.drain();
1✔
329
  }
1✔
330

331
  @Override
332
  public InternalLogId getLogId() {
333
    return logId;
×
334
  }
335

336
  private class PendingStream extends DelayedStream {
337
    private final PickSubchannelArgs args;
338
    private final Context context = Context.current();
1✔
339
    private final ClientStreamTracer[] tracers;
340

341
    private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
1✔
342
      this.args = args;
1✔
343
      this.tracers = tracers;
1✔
344
    }
1✔
345

346
    /** Runnable may be null. */
347
    private Runnable createRealStream(ClientTransport transport) {
348
      ClientStream realStream;
349
      Context origContext = context.attach();
1✔
350
      try {
351
        realStream = transport.newStream(
1✔
352
            args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
1✔
353
            tracers);
354
      } finally {
355
        context.detach(origContext);
1✔
356
      }
357
      return setStream(realStream);
1✔
358
    }
359

360
    @Override
361
    public void cancel(Status reason) {
362
      super.cancel(reason);
1✔
363
      synchronized (lock) {
1✔
364
        if (reportTransportTerminated != null) {
1✔
365
          boolean justRemovedAnElement = pendingStreams.remove(this);
1✔
366
          if (!hasPendingStreams() && justRemovedAnElement) {
1✔
367
            syncContext.executeLater(reportTransportNotInUse);
1✔
368
            if (pickerState.shutdownStatus != null) {
1✔
369
              syncContext.executeLater(reportTransportTerminated);
1✔
370
              reportTransportTerminated = null;
1✔
371
            }
372
          }
373
        }
374
      }
1✔
375
      syncContext.drain();
1✔
376
    }
1✔
377

378
    @Override
379
    protected void onEarlyCancellation(Status reason) {
380
      for (ClientStreamTracer tracer : tracers) {
1✔
381
        tracer.streamClosed(reason);
1✔
382
      }
383
    }
1✔
384

385
    @Override
386
    public void appendTimeoutInsight(InsightBuilder insight) {
387
      if (args.getCallOptions().isWaitForReady()) {
1✔
388
        insight.append("wait_for_ready");
1✔
389
      }
390
      super.appendTimeoutInsight(insight);
1✔
391
    }
1✔
392
  }
393

394
  static final class PickerState {
395
    /**
396
     * The last picker that {@link #reprocess} has used. May be set to null when the channel has
397
     * moved to idle.
398
     */
399
    @Nullable
400
    final SubchannelPicker lastPicker;
401
    /**
402
     * When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
403
     * terminated.
404
     */
405
    @Nullable
406
    final Status shutdownStatus;
407

408
    private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
1✔
409
      this.lastPicker = lastPicker;
1✔
410
      this.shutdownStatus = shutdownStatus;
1✔
411
    }
1✔
412

413
    public PickerState withPicker(SubchannelPicker newPicker) {
414
      return new PickerState(newPicker, this.shutdownStatus);
1✔
415
    }
416

417
    public PickerState withShutdownStatus(Status newShutdownStatus) {
418
      return new PickerState(this.lastPicker, newShutdownStatus);
1✔
419
    }
420
  }
421
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc