• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20144

09 Jan 2026 02:16PM UTC coverage: 88.672% (+0.02%) from 88.656%
#20144

push

github

web-flow
opentelemetry: Add target attribute filter for metrics (#12587)

Introduce an optional Predicate<String> targetAttributeFilter to control how grpc.target is recorded in OpenTelemetry client metrics.

When a filter is provided, targets rejected by the predicate are normalized to "other" to reduce grpc.target metric cardinality, while accepted targets are recorded as-is. If no filter is set, existing behavior is preserved.

This change adds a new Builder API on GrpcOpenTelemetry to allow applications to configure the filter. Tests verify both the Builder
wiring and the target normalization behavior.

This is an optional API; annotation (e.g., experimental) can be added
per maintainer guidance.

Refs #12322
Related: gRFC A109 – Target Attribute Filter for OpenTelemetry Metrics
https://github.com/grpc/proposal/pull/528

35350 of 39866 relevant lines covered (88.67%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.97
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
24
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
25
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
26

27
import com.google.common.annotations.VisibleForTesting;
28
import com.google.common.base.Stopwatch;
29
import com.google.common.base.Supplier;
30
import com.google.common.collect.ImmutableList;
31
import com.google.common.collect.ImmutableSet;
32
import com.google.errorprone.annotations.concurrent.GuardedBy;
33
import io.grpc.CallOptions;
34
import io.grpc.Channel;
35
import io.grpc.ClientCall;
36
import io.grpc.ClientInterceptor;
37
import io.grpc.ClientStreamTracer;
38
import io.grpc.ClientStreamTracer.StreamInfo;
39
import io.grpc.Deadline;
40
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
41
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
42
import io.grpc.Metadata;
43
import io.grpc.MethodDescriptor;
44
import io.grpc.ServerStreamTracer;
45
import io.grpc.Status;
46
import io.grpc.Status.Code;
47
import io.grpc.StreamTracer;
48
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
49
import io.opentelemetry.api.baggage.Baggage;
50
import io.opentelemetry.api.common.AttributesBuilder;
51
import io.opentelemetry.context.Context;
52
import java.util.ArrayList;
53
import java.util.Collection;
54
import java.util.Collections;
55
import java.util.List;
56
import java.util.concurrent.TimeUnit;
57
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
58
import java.util.concurrent.atomic.AtomicLong;
59
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
60
import java.util.logging.Level;
61
import java.util.logging.Logger;
62
import javax.annotation.Nullable;
63

64
/**
65
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
66
 *
67
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
68
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
69
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
70
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
71
 *
72
 * <p>This module optionally applies a target attribute filter to limit the cardinality of
73
 * the {@code grpc.target} attribute in client-side metrics by mapping disallowed targets
74
 * to a stable placeholder value.
75
 *
76
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
77
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
78
 * it's the tracer that reports the summary to OpenTelemetry.
79
 */
80
final class OpenTelemetryMetricsModule {
81
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
82
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
83
      ImmutableSet.of(
1✔
84
          "grpc.client.attempt.started",
85
          "grpc.client.attempt.duration",
86
          "grpc.client.attempt.sent_total_compressed_message_size",
87
          "grpc.client.attempt.rcvd_total_compressed_message_size",
88
          "grpc.client.call.duration",
89
          "grpc.server.call.started",
90
          "grpc.server.call.duration",
91
          "grpc.server.call.sent_total_compressed_message_size",
92
          "grpc.server.call.rcvd_total_compressed_message_size");
93

94
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
95
  // fractional seconds.
96
  private static final double SECONDS_PER_NANO = 1e-9;
97

98
  private final OpenTelemetryMetricsResource resource;
99
  private final Supplier<Stopwatch> stopwatchSupplier;
100
  private final boolean localityEnabled;
101
  private final boolean backendServiceEnabled;
102
  private final ImmutableList<OpenTelemetryPlugin> plugins;
103
  @Nullable
104
  private final TargetFilter targetAttributeFilter;
105

106
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
107
                             OpenTelemetryMetricsResource resource,
108
                             Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
109
    this(stopwatchSupplier, resource, optionalLabels, plugins, null);
1✔
110
  }
1✔
111

112
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
113
      OpenTelemetryMetricsResource resource,
114
      Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
115
      @Nullable TargetFilter targetAttributeFilter) {
1✔
116
    this.resource = checkNotNull(resource, "resource");
1✔
117
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
118
    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
1✔
119
    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
1✔
120
    this.plugins = ImmutableList.copyOf(plugins);
1✔
121
    this.targetAttributeFilter = targetAttributeFilter;
1✔
122
  }
1✔
123

124
  @VisibleForTesting
125
  TargetFilter getTargetAttributeFilter() {
126
    return targetAttributeFilter;
1✔
127
  }
128

129
  /**
130
   * Returns the server tracer factory.
131
   */
132
  ServerStreamTracer.Factory getServerTracerFactory() {
133
    return new ServerTracerFactory();
1✔
134
  }
135

136
  /**
137
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
138
   */
139
  ClientInterceptor getClientInterceptor(String target) {
140
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
141
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
142
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
143
      if (plugin.enablePluginForChannel(target)) {
1✔
144
        pluginBuilder.add(plugin);
1✔
145
      }
146
    }
1✔
147
    String filteredTarget = recordTarget(target);
1✔
148
    return new MetricsClientInterceptor(filteredTarget, pluginBuilder.build());
1✔
149
  }
150

151
  String recordTarget(String target) {
152
    if (targetAttributeFilter == null || target == null) {
1✔
153
      return target;
1✔
154
    }
155
    return targetAttributeFilter.test(target) ? target : "other";
1✔
156
  }
157

158
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
159
    return isGeneratedMethod ? fullMethodName : "other";
1✔
160
  }
161

162
  private static Context otelContextWithBaggage() {
163
    Baggage baggage = BAGGAGE_KEY.get();
1✔
164
    if (baggage == null) {
1✔
165
      return Context.current();
1✔
166
    }
167
    return Context.current().with(baggage);
1✔
168
  }
169

170
  private static final class ClientTracer extends ClientStreamTracer {
171
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
172
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
173

174
    /*
175
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
176
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
177
     * (potentially racy) direct updates of the volatile variables.
178
     */
179
    static {
180
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
181
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
182
      try {
183
        tmpOutboundWireSizeUpdater =
1✔
184
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
185
        tmpInboundWireSizeUpdater =
1✔
186
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
187
      } catch (Throwable t) {
×
188
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
189
        tmpOutboundWireSizeUpdater = null;
×
190
        tmpInboundWireSizeUpdater = null;
×
191
      }
1✔
192
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
193
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
194
    }
1✔
195

196
    final Stopwatch stopwatch;
197
    final CallAttemptsTracerFactory attemptsState;
198
    final OpenTelemetryMetricsModule module;
199
    final StreamInfo info;
200
    final String target;
201
    final String fullMethodName;
202
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
203
    volatile long outboundWireSize;
204
    volatile long inboundWireSize;
205
    volatile String locality;
206
    volatile String backendService;
207
    long attemptNanos;
208
    Code statusCode;
209

210
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
211
        StreamInfo info, String target, String fullMethodName,
212
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
213
      this.attemptsState = attemptsState;
1✔
214
      this.module = module;
1✔
215
      this.info = info;
1✔
216
      this.target = target;
1✔
217
      this.fullMethodName = fullMethodName;
1✔
218
      this.streamPlugins = streamPlugins;
1✔
219
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
220
    }
1✔
221

222
    @Override
223
    public void inboundHeaders(Metadata headers) {
224
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
225
        plugin.inboundHeaders(headers);
1✔
226
      }
1✔
227
    }
1✔
228

229
    @Override
230
    @SuppressWarnings("NonAtomicVolatileUpdate")
231
    public void outboundWireSize(long bytes) {
232
      if (outboundWireSizeUpdater != null) {
1✔
233
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
234
      } else {
235
        outboundWireSize += bytes;
×
236
      }
237
    }
1✔
238

239
    @Override
240
    @SuppressWarnings("NonAtomicVolatileUpdate")
241
    public void inboundWireSize(long bytes) {
242
      if (inboundWireSizeUpdater != null) {
1✔
243
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
244
      } else {
245
        inboundWireSize += bytes;
×
246
      }
247
    }
1✔
248

249
    @Override
250
    public void addOptionalLabel(String key, String value) {
251
      if ("grpc.lb.locality".equals(key)) {
1✔
252
        locality = value;
1✔
253
      }
254
      if ("grpc.lb.backend_service".equals(key)) {
1✔
255
        backendService = value;
1✔
256
      }
257
    }
1✔
258

259
    @Override
260
    public void inboundTrailers(Metadata trailers) {
261
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
262
        plugin.inboundTrailers(trailers);
1✔
263
      }
1✔
264
    }
1✔
265

266
    @Override
267
    public void streamClosed(Status status) {
268
      stopwatch.stop();
1✔
269
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
270
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
271
      statusCode = status.getCode();
1✔
272
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
273
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
274
        // description. Since our timer may be delayed in firing, we double-check the deadline and
275
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
276
        if (deadline.isExpired()) {
1✔
277
          statusCode = Code.DEADLINE_EXCEEDED;
1✔
278
        }
279
      }
280
      attemptsState.attemptEnded();
1✔
281
      recordFinishedAttempt();
1✔
282
    }
1✔
283

284
    void recordFinishedAttempt() {
285
      Context otelContext = otelContextWithBaggage();
1✔
286
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
287
          .put(METHOD_KEY, fullMethodName)
1✔
288
          .put(TARGET_KEY, target)
1✔
289
          .put(STATUS_KEY, statusCode.toString());
1✔
290
      if (module.localityEnabled) {
1✔
291
        String savedLocality = locality;
1✔
292
        if (savedLocality == null) {
1✔
293
          savedLocality = "";
1✔
294
        }
295
        builder.put(LOCALITY_KEY, savedLocality);
1✔
296
      }
297
      if (module.backendServiceEnabled) {
1✔
298
        String savedBackendService = backendService;
1✔
299
        if (savedBackendService == null) {
1✔
300
          savedBackendService = "";
1✔
301
        }
302
        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
1✔
303
      }
304
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
305
        plugin.addLabels(builder);
1✔
306
      }
1✔
307
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
308

309
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
310
        module.resource.clientAttemptDurationCounter()
1✔
311
            .record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
1✔
312
      }
313
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
314
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
315
            .record(outboundWireSize, attribute, otelContext);
1✔
316
      }
317
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
318
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
319
            .record(inboundWireSize, attribute, otelContext);
1✔
320
      }
321
    }
1✔
322
  }
323

324
  @VisibleForTesting
325
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
326
    private final OpenTelemetryMetricsModule module;
327
    private final String target;
328
    private final Stopwatch attemptDelayStopwatch;
329
    private final Stopwatch callStopWatch;
330
    @GuardedBy("lock")
331
    private boolean callEnded;
332
    private final String fullMethodName;
333
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
334
    private Status status;
335
    private long retryDelayNanos;
336
    private long callLatencyNanos;
337
    private final Object lock = new Object();
1✔
338
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
339
    private final AtomicLong hedgedAttemptsPerCall = new AtomicLong();
1✔
340
    private final AtomicLong transparentRetriesPerCall = new AtomicLong();
1✔
341
    @GuardedBy("lock")
342
    private int activeStreams;
343
    @GuardedBy("lock")
344
    private boolean finishedCallToBeRecorded;
345

346
    CallAttemptsTracerFactory(
347
        OpenTelemetryMetricsModule module,
348
        String target,
349
        String fullMethodName,
350
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
1✔
351
      this.module = checkNotNull(module, "module");
1✔
352
      this.target = checkNotNull(target, "target");
1✔
353
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
354
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
355
      this.attemptDelayStopwatch = module.stopwatchSupplier.get();
1✔
356
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
357

358
      io.opentelemetry.api.common.Attributes attribute = io.opentelemetry.api.common.Attributes.of(
1✔
359
          METHOD_KEY, fullMethodName,
360
          TARGET_KEY, target);
361

362
      // Record here in case mewClientStreamTracer() would never be called.
363
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
364
        module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
365
      }
366
    }
1✔
367

368
    @Override
369
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
370
      synchronized (lock) {
1✔
371
        if (finishedCallToBeRecorded) {
1✔
372
          // This can be the case when the call is cancelled but a retry attempt is created.
373
          return new ClientStreamTracer() {};
×
374
        }
375
        if (++activeStreams == 1 && attemptDelayStopwatch.isRunning()) {
1✔
376
          attemptDelayStopwatch.stop();
1✔
377
          retryDelayNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
378
        }
379
      }
1✔
380
      // Skip recording for the first time, since it is already recorded in
381
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
382
      // attempt, as first attempt cannot be a transparent retry.
383
      if (attemptsPerCall.get() > 0) {
1✔
384
        io.opentelemetry.api.common.Attributes attribute =
1✔
385
            io.opentelemetry.api.common.Attributes.of(METHOD_KEY, fullMethodName,
1✔
386
                TARGET_KEY, target);
387
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
388
          module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
389
        }
390
      }
391
      if (info.isTransparentRetry()) {
1✔
392
        transparentRetriesPerCall.incrementAndGet();
1✔
393
      } else if (info.isHedging()) {
1✔
394
        hedgedAttemptsPerCall.incrementAndGet();
1✔
395
      } else {
396
        attemptsPerCall.incrementAndGet();
1✔
397
      }
398
      return newClientTracer(info);
1✔
399
    }
400

401
    private ClientTracer newClientTracer(StreamInfo info) {
402
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
403
      if (!callPlugins.isEmpty()) {
1✔
404
        streamPlugins = new ArrayList<>(callPlugins.size());
1✔
405
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
406
          streamPlugins.add(plugin.newClientStreamPlugin());
1✔
407
        }
1✔
408
        streamPlugins = Collections.unmodifiableList(streamPlugins);
1✔
409
      }
410
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
411
    }
412

413
    // Called whenever each attempt is ended.
414
    void attemptEnded() {
415
      boolean shouldRecordFinishedCall = false;
1✔
416
      synchronized (lock) {
1✔
417
        if (--activeStreams == 0) {
1✔
418
          attemptDelayStopwatch.start();
1✔
419
          if (callEnded && !finishedCallToBeRecorded) {
1✔
420
            shouldRecordFinishedCall = true;
×
421
            finishedCallToBeRecorded = true;
×
422
          }
423
        }
424
      }
1✔
425
      if (shouldRecordFinishedCall) {
1✔
426
        recordFinishedCall();
×
427
      }
428
    }
1✔
429

430
    void callEnded(Status status) {
431
      callStopWatch.stop();
1✔
432
      this.status = status;
1✔
433
      boolean shouldRecordFinishedCall = false;
1✔
434
      synchronized (lock) {
1✔
435
        if (callEnded) {
1✔
436
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
437
          return;
×
438
        }
439
        callEnded = true;
1✔
440
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
441
          shouldRecordFinishedCall = true;
1✔
442
          finishedCallToBeRecorded = true;
1✔
443
        }
444
      }
1✔
445
      if (shouldRecordFinishedCall) {
1✔
446
        recordFinishedCall();
1✔
447
      }
448
    }
1✔
449

450
    void recordFinishedCall() {
451
      Context otelContext = otelContextWithBaggage();
1✔
452
      if (attemptsPerCall.get() == 0) {
1✔
453
        ClientTracer tracer = newClientTracer(null);
1✔
454
        tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
455
        tracer.statusCode = status.getCode();
1✔
456
        tracer.recordFinishedAttempt();
1✔
457
      }
458
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
459

460
      // Base attributes
461
      io.opentelemetry.api.common.Attributes baseAttributes =
1✔
462
          io.opentelemetry.api.common.Attributes.of(
1✔
463
              METHOD_KEY, fullMethodName,
464
              TARGET_KEY, target
465
          );
466

467
      // Duration
468
      if (module.resource.clientCallDurationCounter() != null) {
1✔
469
        module.resource.clientCallDurationCounter().record(
1✔
470
            callLatencyNanos * SECONDS_PER_NANO,
471
            baseAttributes.toBuilder()
1✔
472
                .put(STATUS_KEY, status.getCode().toString())
1✔
473
                .build(),
1✔
474
            otelContext
475
        );
476
      }
477

478
      // Retry counts
479
      if (module.resource.clientCallRetriesCounter() != null) {
1✔
480
        long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
1✔
481
        if (retriesPerCall > 0) {
1✔
482
          module.resource.clientCallRetriesCounter()
1✔
483
              .record(retriesPerCall, baseAttributes, otelContext);
1✔
484
        }
485
      }
486

487
      // Hedge counts
488
      if (module.resource.clientCallHedgesCounter() != null) {
1✔
489
        long hedges = hedgedAttemptsPerCall.get();
1✔
490
        if (hedges > 0) {
1✔
491
          module.resource.clientCallHedgesCounter()
1✔
492
              .record(hedges, baseAttributes, otelContext);
1✔
493
        }
494
      }
495

496
      // Transparent Retry counts
497
      if (module.resource.clientCallTransparentRetriesCounter() != null) {
1✔
498
        long transparentRetries = transparentRetriesPerCall.get();
1✔
499
        if (transparentRetries > 0) {
1✔
500
          module.resource.clientCallTransparentRetriesCounter()
1✔
501
              .record(transparentRetries, baseAttributes, otelContext);
1✔
502
        }
503
      }
504

505
      // Retry delay
506
      if (module.resource.clientCallRetryDelayCounter() != null) {
1✔
507
        module.resource.clientCallRetryDelayCounter().record(
1✔
508
            retryDelayNanos * SECONDS_PER_NANO,
509
            baseAttributes,
510
            otelContext
511
        );
512
      }
513
    }
1✔
514
  }
515

516
  private static final class ServerTracer extends ServerStreamTracer {
517
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
518
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
519
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
520

521
    /*
522
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
523
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
524
     * (potentially racy) direct updates of the volatile variables.
525
     */
526
    static {
527
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
528
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
529
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
530
      try {
531
        tmpStreamClosedUpdater =
1✔
532
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
533
        tmpOutboundWireSizeUpdater =
1✔
534
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
535
        tmpInboundWireSizeUpdater =
1✔
536
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
537
      } catch (Throwable t) {
×
538
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
539
        tmpStreamClosedUpdater = null;
×
540
        tmpOutboundWireSizeUpdater = null;
×
541
        tmpInboundWireSizeUpdater = null;
×
542
      }
1✔
543
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
544
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
545
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
546
    }
1✔
547

548
    private final OpenTelemetryMetricsModule module;
549
    private final String fullMethodName;
550
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
551
    private volatile boolean isGeneratedMethod;
552
    private volatile int streamClosed;
553
    private final Stopwatch stopwatch;
554
    private volatile long outboundWireSize;
555
    private volatile long inboundWireSize;
556

557
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
558
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
559
      this.module = checkNotNull(module, "module");
1✔
560
      this.fullMethodName = fullMethodName;
1✔
561
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
562
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
563
    }
1✔
564

565
    @Override
566
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
567
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
568
      // which is true for all generated methods. Otherwise, programmatically
569
      // created methods result in high cardinality metrics.
570
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
571
      isGeneratedMethod = isSampledToLocalTracing;
1✔
572
      io.opentelemetry.api.common.Attributes attribute =
1✔
573
          io.opentelemetry.api.common.Attributes.of(
1✔
574
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
575

576
      if (module.resource.serverCallCountCounter() != null) {
1✔
577
        module.resource.serverCallCountCounter().add(1, attribute);
1✔
578
      }
579
    }
1✔
580

581
    @Override
582
    @SuppressWarnings("NonAtomicVolatileUpdate")
583
    public void outboundWireSize(long bytes) {
584
      if (outboundWireSizeUpdater != null) {
1✔
585
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
586
      } else {
587
        outboundWireSize += bytes;
×
588
      }
589
    }
1✔
590

591
    @Override
592
    @SuppressWarnings("NonAtomicVolatileUpdate")
593
    public void inboundWireSize(long bytes) {
594
      if (inboundWireSizeUpdater != null) {
1✔
595
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
596
      } else {
597
        inboundWireSize += bytes;
×
598
      }
599
    }
1✔
600

601
    /**
602
     * Record a finished stream and mark the current time as the end time.
603
     *
604
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
605
     * is a no-op.
606
     */
607
    @Override
608
    public void streamClosed(Status status) {
609
      Context otelContext = otelContextWithBaggage();
1✔
610
      if (streamClosedUpdater != null) {
1✔
611
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
612
          return;
×
613
        }
614
      } else {
615
        if (streamClosed != 0) {
×
616
          return;
×
617
        }
618
        streamClosed = 1;
×
619
      }
620
      stopwatch.stop();
1✔
621
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
622
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
623
          .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
1✔
624
          .put(STATUS_KEY, status.getCode().toString());
1✔
625
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
626
        plugin.addLabels(builder);
1✔
627
      }
1✔
628
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
629

630
      if (module.resource.serverCallDurationCounter() != null) {
1✔
631
        module.resource.serverCallDurationCounter()
1✔
632
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
1✔
633
      }
634
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
635
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
636
            .record(outboundWireSize, attributes, otelContext);
1✔
637
      }
638
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
639
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
640
            .record(inboundWireSize, attributes, otelContext);
1✔
641
      }
642
    }
1✔
643
  }
644

645
  @VisibleForTesting
646
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
647
    @Override
648
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
649
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
650
      if (plugins.isEmpty()) {
1✔
651
        streamPlugins = Collections.emptyList();
1✔
652
      } else {
653
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
1✔
654
            new ArrayList<>(plugins.size());
1✔
655
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
656
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
1✔
657
        }
1✔
658
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
1✔
659
      }
660
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
1✔
661
    }
662
  }
663

664
  @VisibleForTesting
665
  final class MetricsClientInterceptor implements ClientInterceptor {
666
    private final String target;
667
    private final ImmutableList<OpenTelemetryPlugin> plugins;
668

669
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
670
      this.target = checkNotNull(target, "target");
1✔
671
      this.plugins = checkNotNull(plugins, "plugins");
1✔
672
    }
1✔
673

674
    @Override
675
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
676
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
677
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
678
      if (plugins.isEmpty()) {
1✔
679
        callPlugins = Collections.emptyList();
1✔
680
      } else {
681
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
1✔
682
            new ArrayList<>(plugins.size());
1✔
683
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
684
          callPluginsMutable.add(plugin.newClientCallPlugin());
1✔
685
        }
1✔
686
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
1✔
687
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
688
          callOptions = plugin.filterCallOptions(callOptions);
1✔
689
        }
1✔
690
      }
691
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
692
      // which is true for all generated methods. Otherwise, programatically
693
      // created methods result in high cardinality metrics.
694
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
695
          OpenTelemetryMetricsModule.this, target,
696
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
697
          callPlugins);
698
      ClientCall<ReqT, RespT> call =
1✔
699
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
700
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
701
        @Override
702
        public void start(Listener<RespT> responseListener, Metadata headers) {
703
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
704
            plugin.addMetadata(headers);
1✔
705
          }
1✔
706
          delegate().start(
1✔
707
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
708
                @Override
709
                public void onClose(Status status, Metadata trailers) {
710
                  tracerFactory.callEnded(status);
1✔
711
                  super.onClose(status, trailers);
1✔
712
                }
1✔
713
              },
714
              headers);
715
        }
1✔
716
      };
717
    }
718
  }
719
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc