• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20034

29 Oct 2025 05:05PM UTC coverage: 88.514% (-0.02%) from 88.533%
#20034

push

github

ejona86
Include causal status details in higher-level statuses

When an operation fails and we want to produce a new status at a higher
level, we commonly are turning the first status into an exception to
attach to the new exception. We should instead prefer to keep as much
information in the status description itself, as cause is not as
reliable to be logged/propagated.

I do expect long-term we'll want to expose an API in grpc-api for this,
but for the moment let's keep it internal. In particular, we'd have to
figure out its name. I could also believe we might want different
formatting, which becomes a clearer discussion when we can see the
usages.

I'm pretty certain there are some other places that could benefit from
this utility, as I remember really wishing I had these functions a month
or two ago. But these are the places I could immediately find.

OutlierDetectionLoadBalancerConfig had its status code changed from
INTERNAL to UNAVAILABLE because the value comes externally, and so isn't
a gRPC bug or such. I didn't change the xds policies in the same way
because it's murkier as the configuration for those is largely generated
within xds itself.

34957 of 39493 relevant lines covered (88.51%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.18
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
21
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.Attributes;
25
import io.grpc.CallOptions;
26
import io.grpc.Channel;
27
import io.grpc.ClientCall;
28
import io.grpc.ClientInterceptor;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
31
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
32
import io.grpc.ForwardingServerCallListener;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.ServerCall;
36
import io.grpc.ServerCallHandler;
37
import io.grpc.ServerInterceptor;
38
import io.grpc.ServerStreamTracer;
39
import io.grpc.internal.GrpcUtil;
40
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
41
import io.opentelemetry.api.OpenTelemetry;
42
import io.opentelemetry.api.common.AttributesBuilder;
43
import io.opentelemetry.api.trace.Span;
44
import io.opentelemetry.api.trace.StatusCode;
45
import io.opentelemetry.api.trace.Tracer;
46
import io.opentelemetry.context.Context;
47
import io.opentelemetry.context.Scope;
48
import io.opentelemetry.context.propagation.ContextPropagators;
49
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
50
import java.util.logging.Level;
51
import java.util.logging.Logger;
52
import javax.annotation.Nullable;
53

54
/**
55
 * Provides factories for {@link io.grpc.StreamTracer} that records tracing to OpenTelemetry.
56
 */
57
final class OpenTelemetryTracingModule {
58
  private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName());
1✔
59

60
  @VisibleForTesting
1✔
61
  final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");
1✔
62
  @Nullable
63
  private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
64
  @Nullable
65
  private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
66

67
  /*
68
   * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
69
   * reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
70
   * (potentially racy) direct updates of the volatile variables.
71
   */
72
  static {
73
    AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
74
    AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
75
    try {
76
      tmpCallEndedUpdater =
1✔
77
          AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
1✔
78
      tmpStreamClosedUpdater =
1✔
79
          AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
80
    } catch (Throwable t) {
×
81
      logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
82
      tmpCallEndedUpdater = null;
×
83
      tmpStreamClosedUpdater = null;
×
84
    }
1✔
85
    callEndedUpdater = tmpCallEndedUpdater;
1✔
86
    streamClosedUpdater = tmpStreamClosedUpdater;
1✔
87
  }
1✔
88

89
  private final Tracer otelTracer;
90
  private final ContextPropagators contextPropagators;
91
  private final MetadataGetter metadataGetter = MetadataGetter.getInstance();
1✔
92
  private final MetadataSetter metadataSetter = MetadataSetter.getInstance();
1✔
93
  private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
1✔
94
  private final ServerInterceptor serverSpanPropagationInterceptor =
1✔
95
      new TracingServerSpanPropagationInterceptor();
96
  private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
1✔
97

98
  OpenTelemetryTracingModule(OpenTelemetry openTelemetry) {
1✔
99
    this.otelTracer = checkNotNull(openTelemetry.getTracerProvider(), "tracerProvider")
1✔
100
        .tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
1✔
101
        .setInstrumentationVersion(IMPLEMENTATION_VERSION)
1✔
102
        .build();
1✔
103
    this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators");
1✔
104
  }
1✔
105

106
  @VisibleForTesting
107
  Tracer getTracer() {
108
    return otelTracer;
1✔
109
  }
110

111
  /**
112
   * Creates a {@link CallAttemptsTracerFactory} for a new call.
113
   */
114
  @VisibleForTesting
115
  CallAttemptsTracerFactory newClientCallTracer(Span clientSpan, MethodDescriptor<?, ?> method) {
116
    return new CallAttemptsTracerFactory(clientSpan, method);
1✔
117
  }
118

119
  /**
120
   * Returns the server tracer factory.
121
   */
122
  ServerStreamTracer.Factory getServerTracerFactory() {
123
    return serverTracerFactory;
1✔
124
  }
125

126
  /**
127
   * Returns the client interceptor that facilitates otel tracing reporting.
128
   */
129
  ClientInterceptor getClientInterceptor() {
130
    return clientInterceptor;
1✔
131
  }
132

133
  ServerInterceptor getServerSpanPropagationInterceptor() {
134
    return serverSpanPropagationInterceptor;
1✔
135
  }
136

137
  @VisibleForTesting
138
  final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
139
    volatile int callEnded;
140
    private final Span clientSpan;
141
    private final String fullMethodName;
142

143
    CallAttemptsTracerFactory(Span clientSpan, MethodDescriptor<?, ?> method) {
1✔
144
      checkNotNull(method, "method");
1✔
145
      this.fullMethodName = checkNotNull(method.getFullMethodName(), "fullMethodName");
1✔
146
      this.clientSpan = checkNotNull(clientSpan, "clientSpan");
1✔
147
    }
1✔
148

149
    @Override
150
    public ClientStreamTracer newClientStreamTracer(
151
        ClientStreamTracer.StreamInfo info, Metadata headers) {
152
      Span attemptSpan = otelTracer.spanBuilder(
1✔
153
              "Attempt." + fullMethodName.replace('/', '.'))
1✔
154
          .setParent(Context.current().with(clientSpan))
1✔
155
          .startSpan();
1✔
156
      attemptSpan.setAttribute(
1✔
157
          "previous-rpc-attempts", info.getPreviousAttempts());
1✔
158
      attemptSpan.setAttribute(
1✔
159
          "transparent-retry",info.isTransparentRetry());
1✔
160
      if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED) != null) {
1✔
161
        clientSpan.addEvent("Delayed name resolution complete");
1✔
162
      }
163
      return new ClientTracer(attemptSpan, clientSpan);
1✔
164
    }
165

166
    /**
167
     * Record a finished call and mark the current time as the end time.
168
     *
169
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
170
     * is a no-op.
171
     */
172
    void callEnded(io.grpc.Status status) {
173
      if (callEndedUpdater != null) {
1✔
174
        if (callEndedUpdater.getAndSet(this, 1) != 0) {
1✔
175
          return;
×
176
        }
177
      } else {
178
        if (callEnded != 0) {
×
179
          return;
×
180
        }
181
        callEnded = 1;
×
182
      }
183
      endSpanWithStatus(clientSpan, status);
1✔
184
    }
1✔
185
  }
186

187
  private final class ClientTracer extends ClientStreamTracer {
188
    private final Span span;
189
    private final Span parentSpan;
190
    volatile int seqNo;
191
    boolean isPendingStream;
192

193
    ClientTracer(Span span, Span parentSpan) {
1✔
194
      this.span = checkNotNull(span, "span");
1✔
195
      this.parentSpan = checkNotNull(parentSpan, "parent span");
1✔
196
    }
1✔
197

198
    @Override
199
    public void streamCreated(Attributes transportAtts, Metadata headers) {
200
      contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers,
1✔
201
          metadataSetter);
1✔
202
      if (isPendingStream) {
1✔
203
        span.addEvent("Delayed LB pick complete");
1✔
204
      }
205
    }
1✔
206

207
    @Override
208
    public void createPendingStream() {
209
      isPendingStream = true;
1✔
210
    }
1✔
211

212
    @Override
213
    public void outboundMessageSent(
214
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
215
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
216
    }
1✔
217

218
    @Override
219
    public void inboundMessageRead(
220
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
221
      if (optionalWireSize != optionalUncompressedSize) {
1✔
222
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
223
      }
224
    }
1✔
225

226
    @Override
227
    public void inboundMessage(int seqNo) {
228
      this.seqNo = seqNo;
1✔
229
    }
1✔
230

231
    @Override
232
    public void inboundUncompressedSize(long bytes) {
233
      recordInboundMessageSize(parentSpan, seqNo, bytes);
1✔
234
    }
1✔
235

236
    @Override
237
    public void streamClosed(io.grpc.Status status) {
238
      endSpanWithStatus(span, status);
1✔
239
    }
1✔
240
  }
241

242
  private final class ServerTracer extends ServerStreamTracer {
243
    private final Span span;
244
    volatile int streamClosed;
245
    private int seqNo;
246

247
    ServerTracer(String fullMethodName, @Nullable Span remoteSpan) {
1✔
248
      checkNotNull(fullMethodName, "fullMethodName");
1✔
249
      this.span =
1✔
250
          otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName))
1✔
251
              .setParent(remoteSpan == null ? null : Context.current().with(remoteSpan))
1✔
252
              .startSpan();
1✔
253
    }
1✔
254

255
    /**
256
     * Record a finished stream and mark the current time as the end time.
257
     *
258
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
259
     * is a no-op.
260
     */
261
    @Override
262
    public void streamClosed(io.grpc.Status status) {
263
      if (streamClosedUpdater != null) {
1✔
264
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
265
          return;
×
266
        }
267
      } else {
268
        if (streamClosed != 0) {
×
269
          return;
×
270
        }
271
        streamClosed = 1;
×
272
      }
273
      endSpanWithStatus(span, status);
1✔
274
    }
1✔
275

276
    @Override
277
    public io.grpc.Context filterContext(io.grpc.Context context) {
278
      return context.withValue(otelSpan, span);
1✔
279
    }
280

281
    @Override
282
    public void outboundMessageSent(
283
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
284
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
285
    }
1✔
286

287
    @Override
288
    public void inboundMessageRead(
289
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
290
      if (optionalWireSize != optionalUncompressedSize) {
1✔
291
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
292
      }
293
    }
1✔
294

295
    @Override
296
    public void inboundMessage(int seqNo) {
297
      this.seqNo = seqNo;
1✔
298
    }
1✔
299

300
    @Override
301
    public void inboundUncompressedSize(long bytes) {
302
      recordInboundMessageSize(span, seqNo, bytes);
1✔
303
    }
1✔
304
  }
305

306
  @VisibleForTesting
307
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
308
    @SuppressWarnings("ReferenceEquality")
309
    @Override
310
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
311
      Context context = contextPropagators.getTextMapPropagator().extract(
1✔
312
          Context.current(), headers, metadataGetter
1✔
313
      );
314
      Span remoteSpan = Span.fromContext(context);
1✔
315
      if (remoteSpan == Span.getInvalid()) {
1✔
316
        remoteSpan = null;
1✔
317
      }
318
      return new ServerTracer(fullMethodName, remoteSpan);
1✔
319
    }
320
  }
321

322
  @VisibleForTesting
323
  final class TracingServerSpanPropagationInterceptor implements ServerInterceptor {
1✔
324
    @Override
325
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
326
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
327
      Span span = otelSpan.get(io.grpc.Context.current());
1✔
328
      if (span == null) {
1✔
329
        logger.log(Level.FINE, "Server span not found. ServerTracerFactory for server "
1✔
330
            + "tracing must be set.");
331
        return next.startCall(call, headers);
1✔
332
      }
333
      Context serverCallContext = Context.current().with(span);
1✔
334
      try (Scope scope = serverCallContext.makeCurrent()) {
1✔
335
        return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
1✔
336
      }
337
    }
338
  }
339

340
  private static class ContextServerCallListener<ReqT> extends
341
      ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
342
    private final Context context;
343

344
    protected ContextServerCallListener(ServerCall.Listener<ReqT> delegate, Context context) {
345
      super(delegate);
1✔
346
      this.context = checkNotNull(context, "context");
1✔
347
    }
1✔
348

349
    @Override
350
    public void onMessage(ReqT message) {
351
      try (Scope scope = context.makeCurrent()) {
1✔
352
        delegate().onMessage(message);
1✔
353
      }
354
    }
1✔
355

356
    @Override
357
    public void onHalfClose() {
358
      try (Scope scope = context.makeCurrent()) {
1✔
359
        delegate().onHalfClose();
1✔
360
      }
361
    }
1✔
362

363
    @Override
364
    public void onCancel() {
365
      try (Scope scope = context.makeCurrent()) {
1✔
366
        delegate().onCancel();
1✔
367
      }
368
    }
1✔
369

370
    @Override
371
    public void onComplete() {
372
      try (Scope scope = context.makeCurrent()) {
1✔
373
        delegate().onComplete();
1✔
374
      }
375
    }
1✔
376

377
    @Override
378
    public void onReady() {
379
      try (Scope scope = context.makeCurrent()) {
1✔
380
        delegate().onReady();
1✔
381
      }
382
    }
1✔
383
  }
384

385
  @VisibleForTesting
386
  final class TracingClientInterceptor implements ClientInterceptor {
1✔
387

388
    @Override
389
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
390
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
391
      Span clientSpan = otelTracer.spanBuilder(
1✔
392
          generateTraceSpanName(false, method.getFullMethodName()))
1✔
393
          .startSpan();
1✔
394

395
      final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
1✔
396
      ClientCall<ReqT, RespT> call =
1✔
397
          next.newCall(
1✔
398
              method,
399
              callOptions.withStreamTracerFactory(tracerFactory));
1✔
400
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
401
        @Override
402
        public void start(Listener<RespT> responseListener, Metadata headers) {
403
          delegate().start(
1✔
404
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
405
                @Override
406
                public void onClose(io.grpc.Status status, Metadata trailers) {
407
                  tracerFactory.callEnded(status);
1✔
408
                  super.onClose(status, trailers);
1✔
409
                }
1✔
410
              },
411
              headers);
412
        }
1✔
413
      };
414
    }
415
  }
416

417
  // Attribute named "message-size" always means the message size the application sees.
418
  // If there was compression, additional event reports "message-size-compressed".
419
  //
420
  // An example trace with message compression:
421
  //
422
  // Sending:
423
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854,
424
  //                                               'message-size-compressed' = 5493) ----|
425
  //
426
  // Receiving:
427
  // |-- Event 'Inbound compressed message', attributes('sequence-numer' = 0,
428
  //                                                    'message-size-compressed' = 5493 ) ----|
429
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
430
  //                                                  'message-size' = 7854) ----|
431
  //
432
  // An example trace with no message compression:
433
  //
434
  // Sending:
435
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854) ---|
436
  //
437
  // Receiving:
438
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
439
  //                                                  'message-size' = 7854) ----|
440
  private void recordOutboundMessageSentEvent(Span span,
441
      int seqNo, long optionalWireSize, long optionalUncompressedSize) {
442
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
443
    attributesBuilder.put("sequence-number", seqNo);
1✔
444
    if (optionalUncompressedSize != -1) {
1✔
445
      attributesBuilder.put("message-size", optionalUncompressedSize);
1✔
446
    }
447
    if (optionalWireSize != -1 && optionalWireSize != optionalUncompressedSize) {
1✔
448
      attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
449
    }
450
    span.addEvent("Outbound message", attributesBuilder.build());
1✔
451
  }
1✔
452

453
  private void recordInboundCompressedMessage(Span span, int seqNo, long optionalWireSize) {
454
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
455
    attributesBuilder.put("sequence-number", seqNo);
1✔
456
    attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
457
    span.addEvent("Inbound compressed message", attributesBuilder.build());
1✔
458
  }
1✔
459

460
  private void recordInboundMessageSize(Span span, int seqNo, long bytes) {
461
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
462
    attributesBuilder.put("sequence-number", seqNo);
1✔
463
    attributesBuilder.put("message-size", bytes);
1✔
464
    span.addEvent("Inbound message", attributesBuilder.build());
1✔
465
  }
1✔
466

467
  private void endSpanWithStatus(Span span, io.grpc.Status status) {
468
    if (status.isOk()) {
1✔
469
      span.setStatus(StatusCode.OK);
1✔
470
    } else {
471
      span.setStatus(StatusCode.ERROR, GrpcUtil.statusToPrettyString(status));
1✔
472
    }
473
    span.end();
1✔
474
  }
1✔
475

476
  /**
477
   * Convert a full method name to a tracing span name.
478
   *
479
   * @param isServer {@code false} if the span is on the client-side, {@code true} if on the
480
   *                 server-side
481
   * @param fullMethodName the method name as returned by
482
   *        {@link MethodDescriptor#getFullMethodName}.
483
   */
484
  @VisibleForTesting
485
  static String generateTraceSpanName(boolean isServer, String fullMethodName) {
486
    String prefix = isServer ? "Recv" : "Sent";
1✔
487
    return prefix + "." + fullMethodName.replace('/', '.');
1✔
488
  }
489
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc