• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #19498

07 Oct 2024 05:44PM UTC coverage: 84.654% (-0.002%) from 84.656%
#19498

push

github

web-flow
report uncompressed message size when it does not need compression (#11598)

33771 of 39893 relevant lines covered (84.65%)

0.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.3
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java
1
/*
2
 * Copyright 2024 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
21
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
22

23
import com.google.common.annotations.VisibleForTesting;
24
import io.grpc.Attributes;
25
import io.grpc.CallOptions;
26
import io.grpc.Channel;
27
import io.grpc.ClientCall;
28
import io.grpc.ClientInterceptor;
29
import io.grpc.ClientStreamTracer;
30
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
31
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
32
import io.grpc.ForwardingServerCallListener;
33
import io.grpc.Metadata;
34
import io.grpc.MethodDescriptor;
35
import io.grpc.ServerCall;
36
import io.grpc.ServerCallHandler;
37
import io.grpc.ServerInterceptor;
38
import io.grpc.ServerStreamTracer;
39
import io.grpc.opentelemetry.internal.OpenTelemetryConstants;
40
import io.opentelemetry.api.OpenTelemetry;
41
import io.opentelemetry.api.common.AttributesBuilder;
42
import io.opentelemetry.api.trace.Span;
43
import io.opentelemetry.api.trace.StatusCode;
44
import io.opentelemetry.api.trace.Tracer;
45
import io.opentelemetry.context.Context;
46
import io.opentelemetry.context.Scope;
47
import io.opentelemetry.context.propagation.ContextPropagators;
48
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
49
import java.util.logging.Level;
50
import java.util.logging.Logger;
51
import javax.annotation.Nullable;
52

53
/**
54
 * Provides factories for {@link io.grpc.StreamTracer} that records tracing to OpenTelemetry.
55
 */
56
final class OpenTelemetryTracingModule {
57
  private static final Logger logger = Logger.getLogger(OpenTelemetryTracingModule.class.getName());
1✔
58

59
  @VisibleForTesting
1✔
60
  final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");
1✔
61
  @Nullable
62
  private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
63
  @Nullable
64
  private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
65

66
  /*
67
   * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their JDK
68
   * reflection API that triggers a NoSuchFieldException. When this occurs, we fallback to
69
   * (potentially racy) direct updates of the volatile variables.
70
   */
71
  static {
72
    AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> tmpCallEndedUpdater;
73
    AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
74
    try {
75
      tmpCallEndedUpdater =
1✔
76
          AtomicIntegerFieldUpdater.newUpdater(CallAttemptsTracerFactory.class, "callEnded");
1✔
77
      tmpStreamClosedUpdater =
1✔
78
          AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
79
    } catch (Throwable t) {
×
80
      logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
81
      tmpCallEndedUpdater = null;
×
82
      tmpStreamClosedUpdater = null;
×
83
    }
1✔
84
    callEndedUpdater = tmpCallEndedUpdater;
1✔
85
    streamClosedUpdater = tmpStreamClosedUpdater;
1✔
86
  }
1✔
87

88
  private final Tracer otelTracer;
89
  private final ContextPropagators contextPropagators;
90
  private final MetadataGetter metadataGetter = MetadataGetter.getInstance();
1✔
91
  private final MetadataSetter metadataSetter = MetadataSetter.getInstance();
1✔
92
  private final TracingClientInterceptor clientInterceptor = new TracingClientInterceptor();
1✔
93
  private final ServerInterceptor serverSpanPropagationInterceptor =
1✔
94
      new TracingServerSpanPropagationInterceptor();
95
  private final ServerTracerFactory serverTracerFactory = new ServerTracerFactory();
1✔
96

97
  OpenTelemetryTracingModule(OpenTelemetry openTelemetry) {
1✔
98
    this.otelTracer = checkNotNull(openTelemetry.getTracerProvider(), "tracerProvider")
1✔
99
        .tracerBuilder(OpenTelemetryConstants.INSTRUMENTATION_SCOPE)
1✔
100
        .setInstrumentationVersion(IMPLEMENTATION_VERSION)
1✔
101
        .build();
1✔
102
    this.contextPropagators = checkNotNull(openTelemetry.getPropagators(), "contextPropagators");
1✔
103
  }
1✔
104

105
  @VisibleForTesting
106
  Tracer getTracer() {
107
    return otelTracer;
1✔
108
  }
109

110
  /**
111
   * Creates a {@link CallAttemptsTracerFactory} for a new call.
112
   */
113
  @VisibleForTesting
114
  CallAttemptsTracerFactory newClientCallTracer(Span clientSpan, MethodDescriptor<?, ?> method) {
115
    return new CallAttemptsTracerFactory(clientSpan, method);
1✔
116
  }
117

118
  /**
119
   * Returns the server tracer factory.
120
   */
121
  ServerStreamTracer.Factory getServerTracerFactory() {
122
    return serverTracerFactory;
1✔
123
  }
124

125
  /**
126
   * Returns the client interceptor that facilitates otel tracing reporting.
127
   */
128
  ClientInterceptor getClientInterceptor() {
129
    return clientInterceptor;
1✔
130
  }
131

132
  ServerInterceptor getServerSpanPropagationInterceptor() {
133
    return serverSpanPropagationInterceptor;
1✔
134
  }
135

136
  @VisibleForTesting
137
  final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
138
    volatile int callEnded;
139
    private final Span clientSpan;
140
    private final String fullMethodName;
141

142
    CallAttemptsTracerFactory(Span clientSpan, MethodDescriptor<?, ?> method) {
1✔
143
      checkNotNull(method, "method");
1✔
144
      this.fullMethodName = checkNotNull(method.getFullMethodName(), "fullMethodName");
1✔
145
      this.clientSpan = checkNotNull(clientSpan, "clientSpan");
1✔
146
    }
1✔
147

148
    @Override
149
    public ClientStreamTracer newClientStreamTracer(
150
        ClientStreamTracer.StreamInfo info, Metadata headers) {
151
      Span attemptSpan = otelTracer.spanBuilder(
1✔
152
              "Attempt." + fullMethodName.replace('/', '.'))
1✔
153
          .setParent(Context.current().with(clientSpan))
1✔
154
          .startSpan();
1✔
155
      attemptSpan.setAttribute(
1✔
156
          "previous-rpc-attempts", info.getPreviousAttempts());
1✔
157
      attemptSpan.setAttribute(
1✔
158
          "transparent-retry",info.isTransparentRetry());
1✔
159
      if (info.getCallOptions().getOption(NAME_RESOLUTION_DELAYED) != null) {
1✔
160
        clientSpan.addEvent("Delayed name resolution complete");
1✔
161
      }
162
      return new ClientTracer(attemptSpan, clientSpan);
1✔
163
    }
164

165
    /**
166
     * Record a finished call and mark the current time as the end time.
167
     *
168
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
169
     * is a no-op.
170
     */
171
    void callEnded(io.grpc.Status status) {
172
      if (callEndedUpdater != null) {
1✔
173
        if (callEndedUpdater.getAndSet(this, 1) != 0) {
1✔
174
          return;
×
175
        }
176
      } else {
177
        if (callEnded != 0) {
×
178
          return;
×
179
        }
180
        callEnded = 1;
×
181
      }
182
      endSpanWithStatus(clientSpan, status);
1✔
183
    }
1✔
184
  }
185

186
  private final class ClientTracer extends ClientStreamTracer {
187
    private final Span span;
188
    private final Span parentSpan;
189
    volatile int seqNo;
190
    boolean isPendingStream;
191

192
    ClientTracer(Span span, Span parentSpan) {
1✔
193
      this.span = checkNotNull(span, "span");
1✔
194
      this.parentSpan = checkNotNull(parentSpan, "parent span");
1✔
195
    }
1✔
196

197
    @Override
198
    public void streamCreated(Attributes transportAtts, Metadata headers) {
199
      contextPropagators.getTextMapPropagator().inject(Context.current().with(span), headers,
1✔
200
          metadataSetter);
1✔
201
      if (isPendingStream) {
1✔
202
        span.addEvent("Delayed LB pick complete");
1✔
203
      }
204
    }
1✔
205

206
    @Override
207
    public void createPendingStream() {
208
      isPendingStream = true;
1✔
209
    }
1✔
210

211
    @Override
212
    public void outboundMessageSent(
213
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
214
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
215
    }
1✔
216

217
    @Override
218
    public void inboundMessageRead(
219
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
220
      if (optionalWireSize != optionalUncompressedSize) {
1✔
221
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
222
      }
223
    }
1✔
224

225
    @Override
226
    public void inboundMessage(int seqNo) {
227
      this.seqNo = seqNo;
1✔
228
    }
1✔
229

230
    @Override
231
    public void inboundUncompressedSize(long bytes) {
232
      recordInboundMessageSize(parentSpan, seqNo, bytes);
1✔
233
    }
1✔
234

235
    @Override
236
    public void streamClosed(io.grpc.Status status) {
237
      endSpanWithStatus(span, status);
1✔
238
    }
1✔
239
  }
240

241
  private final class ServerTracer extends ServerStreamTracer {
242
    private final Span span;
243
    volatile int streamClosed;
244
    private int seqNo;
245

246
    ServerTracer(String fullMethodName, @Nullable Span remoteSpan) {
1✔
247
      checkNotNull(fullMethodName, "fullMethodName");
1✔
248
      this.span =
1✔
249
          otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName))
1✔
250
              .setParent(remoteSpan == null ? null : Context.current().with(remoteSpan))
1✔
251
              .startSpan();
1✔
252
    }
1✔
253

254
    /**
255
     * Record a finished stream and mark the current time as the end time.
256
     *
257
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
258
     * is a no-op.
259
     */
260
    @Override
261
    public void streamClosed(io.grpc.Status status) {
262
      if (streamClosedUpdater != null) {
1✔
263
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
264
          return;
×
265
        }
266
      } else {
267
        if (streamClosed != 0) {
×
268
          return;
×
269
        }
270
        streamClosed = 1;
×
271
      }
272
      endSpanWithStatus(span, status);
1✔
273
    }
1✔
274

275
    @Override
276
    public io.grpc.Context filterContext(io.grpc.Context context) {
277
      return context.withValue(otelSpan, span);
1✔
278
    }
279

280
    @Override
281
    public void outboundMessageSent(
282
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
283
      recordOutboundMessageSentEvent(span, seqNo, optionalWireSize, optionalUncompressedSize);
1✔
284
    }
1✔
285

286
    @Override
287
    public void inboundMessageRead(
288
        int seqNo, long optionalWireSize, long optionalUncompressedSize) {
289
      if (optionalWireSize != optionalUncompressedSize) {
1✔
290
        recordInboundCompressedMessage(span, seqNo, optionalWireSize);
1✔
291
      }
292
    }
1✔
293

294
    @Override
295
    public void inboundMessage(int seqNo) {
296
      this.seqNo = seqNo;
1✔
297
    }
1✔
298

299
    @Override
300
    public void inboundUncompressedSize(long bytes) {
301
      recordInboundMessageSize(span, seqNo, bytes);
1✔
302
    }
1✔
303
  }
304

305
  @VisibleForTesting
306
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
307
    @SuppressWarnings("ReferenceEquality")
308
    @Override
309
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
310
      Context context = contextPropagators.getTextMapPropagator().extract(
1✔
311
          Context.current(), headers, metadataGetter
1✔
312
      );
313
      Span remoteSpan = Span.fromContext(context);
1✔
314
      if (remoteSpan == Span.getInvalid()) {
1✔
315
        remoteSpan = null;
1✔
316
      }
317
      return new ServerTracer(fullMethodName, remoteSpan);
1✔
318
    }
319
  }
320

321
  @VisibleForTesting
322
  final class TracingServerSpanPropagationInterceptor implements ServerInterceptor {
1✔
323
    @Override
324
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
325
        Metadata headers, ServerCallHandler<ReqT, RespT> next) {
326
      Span span = otelSpan.get(io.grpc.Context.current());
1✔
327
      if (span == null) {
1✔
328
        logger.log(Level.FINE, "Server span not found. ServerTracerFactory for server "
1✔
329
            + "tracing must be set.");
330
        return next.startCall(call, headers);
1✔
331
      }
332
      Context serverCallContext = Context.current().with(span);
1✔
333
      try (Scope scope = serverCallContext.makeCurrent()) {
1✔
334
        return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
1✔
335
      }
336
    }
337
  }
338

339
  private static class ContextServerCallListener<ReqT> extends
340
      ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT> {
341
    private final Context context;
342

343
    protected ContextServerCallListener(ServerCall.Listener<ReqT> delegate, Context context) {
344
      super(delegate);
1✔
345
      this.context = checkNotNull(context, "context");
1✔
346
    }
1✔
347

348
    @Override
349
    public void onMessage(ReqT message) {
350
      try (Scope scope = context.makeCurrent()) {
1✔
351
        delegate().onMessage(message);
1✔
352
      }
353
    }
1✔
354

355
    @Override
356
    public void onHalfClose() {
357
      try (Scope scope = context.makeCurrent()) {
1✔
358
        delegate().onHalfClose();
1✔
359
      }
360
    }
1✔
361

362
    @Override
363
    public void onCancel() {
364
      try (Scope scope = context.makeCurrent()) {
1✔
365
        delegate().onCancel();
1✔
366
      }
367
    }
1✔
368

369
    @Override
370
    public void onComplete() {
371
      try (Scope scope = context.makeCurrent()) {
1✔
372
        delegate().onComplete();
1✔
373
      }
374
    }
1✔
375

376
    @Override
377
    public void onReady() {
378
      try (Scope scope = context.makeCurrent()) {
1✔
379
        delegate().onReady();
1✔
380
      }
381
    }
1✔
382
  }
383

384
  @VisibleForTesting
385
  final class TracingClientInterceptor implements ClientInterceptor {
1✔
386

387
    @Override
388
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
389
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
390
      Span clientSpan = otelTracer.spanBuilder(
1✔
391
          generateTraceSpanName(false, method.getFullMethodName()))
1✔
392
          .startSpan();
1✔
393

394
      final CallAttemptsTracerFactory tracerFactory = newClientCallTracer(clientSpan, method);
1✔
395
      ClientCall<ReqT, RespT> call =
1✔
396
          next.newCall(
1✔
397
              method,
398
              callOptions.withStreamTracerFactory(tracerFactory));
1✔
399
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
400
        @Override
401
        public void start(Listener<RespT> responseListener, Metadata headers) {
402
          delegate().start(
1✔
403
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
404
                @Override
405
                public void onClose(io.grpc.Status status, Metadata trailers) {
406
                  tracerFactory.callEnded(status);
1✔
407
                  super.onClose(status, trailers);
1✔
408
                }
1✔
409
              },
410
              headers);
411
        }
1✔
412
      };
413
    }
414
  }
415

416
  // Attribute named "message-size" always means the message size the application sees.
417
  // If there was compression, additional event reports "message-size-compressed".
418
  //
419
  // An example trace with message compression:
420
  //
421
  // Sending:
422
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854,
423
  //                                               'message-size-compressed' = 5493) ----|
424
  //
425
  // Receiving:
426
  // |-- Event 'Inbound compressed message', attributes('sequence-numer' = 0,
427
  //                                                    'message-size-compressed' = 5493 ) ----|
428
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
429
  //                                                  'message-size' = 7854) ----|
430
  //
431
  // An example trace with no message compression:
432
  //
433
  // Sending:
434
  // |-- Event 'Outbound message sent', attributes('sequence-numer' = 0, 'message-size' = 7854) ---|
435
  //
436
  // Receiving:
437
  // |-- Event 'Inbound message received', attributes('sequence-numer' = 0,
438
  //                                                  'message-size' = 7854) ----|
439
  private void recordOutboundMessageSentEvent(Span span,
440
      int seqNo, long optionalWireSize, long optionalUncompressedSize) {
441
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
442
    attributesBuilder.put("sequence-number", seqNo);
1✔
443
    if (optionalUncompressedSize != -1) {
1✔
444
      attributesBuilder.put("message-size", optionalUncompressedSize);
1✔
445
    }
446
    if (optionalWireSize != -1 && optionalWireSize != optionalUncompressedSize) {
1✔
447
      attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
448
    }
449
    span.addEvent("Outbound message sent", attributesBuilder.build());
1✔
450
  }
1✔
451

452
  private void recordInboundCompressedMessage(Span span, int seqNo, long optionalWireSize) {
453
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
454
    attributesBuilder.put("sequence-number", seqNo);
1✔
455
    attributesBuilder.put("message-size-compressed", optionalWireSize);
1✔
456
    span.addEvent("Inbound compressed message", attributesBuilder.build());
1✔
457
  }
1✔
458

459
  private void recordInboundMessageSize(Span span, int seqNo, long bytes) {
460
    AttributesBuilder attributesBuilder = io.opentelemetry.api.common.Attributes.builder();
1✔
461
    attributesBuilder.put("sequence-number", seqNo);
1✔
462
    attributesBuilder.put("message-size", bytes);
1✔
463
    span.addEvent("Inbound message received", attributesBuilder.build());
1✔
464
  }
1✔
465

466
  private String generateErrorStatusDescription(io.grpc.Status status) {
467
    if (status.getDescription() != null) {
1✔
468
      return status.getCode() + ": " + status.getDescription();
1✔
469
    } else {
470
      return status.getCode().toString();
1✔
471
    }
472
  }
473

474
  private void endSpanWithStatus(Span span, io.grpc.Status status) {
475
    if (status.isOk()) {
1✔
476
      span.setStatus(StatusCode.OK);
1✔
477
    } else {
478
      span.setStatus(StatusCode.ERROR, generateErrorStatusDescription(status));
1✔
479
    }
480
    span.end();
1✔
481
  }
1✔
482

483
  /**
484
   * Convert a full method name to a tracing span name.
485
   *
486
   * @param isServer {@code false} if the span is on the client-side, {@code true} if on the
487
   *                 server-side
488
   * @param fullMethodName the method name as returned by
489
   *        {@link MethodDescriptor#getFullMethodName}.
490
   */
491
  @VisibleForTesting
492
  static String generateTraceSpanName(boolean isServer, String fullMethodName) {
493
    String prefix = isServer ? "Recv" : "Sent";
1✔
494
    return prefix + "." + fullMethodName.replace('/', '.');
1✔
495
  }
496
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc