• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19459

16 Sep 2024 09:43PM UTC coverage: 84.566% (+0.01%) from 84.555%
#19459

push

github

web-flow
Otel server context interceptor (#11500)

Add opentelemetry tracing API, guarded by environmental variable(disabled by default).
Use server interceptor to explicitly propagate span to the application thread.

33627 of 39764 relevant lines covered (84.57%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.3
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
21
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.Attributes;
25
import io.grpc.CallOptions;
26
import io.grpc.Channel;
27
import io.grpc.ClientCall;
28
import io.grpc.ClientInterceptor;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
31
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
32
import io.grpc.ForwardingServerCallListener;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.ServerCall;
36
import io.grpc.ServerCallHandler;
37
import io.grpc.ServerInterceptor;
38
import io.grpc.ServerStreamTracer;
39
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
40
import io.opentelemetry.api.OpenTelemetry;
41
import io.opentelemetry.api.common.AttributesBuilder;
42
import io.opentelemetry.api.trace.Span;
43
import io.opentelemetry.api.trace.StatusCode;
44
import io.opentelemetry.api.trace.Tracer;
45
import io.opentelemetry.context.Context;
46
import io.opentelemetry.context.Scope;
47
import io.opentelemetry.context.propagation.ContextPropagators;
48
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49
import java.util.logging.Level;
50
import java.util.logging.Logger;
51
import javax.annotation.Nullable;
52

53
/**
54
 * Provides factories for {@link io.grpc.StreamTracer} that records tracing to OpenTelemetry.
55
 */
56
final class OpenTelemetryTracingModule {
57
  private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName());
1✔
58

59
  @VisibleForTesting
1✔
60
  final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");
1✔
61
  @Nullable
62
  private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
63
  @Nullable
64
  private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
65

66
  /*
67
   * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
68
   * reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
69
   * (potentially racy) direct updates of the volatile variables.
70
   */
71
  static {
72
    AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
73
    AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
74
    try {
75
      tmpCallEndedUpdater =
1✔
76
          AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
1✔
77
      tmpStreamClosedUpdater =
1✔
78
          AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
79
    } catch (Throwable t) {
×
80
      logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
81
      tmpCallEndedUpdater = null;
×
82
      tmpStreamClosedUpdater = null;
×
83
    }
1✔
84
    callEndedUpdater = tmpCallEndedUpdater;
1✔
85
    streamClosedUpdater = tmpStreamClosedUpdater;
1✔
86
  }
1✔
87

88
  private final Tracer otelTracer;
89
  private final ContextPropagators contextPropagators;
90
  private final MetadataGetter metadataGetter = MetadataGetter.getInstance();
1✔
91
  private final MetadataSetter metadataSetter = MetadataSetter.getInstance();
1✔
92
  private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
1✔
93
  private final ServerInterceptor serverSpanPropagationInterceptor =
1✔
94
      new TracingServerSpanPropagationInterceptor();
95
  private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
1✔
96

97
  OpenTelemetryTracingModule(OpenTelemetry openTelemetry) {
1✔
98
    this.otelTracer = checkNotNull(openTelemetry.getTracerProvider(), "tracerProvider")
1✔
99
        .tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
1✔
100
        .setInstrumentationVersion(IMPLEMENTATION_VERSION)
1✔
101
        .build();
1✔
102
    this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators");
1✔
103
  }
1✔
104

105
  @VisibleForTesting
106
  Tracer getTracer() {
107
    return otelTracer;
1✔
108
  }
109

110
  /**
111
   * Creates a {@link CallAttemptsTracerFactory} for a new call.
112
   */
113
  @VisibleForTesting
114
  CallAttemptsTracerFactory newClientCallTracer(Span clientSpan, MethodDescriptor<?, ?> method) {
115
    return new CallAttemptsTracerFactory(clientSpan, method);
1✔
116
  }
117

118
  /**
119
   * Returns the server tracer factory.
120
   */
121
  ServerStreamTracer.Factory getServerTracerFactory() {
122
    return serverTracerFactory;
1✔
123
  }
124

125
  /**
126
   * Returns the client interceptor that facilitates otel tracing reporting.
127
   */
128
  ClientInterceptor getClientInterceptor() {
129
    return clientInterceptor;
1✔
130
  }
131

132
  ServerInterceptor getServerSpanPropagationInterceptor() {
133
    return serverSpanPropagationInterceptor;
1✔
134
  }
135

136
  @VisibleForTesting
137
  final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
138
    volatile int callEnded;
139
    private final Span clientSpan;
140
    private final String fullMethodName;
141

142
    CallAttemptsTracerFactory(Span clientSpan, MethodDescriptor<?, ?> method) {
1✔
143
      checkNotNull(method, "method");
1✔
144
      this.fullMethodName = checkNotNull(method.getFullMethodName(), "fullMethodName");
1✔
145
      this.clientSpan = checkNotNull(clientSpan, "clientSpan");
1✔
146
    }
1✔
147

148
    @Override
149
    public ClientStreamTracer newClientStreamTracer(
150
        ClientStreamTracer.StreamInfo info, Metadata headers) {
151
      Span attemptSpan = otelTracer.spanBuilder(
1✔
152
              "Attempt." + fullMethodName.replace('/', '.'))
1✔
153
          .setParent(Context.current().with(clientSpan))
1✔
154
          .startSpan();
1✔
155
      attemptSpan.setAttribute(
1✔
156
          "previous-rpc-attempts", info.getPreviousAttempts());
1✔
157
      attemptSpan.setAttribute(
1✔
158
          "transparent-retry",info.isTransparentRetry());
1✔
159
      if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED) != null) {
1✔
160
        clientSpan.addEvent("Delayed name resolution complete");
1✔
161
      }
162
      return new ClientTracer(attemptSpan, clientSpan);
1✔
163
    }
164

165
    /**
166
     * Record a finished call and mark the current time as the end time.
167
     *
168
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
169
     * is a no-op.
170
     */
171
    void callEnded(io.grpc.Status status) {
172
      if (callEndedUpdater != null) {
1✔
173
        if (callEndedUpdater.getAndSet(this, 1) != 0) {
1✔
174
          return;
×
175
        }
176
      } else {
177
        if (callEnded != 0) {
×
178
          return;
×
179
        }
180
        callEnded = 1;
×
181
      }
182
      endSpanWithStatus(clientSpan, status);
1✔
183
    }
1✔
184
  }
185

186
  private final class ClientTracer extends ClientStreamTracer {
187
    private final Span span;
188
    private final Span parentSpan;
189
    volatile int seqNo;
190
    boolean isPendingStream;
191

192
    ClientTracer(Span span, Span parentSpan) {
1✔
193
      this.span = checkNotNull(span, "span");
1✔
194
      this.parentSpan = checkNotNull(parentSpan, "parent span");
1✔
195
    }
1✔
196

197
    @Override
198
    public void streamCreated(Attributes transportAtts, Metadata headers) {
199
      contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers,
1✔
200
          metadataSetter);
1✔
201
      if (isPendingStream) {
1✔
202
        span.addEvent("Delayed LB pick complete");
1✔
203
      }
204
    }
1✔
205

206
    @Override
207
    public void createPendingStream() {
208
      isPendingStream = true;
1✔
209
    }
1✔
210

211
    @Override
212
    public void outboundMessageSent(
213
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
214
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
215
    }
1✔
216

217
    @Override
218
    public void inboundMessageRead(
219
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
220
      //TODO(yifeizhuang): needs support from message deframer.
221
      if (optionalWireSize != optionalUncompressedSize) {
1✔
222
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
223
      }
224
    }
1✔
225

226
    @Override
227
    public void inboundMessage(int seqNo) {
228
      this.seqNo = seqNo;
1✔
229
    }
1✔
230

231
    @Override
232
    public void inboundUncompressedSize(long bytes) {
233
      recordInboundMessageSize(parentSpan, seqNo, bytes);
1✔
234
    }
1✔
235

236
    @Override
237
    public void streamClosed(io.grpc.Status status) {
238
      endSpanWithStatus(span, status);
1✔
239
    }
1✔
240
  }
241

242
  private final class ServerTracer extends ServerStreamTracer {
243
    private final Span span;
244
    volatile int streamClosed;
245
    private int seqNo;
246

247
    ServerTracer(String fullMethodName, @Nullable Span remoteSpan) {
1✔
248
      checkNotNull(fullMethodName, "fullMethodName");
1✔
249
      this.span =
1✔
250
          otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName))
1✔
251
              .setParent(remoteSpan == null ? null : Context.current().with(remoteSpan))
1✔
252
              .startSpan();
1✔
253
    }
1✔
254

255
    /**
256
     * Record a finished stream and mark the current time as the end time.
257
     *
258
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
259
     * is a no-op.
260
     */
261
    @Override
262
    public void streamClosed(io.grpc.Status status) {
263
      if (streamClosedUpdater != null) {
1✔
264
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
265
          return;
×
266
        }
267
      } else {
268
        if (streamClosed != 0) {
×
269
          return;
×
270
        }
271
        streamClosed = 1;
×
272
      }
273
      endSpanWithStatus(span, status);
1✔
274
    }
1✔
275

276
    @Override
277
    public io.grpc.Context filterContext(io.grpc.Context context) {
278
      return context.withValue(otelSpan, span);
1✔
279
    }
280

281
    @Override
282
    public void outboundMessageSent(
283
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
284
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
285
    }
1✔
286

287
    @Override
288
    public void inboundMessageRead(
289
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
290
      if (optionalWireSize != optionalUncompressedSize) {
1✔
291
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
292
      }
293
    }
1✔
294

295
    @Override
296
    public void inboundMessage(int seqNo) {
297
      this.seqNo = seqNo;
1✔
298
    }
1✔
299

300
    @Override
301
    public void inboundUncompressedSize(long bytes) {
302
      recordInboundMessageSize(span, seqNo, bytes);
1✔
303
    }
1✔
304
  }
305

306
  @VisibleForTesting
307
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
308
    @SuppressWarnings("ReferenceEquality")
309
    @Override
310
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
311
      Context context = contextPropagators.getTextMapPropagator().extract(
1✔
312
          Context.current(), headers, metadataGetter
1✔
313
      );
314
      Span remoteSpan = Span.fromContext(context);
1✔
315
      if (remoteSpan == Span.getInvalid()) {
1✔
316
        remoteSpan = null;
1✔
317
      }
318
      return new ServerTracer(fullMethodName, remoteSpan);
1✔
319
    }
320
  }
321

322
  @VisibleForTesting
323
  final class TracingServerSpanPropagationInterceptor implements ServerInterceptor {
1✔
324
    @Override
325
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
326
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
327
      Span span = otelSpan.get(io.grpc.Context.current());
1✔
328
      if (span == null) {
1✔
329
        logger.log(Level.FINE, "Server span not found. ServerTracerFactory for server "
1✔
330
            + "tracing must be set.");
331
        return next.startCall(call, headers);
1✔
332
      }
333
      Context serverCallContext = Context.current().with(span);
1✔
334
      try (Scope scope = serverCallContext.makeCurrent()) {
1✔
335
        return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
1✔
336
      }
337
    }
338
  }
339

340
  private static class ContextServerCallListener<ReqT> extends
341
      ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
342
    private final Context context;
343

344
    protected ContextServerCallListener(ServerCall.Listener<ReqT> delegate, Context context) {
345
      super(delegate);
1✔
346
      this.context = checkNotNull(context, "context");
1✔
347
    }
1✔
348

349
    @Override
350
    public void onMessage(ReqT message) {
351
      try (Scope scope = context.makeCurrent()) {
1✔
352
        delegate().onMessage(message);
1✔
353
      }
354
    }
1✔
355

356
    @Override
357
    public void onHalfClose() {
358
      try (Scope scope = context.makeCurrent()) {
1✔
359
        delegate().onHalfClose();
1✔
360
      }
361
    }
1✔
362

363
    @Override
364
    public void onCancel() {
365
      try (Scope scope = context.makeCurrent()) {
1✔
366
        delegate().onCancel();
1✔
367
      }
368
    }
1✔
369

370
    @Override
371
    public void onComplete() {
372
      try (Scope scope = context.makeCurrent()) {
1✔
373
        delegate().onComplete();
1✔
374
      }
375
    }
1✔
376

377
    @Override
378
    public void onReady() {
379
      try (Scope scope = context.makeCurrent()) {
1✔
380
        delegate().onReady();
1✔
381
      }
382
    }
1✔
383
  }
384

385
  @VisibleForTesting
386
  final class TracingClientInterceptor implements ClientInterceptor {
1✔
387

388
    @Override
389
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
390
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
391
      Span clientSpan = otelTracer.spanBuilder(
1✔
392
          generateTraceSpanName(false, method.getFullMethodName()))
1✔
393
          .startSpan();
1✔
394

395
      final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
1✔
396
      ClientCall<ReqT, RespT> call =
1✔
397
          next.newCall(
1✔
398
              method,
399
              callOptions.withStreamTracerFactory(tracerFactory));
1✔
400
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
401
        @Override
402
        public void start(Listener<RespT> responseListener, Metadata headers) {
403
          delegate().start(
1✔
404
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
405
                @Override
406
                public void onClose(io.grpc.Status status, Metadata trailers) {
407
                  tracerFactory.callEnded(status);
1✔
408
                  super.onClose(status, trailers);
1✔
409
                }
1✔
410
              },
411
              headers);
412
        }
1✔
413
      };
414
    }
415
  }
416

417
  // Attribute named "message-size" always means the message size the application sees.
418
  // If there was compression, additional event reports "message-size-compressed".
419
  //
420
  // An example trace with message compression:
421
  //
422
  // Sending:
423
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854,
424
  //                                               'message-size-compressed' = 5493) ----|
425
  //
426
  // Receiving:
427
  // |-- Event 'Inbound compressed message', attributes('sequence-numer' = 0,
428
  //                                                    'message-size-compressed' = 5493 ) ----|
429
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
430
  //                                                  'message-size' = 7854) ----|
431
  //
432
  // An example trace with no message compression:
433
  //
434
  // Sending:
435
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854) ---|
436
  //
437
  // Receiving:
438
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
439
  //                                                  'message-size' = 7854) ----|
440
  private void recordOutboundMessageSentEvent(Span span,
441
      int seqNo, long optionalWireSize, long optionalUncompressedSize) {
442
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
443
    attributesBuilder.put("sequence-number", seqNo);
1✔
444
    if (optionalUncompressedSize != -1) {
1✔
445
      attributesBuilder.put("message-size", optionalUncompressedSize);
1✔
446
    }
447
    if (optionalWireSize != -1 && optionalWireSize != optionalUncompressedSize) {
1✔
448
      attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
449
    }
450
    span.addEvent("Outbound message sent", attributesBuilder.build());
1✔
451
  }
1✔
452

453
  private void recordInboundCompressedMessage(Span span, int seqNo, long optionalWireSize) {
454
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
455
    attributesBuilder.put("sequence-number", seqNo);
1✔
456
    attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
457
    span.addEvent("Inbound compressed message", attributesBuilder.build());
1✔
458
  }
1✔
459

460
  private void recordInboundMessageSize(Span span, int seqNo, long bytes) {
461
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
462
    attributesBuilder.put("sequence-number", seqNo);
1✔
463
    attributesBuilder.put("message-size", bytes);
1✔
464
    span.addEvent("Inbound message received", attributesBuilder.build());
1✔
465
  }
1✔
466

467
  private String generateErrorStatusDescription(io.grpc.Status status) {
468
    if (status.getDescription() != null) {
1✔
469
      return status.getCode() + ": " + status.getDescription();
1✔
470
    } else {
471
      return status.getCode().toString();
1✔
472
    }
473
  }
474

475
  private void endSpanWithStatus(Span span, io.grpc.Status status) {
476
    if (status.isOk()) {
1✔
477
      span.setStatus(StatusCode.OK);
1✔
478
    } else {
479
      span.setStatus(StatusCode.ERROR, generateErrorStatusDescription(status));
1✔
480
    }
481
    span.end();
1✔
482
  }
1✔
483

484
  /**
485
   * Convert a full method name to a tracing span name.
486
   *
487
   * @param isServer {@code false} if the span is on the client-side, {@code true} if on the
488
   *                 server-side
489
   * @param fullMethodName the method name as returned by
490
   *        {@link MethodDescriptor#getFullMethodName}.
491
   */
492
  @VisibleForTesting
493
  static String generateTraceSpanName(boolean isServer, String fullMethodName) {
494
    String prefix = isServer ? "Recv" : "Sent";
1✔
495
    return prefix + "." + fullMethodName.replace('/', '.');
1✔
496
  }
497
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc