• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20231

31 Mar 2026 10:03AM UTC coverage: 88.755% (+0.02%) from 88.734%
#20231

push

github

web-flow
openTelemetry: fix baggage prop (#12665)

35706 of 40230 relevant lines covered (88.75%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.41
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.CUSTOM_LABEL_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
24
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
25
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
26
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Stopwatch;
30
import com.google.common.base.Supplier;
31
import com.google.common.collect.ImmutableList;
32
import com.google.common.collect.ImmutableSet;
33
import com.google.errorprone.annotations.concurrent.GuardedBy;
34
import io.grpc.CallOptions;
35
import io.grpc.Channel;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientInterceptor;
38
import io.grpc.ClientStreamTracer;
39
import io.grpc.ClientStreamTracer.StreamInfo;
40
import io.grpc.Deadline;
41
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
42
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
43
import io.grpc.Grpc;
44
import io.grpc.Metadata;
45
import io.grpc.MethodDescriptor;
46
import io.grpc.ServerStreamTracer;
47
import io.grpc.Status;
48
import io.grpc.Status.Code;
49
import io.grpc.StreamTracer;
50
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
51
import io.opentelemetry.api.baggage.Baggage;
52
import io.opentelemetry.api.common.AttributesBuilder;
53
import io.opentelemetry.context.Context;
54
import java.util.ArrayList;
55
import java.util.Collection;
56
import java.util.Collections;
57
import java.util.List;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
60
import java.util.concurrent.atomic.AtomicLong;
61
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
62
import java.util.logging.Level;
63
import java.util.logging.Logger;
64
import javax.annotation.Nullable;
65

66
/**
67
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
68
 *
69
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
70
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
71
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
72
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
73
 *
74
 * <p>This module optionally applies a target attribute filter to limit the cardinality of
75
 * the {@code grpc.target} attribute in client-side metrics by mapping disallowed targets
76
 * to a stable placeholder value.
77
 *
78
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
79
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
80
 * it's the tracer that reports the summary to OpenTelemetry.
81
 */
82
final class OpenTelemetryMetricsModule {
83
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
84
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
85
      ImmutableSet.of(
1✔
86
          "grpc.client.attempt.started",
87
          "grpc.client.attempt.duration",
88
          "grpc.client.attempt.sent_total_compressed_message_size",
89
          "grpc.client.attempt.rcvd_total_compressed_message_size",
90
          "grpc.client.call.duration",
91
          "grpc.server.call.started",
92
          "grpc.server.call.duration",
93
          "grpc.server.call.sent_total_compressed_message_size",
94
          "grpc.server.call.rcvd_total_compressed_message_size");
95

96
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
97
  // fractional seconds.
98
  private static final double SECONDS_PER_NANO = 1e-9;
99

100
  private final OpenTelemetryMetricsResource resource;
101
  private final Supplier<Stopwatch> stopwatchSupplier;
102
  private final boolean localityEnabled;
103
  private final boolean backendServiceEnabled;
104
  private final boolean customLabelEnabled;
105
  private final ImmutableList<OpenTelemetryPlugin> plugins;
106
  @Nullable
107
  private final TargetFilter targetAttributeFilter;
108

109
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
110
                             OpenTelemetryMetricsResource resource,
111
                             Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
112
    this(stopwatchSupplier, resource, optionalLabels, plugins, null);
1✔
113
  }
1✔
114

115
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
116
      OpenTelemetryMetricsResource resource,
117
      Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
118
      @Nullable TargetFilter targetAttributeFilter) {
1✔
119
    this.resource = checkNotNull(resource, "resource");
1✔
120
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
121
    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
1✔
122
    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
1✔
123
    this.customLabelEnabled = optionalLabels.contains(CUSTOM_LABEL_KEY.getKey());
1✔
124
    this.plugins = ImmutableList.copyOf(plugins);
1✔
125
    this.targetAttributeFilter = targetAttributeFilter;
1✔
126
  }
1✔
127

128
  @VisibleForTesting
129
  TargetFilter getTargetAttributeFilter() {
130
    return targetAttributeFilter;
1✔
131
  }
132

133
  /**
134
   * Returns the server tracer factory.
135
   */
136
  ServerStreamTracer.Factory getServerTracerFactory() {
137
    return new ServerTracerFactory();
1✔
138
  }
139

140
  /**
141
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
142
   */
143
  ClientInterceptor getClientInterceptor(String target) {
144
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
145
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
146
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
147
      if (plugin.enablePluginForChannel(target)) {
1✔
148
        pluginBuilder.add(plugin);
1✔
149
      }
150
    }
1✔
151
    String filteredTarget = recordTarget(target);
1✔
152
    return new MetricsClientInterceptor(filteredTarget, pluginBuilder.build());
1✔
153
  }
154

155
  String recordTarget(String target) {
156
    if (targetAttributeFilter == null || target == null) {
1✔
157
      return target;
1✔
158
    }
159
    return targetAttributeFilter.test(target) ? target : "other";
1✔
160
  }
161

162
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
163
    return isGeneratedMethod ? fullMethodName : "other";
1✔
164
  }
165

166
  private static final class ClientTracer extends ClientStreamTracer {
167
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
168
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
169

170
    /*
171
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
172
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
173
     * (potentially racy) direct updates of the volatile variables.
174
     */
175
    static {
176
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
177
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
178
      try {
179
        tmpOutboundWireSizeUpdater =
1✔
180
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
181
        tmpInboundWireSizeUpdater =
1✔
182
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
183
      } catch (Throwable t) {
×
184
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
185
        tmpOutboundWireSizeUpdater = null;
×
186
        tmpInboundWireSizeUpdater = null;
×
187
      }
1✔
188
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
189
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
190
    }
1✔
191

192
    final Stopwatch stopwatch;
193
    final CallAttemptsTracerFactory attemptsState;
194
    final OpenTelemetryMetricsModule module;
195
    final StreamInfo info;
196
    final String target;
197
    final String fullMethodName;
198
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
199
    volatile long outboundWireSize;
200
    volatile long inboundWireSize;
201
    volatile String locality;
202
    volatile String backendService;
203
    long attemptNanos;
204
    Code statusCode;
205

206
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
207
        StreamInfo info, String target, String fullMethodName,
208
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
209
      this.attemptsState = attemptsState;
1✔
210
      this.module = module;
1✔
211
      this.info = info;
1✔
212
      this.target = target;
1✔
213
      this.fullMethodName = fullMethodName;
1✔
214
      this.streamPlugins = streamPlugins;
1✔
215
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
216
    }
1✔
217

218
    @Override
219
    public void inboundHeaders(Metadata headers) {
220
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
221
        plugin.inboundHeaders(headers);
1✔
222
      }
1✔
223
    }
1✔
224

225
    @Override
226
    @SuppressWarnings("NonAtomicVolatileUpdate")
227
    public void outboundWireSize(long bytes) {
228
      if (outboundWireSizeUpdater != null) {
1✔
229
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
230
      } else {
231
        outboundWireSize += bytes;
×
232
      }
233
    }
1✔
234

235
    @Override
236
    @SuppressWarnings("NonAtomicVolatileUpdate")
237
    public void inboundWireSize(long bytes) {
238
      if (inboundWireSizeUpdater != null) {
1✔
239
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
240
      } else {
241
        inboundWireSize += bytes;
×
242
      }
243
    }
1✔
244

245
    @Override
246
    public void addOptionalLabel(String key, String value) {
247
      if ("grpc.lb.locality".equals(key)) {
1✔
248
        locality = value;
1✔
249
      }
250
      if ("grpc.lb.backend_service".equals(key)) {
1✔
251
        backendService = value;
1✔
252
      }
253
    }
1✔
254

255
    @Override
256
    public void inboundTrailers(Metadata trailers) {
257
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
258
        plugin.inboundTrailers(trailers);
1✔
259
      }
1✔
260
    }
1✔
261

262
    @Override
263
    public void streamClosed(Status status) {
264
      stopwatch.stop();
1✔
265
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
266
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
267
      statusCode = status.getCode();
1✔
268
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
269
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
270
        // description. Since our timer may be delayed in firing, we double-check the deadline and
271
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
272
        if (deadline.isExpired()) {
1✔
273
          statusCode = Code.DEADLINE_EXCEEDED;
1✔
274
        }
275
      }
276
      attemptsState.attemptEnded(info.getCallOptions());
1✔
277
      recordFinishedAttempt();
1✔
278
    }
1✔
279

280
    void recordFinishedAttempt() {
281
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
282
          .put(METHOD_KEY, fullMethodName)
1✔
283
          .put(TARGET_KEY, target)
1✔
284
          .put(STATUS_KEY, statusCode.toString());
1✔
285
      if (module.localityEnabled) {
1✔
286
        String savedLocality = locality;
1✔
287
        if (savedLocality == null) {
1✔
288
          savedLocality = "";
1✔
289
        }
290
        builder.put(LOCALITY_KEY, savedLocality);
1✔
291
      }
292
      if (module.backendServiceEnabled) {
1✔
293
        String savedBackendService = backendService;
1✔
294
        if (savedBackendService == null) {
1✔
295
          savedBackendService = "";
1✔
296
        }
297
        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
1✔
298
      }
299
      if (module.customLabelEnabled) {
1✔
300
        builder.put(
1✔
301
            CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
302
      }
303
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
304
        plugin.addLabels(builder);
1✔
305
      }
1✔
306
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
307

308
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
309
        module.resource.clientAttemptDurationCounter()
1✔
310
            .record(attemptNanos * SECONDS_PER_NANO, attribute, attemptsState.otelContext);
1✔
311
      }
312
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
313
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
314
            .record(outboundWireSize, attribute, attemptsState.otelContext);
1✔
315
      }
316
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
317
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
318
            .record(inboundWireSize, attribute, attemptsState.otelContext);
1✔
319
      }
320
    }
1✔
321
  }
322

323
  @VisibleForTesting
324
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
325
    private final OpenTelemetryMetricsModule module;
326
    private final String target;
327
    private final Stopwatch attemptDelayStopwatch;
328
    private final Stopwatch callStopWatch;
329
    @GuardedBy("lock")
330
    private boolean callEnded;
331
    private final String fullMethodName;
332
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
333
    private final Context otelContext;
334
    private Status status;
335
    private long retryDelayNanos;
336
    private long callLatencyNanos;
337
    private final Object lock = new Object();
1✔
338
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
339
    private final AtomicLong hedgedAttemptsPerCall = new AtomicLong();
1✔
340
    private final AtomicLong transparentRetriesPerCall = new AtomicLong();
1✔
341
    @GuardedBy("lock")
342
    private int activeStreams;
343
    @GuardedBy("lock")
344
    private boolean finishedCallToBeRecorded;
345

346
    CallAttemptsTracerFactory(
347
        OpenTelemetryMetricsModule module,
348
        String target,
349
        CallOptions callOptions,
350
        String fullMethodName,
351
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins, Context otelContext) {
1✔
352
      this.module = checkNotNull(module, "module");
1✔
353
      this.target = checkNotNull(target, "target");
1✔
354
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
355
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
356
      this.otelContext = checkNotNull(otelContext, "otelContext");
1✔
357
      this.attemptDelayStopwatch = module.stopwatchSupplier.get();
1✔
358
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
359

360
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
361
          .put(METHOD_KEY, fullMethodName)
1✔
362
          .put(TARGET_KEY, target);
1✔
363
      if (module.customLabelEnabled) {
1✔
364
        builder.put(
1✔
365
            CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
366
      }
367
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
368

369
      // Record here in case mewClientStreamTracer() would never be called.
370
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
371
        module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
1✔
372
      }
373
    }
1✔
374

375
    @Override
376
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
377
      synchronized (lock) {
1✔
378
        if (finishedCallToBeRecorded) {
1✔
379
          // This can be the case when the call is cancelled but a retry attempt is created.
380
          return new ClientStreamTracer() {};
×
381
        }
382
        if (++activeStreams == 1 && attemptDelayStopwatch.isRunning()) {
1✔
383
          attemptDelayStopwatch.stop();
1✔
384
          retryDelayNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
385
        }
386
      }
1✔
387
      // Skip recording for the first time, since it is already recorded in
388
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
389
      // attempt, as first attempt cannot be a transparent retry.
390
      if (attemptsPerCall.get() > 0) {
1✔
391
        AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
392
            .put(METHOD_KEY, fullMethodName)
1✔
393
            .put(TARGET_KEY, target);
1✔
394
        if (module.customLabelEnabled) {
1✔
395
          builder.put(
1✔
396
              CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
397
        }
398
        io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
399
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
400
          module.resource.clientAttemptCountCounter().add(1, attribute, otelContext);
1✔
401
        }
402
      }
403
      if (info.isTransparentRetry()) {
1✔
404
        transparentRetriesPerCall.incrementAndGet();
1✔
405
      } else if (info.isHedging()) {
1✔
406
        hedgedAttemptsPerCall.incrementAndGet();
1✔
407
      } else {
408
        attemptsPerCall.incrementAndGet();
1✔
409
      }
410
      return newClientTracer(info);
1✔
411
    }
412

413
    private ClientTracer newClientTracer(StreamInfo info) {
414
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
415
      if (!callPlugins.isEmpty()) {
1✔
416
        streamPlugins = new ArrayList<>(callPlugins.size());
1✔
417
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
418
          streamPlugins.add(plugin.newClientStreamPlugin());
1✔
419
        }
1✔
420
        streamPlugins = Collections.unmodifiableList(streamPlugins);
1✔
421
      }
422
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
423
    }
424

425
    // Called whenever each attempt is ended.
426
    void attemptEnded(CallOptions callOptions) {
427
      boolean shouldRecordFinishedCall = false;
1✔
428
      synchronized (lock) {
1✔
429
        if (--activeStreams == 0) {
1✔
430
          attemptDelayStopwatch.start();
1✔
431
          if (callEnded && !finishedCallToBeRecorded) {
1✔
432
            shouldRecordFinishedCall = true;
×
433
            finishedCallToBeRecorded = true;
×
434
          }
435
        }
436
      }
1✔
437
      if (shouldRecordFinishedCall) {
1✔
438
        recordFinishedCall(callOptions);
×
439
      }
440
    }
1✔
441

442
    void callEnded(Status status, CallOptions callOptions) {
443
      callStopWatch.stop();
1✔
444
      this.status = status;
1✔
445
      boolean shouldRecordFinishedCall = false;
1✔
446
      synchronized (lock) {
1✔
447
        if (callEnded) {
1✔
448
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
449
          return;
×
450
        }
451
        callEnded = true;
1✔
452
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
453
          shouldRecordFinishedCall = true;
1✔
454
          finishedCallToBeRecorded = true;
1✔
455
        }
456
      }
1✔
457
      if (shouldRecordFinishedCall) {
1✔
458
        recordFinishedCall(callOptions);
1✔
459
      }
460
    }
1✔
461

462
    void recordFinishedCall(CallOptions callOptions) {
463
      if (attemptsPerCall.get() == 0) {
1✔
464
        ClientTracer tracer = newClientTracer(null);
1✔
465
        tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
466
        tracer.statusCode = status.getCode();
1✔
467
        tracer.recordFinishedAttempt();
1✔
468
      }
469
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
470

471
      // Base attributes
472
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
473
          .put(METHOD_KEY, fullMethodName)
1✔
474
          .put(TARGET_KEY, target);
1✔
475
      if (module.customLabelEnabled) {
1✔
476
        builder.put(CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
477
      }
478
      io.opentelemetry.api.common.Attributes baseAttributes = builder.build();
1✔
479

480
      // Duration
481
      if (module.resource.clientCallDurationCounter() != null) {
1✔
482
        module.resource.clientCallDurationCounter().record(
1✔
483
            callLatencyNanos * SECONDS_PER_NANO,
484
            baseAttributes.toBuilder()
1✔
485
                .put(STATUS_KEY, status.getCode().toString())
1✔
486
                .build(),
1✔
487
            otelContext
488
        );
489
      }
490

491
      // Retry counts
492
      if (module.resource.clientCallRetriesCounter() != null) {
1✔
493
        long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
1✔
494
        if (retriesPerCall > 0) {
1✔
495
          module.resource.clientCallRetriesCounter()
1✔
496
              .record(retriesPerCall, baseAttributes, otelContext);
1✔
497
        }
498
      }
499

500
      // Hedge counts
501
      if (module.resource.clientCallHedgesCounter() != null) {
1✔
502
        long hedges = hedgedAttemptsPerCall.get();
1✔
503
        if (hedges > 0) {
1✔
504
          module.resource.clientCallHedgesCounter()
1✔
505
              .record(hedges, baseAttributes, otelContext);
1✔
506
        }
507
      }
508

509
      // Transparent Retry counts
510
      if (module.resource.clientCallTransparentRetriesCounter() != null) {
1✔
511
        long transparentRetries = transparentRetriesPerCall.get();
1✔
512
        if (transparentRetries > 0) {
1✔
513
          module.resource.clientCallTransparentRetriesCounter()
1✔
514
              .record(transparentRetries, baseAttributes, otelContext);
1✔
515
        }
516
      }
517

518
      // Retry delay
519
      if (module.resource.clientCallRetryDelayCounter() != null) {
1✔
520
        module.resource.clientCallRetryDelayCounter().record(
1✔
521
            retryDelayNanos * SECONDS_PER_NANO,
522
            baseAttributes,
523
            otelContext
524
        );
525
      }
526
    }
1✔
527
  }
528

529
  private static final class ServerTracer extends ServerStreamTracer {
530
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
531
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
532
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
533

534
    /*
535
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
536
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
537
     * (potentially racy) direct updates of the volatile variables.
538
     */
539
    static {
540
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
541
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
542
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
543
      try {
544
        tmpStreamClosedUpdater =
1✔
545
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
546
        tmpOutboundWireSizeUpdater =
1✔
547
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
548
        tmpInboundWireSizeUpdater =
1✔
549
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
550
      } catch (Throwable t) {
×
551
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
552
        tmpStreamClosedUpdater = null;
×
553
        tmpOutboundWireSizeUpdater = null;
×
554
        tmpInboundWireSizeUpdater = null;
×
555
      }
1✔
556
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
557
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
558
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
559
    }
1✔
560

561
    private final OpenTelemetryMetricsModule module;
562
    private final String fullMethodName;
563
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
564
    private Context otelContext = Context.root();
1✔
565
    private volatile boolean isGeneratedMethod;
566
    private volatile int streamClosed;
567
    private final Stopwatch stopwatch;
568
    private volatile long outboundWireSize;
569
    private volatile long inboundWireSize;
570

571
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
572
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
573
      this.module = checkNotNull(module, "module");
1✔
574
      this.fullMethodName = fullMethodName;
1✔
575
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
576
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
577
    }
1✔
578

579
    @Override
580
    public io.grpc.Context filterContext(io.grpc.Context context) {
581
      Baggage baggage = BAGGAGE_KEY.get(context);
1✔
582
      if (baggage != null) {
1✔
583
        otelContext = Context.current().with(baggage);
1✔
584
      } else {
585
        otelContext = Context.current();
1✔
586
      }
587
      return context;
1✔
588
    }
589

590
    @Override
591
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
592
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
593
      // which is true for all generated methods. Otherwise, programmatically
594
      // created methods result in high cardinality metrics.
595
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
596
      isGeneratedMethod = isSampledToLocalTracing;
1✔
597

598
      io.opentelemetry.api.common.Attributes attribute =
1✔
599
          io.opentelemetry.api.common.Attributes.of(
1✔
600
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
601

602
      if (module.resource.serverCallCountCounter() != null) {
1✔
603
        module.resource.serverCallCountCounter().add(1, attribute, otelContext);
1✔
604
      }
605
    }
1✔
606

607
    @Override
608
    @SuppressWarnings("NonAtomicVolatileUpdate")
609
    public void outboundWireSize(long bytes) {
610
      if (outboundWireSizeUpdater != null) {
1✔
611
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
612
      } else {
613
        outboundWireSize += bytes;
×
614
      }
615
    }
1✔
616

617
    @Override
618
    @SuppressWarnings("NonAtomicVolatileUpdate")
619
    public void inboundWireSize(long bytes) {
620
      if (inboundWireSizeUpdater != null) {
1✔
621
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
622
      } else {
623
        inboundWireSize += bytes;
×
624
      }
625
    }
1✔
626

627
    /**
628
     * Record a finished stream and mark the current time as the end time.
629
     *
630
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
631
     * is a no-op.
632
     */
633
    @Override
634
    public void streamClosed(Status status) {
635
      if (streamClosedUpdater != null) {
1✔
636
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
637
          return;
×
638
        }
639
      } else {
640
        if (streamClosed != 0) {
×
641
          return;
×
642
        }
643
        streamClosed = 1;
×
644
      }
645
      stopwatch.stop();
1✔
646
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
647
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
648
          .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
1✔
649
          .put(STATUS_KEY, status.getCode().toString());
1✔
650
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
651
        plugin.addLabels(builder);
1✔
652
      }
1✔
653
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
654

655
      if (module.resource.serverCallDurationCounter() != null) {
1✔
656
        module.resource.serverCallDurationCounter()
1✔
657
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
1✔
658
      }
659
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
660
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
661
            .record(outboundWireSize, attributes, otelContext);
1✔
662
      }
663
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
664
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
665
            .record(inboundWireSize, attributes, otelContext);
1✔
666
      }
667
    }
1✔
668
  }
669

670
  @VisibleForTesting
671
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
672
    @Override
673
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
674
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
675
      if (plugins.isEmpty()) {
1✔
676
        streamPlugins = Collections.emptyList();
1✔
677
      } else {
678
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
1✔
679
            new ArrayList<>(plugins.size());
1✔
680
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
681
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
1✔
682
        }
1✔
683
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
1✔
684
      }
685
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName,
1✔
686
          streamPlugins);
687
    }
688
  }
689

690
  @VisibleForTesting
691
  final class MetricsClientInterceptor implements ClientInterceptor {
692
    private final String target;
693
    private final ImmutableList<OpenTelemetryPlugin> plugins;
694

695
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
696
      this.target = checkNotNull(target, "target");
1✔
697
      this.plugins = checkNotNull(plugins, "plugins");
1✔
698
    }
1✔
699

700
    @Override
701
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
702
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
703
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
704
      if (plugins.isEmpty()) {
1✔
705
        callPlugins = Collections.emptyList();
1✔
706
      } else {
707
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
1✔
708
            new ArrayList<>(plugins.size());
1✔
709
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
710
          callPluginsMutable.add(plugin.newClientCallPlugin());
1✔
711
        }
1✔
712
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
1✔
713
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
714
          callOptions = plugin.filterCallOptions(callOptions);
1✔
715
        }
1✔
716
      }
717
      final CallOptions finalCallOptions = callOptions;
1✔
718
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
719
      // which is true for all generated methods. Otherwise, programatically
720
      // created methods result in high cardinality metrics.
721
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
722
          OpenTelemetryMetricsModule.this, target, callOptions,
723
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
724
          callPlugins, Context.current());
1✔
725
      ClientCall<ReqT, RespT> call =
1✔
726
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
727
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
728
        @Override
729
        public void start(Listener<RespT> responseListener, Metadata headers) {
730
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
731
            plugin.addMetadata(headers);
1✔
732
          }
1✔
733
          delegate().start(
1✔
734
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
735
                @Override
736
                public void onClose(Status status, Metadata trailers) {
737
                  tracerFactory.callEnded(status, finalCallOptions);
1✔
738
                  super.onClose(status, trailers);
1✔
739
                }
1✔
740
              },
741
              headers);
742
        }
1✔
743
      };
744
    }
745
  }
746
}
747

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc