• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20085

14 Nov 2025 11:20PM UTC coverage: 88.511% (-0.05%) from 88.561%
#20085

push

github

ejona86
opentelemetry: propagate baggage to metrics for custom attributes, helps with b/406058193 (#12389)

34999 of 39542 relevant lines covered (88.51%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.51
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
21
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
23

24
import com.google.common.annotations.VisibleForTesting;
25
import io.grpc.Attributes;
26
import io.grpc.CallOptions;
27
import io.grpc.Channel;
28
import io.grpc.ClientCall;
29
import io.grpc.ClientInterceptor;
30
import io.grpc.ClientStreamTracer;
31
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
32
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
33
import io.grpc.ForwardingServerCallListener;
34
import io.grpc.Metadata;
35
import io.grpc.MethodDescriptor;
36
import io.grpc.ServerCall;
37
import io.grpc.ServerCallHandler;
38
import io.grpc.ServerInterceptor;
39
import io.grpc.ServerStreamTracer;
40
import io.grpc.internal.GrpcUtil;
41
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
42
import io.opentelemetry.api.OpenTelemetry;
43
import io.opentelemetry.api.baggage.Baggage;
44
import io.opentelemetry.api.common.AttributesBuilder;
45
import io.opentelemetry.api.trace.Span;
46
import io.opentelemetry.api.trace.StatusCode;
47
import io.opentelemetry.api.trace.Tracer;
48
import io.opentelemetry.context.Context;
49
import io.opentelemetry.context.Scope;
50
import io.opentelemetry.context.propagation.ContextPropagators;
51
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
52
import java.util.logging.Level;
53
import java.util.logging.Logger;
54
import javax.annotation.Nullable;
55

56
/**
57
 * Provides factories for {@link io.grpc.StreamTracer} that records tracing to OpenTelemetry.
58
 */
59
final class OpenTelemetryTracingModule {
60
  private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName());
1✔
61

62
  @VisibleForTesting
1✔
63
  final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");
1✔
64

65
  @Nullable
66
  private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
67
  @Nullable
68
  private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
69

70
  /*
71
   * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
72
   * reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
73
   * (potentially racy) direct updates of the volatile variables.
74
   */
75
  static {
76
    AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
77
    AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
78
    try {
79
      tmpCallEndedUpdater =
1✔
80
          AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
1✔
81
      tmpStreamClosedUpdater =
1✔
82
          AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
83
    } catch (Throwable t) {
×
84
      logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
85
      tmpCallEndedUpdater = null;
×
86
      tmpStreamClosedUpdater = null;
×
87
    }
1✔
88
    callEndedUpdater = tmpCallEndedUpdater;
1✔
89
    streamClosedUpdater = tmpStreamClosedUpdater;
1✔
90
  }
1✔
91

92
  private final Tracer otelTracer;
93
  private final ContextPropagators contextPropagators;
94
  private final MetadataGetter metadataGetter = MetadataGetter.getInstance();
1✔
95
  private final MetadataSetter metadataSetter = MetadataSetter.getInstance();
1✔
96
  private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
1✔
97
  private final ServerInterceptor serverSpanPropagationInterceptor =
1✔
98
      new TracingServerSpanPropagationInterceptor();
99
  private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
1✔
100

101
  OpenTelemetryTracingModule(OpenTelemetry openTelemetry) {
1✔
102
    this.otelTracer = checkNotNull(openTelemetry.getTracerProvider(), "tracerProvider")
1✔
103
        .tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
1✔
104
        .setInstrumentationVersion(IMPLEMENTATION_VERSION)
1✔
105
        .build();
1✔
106
    this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators");
1✔
107
  }
1✔
108

109
  @VisibleForTesting
110
  Tracer getTracer() {
111
    return otelTracer;
1✔
112
  }
113

114
  /**
115
   * Creates a {@link CallAttemptsTracerFactory} for a new call.
116
   */
117
  @VisibleForTesting
118
  CallAttemptsTracerFactory newClientCallTracer(Span clientSpan, MethodDescriptor<?, ?> method) {
119
    return new CallAttemptsTracerFactory(clientSpan, method);
1✔
120
  }
121

122
  /**
123
   * Returns the server tracer factory.
124
   */
125
  ServerStreamTracer.Factory getServerTracerFactory() {
126
    return serverTracerFactory;
1✔
127
  }
128

129
  /**
130
   * Returns the client interceptor that facilitates otel tracing reporting.
131
   */
132
  ClientInterceptor getClientInterceptor() {
133
    return clientInterceptor;
1✔
134
  }
135

136
  ServerInterceptor getServerSpanPropagationInterceptor() {
137
    return serverSpanPropagationInterceptor;
1✔
138
  }
139

140
  @VisibleForTesting
141
  final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
142
    volatile int callEnded;
143
    private final Span clientSpan;
144
    private final String fullMethodName;
145

146
    CallAttemptsTracerFactory(Span clientSpan, MethodDescriptor<?, ?> method) {
1✔
147
      checkNotNull(method, "method");
1✔
148
      this.fullMethodName = checkNotNull(method.getFullMethodName(), "fullMethodName");
1✔
149
      this.clientSpan = checkNotNull(clientSpan, "clientSpan");
1✔
150
    }
1✔
151

152
    @Override
153
    public ClientStreamTracer newClientStreamTracer(
154
        ClientStreamTracer.StreamInfo info, Metadata headers) {
155
      Span attemptSpan = otelTracer.spanBuilder(
1✔
156
              "Attempt." + fullMethodName.replace('/', '.'))
1✔
157
          .setParent(Context.current().with(clientSpan))
1✔
158
          .startSpan();
1✔
159
      attemptSpan.setAttribute(
1✔
160
          "previous-rpc-attempts", info.getPreviousAttempts());
1✔
161
      attemptSpan.setAttribute(
1✔
162
          "transparent-retry",info.isTransparentRetry());
1✔
163
      if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED) != null) {
1✔
164
        clientSpan.addEvent("Delayed name resolution complete");
1✔
165
      }
166
      return new ClientTracer(attemptSpan, clientSpan);
1✔
167
    }
168

169
    /**
170
     * Record a finished call and mark the current time as the end time.
171
     *
172
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
173
     * is a no-op.
174
     */
175
    void callEnded(io.grpc.Status status) {
176
      if (callEndedUpdater != null) {
1✔
177
        if (callEndedUpdater.getAndSet(this, 1) != 0) {
1✔
178
          return;
×
179
        }
180
      } else {
181
        if (callEnded != 0) {
×
182
          return;
×
183
        }
184
        callEnded = 1;
×
185
      }
186
      endSpanWithStatus(clientSpan, status);
1✔
187
    }
1✔
188
  }
189

190
  private final class ClientTracer extends ClientStreamTracer {
191
    private final Span span;
192
    private final Span parentSpan;
193
    volatile int seqNo;
194
    boolean isPendingStream;
195

196
    ClientTracer(Span span, Span parentSpan) {
1✔
197
      this.span = checkNotNull(span, "span");
1✔
198
      this.parentSpan = checkNotNull(parentSpan, "parent span");
1✔
199
    }
1✔
200

201
    @Override
202
    public void streamCreated(Attributes transportAtts, Metadata headers) {
203
      contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers,
1✔
204
          metadataSetter);
1✔
205
      if (isPendingStream) {
1✔
206
        span.addEvent("Delayed LB pick complete");
1✔
207
      }
208
    }
1✔
209

210
    @Override
211
    public void createPendingStream() {
212
      isPendingStream = true;
1✔
213
    }
1✔
214

215
    @Override
216
    public void outboundMessageSent(
217
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
218
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
219
    }
1✔
220

221
    @Override
222
    public void inboundMessageRead(
223
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
224
      if (optionalWireSize != optionalUncompressedSize) {
1✔
225
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
226
      }
227
    }
1✔
228

229
    @Override
230
    public void inboundMessage(int seqNo) {
231
      this.seqNo = seqNo;
1✔
232
    }
1✔
233

234
    @Override
235
    public void inboundUncompressedSize(long bytes) {
236
      recordInboundMessageSize(parentSpan, seqNo, bytes);
1✔
237
    }
1✔
238

239
    @Override
240
    public void streamClosed(io.grpc.Status status) {
241
      endSpanWithStatus(span, status);
1✔
242
    }
1✔
243
  }
244

245
  private final class ServerTracer extends ServerStreamTracer {
246
    private final Span span;
247
    volatile int streamClosed;
248
    private int seqNo;
249
    private Baggage baggage;
250

251
    ServerTracer(String fullMethodName, @Nullable Span remoteSpan, Baggage baggage) {
1✔
252
      checkNotNull(fullMethodName, "fullMethodName");
1✔
253
      this.span =
1✔
254
          otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName))
1✔
255
              .setParent(remoteSpan == null ? null : Context.current().with(remoteSpan))
1✔
256
              .startSpan();
1✔
257
      this.baggage = baggage;
1✔
258
    }
1✔
259

260
    /**
261
     * Record a finished stream and mark the current time as the end time.
262
     *
263
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
264
     * is a no-op.
265
     */
266
    @Override
267
    public void streamClosed(io.grpc.Status status) {
268
      if (streamClosedUpdater != null) {
1✔
269
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
270
          return;
×
271
        }
272
      } else {
273
        if (streamClosed != 0) {
×
274
          return;
×
275
        }
276
        streamClosed = 1;
×
277
      }
278
      endSpanWithStatus(span, status);
1✔
279
    }
1✔
280

281
    @Override
282
    public io.grpc.Context filterContext(io.grpc.Context context) {
283
      return context
1✔
284
          .withValue(otelSpan, span)
1✔
285
          .withValue(BAGGAGE_KEY, baggage);
1✔
286
    }
287

288
    @Override
289
    public void outboundMessageSent(
290
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
291
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
292
    }
1✔
293

294
    @Override
295
    public void inboundMessageRead(
296
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
297
      if (optionalWireSize != optionalUncompressedSize) {
1✔
298
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
299
      }
300
    }
1✔
301

302
    @Override
303
    public void inboundMessage(int seqNo) {
304
      this.seqNo = seqNo;
1✔
305
    }
1✔
306

307
    @Override
308
    public void inboundUncompressedSize(long bytes) {
309
      recordInboundMessageSize(span, seqNo, bytes);
1✔
310
    }
1✔
311
  }
312

313
  @VisibleForTesting
314
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
315
    @SuppressWarnings("ReferenceEquality")
316
    @Override
317
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
318
      Context context = contextPropagators.getTextMapPropagator().extract(
1✔
319
          Context.current(), headers, metadataGetter
1✔
320
      );
321
      Span remoteSpan = Span.fromContext(context);
1✔
322
      if (remoteSpan == Span.getInvalid()) {
1✔
323
        remoteSpan = null;
1✔
324
      }
325
      Baggage baggage = Baggage.fromContext(context);
1✔
326
      return new ServerTracer(fullMethodName, remoteSpan, baggage);
1✔
327
    }
328
  }
329

330
  @VisibleForTesting
331
  final class TracingServerSpanPropagationInterceptor implements ServerInterceptor {
1✔
332
    @Override
333
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
334
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
335
      Span span = otelSpan.get(io.grpc.Context.current());
1✔
336
      if (span == null) {
1✔
337
        logger.log(Level.FINE, "Server span not found. ServerTracerFactory for server "
1✔
338
            + "tracing must be set.");
339
        return next.startCall(call, headers);
1✔
340
      }
341
      Context serverCallContext = Context.current();
1✔
342
      serverCallContext = serverCallContext.with(span);
1✔
343
      Baggage baggage = BAGGAGE_KEY.get();
1✔
344
      if (baggage != null) {
1✔
345
        serverCallContext = serverCallContext.with(baggage);
1✔
346
      } else {
347
        logger.log(Level.WARNING, "Server baggage not found which is unexpected, "
1✔
348
            + "as it is being added unconditionally in filterContext().");
349
      }
350
      try (Scope scope = serverCallContext.makeCurrent()) {
1✔
351
        return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
1✔
352
      }
353
    }
354
  }
355

356
  private static class ContextServerCallListener<ReqT> extends
357
      ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
358
    private final Context context;
359

360
    protected ContextServerCallListener(ServerCall.Listener<ReqT> delegate, Context context) {
361
      super(delegate);
1✔
362
      this.context = checkNotNull(context, "context");
1✔
363
    }
1✔
364

365
    @Override
366
    public void onMessage(ReqT message) {
367
      try (Scope scope = context.makeCurrent()) {
1✔
368
        delegate().onMessage(message);
1✔
369
      }
370
    }
1✔
371

372
    @Override
373
    public void onHalfClose() {
374
      try (Scope scope = context.makeCurrent()) {
1✔
375
        delegate().onHalfClose();
1✔
376
      }
377
    }
1✔
378

379
    @Override
380
    public void onCancel() {
381
      try (Scope scope = context.makeCurrent()) {
1✔
382
        delegate().onCancel();
1✔
383
      }
384
    }
1✔
385

386
    @Override
387
    public void onComplete() {
388
      try (Scope scope = context.makeCurrent()) {
1✔
389
        delegate().onComplete();
1✔
390
      }
391
    }
1✔
392

393
    @Override
394
    public void onReady() {
395
      try (Scope scope = context.makeCurrent()) {
1✔
396
        delegate().onReady();
1✔
397
      }
398
    }
1✔
399
  }
400

401
  @VisibleForTesting
402
  final class TracingClientInterceptor implements ClientInterceptor {
1✔
403

404
    @Override
405
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
406
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
407
      Span clientSpan = otelTracer.spanBuilder(
1✔
408
          generateTraceSpanName(false, method.getFullMethodName()))
1✔
409
          .startSpan();
1✔
410

411
      final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
1✔
412
      ClientCall<ReqT, RespT> call =
1✔
413
          next.newCall(
1✔
414
              method,
415
              callOptions.withStreamTracerFactory(tracerFactory));
1✔
416
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
417
        @Override
418
        public void start(Listener<RespT> responseListener, Metadata headers) {
419
          delegate().start(
1✔
420
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
421
                @Override
422
                public void onClose(io.grpc.Status status, Metadata trailers) {
423
                  tracerFactory.callEnded(status);
1✔
424
                  super.onClose(status, trailers);
1✔
425
                }
1✔
426
              },
427
              headers);
428
        }
1✔
429
      };
430
    }
431
  }
432

433
  // Attribute named "message-size" always means the message size the application sees.
434
  // If there was compression, additional event reports "message-size-compressed".
435
  //
436
  // An example trace with message compression:
437
  //
438
  // Sending:
439
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854,
440
  //                                               'message-size-compressed' = 5493) ----|
441
  //
442
  // Receiving:
443
  // |-- Event 'Inbound compressed message', attributes('sequence-numer' = 0,
444
  //                                                    'message-size-compressed' = 5493 ) ----|
445
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
446
  //                                                  'message-size' = 7854) ----|
447
  //
448
  // An example trace with no message compression:
449
  //
450
  // Sending:
451
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854) ---|
452
  //
453
  // Receiving:
454
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
455
  //                                                  'message-size' = 7854) ----|
456
  private void recordOutboundMessageSentEvent(Span span,
457
      int seqNo, long optionalWireSize, long optionalUncompressedSize) {
458
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
459
    attributesBuilder.put("sequence-number", seqNo);
1✔
460
    if (optionalUncompressedSize != -1) {
1✔
461
      attributesBuilder.put("message-size", optionalUncompressedSize);
1✔
462
    }
463
    if (optionalWireSize != -1 && optionalWireSize != optionalUncompressedSize) {
1✔
464
      attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
465
    }
466
    span.addEvent("Outbound message", attributesBuilder.build());
1✔
467
  }
1✔
468

469
  private void recordInboundCompressedMessage(Span span, int seqNo, long optionalWireSize) {
470
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
471
    attributesBuilder.put("sequence-number", seqNo);
1✔
472
    attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
473
    span.addEvent("Inbound compressed message", attributesBuilder.build());
1✔
474
  }
1✔
475

476
  private void recordInboundMessageSize(Span span, int seqNo, long bytes) {
477
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
478
    attributesBuilder.put("sequence-number", seqNo);
1✔
479
    attributesBuilder.put("message-size", bytes);
1✔
480
    span.addEvent("Inbound message", attributesBuilder.build());
1✔
481
  }
1✔
482

483
  private void endSpanWithStatus(Span span, io.grpc.Status status) {
484
    if (status.isOk()) {
1✔
485
      span.setStatus(StatusCode.OK);
1✔
486
    } else {
487
      span.setStatus(StatusCode.ERROR, GrpcUtil.statusToPrettyString(status));
1✔
488
    }
489
    span.end();
1✔
490
  }
1✔
491

492
  /**
493
   * Convert a full method name to a tracing span name.
494
   *
495
   * @param isServer {@code false} if the span is on the client-side, {@code true} if on the
496
   *                 server-side
497
   * @param fullMethodName the method name as returned by
498
   *        {@link MethodDescriptor#getFullMethodName}.
499
   */
500
  @VisibleForTesting
501
  static String generateTraceSpanName(boolean isServer, String fullMethodName) {
502
    String prefix = isServer ? "Recv" : "Sent";
1✔
503
    return prefix + "." + fullMethodName.replace('/', '.');
1✔
504
  }
505
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc