• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

grpc / grpc-java / #20227

27 Mar 2026 03:13PM UTC coverage: 88.702% (-0.005%) from 88.707%
#20227

push

github

web-flow
Add custom label for per-RPC metrics

Implements gRFC A108.

https://github.com/grpc/proposal/blob/master/A108-otel-custom-per-call-label.md

35518 of 40042 relevant lines covered (88.7%)

0.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.39
/../opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java
1
/*
2
 * Copyright 2023 The gRPC Authors
3
 *
4
 * Licensed under the Apache License, Version 2.0 (the "License");
5
 * you may not use this file except in compliance with the License.
6
 * You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * Unless required by applicable law or agreed to in writing, software
11
 * distributed under the License is distributed on an "AS IS" BASIS,
12
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
 * See the License for the specific language governing permissions and
14
 * limitations under the License.
15
 */
16

17
package io.grpc.opentelemetry;
18

19
import static com.google.common.base.Preconditions.checkNotNull;
20
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
22
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.CUSTOM_LABEL_KEY;
23
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
24
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
25
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
26
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.TARGET_KEY;
27

28
import com.google.common.annotations.VisibleForTesting;
29
import com.google.common.base.Stopwatch;
30
import com.google.common.base.Supplier;
31
import com.google.common.collect.ImmutableList;
32
import com.google.common.collect.ImmutableSet;
33
import com.google.errorprone.annotations.concurrent.GuardedBy;
34
import io.grpc.CallOptions;
35
import io.grpc.Channel;
36
import io.grpc.ClientCall;
37
import io.grpc.ClientInterceptor;
38
import io.grpc.ClientStreamTracer;
39
import io.grpc.ClientStreamTracer.StreamInfo;
40
import io.grpc.Deadline;
41
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
42
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
43
import io.grpc.Grpc;
44
import io.grpc.Metadata;
45
import io.grpc.MethodDescriptor;
46
import io.grpc.ServerStreamTracer;
47
import io.grpc.Status;
48
import io.grpc.Status.Code;
49
import io.grpc.StreamTracer;
50
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
51
import io.opentelemetry.api.baggage.Baggage;
52
import io.opentelemetry.api.common.AttributesBuilder;
53
import io.opentelemetry.context.Context;
54
import java.util.ArrayList;
55
import java.util.Collection;
56
import java.util.Collections;
57
import java.util.List;
58
import java.util.concurrent.TimeUnit;
59
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
60
import java.util.concurrent.atomic.AtomicLong;
61
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
62
import java.util.logging.Level;
63
import java.util.logging.Logger;
64
import javax.annotation.Nullable;
65

66
/**
67
 * Provides factories for {@link StreamTracer} that records metrics to OpenTelemetry.
68
 *
69
 * <p>On the client-side, a factory is created for each call, and the factory creates a stream
70
 * tracer for each attempt. If there is no stream created when the call is ended, we still create a
71
 * tracer. It's the tracer that reports per-attempt stats, and the factory that reports the stats
72
 * of the overall RPC, such as RETRIES_PER_CALL, to OpenTelemetry.
73
 *
74
 * <p>This module optionally applies a target attribute filter to limit the cardinality of
75
 * the {@code grpc.target} attribute in client-side metrics by mapping disallowed targets
76
 * to a stable placeholder value.
77
 *
78
 * <p>On the server-side, there is only one ServerStream per each ServerCall, and ServerStream
79
 * starts earlier than the ServerCall. Therefore, only one tracer is created per stream/call, and
80
 * it's the tracer that reports the summary to OpenTelemetry.
81
 */
82
final class OpenTelemetryMetricsModule {
83
  private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsModule.class.getName());
1✔
84
  public static final ImmutableSet<String> DEFAULT_PER_CALL_METRICS_SET =
1✔
85
      ImmutableSet.of(
1✔
86
          "grpc.client.attempt.started",
87
          "grpc.client.attempt.duration",
88
          "grpc.client.attempt.sent_total_compressed_message_size",
89
          "grpc.client.attempt.rcvd_total_compressed_message_size",
90
          "grpc.client.call.duration",
91
          "grpc.server.call.started",
92
          "grpc.server.call.duration",
93
          "grpc.server.call.sent_total_compressed_message_size",
94
          "grpc.server.call.rcvd_total_compressed_message_size");
95

96
  // Using floating point because TimeUnit.NANOSECONDS.toSeconds would discard
97
  // fractional seconds.
98
  private static final double SECONDS_PER_NANO = 1e-9;
99

100
  private final OpenTelemetryMetricsResource resource;
101
  private final Supplier<Stopwatch> stopwatchSupplier;
102
  private final boolean localityEnabled;
103
  private final boolean backendServiceEnabled;
104
  private final boolean customLabelEnabled;
105
  private final ImmutableList<OpenTelemetryPlugin> plugins;
106
  @Nullable
107
  private final TargetFilter targetAttributeFilter;
108

109
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
110
                             OpenTelemetryMetricsResource resource,
111
                             Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
112
    this(stopwatchSupplier, resource, optionalLabels, plugins, null);
1✔
113
  }
1✔
114

115
  OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
116
      OpenTelemetryMetricsResource resource,
117
      Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins,
118
      @Nullable TargetFilter targetAttributeFilter) {
1✔
119
    this.resource = checkNotNull(resource, "resource");
1✔
120
    this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
1✔
121
    this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
1✔
122
    this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey());
1✔
123
    this.customLabelEnabled = optionalLabels.contains(CUSTOM_LABEL_KEY.getKey());
1✔
124
    this.plugins = ImmutableList.copyOf(plugins);
1✔
125
    this.targetAttributeFilter = targetAttributeFilter;
1✔
126
  }
1✔
127

128
  @VisibleForTesting
129
  TargetFilter getTargetAttributeFilter() {
130
    return targetAttributeFilter;
1✔
131
  }
132

133
  /**
134
   * Returns the server tracer factory.
135
   */
136
  ServerStreamTracer.Factory getServerTracerFactory() {
137
    return new ServerTracerFactory();
1✔
138
  }
139

140
  /**
141
   * Returns the client interceptor that facilitates OpenTelemetry metrics reporting.
142
   */
143
  ClientInterceptor getClientInterceptor(String target) {
144
    ImmutableList.Builder<OpenTelemetryPlugin> pluginBuilder =
1✔
145
        ImmutableList.builderWithExpectedSize(plugins.size());
1✔
146
    for (OpenTelemetryPlugin plugin : plugins) {
1✔
147
      if (plugin.enablePluginForChannel(target)) {
1✔
148
        pluginBuilder.add(plugin);
1✔
149
      }
150
    }
1✔
151
    String filteredTarget = recordTarget(target);
1✔
152
    return new MetricsClientInterceptor(filteredTarget, pluginBuilder.build());
1✔
153
  }
154

155
  String recordTarget(String target) {
156
    if (targetAttributeFilter == null || target == null) {
1✔
157
      return target;
1✔
158
    }
159
    return targetAttributeFilter.test(target) ? target : "other";
1✔
160
  }
161

162
  static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) {
163
    return isGeneratedMethod ? fullMethodName : "other";
1✔
164
  }
165

166
  private static Context otelContextWithBaggage() {
167
    Baggage baggage = BAGGAGE_KEY.get();
1✔
168
    if (baggage == null) {
1✔
169
      return Context.current();
1✔
170
    }
171
    return Context.current().with(baggage);
1✔
172
  }
173

174
  private static final class ClientTracer extends ClientStreamTracer {
175
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
176
    @Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
177

178
    /*
179
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
180
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
181
     * (potentially racy) direct updates of the volatile variables.
182
     */
183
    static {
184
      AtomicLongFieldUpdater<ClientTracer> tmpOutboundWireSizeUpdater;
185
      AtomicLongFieldUpdater<ClientTracer> tmpInboundWireSizeUpdater;
186
      try {
187
        tmpOutboundWireSizeUpdater =
1✔
188
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "outboundWireSize");
1✔
189
        tmpInboundWireSizeUpdater =
1✔
190
            AtomicLongFieldUpdater.newUpdater(ClientTracer.class, "inboundWireSize");
1✔
191
      } catch (Throwable t) {
×
192
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
193
        tmpOutboundWireSizeUpdater = null;
×
194
        tmpInboundWireSizeUpdater = null;
×
195
      }
1✔
196
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
197
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
198
    }
1✔
199

200
    final Stopwatch stopwatch;
201
    final CallAttemptsTracerFactory attemptsState;
202
    final OpenTelemetryMetricsModule module;
203
    final StreamInfo info;
204
    final String target;
205
    final String fullMethodName;
206
    final List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins;
207
    volatile long outboundWireSize;
208
    volatile long inboundWireSize;
209
    volatile String locality;
210
    volatile String backendService;
211
    long attemptNanos;
212
    Code statusCode;
213

214
    ClientTracer(CallAttemptsTracerFactory attemptsState, OpenTelemetryMetricsModule module,
215
        StreamInfo info, String target, String fullMethodName,
216
        List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins) {
1✔
217
      this.attemptsState = attemptsState;
1✔
218
      this.module = module;
1✔
219
      this.info = info;
1✔
220
      this.target = target;
1✔
221
      this.fullMethodName = fullMethodName;
1✔
222
      this.streamPlugins = streamPlugins;
1✔
223
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
224
    }
1✔
225

226
    @Override
227
    public void inboundHeaders(Metadata headers) {
228
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
229
        plugin.inboundHeaders(headers);
1✔
230
      }
1✔
231
    }
1✔
232

233
    @Override
234
    @SuppressWarnings("NonAtomicVolatileUpdate")
235
    public void outboundWireSize(long bytes) {
236
      if (outboundWireSizeUpdater != null) {
1✔
237
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
238
      } else {
239
        outboundWireSize += bytes;
×
240
      }
241
    }
1✔
242

243
    @Override
244
    @SuppressWarnings("NonAtomicVolatileUpdate")
245
    public void inboundWireSize(long bytes) {
246
      if (inboundWireSizeUpdater != null) {
1✔
247
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
248
      } else {
249
        inboundWireSize += bytes;
×
250
      }
251
    }
1✔
252

253
    @Override
254
    public void addOptionalLabel(String key, String value) {
255
      if ("grpc.lb.locality".equals(key)) {
1✔
256
        locality = value;
1✔
257
      }
258
      if ("grpc.lb.backend_service".equals(key)) {
1✔
259
        backendService = value;
1✔
260
      }
261
    }
1✔
262

263
    @Override
264
    public void inboundTrailers(Metadata trailers) {
265
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
266
        plugin.inboundTrailers(trailers);
1✔
267
      }
1✔
268
    }
1✔
269

270
    @Override
271
    public void streamClosed(Status status) {
272
      stopwatch.stop();
1✔
273
      attemptNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
274
      Deadline deadline = info.getCallOptions().getDeadline();
1✔
275
      statusCode = status.getCode();
1✔
276
      if (statusCode == Code.CANCELLED && deadline != null) {
1✔
277
        // When the server's deadline expires, it can only reset the stream with CANCEL and no
278
        // description. Since our timer may be delayed in firing, we double-check the deadline and
279
        // turn the failure into the likely more helpful DEADLINE_EXCEEDED status.
280
        if (deadline.isExpired()) {
1✔
281
          statusCode = Code.DEADLINE_EXCEEDED;
1✔
282
        }
283
      }
284
      attemptsState.attemptEnded(info.getCallOptions());
1✔
285
      recordFinishedAttempt();
1✔
286
    }
1✔
287

288
    void recordFinishedAttempt() {
289
      Context otelContext = otelContextWithBaggage();
1✔
290
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
291
          .put(METHOD_KEY, fullMethodName)
1✔
292
          .put(TARGET_KEY, target)
1✔
293
          .put(STATUS_KEY, statusCode.toString());
1✔
294
      if (module.localityEnabled) {
1✔
295
        String savedLocality = locality;
1✔
296
        if (savedLocality == null) {
1✔
297
          savedLocality = "";
1✔
298
        }
299
        builder.put(LOCALITY_KEY, savedLocality);
1✔
300
      }
301
      if (module.backendServiceEnabled) {
1✔
302
        String savedBackendService = backendService;
1✔
303
        if (savedBackendService == null) {
1✔
304
          savedBackendService = "";
1✔
305
        }
306
        builder.put(BACKEND_SERVICE_KEY, savedBackendService);
1✔
307
      }
308
      if (module.customLabelEnabled) {
1✔
309
        builder.put(
1✔
310
            CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
311
      }
312
      for (OpenTelemetryPlugin.ClientStreamPlugin plugin : streamPlugins) {
1✔
313
        plugin.addLabels(builder);
1✔
314
      }
1✔
315
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
316

317
      if (module.resource.clientAttemptDurationCounter() != null ) {
1✔
318
        module.resource.clientAttemptDurationCounter()
1✔
319
            .record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
1✔
320
      }
321
      if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
1✔
322
        module.resource.clientTotalSentCompressedMessageSizeCounter()
1✔
323
            .record(outboundWireSize, attribute, otelContext);
1✔
324
      }
325
      if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
326
        module.resource.clientTotalReceivedCompressedMessageSizeCounter()
1✔
327
            .record(inboundWireSize, attribute, otelContext);
1✔
328
      }
329
    }
1✔
330
  }
331

332
  @VisibleForTesting
333
  static final class CallAttemptsTracerFactory extends ClientStreamTracer.Factory {
334
    private final OpenTelemetryMetricsModule module;
335
    private final String target;
336
    private final Stopwatch attemptDelayStopwatch;
337
    private final Stopwatch callStopWatch;
338
    @GuardedBy("lock")
339
    private boolean callEnded;
340
    private final String fullMethodName;
341
    private final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
342
    private Status status;
343
    private long retryDelayNanos;
344
    private long callLatencyNanos;
345
    private final Object lock = new Object();
1✔
346
    private final AtomicLong attemptsPerCall = new AtomicLong();
1✔
347
    private final AtomicLong hedgedAttemptsPerCall = new AtomicLong();
1✔
348
    private final AtomicLong transparentRetriesPerCall = new AtomicLong();
1✔
349
    @GuardedBy("lock")
350
    private int activeStreams;
351
    @GuardedBy("lock")
352
    private boolean finishedCallToBeRecorded;
353

354
    CallAttemptsTracerFactory(
355
        OpenTelemetryMetricsModule module,
356
        String target,
357
        CallOptions callOptions,
358
        String fullMethodName,
359
        List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins) {
1✔
360
      this.module = checkNotNull(module, "module");
1✔
361
      this.target = checkNotNull(target, "target");
1✔
362
      this.fullMethodName = checkNotNull(fullMethodName, "fullMethodName");
1✔
363
      this.callPlugins = checkNotNull(callPlugins, "callPlugins");
1✔
364
      this.attemptDelayStopwatch = module.stopwatchSupplier.get();
1✔
365
      this.callStopWatch = module.stopwatchSupplier.get().start();
1✔
366

367
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
368
          .put(METHOD_KEY, fullMethodName)
1✔
369
          .put(TARGET_KEY, target);
1✔
370
      if (module.customLabelEnabled) {
1✔
371
        builder.put(
1✔
372
            CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
373
      }
374
      io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
375

376
      // Record here in case mewClientStreamTracer() would never be called.
377
      if (module.resource.clientAttemptCountCounter() != null) {
1✔
378
        module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
379
      }
380
    }
1✔
381

382
    @Override
383
    public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata metadata) {
384
      synchronized (lock) {
1✔
385
        if (finishedCallToBeRecorded) {
1✔
386
          // This can be the case when the call is cancelled but a retry attempt is created.
387
          return new ClientStreamTracer() {};
×
388
        }
389
        if (++activeStreams == 1 && attemptDelayStopwatch.isRunning()) {
1✔
390
          attemptDelayStopwatch.stop();
1✔
391
          retryDelayNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
392
        }
393
      }
1✔
394
      // Skip recording for the first time, since it is already recorded in
395
      // CallAttemptsTracerFactory constructor. attemptsPerCall will be non-zero after the first
396
      // attempt, as first attempt cannot be a transparent retry.
397
      if (attemptsPerCall.get() > 0) {
1✔
398
        AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
399
            .put(METHOD_KEY, fullMethodName)
1✔
400
            .put(TARGET_KEY, target);
1✔
401
        if (module.customLabelEnabled) {
1✔
402
          builder.put(
1✔
403
              CUSTOM_LABEL_KEY, info.getCallOptions().getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
404
        }
405
        io.opentelemetry.api.common.Attributes attribute = builder.build();
1✔
406
        if (module.resource.clientAttemptCountCounter() != null) {
1✔
407
          module.resource.clientAttemptCountCounter().add(1, attribute);
1✔
408
        }
409
      }
410
      if (info.isTransparentRetry()) {
1✔
411
        transparentRetriesPerCall.incrementAndGet();
1✔
412
      } else if (info.isHedging()) {
1✔
413
        hedgedAttemptsPerCall.incrementAndGet();
1✔
414
      } else {
415
        attemptsPerCall.incrementAndGet();
1✔
416
      }
417
      return newClientTracer(info);
1✔
418
    }
419

420
    private ClientTracer newClientTracer(StreamInfo info) {
421
      List<OpenTelemetryPlugin.ClientStreamPlugin> streamPlugins = Collections.emptyList();
1✔
422
      if (!callPlugins.isEmpty()) {
1✔
423
        streamPlugins = new ArrayList<>(callPlugins.size());
1✔
424
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
425
          streamPlugins.add(plugin.newClientStreamPlugin());
1✔
426
        }
1✔
427
        streamPlugins = Collections.unmodifiableList(streamPlugins);
1✔
428
      }
429
      return new ClientTracer(this, module, info, target, fullMethodName, streamPlugins);
1✔
430
    }
431

432
    // Called whenever each attempt is ended.
433
    void attemptEnded(CallOptions callOptions) {
434
      boolean shouldRecordFinishedCall = false;
1✔
435
      synchronized (lock) {
1✔
436
        if (--activeStreams == 0) {
1✔
437
          attemptDelayStopwatch.start();
1✔
438
          if (callEnded && !finishedCallToBeRecorded) {
1✔
439
            shouldRecordFinishedCall = true;
×
440
            finishedCallToBeRecorded = true;
×
441
          }
442
        }
443
      }
1✔
444
      if (shouldRecordFinishedCall) {
1✔
445
        recordFinishedCall(callOptions);
×
446
      }
447
    }
1✔
448

449
    void callEnded(Status status, CallOptions callOptions) {
450
      callStopWatch.stop();
1✔
451
      this.status = status;
1✔
452
      boolean shouldRecordFinishedCall = false;
1✔
453
      synchronized (lock) {
1✔
454
        if (callEnded) {
1✔
455
          // TODO(https://github.com/grpc/grpc-java/issues/7921): this shouldn't happen
456
          return;
×
457
        }
458
        callEnded = true;
1✔
459
        if (activeStreams == 0 && !finishedCallToBeRecorded) {
1✔
460
          shouldRecordFinishedCall = true;
1✔
461
          finishedCallToBeRecorded = true;
1✔
462
        }
463
      }
1✔
464
      if (shouldRecordFinishedCall) {
1✔
465
        recordFinishedCall(callOptions);
1✔
466
      }
467
    }
1✔
468

469
    void recordFinishedCall(CallOptions callOptions) {
470
      Context otelContext = otelContextWithBaggage();
1✔
471
      if (attemptsPerCall.get() == 0) {
1✔
472
        ClientTracer tracer = newClientTracer(null);
1✔
473
        tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
474
        tracer.statusCode = status.getCode();
1✔
475
        tracer.recordFinishedAttempt();
1✔
476
      }
477
      callLatencyNanos = callStopWatch.elapsed(TimeUnit.NANOSECONDS);
1✔
478

479
      // Base attributes
480
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
481
          .put(METHOD_KEY, fullMethodName)
1✔
482
          .put(TARGET_KEY, target);
1✔
483
      if (module.customLabelEnabled) {
1✔
484
        builder.put(CUSTOM_LABEL_KEY, callOptions.getOption(Grpc.CALL_OPTION_CUSTOM_LABEL));
1✔
485
      }
486
      io.opentelemetry.api.common.Attributes baseAttributes = builder.build();
1✔
487

488
      // Duration
489
      if (module.resource.clientCallDurationCounter() != null) {
1✔
490
        module.resource.clientCallDurationCounter().record(
1✔
491
            callLatencyNanos * SECONDS_PER_NANO,
492
            baseAttributes.toBuilder()
1✔
493
                .put(STATUS_KEY, status.getCode().toString())
1✔
494
                .build(),
1✔
495
            otelContext
496
        );
497
      }
498

499
      // Retry counts
500
      if (module.resource.clientCallRetriesCounter() != null) {
1✔
501
        long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
1✔
502
        if (retriesPerCall > 0) {
1✔
503
          module.resource.clientCallRetriesCounter()
1✔
504
              .record(retriesPerCall, baseAttributes, otelContext);
1✔
505
        }
506
      }
507

508
      // Hedge counts
509
      if (module.resource.clientCallHedgesCounter() != null) {
1✔
510
        long hedges = hedgedAttemptsPerCall.get();
1✔
511
        if (hedges > 0) {
1✔
512
          module.resource.clientCallHedgesCounter()
1✔
513
              .record(hedges, baseAttributes, otelContext);
1✔
514
        }
515
      }
516

517
      // Transparent Retry counts
518
      if (module.resource.clientCallTransparentRetriesCounter() != null) {
1✔
519
        long transparentRetries = transparentRetriesPerCall.get();
1✔
520
        if (transparentRetries > 0) {
1✔
521
          module.resource.clientCallTransparentRetriesCounter()
1✔
522
              .record(transparentRetries, baseAttributes, otelContext);
1✔
523
        }
524
      }
525

526
      // Retry delay
527
      if (module.resource.clientCallRetryDelayCounter() != null) {
1✔
528
        module.resource.clientCallRetryDelayCounter().record(
1✔
529
            retryDelayNanos * SECONDS_PER_NANO,
530
            baseAttributes,
531
            otelContext
532
        );
533
      }
534
    }
1✔
535
  }
536

537
  private static final class ServerTracer extends ServerStreamTracer {
538
    @Nullable private static final AtomicIntegerFieldUpdater<ServerTracer> streamClosedUpdater;
539
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> outboundWireSizeUpdater;
540
    @Nullable private static final AtomicLongFieldUpdater<ServerTracer> inboundWireSizeUpdater;
541

542
    /*
543
     * When using Atomic*FieldUpdater, some Samsung Android 5.0.x devices encounter a bug in their
544
     * JDK reflection API that triggers a NoSuchFieldException. When this occurs, we fall back to
545
     * (potentially racy) direct updates of the volatile variables.
546
     */
547
    static {
548
      AtomicIntegerFieldUpdater<ServerTracer> tmpStreamClosedUpdater;
549
      AtomicLongFieldUpdater<ServerTracer> tmpOutboundWireSizeUpdater;
550
      AtomicLongFieldUpdater<ServerTracer> tmpInboundWireSizeUpdater;
551
      try {
552
        tmpStreamClosedUpdater =
1✔
553
            AtomicIntegerFieldUpdater.newUpdater(ServerTracer.class, "streamClosed");
1✔
554
        tmpOutboundWireSizeUpdater =
1✔
555
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "outboundWireSize");
1✔
556
        tmpInboundWireSizeUpdater =
1✔
557
            AtomicLongFieldUpdater.newUpdater(ServerTracer.class, "inboundWireSize");
1✔
558
      } catch (Throwable t) {
×
559
        logger.log(Level.SEVERE, "Creating atomic field updaters failed", t);
×
560
        tmpStreamClosedUpdater = null;
×
561
        tmpOutboundWireSizeUpdater = null;
×
562
        tmpInboundWireSizeUpdater = null;
×
563
      }
1✔
564
      streamClosedUpdater = tmpStreamClosedUpdater;
1✔
565
      outboundWireSizeUpdater = tmpOutboundWireSizeUpdater;
1✔
566
      inboundWireSizeUpdater = tmpInboundWireSizeUpdater;
1✔
567
    }
1✔
568

569
    private final OpenTelemetryMetricsModule module;
570
    private final String fullMethodName;
571
    private final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
572
    private volatile boolean isGeneratedMethod;
573
    private volatile int streamClosed;
574
    private final Stopwatch stopwatch;
575
    private volatile long outboundWireSize;
576
    private volatile long inboundWireSize;
577

578
    ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
579
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
1✔
580
      this.module = checkNotNull(module, "module");
1✔
581
      this.fullMethodName = fullMethodName;
1✔
582
      this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins");
1✔
583
      this.stopwatch = module.stopwatchSupplier.get().start();
1✔
584
    }
1✔
585

586
    @Override
587
    public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
588
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
589
      // which is true for all generated methods. Otherwise, programmatically
590
      // created methods result in high cardinality metrics.
591
      boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
1✔
592
      isGeneratedMethod = isSampledToLocalTracing;
1✔
593
      io.opentelemetry.api.common.Attributes attribute =
1✔
594
          io.opentelemetry.api.common.Attributes.of(
1✔
595
              METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));
1✔
596

597
      if (module.resource.serverCallCountCounter() != null) {
1✔
598
        module.resource.serverCallCountCounter().add(1, attribute);
1✔
599
      }
600
    }
1✔
601

602
    @Override
603
    @SuppressWarnings("NonAtomicVolatileUpdate")
604
    public void outboundWireSize(long bytes) {
605
      if (outboundWireSizeUpdater != null) {
1✔
606
        outboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
607
      } else {
608
        outboundWireSize += bytes;
×
609
      }
610
    }
1✔
611

612
    @Override
613
    @SuppressWarnings("NonAtomicVolatileUpdate")
614
    public void inboundWireSize(long bytes) {
615
      if (inboundWireSizeUpdater != null) {
1✔
616
        inboundWireSizeUpdater.getAndAdd(this, bytes);
1✔
617
      } else {
618
        inboundWireSize += bytes;
×
619
      }
620
    }
1✔
621

622
    /**
623
     * Record a finished stream and mark the current time as the end time.
624
     *
625
     * <p>Can be called from any thread without synchronization.  Calling it the second time or more
626
     * is a no-op.
627
     */
628
    @Override
629
    public void streamClosed(Status status) {
630
      Context otelContext = otelContextWithBaggage();
1✔
631
      if (streamClosedUpdater != null) {
1✔
632
        if (streamClosedUpdater.getAndSet(this, 1) != 0) {
1✔
633
          return;
×
634
        }
635
      } else {
636
        if (streamClosed != 0) {
×
637
          return;
×
638
        }
639
        streamClosed = 1;
×
640
      }
641
      stopwatch.stop();
1✔
642
      long elapsedTimeNanos = stopwatch.elapsed(TimeUnit.NANOSECONDS);
1✔
643
      AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
1✔
644
          .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
1✔
645
          .put(STATUS_KEY, status.getCode().toString());
1✔
646
      for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
1✔
647
        plugin.addLabels(builder);
1✔
648
      }
1✔
649
      io.opentelemetry.api.common.Attributes attributes = builder.build();
1✔
650

651
      if (module.resource.serverCallDurationCounter() != null) {
1✔
652
        module.resource.serverCallDurationCounter()
1✔
653
            .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
1✔
654
      }
655
      if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
1✔
656
        module.resource.serverTotalSentCompressedMessageSizeCounter()
1✔
657
            .record(outboundWireSize, attributes, otelContext);
1✔
658
      }
659
      if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
1✔
660
        module.resource.serverTotalReceivedCompressedMessageSizeCounter()
1✔
661
            .record(inboundWireSize, attributes, otelContext);
1✔
662
      }
663
    }
1✔
664
  }
665

666
  @VisibleForTesting
667
  final class ServerTracerFactory extends ServerStreamTracer.Factory {
1✔
668
    @Override
669
    public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
670
      final List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins;
671
      if (plugins.isEmpty()) {
1✔
672
        streamPlugins = Collections.emptyList();
1✔
673
      } else {
674
        List<OpenTelemetryPlugin.ServerStreamPlugin> streamPluginsMutable =
1✔
675
            new ArrayList<>(plugins.size());
1✔
676
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
677
          streamPluginsMutable.add(plugin.newServerStreamPlugin(headers));
1✔
678
        }
1✔
679
        streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
1✔
680
      }
681
      return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
1✔
682
    }
683
  }
684

685
  @VisibleForTesting
686
  final class MetricsClientInterceptor implements ClientInterceptor {
687
    private final String target;
688
    private final ImmutableList<OpenTelemetryPlugin> plugins;
689

690
    MetricsClientInterceptor(String target, ImmutableList<OpenTelemetryPlugin> plugins) {
1✔
691
      this.target = checkNotNull(target, "target");
1✔
692
      this.plugins = checkNotNull(plugins, "plugins");
1✔
693
    }
1✔
694

695
    @Override
696
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
697
        MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
698
      final List<OpenTelemetryPlugin.ClientCallPlugin> callPlugins;
699
      if (plugins.isEmpty()) {
1✔
700
        callPlugins = Collections.emptyList();
1✔
701
      } else {
702
        List<OpenTelemetryPlugin.ClientCallPlugin> callPluginsMutable =
1✔
703
            new ArrayList<>(plugins.size());
1✔
704
        for (OpenTelemetryPlugin plugin : plugins) {
1✔
705
          callPluginsMutable.add(plugin.newClientCallPlugin());
1✔
706
        }
1✔
707
        callPlugins = Collections.unmodifiableList(callPluginsMutable);
1✔
708
        for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
709
          callOptions = plugin.filterCallOptions(callOptions);
1✔
710
        }
1✔
711
      }
712
      final CallOptions finalCallOptions = callOptions;
1✔
713
      // Only record method name as an attribute if isSampledToLocalTracing is set to true,
714
      // which is true for all generated methods. Otherwise, programatically
715
      // created methods result in high cardinality metrics.
716
      final CallAttemptsTracerFactory tracerFactory = new CallAttemptsTracerFactory(
1✔
717
          OpenTelemetryMetricsModule.this, target, callOptions,
718
          recordMethodName(method.getFullMethodName(), method.isSampledToLocalTracing()),
1✔
719
          callPlugins);
720
      ClientCall<ReqT, RespT> call =
1✔
721
          next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
1✔
722
      return new SimpleForwardingClientCall<ReqT, RespT>(call) {
1✔
723
        @Override
724
        public void start(Listener<RespT> responseListener, Metadata headers) {
725
          for (OpenTelemetryPlugin.ClientCallPlugin plugin : callPlugins) {
1✔
726
            plugin.addMetadata(headers);
1✔
727
          }
1✔
728
          delegate().start(
1✔
729
              new SimpleForwardingClientCallListener<RespT>(responseListener) {
1✔
730
                @Override
731
                public void onClose(Status status, Metadata trailers) {
732
                  tracerFactory.callEnded(status, finalCallOptions);
1✔
733
                  super.onClose(status, trailers);
1✔
734
                }
1✔
735
              },
736
              headers);
737
        }
1✔
738
      };
739
    }
740
  }
741
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc